Module refinery.units.sinks.dump

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import io
import os
import os.path

from itertools import cycle
from string import Formatter

from refinery.units import Arg, Unit, RefineryCriticalException
from refinery.lib.meta import metavars


class dump(Unit):
    """
    Dump incoming data to files on disk. It is possible to specify filenames with format fields.
    Any metadata field on an incoming chunk is available. Additionally, any field that can be
    populated by the `refinery.cm` unit is also available. These include the following:

        {ext}    : Automatically guessed file extension
        {crc32}  : CRC32 checksum of the data
        {index}  : Index of the data in the input stream, starting at 0
        {size}   : Size of the data in bytes
        {md5}    : MD5 hash of the data
        {sha1}   : SHA1 hash of the data
        {sha256} : SHA-256 hash of the data
        {path}   : Associated path; defaults to {sha256} if none is given.

    When not using formatted file names, the unit ingests as many incoming inputs as filenames were
    specified on the command line. Unless connected to a terminal, the remaining inputs will be
    forwarded on STDOUT. The `-t` or `--tee` switch can be used to forward all inputs, under all
    circumstances, regardless of whether or not they have been processed.

    If no file is specified, all ingested inputs are concatenated and written to the clipboard. This
    will only succeed when the data can successfully be encoded.
    """

    def __init__(
        self, *files: Arg(metavar='file', type=str, help='Optionally formatted filename.'),
        tee    : Arg.Switch('-t', help='Forward all inputs to STDOUT.') = False,
        stream : Arg.Switch('-s', help='Dump all incoming data to the same file.') = False,
        plain  : Arg.Switch('-p', help='Never apply any formatting to file names.') = False,
        force  : Arg.Switch('-f', help='Remove files if necessary to create dump path.') = False,
    ):
        if stream and len(files) != 1:
            raise ValueError('Can only use exactly one file in stream mode.')
        super().__init__(files=files, tee=tee, stream=stream, force=force)
        self.stream = None
        self._formatted = not plain and any(self._has_format(f) for f in files)
        self._reset()

    @staticmethod
    def _has_format(filename):
        if not isinstance(filename, str):
            return False
        formatter = Formatter()
        return any(
            any(t.isalnum() for t in fields)
            for _, fields, *__ in formatter.parse(filename) if fields
        )

    def _reset(self):
        self.exhausted = False
        self.paths = cycle(self.args.files) if self._formatted else iter(self.args.files)
        self._close()

    @property
    def _clipcopy(self):
        return not self.args.files

    def _components(self, path):
        def _reversed_components(path):
            while True:
                path, component = os.path.split(path)
                if not component:
                    break
                yield component
            yield path
        components = list(_reversed_components(path))
        components.reverse()
        return components

    def _open(self, path, unc=False):
        if hasattr(path, 'close'):
            return path
        path = os.path.abspath(path)
        base = os.path.dirname(path)
        if not unc:
            self.log_info('opening:', path)
        try:
            os.makedirs(base, exist_ok=True)
        except FileExistsError:
            self.log_info('existed:', path)
            part, components = '', self._components(path)
            while components:
                component, *components = components
                part = os.path.join(part, component)
                if os.path.exists(part) and os.path.isfile(part):
                    if self.args.force:
                        os.unlink(part)
                        return self._open(path, unc)
                    break
            raise RefineryCriticalException(F'Unable to dump to {path} because {part} is a file.')
        except FileNotFoundError:
            if unc or os.name != 'nt':
                raise
            path = F'\\\\?\\{path}'
            return self._open(path, unc=True)
        except OSError as e:
            if not self.log_info():
                self.log_warn('opening:', path)
            self.log_warn('errored:', e.args[1])
            return open(os.devnull, 'wb')
        else:
            mode = 'ab' if self.args.stream else 'wb'
            return open(path, mode)

    def _close(self, final=False):
        if not self.stream:
            return
        self.stream.flush()
        if self.args.stream and not final:
            return
        if self._clipcopy:
            if os.name == 'nt':
                from refinery.lib.winclip import ClipBoard, CF
                try:
                    img = self._image.open(self.stream)
                    with io.BytesIO() as out:
                        img.save(out, 'BMP')
                except Exception:
                    with ClipBoard(CF.TEXT) as cpb:
                        cpb.copy(self.stream.getvalue())
                else:
                    with ClipBoard(CF.DIB) as cpb:
                        out.seek(14, io.SEEK_SET)
                        cpb.copy(out.read())
            else:
                data = self.stream.getvalue()
                data = data.decode(self.codec, errors='backslashreplace')
                self._pyperclip.copy(data)
        self.stream.close()
        self.stream = None

    @Unit.Requires('pyperclip')
    def _pyperclip():
        import pyperclip
        return pyperclip

    @Unit.Requires('Pillow', 'formats')
    def _image():
        from PIL import Image
        return Image

    def process(self, data: bytes):
        forward_input_data = self.args.tee
        if self._clipcopy:
            self.stream.write(data)
        elif not self.exhausted:
            if not self.stream:
                # This should happen only when the unit is called from Python code
                # rather than via the command line.
                try:
                    path = next(self.paths)
                except StopIteration:
                    raise RefineryCriticalException('the list of filenames was exhausted.')
                else:
                    with self._open(path) as stream:
                        stream.write(data)
            else:
                self.stream.write(data)
                self.log_debug(F'wrote 0x{len(data):08X} bytes')
                self._close()
        else:
            forward_input_data = forward_input_data or not self.isatty
            if not forward_input_data:
                size = metavars(data).size
                self.log_warn(F'discarding unprocessed chunk of size {size!s}.')
        if forward_input_data:
            yield data

    def filter(self, chunks):
        if self.exhausted:
            self._reset()

        nostream = not self.args.stream
        clipcopy = self._clipcopy

        if clipcopy:
            self.stream = io.BytesIO()

        for index, chunk in enumerate(chunks, 0):
            if not chunk.visible:
                continue
            if not clipcopy and not self.exhausted and (nostream or not self.stream):
                try:
                    path = next(self.paths)
                except StopIteration:
                    self.exhausted = True
                else:
                    if self._has_format(path):
                        meta = metavars(chunk)
                        meta.ghost = True
                        meta.update_index(index)
                        path = meta.format_str(path, self.codec, [chunk])
                    self.stream = self._open(path)
            yield chunk

        self._close(final=True)
        self.exhausted = True

Classes

class dump (*files, tee=False, stream=False, plain=False, force=False)

Dump incoming data to files on disk. It is possible to specify filenames with format fields. Any metadata field on an incoming chunk is available. Additionally, any field that can be populated by the cm unit is also available. These include the following:

{ext}    : Automatically guessed file extension
{crc32}  : CRC32 checksum of the data
{index}  : Index of the data in the input stream, starting at 0
{size}   : Size of the data in bytes
{md5}    : MD5 hash of the data
{sha1}   : SHA1 hash of the data
{sha256} : SHA-256 hash of the data
{path}   : Associated path; defaults to {sha256} if none is given.

When not using formatted file names, the unit ingests as many incoming inputs as filenames were specified on the command line. Unless connected to a terminal, the remaining inputs will be forwarded on STDOUT. The -t or --tee switch can be used to forward all inputs, under all circumstances, regardless of whether or not they have been processed.

If no file is specified, all ingested inputs are concatenated and written to the clipboard. This will only succeed when the data can successfully be encoded.

Expand source code Browse git
class dump(Unit):
    """
    Dump incoming data to files on disk. It is possible to specify filenames with format fields.
    Any metadata field on an incoming chunk is available. Additionally, any field that can be
    populated by the `refinery.cm` unit is also available. These include the following:

        {ext}    : Automatically guessed file extension
        {crc32}  : CRC32 checksum of the data
        {index}  : Index of the data in the input stream, starting at 0
        {size}   : Size of the data in bytes
        {md5}    : MD5 hash of the data
        {sha1}   : SHA1 hash of the data
        {sha256} : SHA-256 hash of the data
        {path}   : Associated path; defaults to {sha256} if none is given.

    When not using formatted file names, the unit ingests as many incoming inputs as filenames were
    specified on the command line. Unless connected to a terminal, the remaining inputs will be
    forwarded on STDOUT. The `-t` or `--tee` switch can be used to forward all inputs, under all
    circumstances, regardless of whether or not they have been processed.

    If no file is specified, all ingested inputs are concatenated and written to the clipboard. This
    will only succeed when the data can successfully be encoded.
    """

    def __init__(
        self, *files: Arg(metavar='file', type=str, help='Optionally formatted filename.'),
        tee    : Arg.Switch('-t', help='Forward all inputs to STDOUT.') = False,
        stream : Arg.Switch('-s', help='Dump all incoming data to the same file.') = False,
        plain  : Arg.Switch('-p', help='Never apply any formatting to file names.') = False,
        force  : Arg.Switch('-f', help='Remove files if necessary to create dump path.') = False,
    ):
        if stream and len(files) != 1:
            raise ValueError('Can only use exactly one file in stream mode.')
        super().__init__(files=files, tee=tee, stream=stream, force=force)
        self.stream = None
        self._formatted = not plain and any(self._has_format(f) for f in files)
        self._reset()

    @staticmethod
    def _has_format(filename):
        if not isinstance(filename, str):
            return False
        formatter = Formatter()
        return any(
            any(t.isalnum() for t in fields)
            for _, fields, *__ in formatter.parse(filename) if fields
        )

    def _reset(self):
        self.exhausted = False
        self.paths = cycle(self.args.files) if self._formatted else iter(self.args.files)
        self._close()

    @property
    def _clipcopy(self):
        return not self.args.files

    def _components(self, path):
        def _reversed_components(path):
            while True:
                path, component = os.path.split(path)
                if not component:
                    break
                yield component
            yield path
        components = list(_reversed_components(path))
        components.reverse()
        return components

    def _open(self, path, unc=False):
        if hasattr(path, 'close'):
            return path
        path = os.path.abspath(path)
        base = os.path.dirname(path)
        if not unc:
            self.log_info('opening:', path)
        try:
            os.makedirs(base, exist_ok=True)
        except FileExistsError:
            self.log_info('existed:', path)
            part, components = '', self._components(path)
            while components:
                component, *components = components
                part = os.path.join(part, component)
                if os.path.exists(part) and os.path.isfile(part):
                    if self.args.force:
                        os.unlink(part)
                        return self._open(path, unc)
                    break
            raise RefineryCriticalException(F'Unable to dump to {path} because {part} is a file.')
        except FileNotFoundError:
            if unc or os.name != 'nt':
                raise
            path = F'\\\\?\\{path}'
            return self._open(path, unc=True)
        except OSError as e:
            if not self.log_info():
                self.log_warn('opening:', path)
            self.log_warn('errored:', e.args[1])
            return open(os.devnull, 'wb')
        else:
            mode = 'ab' if self.args.stream else 'wb'
            return open(path, mode)

    def _close(self, final=False):
        if not self.stream:
            return
        self.stream.flush()
        if self.args.stream and not final:
            return
        if self._clipcopy:
            if os.name == 'nt':
                from refinery.lib.winclip import ClipBoard, CF
                try:
                    img = self._image.open(self.stream)
                    with io.BytesIO() as out:
                        img.save(out, 'BMP')
                except Exception:
                    with ClipBoard(CF.TEXT) as cpb:
                        cpb.copy(self.stream.getvalue())
                else:
                    with ClipBoard(CF.DIB) as cpb:
                        out.seek(14, io.SEEK_SET)
                        cpb.copy(out.read())
            else:
                data = self.stream.getvalue()
                data = data.decode(self.codec, errors='backslashreplace')
                self._pyperclip.copy(data)
        self.stream.close()
        self.stream = None

    @Unit.Requires('pyperclip')
    def _pyperclip():
        import pyperclip
        return pyperclip

    @Unit.Requires('Pillow', 'formats')
    def _image():
        from PIL import Image
        return Image

    def process(self, data: bytes):
        forward_input_data = self.args.tee
        if self._clipcopy:
            self.stream.write(data)
        elif not self.exhausted:
            if not self.stream:
                # This should happen only when the unit is called from Python code
                # rather than via the command line.
                try:
                    path = next(self.paths)
                except StopIteration:
                    raise RefineryCriticalException('the list of filenames was exhausted.')
                else:
                    with self._open(path) as stream:
                        stream.write(data)
            else:
                self.stream.write(data)
                self.log_debug(F'wrote 0x{len(data):08X} bytes')
                self._close()
        else:
            forward_input_data = forward_input_data or not self.isatty
            if not forward_input_data:
                size = metavars(data).size
                self.log_warn(F'discarding unprocessed chunk of size {size!s}.')
        if forward_input_data:
            yield data

    def filter(self, chunks):
        if self.exhausted:
            self._reset()

        nostream = not self.args.stream
        clipcopy = self._clipcopy

        if clipcopy:
            self.stream = io.BytesIO()

        for index, chunk in enumerate(chunks, 0):
            if not chunk.visible:
                continue
            if not clipcopy and not self.exhausted and (nostream or not self.stream):
                try:
                    path = next(self.paths)
                except StopIteration:
                    self.exhausted = True
                else:
                    if self._has_format(path):
                        meta = metavars(chunk)
                        meta.ghost = True
                        meta.update_index(index)
                        path = meta.format_str(path, self.codec, [chunk])
                    self.stream = self._open(path)
            yield chunk

        self._close(final=True)
        self.exhausted = True

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members