Module refinery.units.meta.ef

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import os
import os.path
import sys

from pathlib import Path
from datetime import datetime
from typing import Iterable, Dict, Set, Optional

from refinery.lib.meta import metavars
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import exception_to_string
from refinery.units import Arg, Unit


class ef(Unit):
    """
    Short for "emit file". The unit reads files from disk and outputs them individually. Has the ability to
    read large files in chunks.
    """

    def __init__(self,
        *filenames: Arg(metavar='FILEMASK', nargs='+', type=str, help=(
            'A list of file masks. Each matching file will be read from disk and '
            'emitted. The file masks can include format string expressions which '
            'will be substituted from the current meta variables. The masks can '
            'use wild-card expressions, but this feature is disabled by default on '
            'Posix platforms, where it has to be enabled explicitly using the -w '
            'switch. On Windows, the feature is enabled by default and can be '
            'disabled using the -t switch.'
        )),
        list: Arg.Switch('-l', help='Only lists files with metadata.') = False,
        meta: Arg.Switch('-m', help=(
            'Adds the atime, mtime, ctime, and size metadata variables.'
        )) = False,
        size: Arg.Bounds('-s', range=True, help=(
            'If specified, only files are read whose size is in the given range.')) = None,
        read: Arg.Number('-r', help=(
            'If specified, files will be read in chunks of size N and each '
            'chunk is emitted as one element in the output list.'
        )) = 0,
        wild: Arg.Switch('-w', group='W', help='Force use of wildcard patterns in file masks.') = False,
        tame: Arg.Switch('-t', group='W', help='Disable wildcard patterns in file masks.') = False,
        symlinks: Arg.Switch('-y', help='Follow symbolic links and junctions, these are ignored by default.') = False,
        linewise: Arg.Switch('-i', help=(
            'Read the file linewise. By default, one line is read at a time. '
            'In line mode, the --read argument can be used to read the given '
            'number of lines in each chunk.'
        )) = False
    ):
        if wild and tame:
            raise ValueError('Cannot be both wild and tame!')
        super().__init__(
            size=size,
            read=read,
            list=list,
            meta=meta,
            wild=wild,
            tame=tame,
            symlinks=symlinks,
            linewise=linewise,
            filenames=filenames
        )

    def _read_chunks(self, fd):
        while True:
            buffer = fd.read(self.args.read)
            if not buffer:
                break
            yield buffer

    def _read_lines(self, fd):
        count = self.args.read or 1
        if count == 1:
            while True:
                buffer = fd.readline()
                if not buffer:
                    break
                yield buffer
            return
        with MemoryFile() as out:
            while True:
                for _ in range(count):
                    buffer = fd.readline()
                    if not buffer:
                        break
                    out.write(buffer)
                if not out.tell():
                    break
                yield out.getvalue()
                out.seek(0)
                out.truncate()

    def _absolute_path(self, path_string: str):
        path = Path(path_string).absolute()
        if os.name == 'nt' and not path.parts[0].startswith('\\\\?\\'):
            # The pathlib glob method will simply fail mid-traversal if it attempts to descend into
            # a folder or to a file whose path exceeds MAX_PATH on Windows. As a workaround, we use
            # UNC paths throughout and truncate to relative paths after enumeration.
            path = Path(F'\\\\?\\{path!s}')
        return path

    def _glob(self, pattern: str) -> Iterable[Path]:
        if pattern.endswith('**'):
            pattern += '/*'
        wildcard = re.search(R'[\[\?\*]', pattern)
        if wildcard is None:
            yield self._absolute_path(pattern)
            return
        k = wildcard.start()
        base, pattern = pattern[:k], pattern[k:]
        path = self._absolute_path(base or '.')
        last = path.parts[-1]
        if base.endswith(last):
            # /base/something.*
            pattern = F'{last}{pattern}'
            path = path.parent

        scandir = os.scandir

        class EmptyIterator:
            def __enter__(self): return self
            def __exit__(self, *_, **__): pass
            def __next__(self): raise StopIteration
            def __iter__(self): return self

        if sys.version_info >= (3, 12):
            def islink(path):
                return os.path.islink(path) or os.path.isjunction(path)
        else:
            def islink(path):
                try:
                    return bool(os.readlink(path))
                except OSError:
                    return False

        paths_scanned = set()

        def _patched_scandir(path):
            if islink(path):
                if not self.args.symlinks:
                    return EmptyIterator()
                try:
                    rp = os.path.realpath(path, strict=True)
                except OSError:
                    return EmptyIterator()
                if rp in paths_scanned:
                    self.log_warn(F'file system loop at: {path!s}')
                    return EmptyIterator()
                paths_scanned.add(rp)
                path = rp
            try:
                return scandir(path)
            except Exception as e:
                self.log_warn('error calling scandir:', exception_to_string(e))
                return EmptyIterator()

        try:
            os.scandir = _patched_scandir
            for match in path.glob(pattern):
                yield match
        finally:
            os.scandir = scandir

    def process(self, data):
        meta = metavars(data)
        size = self.args.size
        size = size and range(size.start, size.stop, size.step)
        meta.ghost = True
        wild = (os.name == 'nt' or self.args.wild) and not self.args.tame
        root = self._absolute_path('.')
        paths = self._glob if wild else lambda mask: [self._absolute_path(mask)]
        do_meta = self.args.meta
        do_stat = size or do_meta

        class SkipErrors:
            unit = self

            def __init__(self):
                self._history: Set[type] = set()
                self._message: Dict[type, Optional[str]] = {
                    ValueError: (
                        None
                    ), PermissionError: (
                        'access error while scanning: {}'
                    ), OSError: (
                        'system error while scanning: {}'
                    ), FileNotFoundError: (
                        'file unexpectedly not found: {}'
                    ), Exception: (
                        'unknown error while reading: {}'
                    ),
                }
                self.path = None

            def reset(self, path):
                self._history.clear()
                self.path = path
                return self

            def __enter__(self):
                return self

            def __exit__(self, et, ev, trace):
                if et is None:
                    return False
                for t, msg in self._message.items():
                    if issubclass(et, t):
                        if t not in self._history:
                            self._history.add(t)
                            if msg is not None:
                                self.unit.log_info(msg.format(self.path))
                        return True
                else:
                    return False

        for mask in self.args.filenames:
            mask = meta.format_str(mask, self.codec, [data])
            self.log_debug('scanning for mask:', mask)
            kwargs = dict()
            skip_errors = SkipErrors()
            for path in paths(mask):
                skip_errors.reset(path)
                filesize = None
                with skip_errors:
                    path = path.relative_to(root)
                with skip_errors:
                    if wild and not path.is_file():
                        continue
                with skip_errors:
                    if do_stat:
                        stat = path.stat()
                        filesize = stat.st_size
                    if do_meta:
                        kwargs.update(
                            size=filesize,
                            atime=datetime.fromtimestamp(stat.st_atime).isoformat(' ', 'seconds'),
                            ctime=datetime.fromtimestamp(stat.st_ctime).isoformat(' ', 'seconds'),
                            mtime=datetime.fromtimestamp(stat.st_mtime).isoformat(' ', 'seconds')
                        )
                if size is not None and filesize not in size:
                    continue
                with skip_errors:
                    if self.args.list:
                        yield self.labelled(str(path).encode(self.codec), **kwargs)
                        continue
                    with path.open('rb') as stream:
                        if self.args.linewise:
                            yield from self._read_lines(stream)
                        elif self.args.read:
                            yield from self._read_chunks(stream)
                        else:
                            data = stream.read()
                            self.log_info(lambda: F'reading: {path!s} ({len(data)} bytes)')
                            yield self.labelled(data, path=path.as_posix(), **kwargs)

Classes

class ef (*filenames, list=False, meta=False, size=None, read=0, wild=False, tame=False, symlinks=False, linewise=False)

Short for "emit file". The unit reads files from disk and outputs them individually. Has the ability to read large files in chunks.

Expand source code Browse git
class ef(Unit):
    """
    Short for "emit file". The unit reads files from disk and outputs them individually. Has the ability to
    read large files in chunks.
    """

    def __init__(self,
        *filenames: Arg(metavar='FILEMASK', nargs='+', type=str, help=(
            'A list of file masks. Each matching file will be read from disk and '
            'emitted. The file masks can include format string expressions which '
            'will be substituted from the current meta variables. The masks can '
            'use wild-card expressions, but this feature is disabled by default on '
            'Posix platforms, where it has to be enabled explicitly using the -w '
            'switch. On Windows, the feature is enabled by default and can be '
            'disabled using the -t switch.'
        )),
        list: Arg.Switch('-l', help='Only lists files with metadata.') = False,
        meta: Arg.Switch('-m', help=(
            'Adds the atime, mtime, ctime, and size metadata variables.'
        )) = False,
        size: Arg.Bounds('-s', range=True, help=(
            'If specified, only files are read whose size is in the given range.')) = None,
        read: Arg.Number('-r', help=(
            'If specified, files will be read in chunks of size N and each '
            'chunk is emitted as one element in the output list.'
        )) = 0,
        wild: Arg.Switch('-w', group='W', help='Force use of wildcard patterns in file masks.') = False,
        tame: Arg.Switch('-t', group='W', help='Disable wildcard patterns in file masks.') = False,
        symlinks: Arg.Switch('-y', help='Follow symbolic links and junctions, these are ignored by default.') = False,
        linewise: Arg.Switch('-i', help=(
            'Read the file linewise. By default, one line is read at a time. '
            'In line mode, the --read argument can be used to read the given '
            'number of lines in each chunk.'
        )) = False
    ):
        if wild and tame:
            raise ValueError('Cannot be both wild and tame!')
        super().__init__(
            size=size,
            read=read,
            list=list,
            meta=meta,
            wild=wild,
            tame=tame,
            symlinks=symlinks,
            linewise=linewise,
            filenames=filenames
        )

    def _read_chunks(self, fd):
        while True:
            buffer = fd.read(self.args.read)
            if not buffer:
                break
            yield buffer

    def _read_lines(self, fd):
        count = self.args.read or 1
        if count == 1:
            while True:
                buffer = fd.readline()
                if not buffer:
                    break
                yield buffer
            return
        with MemoryFile() as out:
            while True:
                for _ in range(count):
                    buffer = fd.readline()
                    if not buffer:
                        break
                    out.write(buffer)
                if not out.tell():
                    break
                yield out.getvalue()
                out.seek(0)
                out.truncate()

    def _absolute_path(self, path_string: str):
        path = Path(path_string).absolute()
        if os.name == 'nt' and not path.parts[0].startswith('\\\\?\\'):
            # The pathlib glob method will simply fail mid-traversal if it attempts to descend into
            # a folder or to a file whose path exceeds MAX_PATH on Windows. As a workaround, we use
            # UNC paths throughout and truncate to relative paths after enumeration.
            path = Path(F'\\\\?\\{path!s}')
        return path

    def _glob(self, pattern: str) -> Iterable[Path]:
        if pattern.endswith('**'):
            pattern += '/*'
        wildcard = re.search(R'[\[\?\*]', pattern)
        if wildcard is None:
            yield self._absolute_path(pattern)
            return
        k = wildcard.start()
        base, pattern = pattern[:k], pattern[k:]
        path = self._absolute_path(base or '.')
        last = path.parts[-1]
        if base.endswith(last):
            # /base/something.*
            pattern = F'{last}{pattern}'
            path = path.parent

        scandir = os.scandir

        class EmptyIterator:
            def __enter__(self): return self
            def __exit__(self, *_, **__): pass
            def __next__(self): raise StopIteration
            def __iter__(self): return self

        if sys.version_info >= (3, 12):
            def islink(path):
                return os.path.islink(path) or os.path.isjunction(path)
        else:
            def islink(path):
                try:
                    return bool(os.readlink(path))
                except OSError:
                    return False

        paths_scanned = set()

        def _patched_scandir(path):
            if islink(path):
                if not self.args.symlinks:
                    return EmptyIterator()
                try:
                    rp = os.path.realpath(path, strict=True)
                except OSError:
                    return EmptyIterator()
                if rp in paths_scanned:
                    self.log_warn(F'file system loop at: {path!s}')
                    return EmptyIterator()
                paths_scanned.add(rp)
                path = rp
            try:
                return scandir(path)
            except Exception as e:
                self.log_warn('error calling scandir:', exception_to_string(e))
                return EmptyIterator()

        try:
            os.scandir = _patched_scandir
            for match in path.glob(pattern):
                yield match
        finally:
            os.scandir = scandir

    def process(self, data):
        meta = metavars(data)
        size = self.args.size
        size = size and range(size.start, size.stop, size.step)
        meta.ghost = True
        wild = (os.name == 'nt' or self.args.wild) and not self.args.tame
        root = self._absolute_path('.')
        paths = self._glob if wild else lambda mask: [self._absolute_path(mask)]
        do_meta = self.args.meta
        do_stat = size or do_meta

        class SkipErrors:
            unit = self

            def __init__(self):
                self._history: Set[type] = set()
                self._message: Dict[type, Optional[str]] = {
                    ValueError: (
                        None
                    ), PermissionError: (
                        'access error while scanning: {}'
                    ), OSError: (
                        'system error while scanning: {}'
                    ), FileNotFoundError: (
                        'file unexpectedly not found: {}'
                    ), Exception: (
                        'unknown error while reading: {}'
                    ),
                }
                self.path = None

            def reset(self, path):
                self._history.clear()
                self.path = path
                return self

            def __enter__(self):
                return self

            def __exit__(self, et, ev, trace):
                if et is None:
                    return False
                for t, msg in self._message.items():
                    if issubclass(et, t):
                        if t not in self._history:
                            self._history.add(t)
                            if msg is not None:
                                self.unit.log_info(msg.format(self.path))
                        return True
                else:
                    return False

        for mask in self.args.filenames:
            mask = meta.format_str(mask, self.codec, [data])
            self.log_debug('scanning for mask:', mask)
            kwargs = dict()
            skip_errors = SkipErrors()
            for path in paths(mask):
                skip_errors.reset(path)
                filesize = None
                with skip_errors:
                    path = path.relative_to(root)
                with skip_errors:
                    if wild and not path.is_file():
                        continue
                with skip_errors:
                    if do_stat:
                        stat = path.stat()
                        filesize = stat.st_size
                    if do_meta:
                        kwargs.update(
                            size=filesize,
                            atime=datetime.fromtimestamp(stat.st_atime).isoformat(' ', 'seconds'),
                            ctime=datetime.fromtimestamp(stat.st_ctime).isoformat(' ', 'seconds'),
                            mtime=datetime.fromtimestamp(stat.st_mtime).isoformat(' ', 'seconds')
                        )
                if size is not None and filesize not in size:
                    continue
                with skip_errors:
                    if self.args.list:
                        yield self.labelled(str(path).encode(self.codec), **kwargs)
                        continue
                    with path.open('rb') as stream:
                        if self.args.linewise:
                            yield from self._read_lines(stream)
                        elif self.args.read:
                            yield from self._read_chunks(stream)
                        else:
                            data = stream.read()
                            self.log_info(lambda: F'reading: {path!s} ({len(data)} bytes)')
                            yield self.labelled(data, path=path.as_posix(), **kwargs)

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members