Module refinery.lib.batch.lexer

Expand source code Browse git
from __future__ import annotations

import array
import codecs
import enum
import io
import itertools
import ntpath
import re

from dataclasses import dataclass, field
from typing import Callable, ClassVar, Generator

from refinery.lib.batch.const import (
    AMPERSAND,
    ANGLE_CLOSE,
    ANGLE_OPEN,
    ANGLES,
    ASTERIX,
    AT,
    CARET,
    COLON,
    COMMA,
    DOLLAR,
    EQUALS,
    LINEBREAK,
    NINE,
    PAREN_CLOSE,
    PAREN_OPEN,
    PERCENT,
    PIPE,
    QUOTE,
    SEMICOLON,
    SEPARATORS,
    SLASH,
    WHITESPACE,
    ZERO,
)
from refinery.lib.batch.model import (
    ArgVar,
    ArgVarFlags,
    Ctrl,
    EmulatorException,
    MissingVariable,
    Redirect,
    RedirectIO,
    Token,
    UnexpectedEOF,
    UnexpectedToken,
    Word,
)
from refinery.lib.batch.state import BatchState
from refinery.lib.batch.util import batchint, u16, uncaret, unquote
from refinery.lib.types import buf


class Mode(enum.IntEnum):
    Text = 0
    Whitespace = enum.auto()
    Quote = enum.auto()
    Label = enum.auto()
    Gap = enum.auto()
    SetStarted = enum.auto()
    SetRegular = enum.auto()
    SetQuoted = enum.auto()


class EV(enum.IntEnum):
    New = PERCENT
    Env = DOLLAR
    Mod = COLON


SeparatorMap = {
    AT          : Ctrl.At,
    COMMA       : Ctrl.Comma,
    EQUALS      : Ctrl.Equals,
    PAREN_CLOSE : Ctrl.EndGroup,
    PAREN_OPEN  : Ctrl.NewGroup,
    SEMICOLON   : Ctrl.Semicolon,
}


@dataclass
class BatchLexerCursor:
    offset: int = 0
    modes: list[Mode] = field(default_factory=list)
    token: array.array[int] = field(
        default_factory=lambda: array.array('H'))
    subst_offset: int = 0
    subst_buffer: array.array[int] = field(
        default_factory=lambda: array.array('H'))

    def eof(self, size: int):
        if self.offset < size:
            return False
        return (n := len(self.subst_buffer)) == 0 or self.subst_offset >= n

    def copy(self):
        return self.__class__(
            self.offset,
            list(self.modes),
            array.array('H', self.token),
            self.subst_offset,
            array.array('H', self.subst_buffer)
        )

    @property
    def substituting(self):
        return len(self.subst_buffer) > 0


class BatchLexer:

    labels: dict[str, int]
    code: memoryview

    pending_redirect: RedirectIO | None

    cursor: BatchLexerCursor
    resume: BatchLexerCursor | None

    class _register:
        # A handler is given the current mode and char. It returns a boolean indicating
        # whether or not the character was processed and may be consumed.
        handlers: ClassVar[dict[Mode, Callable[
            [BatchLexer, Mode, int], Generator[Token, None, bool]
        ]]] = {}

        def __init__(self, *modes: Mode):
            self.modes = modes

        def __call__(self, handler):
            for mode in self.modes:
                self.handlers[mode] = handler
            return handler

    def __init__(self, data: str | buf | BatchLexer, state: BatchState | None = None):
        if isinstance(data, BatchLexer):
            if state is not None:
                raise NotImplementedError
            self.text = data.text
            self.code = data.code
            self.labels = data.labels
            self.state = data.state
        else:
            if state is None:
                state = BatchState()
            self.state = state
            self.preparse(data)

    def parse_group(self):
        self.group += 1

    def parse_label(self):
        if (m := self.mode) != Mode.Text or len(self.modes) != 1:
            raise EmulatorException(F'Switching to LABEL while in mode {m.name}')
        self.mode_switch(Mode.Label)

    def parse_set(self):
        if (m := self.mode) != Mode.Text or len(self.modes) != 1:
            raise EmulatorException(F'Switching to SET while in mode {m.name}')
        self.mode_switch(Mode.SetStarted)

    @property
    def environment(self):
        return self.state.environment

    def parse_arg_variable(self, var: ArgVar):
        """
        %* in a batch script refers to all the arguments (e.g. %1 %2 %3
            %4 %5 ...)
        Substitution of batch parameters (%n) has been enhanced.  You can
        now use the following optional syntax:
            %~1         - expands %1 removing any surrounding quotes (")
            %~f1        - expands %1 to a fully qualified path name
            %~d1        - expands %1 to a drive letter only
            %~p1        - expands %1 to a path only
            %~n1        - expands %1 to a file name only
            %~x1        - expands %1 to a file extension only
            %~s1        - expanded path contains short names only
            %~a1        - expands %1 to file attributes
            %~t1        - expands %1 to date/time of file
            %~z1        - expands %1 to size of file
            %~$PATH:1   - searches the directories listed in the PATH
                           environment variable and expands %1 to the fully
                           qualified name of the first one found.  If the
                           environment variable name is not defined or the
                           file is not found by the search, then this
                           modifier expands to the empty string
        The modifiers can be combined to get compound results:
            %~dp1       - expands %1 to a drive letter and path only
            %~nx1       - expands %1 to a file name and extension only
            %~dp$PATH:1 - searches the directories listed in the PATH
                           environment variable for %1 and expands to the
                           drive letter and path of the first one found.
            %~ftza1     - expands %1 to a DIR like output line
        In the above examples %1 and PATH can be replaced by other
        valid values.  The %~ syntax is terminated by a valid argument
        number.  The %~ modifiers may not be used with %*
        """
        state = self.state
        flags = var.flags

        if (k := var.offset) is (...):
            return state.command_line
        if (j := k - 1) < 0:
            argval = state.name
        elif j < len(args := state.args):
            argval = args[j]
        else:
            return ''

        if not flags:
            return argval

        has_path = 0 != ArgVarFlags.FullPath & flags

        if flags.StripQuotes and argval.startswith('"') and argval.endswith('"'):
            argval = argval[1:-1]

        if flags.ShortName and not has_path:
            flags |= ArgVarFlags.FullPath
            has_path = True

        with io.StringIO() as out:
            if flags.Attributes:
                out.write('--a--------') # TODO: placeholder
            if flags.DateTime:
                dt = state.start_time.isoformat(' ', 'minutes')
                out.write(F' {dt}')
            if flags.FileSize:
                out.write(F' {state.sizeof_file(argval)}')
            if has_path:
                out.write(' ')
                full_path = state.resolve_path(argval)
                drv, rest = ntpath.splitdrive(full_path)
                *pp, name = ntpath.split(rest)
                name, ext = ntpath.splitext(name)
                if flags.DriveLetter:
                    out.write(drv)
                if flags.FilePath:
                    out.write(ntpath.join(*pp))
                if flags.FileName:
                    out.write(name)
                if flags.FileExtension:
                    out.write(ext)
            return out.getvalue().lstrip()

    @property
    def modes(self):
        return self.cursor.modes

    def reset(self, offset: int):
        self.quote = False
        self.caret = False
        self.white = False
        self.first_after_gap = True
        self.group = 0
        self.cursor = BatchLexerCursor(offset)
        self.modes.append(Mode.Text)
        self.resume = None
        self.pending_redirect = None

    def mode_reset(self):
        del self.modes[1:]

    def mode_finish(self):
        modes = self.modes
        if len(modes) <= 1:
            raise RuntimeError('Trying to exit base mode.')
        self.modes.pop()

    def mode_switch(self, mode: Mode):
        self.modes.append(mode)

    @property
    def mode(self):
        return self.modes[-1]

    @mode.setter
    def mode(self, value: Mode):
        self.modes[-1] = value

    @property
    def substituting(self):
        return self.cursor.substituting

    @property
    def eof(self):
        return (c := self.cursor).offset >= len(self.code) and not c.subst_buffer

    def quick_save(self):
        self.resume = self.cursor.copy()

    def quick_load(self):
        if (resume := self.resume) is None:
            raise RuntimeError
        self.cursor = resume
        self.resume = None

    def current_char(self):
        cursor = self.cursor
        if not (subst := cursor.subst_buffer):
            offset = cursor.offset
            if self.code[offset] == PERCENT:
                cursor.offset += 1
                self.fill_substitution_buffer()
                return self.current_char()
        else:
            offset = cursor.subst_offset
            if offset >= (n := len(subst)):
                offset -= n
                offset += cursor.offset
            else:
                return subst[offset]
        try:
            return self.code[offset]
        except IndexError:
            raise UnexpectedEOF

    def consume_char(self):
        cursor = self.cursor
        if subst := cursor.subst_buffer:
            offset = cursor.subst_offset + 1
            if offset >= len(subst):
                del subst[:]
                cursor.subst_offset = -1
            else:
                cursor.subst_offset = offset
        else:
            offset = cursor.offset + 1
            if offset > len(self.code):
                raise EOFError('Consumed a character beyond EOF.')
            cursor.offset = offset

    def next_char(self):
        self.consume_char()
        return self.current_char()

    def parse_env_variable(self, var: str):
        if var == '':
            return '%'
        name, _, modifier = var.partition(':')
        base = self.state.envar(name)
        if not modifier or not base:
            return base
        if '=' in modifier:
            old, _, new = modifier.partition('=')
            kwargs = {}
            if old.startswith('~'):
                old = old[1:]
                kwargs.update(count=1)
            return base.replace(old, new, **kwargs)
        else:
            if not modifier.startswith('~'):
                raise EmulatorException
            offset, _, length = modifier[1:].partition(',')
            offset = batchint(offset)
            if offset < 0:
                offset = max(0, len(base) + offset)
            if length:
                end = offset + batchint(length)
            else:
                end = len(base)
            return base[offset:end]

    def emit_token(self):
        switched = False
        if (buffer := self.cursor.token) and (token := u16(buffer)):
            if (pr := self.pending_redirect):
                pr.target = unquote(token)
                self.pending_redirect = None
                self.mode_switch(Mode.Gap)
                yield pr
                switched = True
            else:
                yield Word(token)
        del buffer[:]
        self.first_after_gap = False
        return switched

    def tokens(self, offset: int) -> Generator[Token]:
        self.reset(offset)
        handlers = self._register.handlers
        current_char = self.current_char
        consume_char = self.consume_char
        size = len(self.code)

        while not self.cursor.eof(size):
            c = current_char()
            m = self.mode
            h = handlers[m]
            if (yield from h(self, m, c)):
                consume_char()

        if not self.first_after_gap:
            yield from self.emit_token()

    def check_line_break(self, mode: Mode, char: int):
        if char != LINEBREAK:
            return False
        if not self.caret:
            # caret is not reset until the next char!
            yield from self.emit_token()
            self.white = True
            self.quote = False
            self.mode_reset()
            yield Ctrl.NewLine
        self.consume_char()
        return True

    def check_caret(self, char: int):
        if self.caret:
            self.cursor.token.append(char)
            self.caret = False
            self.consume_char()
            return True
        elif char == CARET:
            self.caret = True
            self.consume_char()
            return True
        else:
            return False

    def check_command_separators(self, char: int):
        if char == PAREN_CLOSE and (g := self.group) > 0:
            yield from self.emit_token()
            yield Ctrl.EndGroup
            self.mode_reset()
            self.consume_char()
            self.group = g - 1
            return True
        elif char == AMPERSAND:
            tok = Ctrl.Ampersand
        elif char == PIPE:
            tok = Ctrl.Pipe
        else:
            return False
        yield from self.emit_token()
        self.mode_reset()
        self.consume_char()
        yield tok
        return True

    def check_quote_start(self, char: int):
        if char != QUOTE:
            return False
        self.cursor.token.append(char)
        self.mode_switch(Mode.Quote)
        self.caret = False
        self.consume_char()
        return True

    def check_redirect_io(self, char: int):
        if char not in ANGLES:
            return False

        output = char != ANGLE_OPEN
        token = self.cursor.token

        if len(token) == 1 and (src := token[0] - ZERO) in range(10):
            del token[:]
            source = src
        else:
            source = int(output)

        char = self.next_char()

        if not output:
            how = Redirect.In
        elif char == ANGLE_CLOSE:
            how = Redirect.OutAppend
            char = self.next_char()
        else:
            how = Redirect.OutCreate

        yield from self.emit_token()

        if char != AMPERSAND:
            self.pending_redirect = RedirectIO(how, source)
            self.mode_switch(Mode.Gap)
        else:
            char = self.next_char()
            if char not in range(ZERO, NINE + 1):
                raise UnexpectedToken(char)
            self.consume_char()
            yield RedirectIO(how, source, char - ZERO)

        return True

    def fill_substitution_buffer(self):
        if (cursor := self.cursor).substituting:
            return

        code = self.code
        var_resume = -1
        var_dollar = -1
        var_cmdarg = ArgVar()
        variable = None
        phase = EV.New
        q = ArgVarFlags.StripQuotes

        for current in range((current := cursor.offset), len(code)):
            char = code[current]
            if char == LINEBREAK:
                break
            elif char == PERCENT:
                try:
                    var_name = u16(self.code[cursor.offset:current])
                    variable = u16(self.parse_env_variable(var_name))
                except MissingVariable:
                    if var_resume < 0:
                        var_resume = current + 1
                    break
            elif var_cmdarg:
                if ZERO <= char <= NINE:
                    var_cmdarg.offset = char - ZERO
                    variable = u16(self.parse_arg_variable(var_cmdarg))
                elif char == ASTERIX and cursor.offset == current:
                    var_cmdarg.offset = (...)
                    variable = u16(self.parse_arg_variable(var_cmdarg))
            if variable is not None:
                cursor.subst_offset = 0
                cursor.subst_buffer.extend(variable)
                var_resume = current + 1
                break
            if phase == EV.Mod:
                var_cmdarg = None
            elif phase == EV.Env:
                if char == COLON:
                    if var_cmdarg:
                        assert var_dollar > 0
                        var_cmdarg.path = u16(self.code[var_dollar:current])
                    var_resume = current + 1
            else:
                if char == DOLLAR:
                    var_dollar = current + 1
                    phase = EV.Env
                    continue
                if char == COLON:
                    var_cmdarg = None
                    var_resume = current + 1
                    phase = EV.Mod
                    continue
                if not var_cmdarg:
                    continue
                try:
                    var_cmdarg.flags |= ArgVarFlags.FromToken(char)
                except KeyError:
                    var_cmdarg = None
                    continue
                if q not in var_cmdarg.flags:
                    var_cmdarg = None
        if var_resume >= 0:
            cursor.offset = var_resume

    @_register(Mode.Label)
    def gobble_label(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.check_line_break(mode, char)):
            return False
        self.cursor.token.append(char)
        return True

    @_register(Mode.Quote)
    def gobble_quote(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.check_line_break(mode, char)):
            return False
        self.cursor.token.append(char)
        if char == QUOTE:
            self.mode_finish()
        return True

    @_register(Mode.Whitespace)
    def gobble_whitespace(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if char in WHITESPACE:
            self.cursor.token.append(char)
            return True
        self.mode_finish()
        token = self.cursor.token
        yield Word(u16(token))
        del token[:]
        return False

    @_register(Mode.SetQuoted)
    def gobble_quoted_set(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if char == QUOTE:
            self.consume_char()
            self.cursor.token.append(QUOTE)
            self.quick_save()
            return False

        if char == LINEBREAK:
            if self.resume is None:
                yield from self.emit_token()
                self.mode_reset()
                yield Ctrl.NewLine
                return True
            elif self.caret:
                self.caret = False
                return True
            else:
                self.quick_load()
                yield from self.emit_token()
                return False

        if self.resume is not None:
            if char == CARET:
                self.caret = not self.caret
            elif not self.caret:
                if (char == PAREN_CLOSE and self.group > 0) or char in (PIPE, AMPERSAND):
                    self.quick_load()
                    yield from self.emit_token()
                    self.mode_finish()
                    # after a quick load, the ending quote was already consumed.
                    return False

        self.cursor.token.append(char)
        return True

    @_register(Mode.SetStarted)
    def gobble_set(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        token = self.cursor.token
        if (yield from self.check_line_break(mode, char)):
            return False
        if char in WHITESPACE:
            yield from self.emit_token()
            token.append(char)
            self.mode_switch(Mode.Whitespace)
            return True
        if char == SLASH and not self.pending_redirect:
            yield from self.emit_token()
            token.append(char)
            return True
        if not token and char == QUOTE:
            self.caret = False
            token.append(char)
            self.mode = Mode.SetQuoted
            return True
        if self.check_caret(char):
            return False
        if char == EQUALS:
            yield from self.emit_token()
            yield Ctrl.Equals
            self.mode = Mode.SetRegular
            return True
        if self.check_quote_start(char):
            return False
        if (yield from self.check_command_separators(char)):
            return False
        if (yield from self.check_redirect_io(char)):
            return False
        token.append(char)
        return True

    def common_token_checks(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        return (False
            or (yield from self.check_line_break(mode, char))
            or self.check_caret(char)
            or self.check_quote_start(char)
            or (yield from self.check_command_separators(char))
            or (yield from self.check_redirect_io(char)))

    @_register(Mode.SetRegular)
    def gobble_set_regular(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.common_token_checks(mode, char)):
            return False
        if (pr := self.pending_redirect) and char in WHITESPACE:
            token = self.cursor.token
            self.pending_redirect = None
            pr.target = unquote(u16(token))
            del token[:]
            yield pr
        self.cursor.token.append(char)
        return True

    @_register(Mode.Gap)
    def gobble_gap(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        yield from ()
        if char in SEPARATORS:
            return True
        self.mode_finish()
        self.first_after_gap = True
        return False

    @_register(Mode.Text)
    def gobble_txt(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.common_token_checks(mode, char)):
            return False
        if char in WHITESPACE:
            yield from self.emit_token()
            self.cursor.token.append(char)
            self.mode_switch(Mode.Whitespace)
            return True
        if char == SLASH and not self.pending_redirect:
            yield from self.emit_token()
        if char == COLON:
            if (yield from self.emit_token()):
                return False
            elif self.next_char() == COLON:
                yield Ctrl.Comment
                return True
            else:
                yield Ctrl.Label
                return False
        try:
            token = SeparatorMap[char]
        except KeyError:
            pass
        else:
            if (yield from self.emit_token()):
                return False
            else:
                yield token
                return True
        self.cursor.token.append(char)
        return True

    @staticmethod
    def label(text: str, uppercase=True):
        parts = re.split('([\x20\t\v])', text.lstrip())
        for k, part in itertools.islice(enumerate(parts), 0, None, 2):
            tq, part = uncaret(part, True)
            if not tq:
                parts[k] = part
                del parts[k + 1:]
                break
            parts[k] = part[:-1]
        label = ''.join(parts)
        if uppercase:
            label = label.upper()
        return label

    def preparse(self, text: str | buf):
        self.labels = {}

        if not isinstance(text, str):
            text = codecs.decode(text, 'utf8', errors='replace')

        _tail = text[-10:]
        lines = text.splitlines(keepends=False)
        utf16 = array.array('H')

        if _tail.splitlines() != F'{_tail}\n'.splitlines():
            # the text had a trailing line break, which is swallowed by the splitlines method
            lines.append('')

        for k, line in enumerate(lines):
            if k > 0:
                utf16.append(LINEBREAK)
            encoded = line.encode('utf-16le')
            if not encoded:
                continue
            encoded = memoryview(encoded).cast('H')
            offset = len(utf16)
            prefix = re.search('^@?[\\s]*:', line)
            if prefix:
                p = prefix.end()
                if lb := self.label(u16(encoded[p:])):
                    self.labels.setdefault(lb, offset + p - 1)
            utf16.extend(encoded)

        self.text = text
        self.code = memoryview(utf16)

    if set(_register.handlers) != set(Mode):
        raise NotImplementedError('Not all handlers were implemented.')

Classes

class Mode (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class Mode(enum.IntEnum):
    Text = 0
    Whitespace = enum.auto()
    Quote = enum.auto()
    Label = enum.auto()
    Gap = enum.auto()
    SetStarted = enum.auto()
    SetRegular = enum.auto()
    SetQuoted = enum.auto()

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var Text

The type of the None singleton.

var Whitespace

The type of the None singleton.

var Quote

The type of the None singleton.

var Label

The type of the None singleton.

var Gap

The type of the None singleton.

var SetStarted

The type of the None singleton.

var SetRegular

The type of the None singleton.

var SetQuoted

The type of the None singleton.

class EV (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class EV(enum.IntEnum):
    New = PERCENT
    Env = DOLLAR
    Mod = COLON

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var New

The type of the None singleton.

var Env

The type of the None singleton.

var Mod

The type of the None singleton.

class BatchLexerCursor (offset=0, modes=<factory>, token=<factory>, subst_offset=0, subst_buffer=<factory>)

BatchLexerCursor(offset: 'int' = 0, modes: 'list[Mode]' = , token: 'array.array[int]' = , subst_offset: 'int' = 0, subst_buffer: 'array.array[int]' = )

Expand source code Browse git
@dataclass
class BatchLexerCursor:
    offset: int = 0
    modes: list[Mode] = field(default_factory=list)
    token: array.array[int] = field(
        default_factory=lambda: array.array('H'))
    subst_offset: int = 0
    subst_buffer: array.array[int] = field(
        default_factory=lambda: array.array('H'))

    def eof(self, size: int):
        if self.offset < size:
            return False
        return (n := len(self.subst_buffer)) == 0 or self.subst_offset >= n

    def copy(self):
        return self.__class__(
            self.offset,
            list(self.modes),
            array.array('H', self.token),
            self.subst_offset,
            array.array('H', self.subst_buffer)
        )

    @property
    def substituting(self):
        return len(self.subst_buffer) > 0

Instance variables

var modes

The type of the None singleton.

var token

The type of the None singleton.

var subst_buffer

The type of the None singleton.

var offset

The type of the None singleton.

var subst_offset

The type of the None singleton.

var substituting
Expand source code Browse git
@property
def substituting(self):
    return len(self.subst_buffer) > 0

Methods

def eof(self, size)
Expand source code Browse git
def eof(self, size: int):
    if self.offset < size:
        return False
    return (n := len(self.subst_buffer)) == 0 or self.subst_offset >= n
def copy(self)
Expand source code Browse git
def copy(self):
    return self.__class__(
        self.offset,
        list(self.modes),
        array.array('H', self.token),
        self.subst_offset,
        array.array('H', self.subst_buffer)
    )
class BatchLexer (data, state=None)
Expand source code Browse git
class BatchLexer:

    labels: dict[str, int]
    code: memoryview

    pending_redirect: RedirectIO | None

    cursor: BatchLexerCursor
    resume: BatchLexerCursor | None

    class _register:
        # A handler is given the current mode and char. It returns a boolean indicating
        # whether or not the character was processed and may be consumed.
        handlers: ClassVar[dict[Mode, Callable[
            [BatchLexer, Mode, int], Generator[Token, None, bool]
        ]]] = {}

        def __init__(self, *modes: Mode):
            self.modes = modes

        def __call__(self, handler):
            for mode in self.modes:
                self.handlers[mode] = handler
            return handler

    def __init__(self, data: str | buf | BatchLexer, state: BatchState | None = None):
        if isinstance(data, BatchLexer):
            if state is not None:
                raise NotImplementedError
            self.text = data.text
            self.code = data.code
            self.labels = data.labels
            self.state = data.state
        else:
            if state is None:
                state = BatchState()
            self.state = state
            self.preparse(data)

    def parse_group(self):
        self.group += 1

    def parse_label(self):
        if (m := self.mode) != Mode.Text or len(self.modes) != 1:
            raise EmulatorException(F'Switching to LABEL while in mode {m.name}')
        self.mode_switch(Mode.Label)

    def parse_set(self):
        if (m := self.mode) != Mode.Text or len(self.modes) != 1:
            raise EmulatorException(F'Switching to SET while in mode {m.name}')
        self.mode_switch(Mode.SetStarted)

    @property
    def environment(self):
        return self.state.environment

    def parse_arg_variable(self, var: ArgVar):
        """
        %* in a batch script refers to all the arguments (e.g. %1 %2 %3
            %4 %5 ...)
        Substitution of batch parameters (%n) has been enhanced.  You can
        now use the following optional syntax:
            %~1         - expands %1 removing any surrounding quotes (")
            %~f1        - expands %1 to a fully qualified path name
            %~d1        - expands %1 to a drive letter only
            %~p1        - expands %1 to a path only
            %~n1        - expands %1 to a file name only
            %~x1        - expands %1 to a file extension only
            %~s1        - expanded path contains short names only
            %~a1        - expands %1 to file attributes
            %~t1        - expands %1 to date/time of file
            %~z1        - expands %1 to size of file
            %~$PATH:1   - searches the directories listed in the PATH
                           environment variable and expands %1 to the fully
                           qualified name of the first one found.  If the
                           environment variable name is not defined or the
                           file is not found by the search, then this
                           modifier expands to the empty string
        The modifiers can be combined to get compound results:
            %~dp1       - expands %1 to a drive letter and path only
            %~nx1       - expands %1 to a file name and extension only
            %~dp$PATH:1 - searches the directories listed in the PATH
                           environment variable for %1 and expands to the
                           drive letter and path of the first one found.
            %~ftza1     - expands %1 to a DIR like output line
        In the above examples %1 and PATH can be replaced by other
        valid values.  The %~ syntax is terminated by a valid argument
        number.  The %~ modifiers may not be used with %*
        """
        state = self.state
        flags = var.flags

        if (k := var.offset) is (...):
            return state.command_line
        if (j := k - 1) < 0:
            argval = state.name
        elif j < len(args := state.args):
            argval = args[j]
        else:
            return ''

        if not flags:
            return argval

        has_path = 0 != ArgVarFlags.FullPath & flags

        if flags.StripQuotes and argval.startswith('"') and argval.endswith('"'):
            argval = argval[1:-1]

        if flags.ShortName and not has_path:
            flags |= ArgVarFlags.FullPath
            has_path = True

        with io.StringIO() as out:
            if flags.Attributes:
                out.write('--a--------') # TODO: placeholder
            if flags.DateTime:
                dt = state.start_time.isoformat(' ', 'minutes')
                out.write(F' {dt}')
            if flags.FileSize:
                out.write(F' {state.sizeof_file(argval)}')
            if has_path:
                out.write(' ')
                full_path = state.resolve_path(argval)
                drv, rest = ntpath.splitdrive(full_path)
                *pp, name = ntpath.split(rest)
                name, ext = ntpath.splitext(name)
                if flags.DriveLetter:
                    out.write(drv)
                if flags.FilePath:
                    out.write(ntpath.join(*pp))
                if flags.FileName:
                    out.write(name)
                if flags.FileExtension:
                    out.write(ext)
            return out.getvalue().lstrip()

    @property
    def modes(self):
        return self.cursor.modes

    def reset(self, offset: int):
        self.quote = False
        self.caret = False
        self.white = False
        self.first_after_gap = True
        self.group = 0
        self.cursor = BatchLexerCursor(offset)
        self.modes.append(Mode.Text)
        self.resume = None
        self.pending_redirect = None

    def mode_reset(self):
        del self.modes[1:]

    def mode_finish(self):
        modes = self.modes
        if len(modes) <= 1:
            raise RuntimeError('Trying to exit base mode.')
        self.modes.pop()

    def mode_switch(self, mode: Mode):
        self.modes.append(mode)

    @property
    def mode(self):
        return self.modes[-1]

    @mode.setter
    def mode(self, value: Mode):
        self.modes[-1] = value

    @property
    def substituting(self):
        return self.cursor.substituting

    @property
    def eof(self):
        return (c := self.cursor).offset >= len(self.code) and not c.subst_buffer

    def quick_save(self):
        self.resume = self.cursor.copy()

    def quick_load(self):
        if (resume := self.resume) is None:
            raise RuntimeError
        self.cursor = resume
        self.resume = None

    def current_char(self):
        cursor = self.cursor
        if not (subst := cursor.subst_buffer):
            offset = cursor.offset
            if self.code[offset] == PERCENT:
                cursor.offset += 1
                self.fill_substitution_buffer()
                return self.current_char()
        else:
            offset = cursor.subst_offset
            if offset >= (n := len(subst)):
                offset -= n
                offset += cursor.offset
            else:
                return subst[offset]
        try:
            return self.code[offset]
        except IndexError:
            raise UnexpectedEOF

    def consume_char(self):
        cursor = self.cursor
        if subst := cursor.subst_buffer:
            offset = cursor.subst_offset + 1
            if offset >= len(subst):
                del subst[:]
                cursor.subst_offset = -1
            else:
                cursor.subst_offset = offset
        else:
            offset = cursor.offset + 1
            if offset > len(self.code):
                raise EOFError('Consumed a character beyond EOF.')
            cursor.offset = offset

    def next_char(self):
        self.consume_char()
        return self.current_char()

    def parse_env_variable(self, var: str):
        if var == '':
            return '%'
        name, _, modifier = var.partition(':')
        base = self.state.envar(name)
        if not modifier or not base:
            return base
        if '=' in modifier:
            old, _, new = modifier.partition('=')
            kwargs = {}
            if old.startswith('~'):
                old = old[1:]
                kwargs.update(count=1)
            return base.replace(old, new, **kwargs)
        else:
            if not modifier.startswith('~'):
                raise EmulatorException
            offset, _, length = modifier[1:].partition(',')
            offset = batchint(offset)
            if offset < 0:
                offset = max(0, len(base) + offset)
            if length:
                end = offset + batchint(length)
            else:
                end = len(base)
            return base[offset:end]

    def emit_token(self):
        switched = False
        if (buffer := self.cursor.token) and (token := u16(buffer)):
            if (pr := self.pending_redirect):
                pr.target = unquote(token)
                self.pending_redirect = None
                self.mode_switch(Mode.Gap)
                yield pr
                switched = True
            else:
                yield Word(token)
        del buffer[:]
        self.first_after_gap = False
        return switched

    def tokens(self, offset: int) -> Generator[Token]:
        self.reset(offset)
        handlers = self._register.handlers
        current_char = self.current_char
        consume_char = self.consume_char
        size = len(self.code)

        while not self.cursor.eof(size):
            c = current_char()
            m = self.mode
            h = handlers[m]
            if (yield from h(self, m, c)):
                consume_char()

        if not self.first_after_gap:
            yield from self.emit_token()

    def check_line_break(self, mode: Mode, char: int):
        if char != LINEBREAK:
            return False
        if not self.caret:
            # caret is not reset until the next char!
            yield from self.emit_token()
            self.white = True
            self.quote = False
            self.mode_reset()
            yield Ctrl.NewLine
        self.consume_char()
        return True

    def check_caret(self, char: int):
        if self.caret:
            self.cursor.token.append(char)
            self.caret = False
            self.consume_char()
            return True
        elif char == CARET:
            self.caret = True
            self.consume_char()
            return True
        else:
            return False

    def check_command_separators(self, char: int):
        if char == PAREN_CLOSE and (g := self.group) > 0:
            yield from self.emit_token()
            yield Ctrl.EndGroup
            self.mode_reset()
            self.consume_char()
            self.group = g - 1
            return True
        elif char == AMPERSAND:
            tok = Ctrl.Ampersand
        elif char == PIPE:
            tok = Ctrl.Pipe
        else:
            return False
        yield from self.emit_token()
        self.mode_reset()
        self.consume_char()
        yield tok
        return True

    def check_quote_start(self, char: int):
        if char != QUOTE:
            return False
        self.cursor.token.append(char)
        self.mode_switch(Mode.Quote)
        self.caret = False
        self.consume_char()
        return True

    def check_redirect_io(self, char: int):
        if char not in ANGLES:
            return False

        output = char != ANGLE_OPEN
        token = self.cursor.token

        if len(token) == 1 and (src := token[0] - ZERO) in range(10):
            del token[:]
            source = src
        else:
            source = int(output)

        char = self.next_char()

        if not output:
            how = Redirect.In
        elif char == ANGLE_CLOSE:
            how = Redirect.OutAppend
            char = self.next_char()
        else:
            how = Redirect.OutCreate

        yield from self.emit_token()

        if char != AMPERSAND:
            self.pending_redirect = RedirectIO(how, source)
            self.mode_switch(Mode.Gap)
        else:
            char = self.next_char()
            if char not in range(ZERO, NINE + 1):
                raise UnexpectedToken(char)
            self.consume_char()
            yield RedirectIO(how, source, char - ZERO)

        return True

    def fill_substitution_buffer(self):
        if (cursor := self.cursor).substituting:
            return

        code = self.code
        var_resume = -1
        var_dollar = -1
        var_cmdarg = ArgVar()
        variable = None
        phase = EV.New
        q = ArgVarFlags.StripQuotes

        for current in range((current := cursor.offset), len(code)):
            char = code[current]
            if char == LINEBREAK:
                break
            elif char == PERCENT:
                try:
                    var_name = u16(self.code[cursor.offset:current])
                    variable = u16(self.parse_env_variable(var_name))
                except MissingVariable:
                    if var_resume < 0:
                        var_resume = current + 1
                    break
            elif var_cmdarg:
                if ZERO <= char <= NINE:
                    var_cmdarg.offset = char - ZERO
                    variable = u16(self.parse_arg_variable(var_cmdarg))
                elif char == ASTERIX and cursor.offset == current:
                    var_cmdarg.offset = (...)
                    variable = u16(self.parse_arg_variable(var_cmdarg))
            if variable is not None:
                cursor.subst_offset = 0
                cursor.subst_buffer.extend(variable)
                var_resume = current + 1
                break
            if phase == EV.Mod:
                var_cmdarg = None
            elif phase == EV.Env:
                if char == COLON:
                    if var_cmdarg:
                        assert var_dollar > 0
                        var_cmdarg.path = u16(self.code[var_dollar:current])
                    var_resume = current + 1
            else:
                if char == DOLLAR:
                    var_dollar = current + 1
                    phase = EV.Env
                    continue
                if char == COLON:
                    var_cmdarg = None
                    var_resume = current + 1
                    phase = EV.Mod
                    continue
                if not var_cmdarg:
                    continue
                try:
                    var_cmdarg.flags |= ArgVarFlags.FromToken(char)
                except KeyError:
                    var_cmdarg = None
                    continue
                if q not in var_cmdarg.flags:
                    var_cmdarg = None
        if var_resume >= 0:
            cursor.offset = var_resume

    @_register(Mode.Label)
    def gobble_label(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.check_line_break(mode, char)):
            return False
        self.cursor.token.append(char)
        return True

    @_register(Mode.Quote)
    def gobble_quote(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.check_line_break(mode, char)):
            return False
        self.cursor.token.append(char)
        if char == QUOTE:
            self.mode_finish()
        return True

    @_register(Mode.Whitespace)
    def gobble_whitespace(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if char in WHITESPACE:
            self.cursor.token.append(char)
            return True
        self.mode_finish()
        token = self.cursor.token
        yield Word(u16(token))
        del token[:]
        return False

    @_register(Mode.SetQuoted)
    def gobble_quoted_set(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if char == QUOTE:
            self.consume_char()
            self.cursor.token.append(QUOTE)
            self.quick_save()
            return False

        if char == LINEBREAK:
            if self.resume is None:
                yield from self.emit_token()
                self.mode_reset()
                yield Ctrl.NewLine
                return True
            elif self.caret:
                self.caret = False
                return True
            else:
                self.quick_load()
                yield from self.emit_token()
                return False

        if self.resume is not None:
            if char == CARET:
                self.caret = not self.caret
            elif not self.caret:
                if (char == PAREN_CLOSE and self.group > 0) or char in (PIPE, AMPERSAND):
                    self.quick_load()
                    yield from self.emit_token()
                    self.mode_finish()
                    # after a quick load, the ending quote was already consumed.
                    return False

        self.cursor.token.append(char)
        return True

    @_register(Mode.SetStarted)
    def gobble_set(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        token = self.cursor.token
        if (yield from self.check_line_break(mode, char)):
            return False
        if char in WHITESPACE:
            yield from self.emit_token()
            token.append(char)
            self.mode_switch(Mode.Whitespace)
            return True
        if char == SLASH and not self.pending_redirect:
            yield from self.emit_token()
            token.append(char)
            return True
        if not token and char == QUOTE:
            self.caret = False
            token.append(char)
            self.mode = Mode.SetQuoted
            return True
        if self.check_caret(char):
            return False
        if char == EQUALS:
            yield from self.emit_token()
            yield Ctrl.Equals
            self.mode = Mode.SetRegular
            return True
        if self.check_quote_start(char):
            return False
        if (yield from self.check_command_separators(char)):
            return False
        if (yield from self.check_redirect_io(char)):
            return False
        token.append(char)
        return True

    def common_token_checks(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        return (False
            or (yield from self.check_line_break(mode, char))
            or self.check_caret(char)
            or self.check_quote_start(char)
            or (yield from self.check_command_separators(char))
            or (yield from self.check_redirect_io(char)))

    @_register(Mode.SetRegular)
    def gobble_set_regular(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.common_token_checks(mode, char)):
            return False
        if (pr := self.pending_redirect) and char in WHITESPACE:
            token = self.cursor.token
            self.pending_redirect = None
            pr.target = unquote(u16(token))
            del token[:]
            yield pr
        self.cursor.token.append(char)
        return True

    @_register(Mode.Gap)
    def gobble_gap(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        yield from ()
        if char in SEPARATORS:
            return True
        self.mode_finish()
        self.first_after_gap = True
        return False

    @_register(Mode.Text)
    def gobble_txt(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
        if (yield from self.common_token_checks(mode, char)):
            return False
        if char in WHITESPACE:
            yield from self.emit_token()
            self.cursor.token.append(char)
            self.mode_switch(Mode.Whitespace)
            return True
        if char == SLASH and not self.pending_redirect:
            yield from self.emit_token()
        if char == COLON:
            if (yield from self.emit_token()):
                return False
            elif self.next_char() == COLON:
                yield Ctrl.Comment
                return True
            else:
                yield Ctrl.Label
                return False
        try:
            token = SeparatorMap[char]
        except KeyError:
            pass
        else:
            if (yield from self.emit_token()):
                return False
            else:
                yield token
                return True
        self.cursor.token.append(char)
        return True

    @staticmethod
    def label(text: str, uppercase=True):
        parts = re.split('([\x20\t\v])', text.lstrip())
        for k, part in itertools.islice(enumerate(parts), 0, None, 2):
            tq, part = uncaret(part, True)
            if not tq:
                parts[k] = part
                del parts[k + 1:]
                break
            parts[k] = part[:-1]
        label = ''.join(parts)
        if uppercase:
            label = label.upper()
        return label

    def preparse(self, text: str | buf):
        self.labels = {}

        if not isinstance(text, str):
            text = codecs.decode(text, 'utf8', errors='replace')

        _tail = text[-10:]
        lines = text.splitlines(keepends=False)
        utf16 = array.array('H')

        if _tail.splitlines() != F'{_tail}\n'.splitlines():
            # the text had a trailing line break, which is swallowed by the splitlines method
            lines.append('')

        for k, line in enumerate(lines):
            if k > 0:
                utf16.append(LINEBREAK)
            encoded = line.encode('utf-16le')
            if not encoded:
                continue
            encoded = memoryview(encoded).cast('H')
            offset = len(utf16)
            prefix = re.search('^@?[\\s]*:', line)
            if prefix:
                p = prefix.end()
                if lb := self.label(u16(encoded[p:])):
                    self.labels.setdefault(lb, offset + p - 1)
            utf16.extend(encoded)

        self.text = text
        self.code = memoryview(utf16)

    if set(_register.handlers) != set(Mode):
        raise NotImplementedError('Not all handlers were implemented.')

Class variables

var labels

The type of the None singleton.

var code

The type of the None singleton.

var pending_redirect

The type of the None singleton.

var cursor

The type of the None singleton.

var resume

The type of the None singleton.

Static methods

def label(text, uppercase=True)
Expand source code Browse git
@staticmethod
def label(text: str, uppercase=True):
    parts = re.split('([\x20\t\v])', text.lstrip())
    for k, part in itertools.islice(enumerate(parts), 0, None, 2):
        tq, part = uncaret(part, True)
        if not tq:
            parts[k] = part
            del parts[k + 1:]
            break
        parts[k] = part[:-1]
    label = ''.join(parts)
    if uppercase:
        label = label.upper()
    return label

Instance variables

var environment
Expand source code Browse git
@property
def environment(self):
    return self.state.environment
var modes
Expand source code Browse git
@property
def modes(self):
    return self.cursor.modes
var mode
Expand source code Browse git
@property
def mode(self):
    return self.modes[-1]
var substituting
Expand source code Browse git
@property
def substituting(self):
    return self.cursor.substituting
var eof
Expand source code Browse git
@property
def eof(self):
    return (c := self.cursor).offset >= len(self.code) and not c.subst_buffer

Methods

def parse_group(self)
Expand source code Browse git
def parse_group(self):
    self.group += 1
def parse_label(self)
Expand source code Browse git
def parse_label(self):
    if (m := self.mode) != Mode.Text or len(self.modes) != 1:
        raise EmulatorException(F'Switching to LABEL while in mode {m.name}')
    self.mode_switch(Mode.Label)
def parse_set(self)
Expand source code Browse git
def parse_set(self):
    if (m := self.mode) != Mode.Text or len(self.modes) != 1:
        raise EmulatorException(F'Switching to SET while in mode {m.name}')
    self.mode_switch(Mode.SetStarted)
def parse_arg_variable(self, var)

% in a batch script refers to all the arguments (e.g. %1 %2 %3 %4 %5 …) Substitution of batch parameters (%n) has been enhanced. You can now use the following optional syntax: %~1 - expands %1 removing any surrounding quotes (") %~f1 - expands %1 to a fully qualified path name %~d1 - expands %1 to a drive letter only %~p1 - expands %1 to a path only %~n1 - expands %1 to a file name only %~x1 - expands %1 to a file extension only %~s1 - expanded path contains short names only %~a1 - expands %1 to file attributes %~t1 - expands %1 to date/time of file %~z1 - expands %1 to size of file %~$PATH:1 - searches the directories listed in the PATH environment variable and expands %1 to the fully qualified name of the first one found. If the environment variable name is not defined or the file is not found by the search, then this modifier expands to the empty string The modifiers can be combined to get compound results: %~dp1 - expands %1 to a drive letter and path only %~nx1 - expands %1 to a file name and extension only %~dp$PATH:1 - searches the directories listed in the PATH environment variable for %1 and expands to the drive letter and path of the first one found. %~ftza1 - expands %1 to a DIR like output line In the above examples %1 and PATH can be replaced by other valid values. The %~ syntax is terminated by a valid argument number. The %~ modifiers may not be used with %

Expand source code Browse git
def parse_arg_variable(self, var: ArgVar):
    """
    %* in a batch script refers to all the arguments (e.g. %1 %2 %3
        %4 %5 ...)
    Substitution of batch parameters (%n) has been enhanced.  You can
    now use the following optional syntax:
        %~1         - expands %1 removing any surrounding quotes (")
        %~f1        - expands %1 to a fully qualified path name
        %~d1        - expands %1 to a drive letter only
        %~p1        - expands %1 to a path only
        %~n1        - expands %1 to a file name only
        %~x1        - expands %1 to a file extension only
        %~s1        - expanded path contains short names only
        %~a1        - expands %1 to file attributes
        %~t1        - expands %1 to date/time of file
        %~z1        - expands %1 to size of file
        %~$PATH:1   - searches the directories listed in the PATH
                       environment variable and expands %1 to the fully
                       qualified name of the first one found.  If the
                       environment variable name is not defined or the
                       file is not found by the search, then this
                       modifier expands to the empty string
    The modifiers can be combined to get compound results:
        %~dp1       - expands %1 to a drive letter and path only
        %~nx1       - expands %1 to a file name and extension only
        %~dp$PATH:1 - searches the directories listed in the PATH
                       environment variable for %1 and expands to the
                       drive letter and path of the first one found.
        %~ftza1     - expands %1 to a DIR like output line
    In the above examples %1 and PATH can be replaced by other
    valid values.  The %~ syntax is terminated by a valid argument
    number.  The %~ modifiers may not be used with %*
    """
    state = self.state
    flags = var.flags

    if (k := var.offset) is (...):
        return state.command_line
    if (j := k - 1) < 0:
        argval = state.name
    elif j < len(args := state.args):
        argval = args[j]
    else:
        return ''

    if not flags:
        return argval

    has_path = 0 != ArgVarFlags.FullPath & flags

    if flags.StripQuotes and argval.startswith('"') and argval.endswith('"'):
        argval = argval[1:-1]

    if flags.ShortName and not has_path:
        flags |= ArgVarFlags.FullPath
        has_path = True

    with io.StringIO() as out:
        if flags.Attributes:
            out.write('--a--------') # TODO: placeholder
        if flags.DateTime:
            dt = state.start_time.isoformat(' ', 'minutes')
            out.write(F' {dt}')
        if flags.FileSize:
            out.write(F' {state.sizeof_file(argval)}')
        if has_path:
            out.write(' ')
            full_path = state.resolve_path(argval)
            drv, rest = ntpath.splitdrive(full_path)
            *pp, name = ntpath.split(rest)
            name, ext = ntpath.splitext(name)
            if flags.DriveLetter:
                out.write(drv)
            if flags.FilePath:
                out.write(ntpath.join(*pp))
            if flags.FileName:
                out.write(name)
            if flags.FileExtension:
                out.write(ext)
        return out.getvalue().lstrip()
def reset(self, offset)
Expand source code Browse git
def reset(self, offset: int):
    self.quote = False
    self.caret = False
    self.white = False
    self.first_after_gap = True
    self.group = 0
    self.cursor = BatchLexerCursor(offset)
    self.modes.append(Mode.Text)
    self.resume = None
    self.pending_redirect = None
def mode_reset(self)
Expand source code Browse git
def mode_reset(self):
    del self.modes[1:]
def mode_finish(self)
Expand source code Browse git
def mode_finish(self):
    modes = self.modes
    if len(modes) <= 1:
        raise RuntimeError('Trying to exit base mode.')
    self.modes.pop()
def mode_switch(self, mode)
Expand source code Browse git
def mode_switch(self, mode: Mode):
    self.modes.append(mode)
def quick_save(self)
Expand source code Browse git
def quick_save(self):
    self.resume = self.cursor.copy()
def quick_load(self)
Expand source code Browse git
def quick_load(self):
    if (resume := self.resume) is None:
        raise RuntimeError
    self.cursor = resume
    self.resume = None
def current_char(self)
Expand source code Browse git
def current_char(self):
    cursor = self.cursor
    if not (subst := cursor.subst_buffer):
        offset = cursor.offset
        if self.code[offset] == PERCENT:
            cursor.offset += 1
            self.fill_substitution_buffer()
            return self.current_char()
    else:
        offset = cursor.subst_offset
        if offset >= (n := len(subst)):
            offset -= n
            offset += cursor.offset
        else:
            return subst[offset]
    try:
        return self.code[offset]
    except IndexError:
        raise UnexpectedEOF
def consume_char(self)
Expand source code Browse git
def consume_char(self):
    cursor = self.cursor
    if subst := cursor.subst_buffer:
        offset = cursor.subst_offset + 1
        if offset >= len(subst):
            del subst[:]
            cursor.subst_offset = -1
        else:
            cursor.subst_offset = offset
    else:
        offset = cursor.offset + 1
        if offset > len(self.code):
            raise EOFError('Consumed a character beyond EOF.')
        cursor.offset = offset
def next_char(self)
Expand source code Browse git
def next_char(self):
    self.consume_char()
    return self.current_char()
def parse_env_variable(self, var)
Expand source code Browse git
def parse_env_variable(self, var: str):
    if var == '':
        return '%'
    name, _, modifier = var.partition(':')
    base = self.state.envar(name)
    if not modifier or not base:
        return base
    if '=' in modifier:
        old, _, new = modifier.partition('=')
        kwargs = {}
        if old.startswith('~'):
            old = old[1:]
            kwargs.update(count=1)
        return base.replace(old, new, **kwargs)
    else:
        if not modifier.startswith('~'):
            raise EmulatorException
        offset, _, length = modifier[1:].partition(',')
        offset = batchint(offset)
        if offset < 0:
            offset = max(0, len(base) + offset)
        if length:
            end = offset + batchint(length)
        else:
            end = len(base)
        return base[offset:end]
def emit_token(self)
Expand source code Browse git
def emit_token(self):
    switched = False
    if (buffer := self.cursor.token) and (token := u16(buffer)):
        if (pr := self.pending_redirect):
            pr.target = unquote(token)
            self.pending_redirect = None
            self.mode_switch(Mode.Gap)
            yield pr
            switched = True
        else:
            yield Word(token)
    del buffer[:]
    self.first_after_gap = False
    return switched
def tokens(self, offset)
Expand source code Browse git
def tokens(self, offset: int) -> Generator[Token]:
    self.reset(offset)
    handlers = self._register.handlers
    current_char = self.current_char
    consume_char = self.consume_char
    size = len(self.code)

    while not self.cursor.eof(size):
        c = current_char()
        m = self.mode
        h = handlers[m]
        if (yield from h(self, m, c)):
            consume_char()

    if not self.first_after_gap:
        yield from self.emit_token()
def check_line_break(self, mode, char)
Expand source code Browse git
def check_line_break(self, mode: Mode, char: int):
    if char != LINEBREAK:
        return False
    if not self.caret:
        # caret is not reset until the next char!
        yield from self.emit_token()
        self.white = True
        self.quote = False
        self.mode_reset()
        yield Ctrl.NewLine
    self.consume_char()
    return True
def check_caret(self, char)
Expand source code Browse git
def check_caret(self, char: int):
    if self.caret:
        self.cursor.token.append(char)
        self.caret = False
        self.consume_char()
        return True
    elif char == CARET:
        self.caret = True
        self.consume_char()
        return True
    else:
        return False
def check_command_separators(self, char)
Expand source code Browse git
def check_command_separators(self, char: int):
    if char == PAREN_CLOSE and (g := self.group) > 0:
        yield from self.emit_token()
        yield Ctrl.EndGroup
        self.mode_reset()
        self.consume_char()
        self.group = g - 1
        return True
    elif char == AMPERSAND:
        tok = Ctrl.Ampersand
    elif char == PIPE:
        tok = Ctrl.Pipe
    else:
        return False
    yield from self.emit_token()
    self.mode_reset()
    self.consume_char()
    yield tok
    return True
def check_quote_start(self, char)
Expand source code Browse git
def check_quote_start(self, char: int):
    if char != QUOTE:
        return False
    self.cursor.token.append(char)
    self.mode_switch(Mode.Quote)
    self.caret = False
    self.consume_char()
    return True
def check_redirect_io(self, char)
Expand source code Browse git
def check_redirect_io(self, char: int):
    if char not in ANGLES:
        return False

    output = char != ANGLE_OPEN
    token = self.cursor.token

    if len(token) == 1 and (src := token[0] - ZERO) in range(10):
        del token[:]
        source = src
    else:
        source = int(output)

    char = self.next_char()

    if not output:
        how = Redirect.In
    elif char == ANGLE_CLOSE:
        how = Redirect.OutAppend
        char = self.next_char()
    else:
        how = Redirect.OutCreate

    yield from self.emit_token()

    if char != AMPERSAND:
        self.pending_redirect = RedirectIO(how, source)
        self.mode_switch(Mode.Gap)
    else:
        char = self.next_char()
        if char not in range(ZERO, NINE + 1):
            raise UnexpectedToken(char)
        self.consume_char()
        yield RedirectIO(how, source, char - ZERO)

    return True
def fill_substitution_buffer(self)
Expand source code Browse git
def fill_substitution_buffer(self):
    if (cursor := self.cursor).substituting:
        return

    code = self.code
    var_resume = -1
    var_dollar = -1
    var_cmdarg = ArgVar()
    variable = None
    phase = EV.New
    q = ArgVarFlags.StripQuotes

    for current in range((current := cursor.offset), len(code)):
        char = code[current]
        if char == LINEBREAK:
            break
        elif char == PERCENT:
            try:
                var_name = u16(self.code[cursor.offset:current])
                variable = u16(self.parse_env_variable(var_name))
            except MissingVariable:
                if var_resume < 0:
                    var_resume = current + 1
                break
        elif var_cmdarg:
            if ZERO <= char <= NINE:
                var_cmdarg.offset = char - ZERO
                variable = u16(self.parse_arg_variable(var_cmdarg))
            elif char == ASTERIX and cursor.offset == current:
                var_cmdarg.offset = (...)
                variable = u16(self.parse_arg_variable(var_cmdarg))
        if variable is not None:
            cursor.subst_offset = 0
            cursor.subst_buffer.extend(variable)
            var_resume = current + 1
            break
        if phase == EV.Mod:
            var_cmdarg = None
        elif phase == EV.Env:
            if char == COLON:
                if var_cmdarg:
                    assert var_dollar > 0
                    var_cmdarg.path = u16(self.code[var_dollar:current])
                var_resume = current + 1
        else:
            if char == DOLLAR:
                var_dollar = current + 1
                phase = EV.Env
                continue
            if char == COLON:
                var_cmdarg = None
                var_resume = current + 1
                phase = EV.Mod
                continue
            if not var_cmdarg:
                continue
            try:
                var_cmdarg.flags |= ArgVarFlags.FromToken(char)
            except KeyError:
                var_cmdarg = None
                continue
            if q not in var_cmdarg.flags:
                var_cmdarg = None
    if var_resume >= 0:
        cursor.offset = var_resume
def gobble_label(self, mode, char)
Expand source code Browse git
@_register(Mode.Label)
def gobble_label(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    if (yield from self.check_line_break(mode, char)):
        return False
    self.cursor.token.append(char)
    return True
def gobble_quote(self, mode, char)
Expand source code Browse git
@_register(Mode.Quote)
def gobble_quote(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    if (yield from self.check_line_break(mode, char)):
        return False
    self.cursor.token.append(char)
    if char == QUOTE:
        self.mode_finish()
    return True
def gobble_whitespace(self, mode, char)
Expand source code Browse git
@_register(Mode.Whitespace)
def gobble_whitespace(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    if char in WHITESPACE:
        self.cursor.token.append(char)
        return True
    self.mode_finish()
    token = self.cursor.token
    yield Word(u16(token))
    del token[:]
    return False
def gobble_quoted_set(self, mode, char)
Expand source code Browse git
@_register(Mode.SetQuoted)
def gobble_quoted_set(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    if char == QUOTE:
        self.consume_char()
        self.cursor.token.append(QUOTE)
        self.quick_save()
        return False

    if char == LINEBREAK:
        if self.resume is None:
            yield from self.emit_token()
            self.mode_reset()
            yield Ctrl.NewLine
            return True
        elif self.caret:
            self.caret = False
            return True
        else:
            self.quick_load()
            yield from self.emit_token()
            return False

    if self.resume is not None:
        if char == CARET:
            self.caret = not self.caret
        elif not self.caret:
            if (char == PAREN_CLOSE and self.group > 0) or char in (PIPE, AMPERSAND):
                self.quick_load()
                yield from self.emit_token()
                self.mode_finish()
                # after a quick load, the ending quote was already consumed.
                return False

    self.cursor.token.append(char)
    return True
def gobble_set(self, mode, char)
Expand source code Browse git
@_register(Mode.SetStarted)
def gobble_set(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    token = self.cursor.token
    if (yield from self.check_line_break(mode, char)):
        return False
    if char in WHITESPACE:
        yield from self.emit_token()
        token.append(char)
        self.mode_switch(Mode.Whitespace)
        return True
    if char == SLASH and not self.pending_redirect:
        yield from self.emit_token()
        token.append(char)
        return True
    if not token and char == QUOTE:
        self.caret = False
        token.append(char)
        self.mode = Mode.SetQuoted
        return True
    if self.check_caret(char):
        return False
    if char == EQUALS:
        yield from self.emit_token()
        yield Ctrl.Equals
        self.mode = Mode.SetRegular
        return True
    if self.check_quote_start(char):
        return False
    if (yield from self.check_command_separators(char)):
        return False
    if (yield from self.check_redirect_io(char)):
        return False
    token.append(char)
    return True
def common_token_checks(self, mode, char)
Expand source code Browse git
def common_token_checks(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    return (False
        or (yield from self.check_line_break(mode, char))
        or self.check_caret(char)
        or self.check_quote_start(char)
        or (yield from self.check_command_separators(char))
        or (yield from self.check_redirect_io(char)))
def gobble_set_regular(self, mode, char)
Expand source code Browse git
@_register(Mode.SetRegular)
def gobble_set_regular(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    if (yield from self.common_token_checks(mode, char)):
        return False
    if (pr := self.pending_redirect) and char in WHITESPACE:
        token = self.cursor.token
        self.pending_redirect = None
        pr.target = unquote(u16(token))
        del token[:]
        yield pr
    self.cursor.token.append(char)
    return True
def gobble_gap(self, mode, char)
Expand source code Browse git
@_register(Mode.Gap)
def gobble_gap(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    yield from ()
    if char in SEPARATORS:
        return True
    self.mode_finish()
    self.first_after_gap = True
    return False
def gobble_txt(self, mode, char)
Expand source code Browse git
@_register(Mode.Text)
def gobble_txt(self, mode: Mode, char: int) -> Generator[Token, None, bool]:
    if (yield from self.common_token_checks(mode, char)):
        return False
    if char in WHITESPACE:
        yield from self.emit_token()
        self.cursor.token.append(char)
        self.mode_switch(Mode.Whitespace)
        return True
    if char == SLASH and not self.pending_redirect:
        yield from self.emit_token()
    if char == COLON:
        if (yield from self.emit_token()):
            return False
        elif self.next_char() == COLON:
            yield Ctrl.Comment
            return True
        else:
            yield Ctrl.Label
            return False
    try:
        token = SeparatorMap[char]
    except KeyError:
        pass
    else:
        if (yield from self.emit_token()):
            return False
        else:
            yield token
            return True
    self.cursor.token.append(char)
    return True
def preparse(self, text)
Expand source code Browse git
def preparse(self, text: str | buf):
    self.labels = {}

    if not isinstance(text, str):
        text = codecs.decode(text, 'utf8', errors='replace')

    _tail = text[-10:]
    lines = text.splitlines(keepends=False)
    utf16 = array.array('H')

    if _tail.splitlines() != F'{_tail}\n'.splitlines():
        # the text had a trailing line break, which is swallowed by the splitlines method
        lines.append('')

    for k, line in enumerate(lines):
        if k > 0:
            utf16.append(LINEBREAK)
        encoded = line.encode('utf-16le')
        if not encoded:
            continue
        encoded = memoryview(encoded).cast('H')
        offset = len(utf16)
        prefix = re.search('^@?[\\s]*:', line)
        if prefix:
            p = prefix.end()
            if lb := self.label(u16(encoded[p:])):
                self.labels.setdefault(lb, offset + p - 1)
        utf16.extend(encoded)

    self.text = text
    self.code = memoryview(utf16)