Module refinery.lib.scripts.ps1.lexer

Expand source code Browse git
from __future__ import annotations

import enum
import re

from collections.abc import Generator
from dataclasses import dataclass, field

from refinery.lib.scripts.ps1.token import (
    _KEYWORDS,
    _VARIABLE_PATTERN_CORE,
    DASHES,
    DOUBLE_QUOTES,
    SINGLE_QUOTES,
    WHITESPACE,
    Ps1Token,
    Ps1TokenKind,
)


class Ps1LexerMode(enum.Enum):
    EXPRESSION = 'expression'
    ARGUMENT = 'argument'


_TWO_CHAR_OPS: dict[str, Ps1TokenKind] = {
    '+=' : Ps1TokenKind.PLUS_ASSIGN,
    '-=' : Ps1TokenKind.DASH_ASSIGN,
    '*=' : Ps1TokenKind.STAR_ASSIGN,
    '/=' : Ps1TokenKind.SLASH_ASSIGN,
    '%=' : Ps1TokenKind.PERCENT_ASSIGN,
    '++' : Ps1TokenKind.INCREMENT,
    '--' : Ps1TokenKind.DECREMENT,
    '..' : Ps1TokenKind.DOTDOT,
    '::' : Ps1TokenKind.DOUBLE_COLON,
    '&&' : Ps1TokenKind.DOUBLE_AMPERSAND,
    '||' : Ps1TokenKind.DOUBLE_PIPE,
    '@(' : Ps1TokenKind.AT_LPAREN,
    '@{' : Ps1TokenKind.AT_LBRACE,
    '$(' : Ps1TokenKind.DOLLAR_LPAREN,
}

_ONE_CHAR_OPS: dict[str, Ps1TokenKind] = {
    '+' : Ps1TokenKind.PLUS,
    '-' : Ps1TokenKind.DASH,
    '*' : Ps1TokenKind.STAR,
    '/' : Ps1TokenKind.SLASH,
    '%' : Ps1TokenKind.PERCENT,
    '.' : Ps1TokenKind.DOT,
    ',' : Ps1TokenKind.COMMA,
    ';' : Ps1TokenKind.SEMICOLON,
    '!' : Ps1TokenKind.EXCLAIM,
    '(' : Ps1TokenKind.LPAREN,
    ')' : Ps1TokenKind.RPAREN,
    '{' : Ps1TokenKind.LBRACE,
    '}' : Ps1TokenKind.RBRACE,
    '[' : Ps1TokenKind.LBRACKET,
    ']' : Ps1TokenKind.RBRACKET,
    '|' : Ps1TokenKind.PIPE,
    '&' : Ps1TokenKind.AMPERSAND,
    '=' : Ps1TokenKind.EQUALS,
}

_DASH_OPERATORS: dict[str, str] = {
    _name: F'-{_name}' for _name in (
        'and',
        'as',
        'band',
        'bnot',
        'bor',
        'bxor',
        'ccontains',
        'ceq',
        'cge',
        'cgt',
        'cin',
        'cle',
        'clike',
        'clt',
        'cmatch',
        'cne',
        'cnotcontains',
        'cnotin',
        'cnotlike',
        'cnotmatch',
        'contains',
        'creplace',
        'csplit',
        'eq',
        'f',
        'ge',
        'gt',
        'icontains',
        'ieq',
        'ige',
        'igt',
        'iin',
        'ile',
        'ilike',
        'ilt',
        'imatch',
        'in',
        'ine',
        'inotcontains',
        'inotin',
        'inotlike',
        'inotmatch',
        'ireplace',
        'is',
        'isnot',
        'isplit',
        'join',
        'le',
        'like',
        'lt',
        'match',
        'ne',
        'not',
        'notcontains',
        'notin',
        'notlike',
        'notmatch',
        'or',
        'replace',
        'shl',
        'shr',
        'split',
        'xor',
    )
}

_FORCE_START_NEW_TOKEN = frozenset(' \t\r\n|&;,{}()')
_FORCE_NEW_TOKEN_AFTER_NUMBER = frozenset('!#%*+-./<=>]')

_VARIABLE_STOPS_NO_RESCAN = frozenset('.[=')

_REDIRECTION_PATTERN = re.compile(
    r'[1-6*](?:>>|>&[12]|>)'  # explicit stream: 2>&1, 2>>, 2>
    r'|>>|>&1|>'              # bare: >>, >&1, >
    r'|<',                    # input
)

_INTEGER_PATTERN = re.compile(
    r'0[xX][0-9a-fA-F][0-9a-fA-F_]*(?:l|L)?'
    r'|0[bB][01][01_]*(?:l|L)?'
    r'|[0-9][0-9_]*(?:l|L)?',
)

_REAL_PATTERN = re.compile(
    r'(?:'
    r'(?:[0-9]*\.[0-9]+|[0-9]+\.)(?:[eE][+-]?[0-9]+)?'
    r'|[0-9]+[eE][+-]?[0-9]+'
    r')(?:[dD]|[kKmMgGtTpP][bB])?'
    r'|[0-9]+(?:\.[0-9]+)?(?:[dD]|[kKmMgGtTpP][bB])',
)

_VARIABLE_PATTERN = re.compile(_VARIABLE_PATTERN_CORE, re.IGNORECASE)

_DASH_WORD = re.compile(r'[a-zA-Z]+')

_PARAMETER_TERMINATORS = frozenset(' \t\r\n{}();,|&.[')

_VARIABLE_START_CHARS = '_?{$^'


@dataclass
class Ps1Lexer:
    source: str
    pos: int = 0
    mode_stack: list[Ps1LexerMode] = field(default_factory=lambda: [Ps1LexerMode.EXPRESSION])

    @property
    def mode(self) -> Ps1LexerMode:
        return self.mode_stack[-1]

    @mode.setter
    def mode(self, value: Ps1LexerMode):
        self.mode_stack[-1] = value

    def push_mode(self, mode: Ps1LexerMode):
        self.mode_stack.append(mode)

    def pop_mode(self):
        if len(self.mode_stack) > 1:
            self.mode_stack.pop()

    def _at_end(self) -> bool:
        return self.pos >= len(self.source)

    def _skip_whitespace(self) -> bool:
        start = self.pos
        src = self.source
        length = len(src)
        while self.pos < length:
            c = src[self.pos]
            if c in WHITESPACE:
                self.pos += 1
            elif c == '`' and self.pos + 1 < length and src[self.pos + 1] == '\n':
                self.pos += 2
            elif c == '`' and self.pos + 2 < length and src[self.pos + 1:self.pos + 3] == '\r\n':
                self.pos += 3
            else:
                break
        return self.pos > start

    def _read_line_comment(self) -> None:
        src = self.source
        length = len(src)
        while self.pos < length and src[self.pos] != '\n':
            self.pos += 1

    def _read_block_comment(self) -> None:
        src = self.source
        length = len(src)
        self.pos += 2
        while self.pos < length - 1:
            if src[self.pos] == '#' and src[self.pos + 1] == '>':
                self.pos += 2
                return
            self.pos += 1
        self.pos = length

    def _try_skip_expandable(self) -> bool:
        src = self.source
        length = len(src)
        c = src[self.pos]
        if c == '`' and self.pos + 1 < length:
            self.pos += 2
            return True
        if c == '$' and self.pos + 1 < length and src[self.pos + 1] == '(':
            self.pos += 2
            self._skip_subexpression_content()
            return True
        return False

    def _read_string(self, quote_set: frozenset[str], expandable: bool = False) -> str:
        start = self.pos
        src = self.source
        length = len(src)
        self.pos += 1
        while self.pos < length:
            c = src[self.pos]
            if expandable and self._try_skip_expandable():
                continue
            if c in quote_set:
                self.pos += 1
                if self.pos < length and src[self.pos] in quote_set:
                    self.pos += 1
                    continue
                return src[start:self.pos]
            self.pos += 1
        return src[start:self.pos]

    def _skip_subexpression_content(self):
        src = self.source
        length = len(src)
        depth = 1
        while self.pos < length:
            c = src[self.pos]
            if c == '`' and self.pos + 1 < length:
                self.pos += 2
                continue
            if c == '(':
                depth += 1
                self.pos += 1
                continue
            if c == ')':
                depth -= 1
                self.pos += 1
                if depth == 0:
                    return
                continue
            if c == '@' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if self.pos + 2 < length and src[self.pos + 2] in '\r\n':
                    if nc in SINGLE_QUOTES:
                        self._read_here_string(SINGLE_QUOTES)
                        continue
                    if nc in DOUBLE_QUOTES:
                        self._read_here_string(DOUBLE_QUOTES, expandable=True)
                        continue
            if c in SINGLE_QUOTES:
                self._read_string(SINGLE_QUOTES)
                continue
            if c in DOUBLE_QUOTES:
                self._read_string(DOUBLE_QUOTES, expandable=True)
                continue
            self.pos += 1

    def _skip_here_string_header(self):
        src = self.source
        length = len(src)
        self.pos += 2
        if self.pos < length and src[self.pos] == '\r':
            self.pos += 1
        if self.pos < length and src[self.pos] == '\n':
            self.pos += 1

    def _check_here_string_terminator(self, quote_set: frozenset[str]) -> bool:
        src = self.source
        length = len(src)
        if src[self.pos] == '\r' and self.pos + 1 < length and src[self.pos + 1] == '\n':
            nl_end = self.pos + 2
        else:
            nl_end = self.pos + 1
        if nl_end + 1 < length and src[nl_end] in quote_set and src[nl_end + 1] == '@':
            self.pos = nl_end + 2
            return True
        return False

    def _read_here_string(self, quote_set: frozenset[str], expandable: bool = False) -> str:
        start = self.pos
        src = self.source
        length = len(src)
        self._skip_here_string_header()
        while self.pos < length:
            c = src[self.pos]
            if c in '\r\n':
                if self._check_here_string_terminator(quote_set):
                    return src[start:self.pos]
            if expandable and self._try_skip_expandable():
                continue
            self.pos += 1
        return src[start:self.pos]

    def _read_variable(self, prefix: str) -> Ps1Token:
        start = self.pos
        self.pos += len(prefix)
        m = _VARIABLE_PATTERN.match(self.source, self.pos)
        if m:
            self.pos = m.end()
        kind = Ps1TokenKind.SPLAT_VARIABLE if prefix == '@' else Ps1TokenKind.VARIABLE
        return Ps1Token(kind, self.source[start:self.pos], start)

    def _read_number(self) -> Ps1Token | None:
        src = self.source
        m = _REAL_PATTERN.match(src, self.pos)
        if m:
            text = m.group()
            end = m.end()
            if text.endswith('.') and end < len(src):
                nc = src[end]
                if nc == '.' or nc.isalpha() or nc in '_$@{':
                    text = text[:-1]
                    end -= 1
                    if text and text.replace('_', '').isdigit():
                        start = self.pos
                        self.pos = end
                        return Ps1Token(Ps1TokenKind.INTEGER, text, start)
            start = self.pos
            self.pos = end
            return Ps1Token(Ps1TokenKind.REAL, text, start)
        m = _INTEGER_PATTERN.match(src, self.pos)
        if m:
            start = self.pos
            self.pos = m.end()
            if (
                self.pos + 1 < len(src)
                and src[self.pos].lower() in 'kmgtp'
                and src[self.pos + 1].lower() == 'b'
            ):
                text = src[start:self.pos + 2]
                self.pos += 2
                return Ps1Token(Ps1TokenKind.REAL, text, start)
            return Ps1Token(Ps1TokenKind.INTEGER, m.group(), start)
        return None

    def _try_dash_operator(self) -> Ps1Token | None:
        src = self.source
        start = self.pos
        self.pos += 1
        m = _DASH_WORD.match(src, self.pos)
        if not m:
            self.pos = start
            return None
        word = m.group().lower()
        if word in _DASH_OPERATORS:
            self.pos = m.end()
            return Ps1Token(Ps1TokenKind.OPERATOR, _DASH_OPERATORS[word], start)
        self.pos = start
        return None

    def _try_parameter(self) -> Ps1Token | None:
        src = self.source
        length = len(src)
        start = self.pos
        self.pos += 1
        if self.pos >= length:
            self.pos = start
            return None
        c = src[self.pos]
        if not (c.isalpha() or c == '_' or c == '?'):
            self.pos = start
            return None
        self.pos += 1
        while self.pos < length:
            c = src[self.pos]
            if c in _PARAMETER_TERMINATORS or c.isspace():
                break
            if c in SINGLE_QUOTES or c in DOUBLE_QUOTES:
                self.pos = start
                return self._read_generic_token()
            if c == ':':
                self.pos += 1
                break
            self.pos += 1
        return Ps1Token(Ps1TokenKind.PARAMETER, src[start:self.pos], start)

    def _try_redirection(self) -> Ps1Token | None:
        m = _REDIRECTION_PATTERN.match(self.source, self.pos)
        if m:
            start = self.pos
            self.pos = m.end()
            return Ps1Token(Ps1TokenKind.REDIRECTION, m.group(), start)
        return None

    def _read_identifier(self) -> Ps1Token:
        src = self.source
        length = len(src)
        start = self.pos
        c = src[self.pos]
        if c == '`' and self.pos + 1 < length:
            self.pos += 2
        else:
            self.pos += 1
        while self.pos < length:
            ch = src[self.pos]
            if ch == '`' and self.pos + 1 < length:
                self.pos += 2
            elif ch.isalnum() or ch == '_':
                self.pos += 1
            elif ch in DASHES and self.mode != Ps1LexerMode.EXPRESSION:
                self.pos += 1
            else:
                break
        return Ps1Token(Ps1TokenKind.GENERIC_TOKEN, src[start:self.pos], start)

    def _read_generic_token(self) -> Ps1Token:
        start = self.pos
        src = self.source
        length = len(src)
        has_expansion = False
        while self.pos < length:
            c = src[self.pos]
            if c == '`' and self.pos + 1 < length:
                self.pos += 2
                continue
            if c in SINGLE_QUOTES:
                self._read_string(SINGLE_QUOTES)
                continue
            if c in DOUBLE_QUOTES:
                self._read_string(DOUBLE_QUOTES, expandable=True)
                continue
            if c == '$' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if nc == '(':
                    has_expansion = True
                    self.pos += 2
                    self._skip_subexpression_content()
                    continue
                if nc.isalnum() or nc in _VARIABLE_START_CHARS:
                    m = _VARIABLE_PATTERN.match(src, self.pos + 1)
                    if m:
                        has_expansion = True
                        self.pos = m.end()
                        continue
                self.pos += 1
                continue
            if c in _FORCE_START_NEW_TOKEN:
                break
            self.pos += 1
        kind = Ps1TokenKind.GENERIC_EXPAND if has_expansion else Ps1TokenKind.GENERIC_TOKEN
        return Ps1Token(kind, src[start:self.pos], start)

    def _emit(self, token: Ps1Token) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
        mode_hint = yield token
        if mode_hint is not None:
            self.mode = mode_hint

    def _emit_keyword_or_token(self, token: Ps1Token) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
        kw = _KEYWORDS.get(token.value.lower())
        if kw is not None:
            yield from self._emit(Ps1Token(kw, token.value, token.offset))
        else:
            yield from self._emit(token)

    def tokenize(self) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
        src = self.source
        length = len(src)

        while True:
            self._skip_whitespace()
            if self._at_end():
                yield Ps1Token(Ps1TokenKind.EOF, '', self.pos)
                return

            start = self.pos
            c = src[self.pos]

            if c == '\r' and self.pos + 1 < length and src[self.pos + 1] == '\n':
                self.pos += 2
                yield from self._emit(Ps1Token(Ps1TokenKind.NEWLINE, '\r\n', start))
                continue
            if c == '\n':
                self.pos += 1
                yield from self._emit(Ps1Token(Ps1TokenKind.NEWLINE, '\n', start))
                continue

            c2 = src[self.pos:self.pos + 2]
            if len(c2) == 2:
                d0 = '-' if c2[0] in DASHES else c2[0]
                d1 = '-' if c2[1] in DASHES else c2[1]
                c2 = d0 + d1

            if c2 == '<#':
                self._read_block_comment()
                continue
            if c == '#':
                self._read_line_comment()
                continue

            if c == '@' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if nc in SINGLE_QUOTES:
                    text = self._read_here_string(SINGLE_QUOTES)
                    yield from self._emit(Ps1Token(Ps1TokenKind.HSTRING_VERBATIM, text, start))
                    continue
                if nc in DOUBLE_QUOTES:
                    text = self._read_here_string(DOUBLE_QUOTES, expandable=True)
                    yield from self._emit(Ps1Token(Ps1TokenKind.HSTRING_EXPAND, text, start))
                    continue

            if c2 in ('..', '--', '++', '::', '+=', '-=', '*=', '/=', '%=') and self.mode == Ps1LexerMode.ARGUMENT:
                after = self.pos + 2
                if after < length and src[after] not in _FORCE_START_NEW_TOKEN:
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit(token)
                        continue

            if c2 in _TWO_CHAR_OPS:
                self.pos += 2
                kind = _TWO_CHAR_OPS[c2]
                yield from self._emit(Ps1Token(kind, c2, start))
                continue

            if c == ':' and self.pos + 1 < length and (src[self.pos + 1].isalpha() or src[self.pos + 1] == '_'):
                self.pos += 1
                while self.pos < length and (src[self.pos].isalnum() or src[self.pos] == '_'):
                    self.pos += 1
                yield from self._emit(Ps1Token(Ps1TokenKind.LABEL, src[start:self.pos], start))
                continue

            if c == '$' or (c == '@' and self.pos + 1 < length and src[self.pos + 1] not in '({'):
                nc = src[self.pos + 1] if self.pos + 1 < length else ''
                if nc and (nc.isalnum() or nc in _VARIABLE_START_CHARS):
                    token = self._read_variable(c)
                    if self.mode == Ps1LexerMode.ARGUMENT and self.pos < length:
                        fc = src[self.pos]
                        if fc not in _FORCE_START_NEW_TOKEN and fc not in _VARIABLE_STOPS_NO_RESCAN:
                            self.pos = start
                            token = self._read_generic_token()
                    yield from self._emit(token)
                    continue

            if c in SINGLE_QUOTES:
                text = self._read_string(SINGLE_QUOTES)
                yield from self._emit(Ps1Token(Ps1TokenKind.STRING_VERBATIM, text, start))
                continue
            if c in DOUBLE_QUOTES:
                text = self._read_string(DOUBLE_QUOTES, expandable=True)
                yield from self._emit(Ps1Token(Ps1TokenKind.STRING_EXPAND, text, start))
                continue

            if c in '123456' and self.pos + 1 < length and src[self.pos + 1] == '>':
                if self.mode == Ps1LexerMode.ARGUMENT:
                    redir = self._try_redirection()
                    if redir:
                        yield from self._emit(redir)
                        continue

            if c.isdigit() or (c == '.' and self.pos + 1 < length and src[self.pos + 1].isdigit()):
                token = self._read_number()
                if token:
                    nc = src[self.pos] if self.pos < length else None
                    if nc is not None and nc not in _FORCE_START_NEW_TOKEN and not nc.isspace():
                        if self.mode == Ps1LexerMode.ARGUMENT or (
                            nc not in _FORCE_NEW_TOKEN_AFTER_NUMBER
                        ):
                            self.pos = start
                            token = self._read_generic_token()
                    yield from self._emit(token)
                    continue

            if c in DASHES:
                if self.mode == Ps1LexerMode.EXPRESSION:
                    op = self._try_dash_operator()
                    if op:
                        yield from self._emit(op)
                        continue
                elif self.mode == Ps1LexerMode.ARGUMENT:
                    param = self._try_parameter()
                    if param:
                        yield from self._emit(param)
                        continue

            redir = self._try_redirection()
            if redir:
                yield from self._emit(redir)
                continue

            if self.mode == Ps1LexerMode.ARGUMENT:
                if c == '.' and self.pos + 1 < length:
                    nc = src[self.pos + 1]
                    if nc not in _FORCE_START_NEW_TOKEN and nc != '$' and nc not in SINGLE_QUOTES and nc not in DOUBLE_QUOTES:
                        token = self._read_generic_token()
                        if token.value:
                            yield from self._emit(token)
                            continue

            if self.mode == Ps1LexerMode.ARGUMENT and c in '*/%=!+':
                if self.pos + 1 < length and src[self.pos + 1] not in _FORCE_START_NEW_TOKEN:
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit(token)
                        continue

            if c in _ONE_CHAR_OPS or c in DASHES:
                self.pos += 1
                kind = _ONE_CHAR_OPS.get(c) or Ps1TokenKind.DASH
                yield from self._emit(Ps1Token(kind, c, start))
                continue

            if self.mode == Ps1LexerMode.ARGUMENT:
                if c.isalpha() or c == '_' or c == '\\' or c == '`':
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit_keyword_or_token(token)
                        continue

            if c.isalpha() or c == '_' or c == '`':
                token = self._read_identifier()
                if self.pos < length and src[self.pos] not in _FORCE_START_NEW_TOKEN:
                    if self.mode == Ps1LexerMode.ARGUMENT or (
                        src[self.pos] in SINGLE_QUOTES
                        or src[self.pos] in DOUBLE_QUOTES
                        or src[self.pos] == '$'
                    ):
                        self.pos = start
                        token = self._read_generic_token()
                if token.value:
                    yield from self._emit_keyword_or_token(token)
                    continue

            self.pos += 1
            yield from self._emit(Ps1Token(Ps1TokenKind.GENERIC_TOKEN, c, start))

Classes

class Ps1LexerMode (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class Ps1LexerMode(enum.Enum):
    EXPRESSION = 'expression'
    ARGUMENT = 'argument'

Ancestors

  • enum.Enum

Class variables

var EXPRESSION

The type of the None singleton.

var ARGUMENT

The type of the None singleton.

class Ps1Lexer (source, pos=0, mode_stack=<factory>)

Ps1Lexer(source: 'str', pos: 'int' = 0, mode_stack: 'list[Ps1LexerMode]' = )

Expand source code Browse git
@dataclass
class Ps1Lexer:
    source: str
    pos: int = 0
    mode_stack: list[Ps1LexerMode] = field(default_factory=lambda: [Ps1LexerMode.EXPRESSION])

    @property
    def mode(self) -> Ps1LexerMode:
        return self.mode_stack[-1]

    @mode.setter
    def mode(self, value: Ps1LexerMode):
        self.mode_stack[-1] = value

    def push_mode(self, mode: Ps1LexerMode):
        self.mode_stack.append(mode)

    def pop_mode(self):
        if len(self.mode_stack) > 1:
            self.mode_stack.pop()

    def _at_end(self) -> bool:
        return self.pos >= len(self.source)

    def _skip_whitespace(self) -> bool:
        start = self.pos
        src = self.source
        length = len(src)
        while self.pos < length:
            c = src[self.pos]
            if c in WHITESPACE:
                self.pos += 1
            elif c == '`' and self.pos + 1 < length and src[self.pos + 1] == '\n':
                self.pos += 2
            elif c == '`' and self.pos + 2 < length and src[self.pos + 1:self.pos + 3] == '\r\n':
                self.pos += 3
            else:
                break
        return self.pos > start

    def _read_line_comment(self) -> None:
        src = self.source
        length = len(src)
        while self.pos < length and src[self.pos] != '\n':
            self.pos += 1

    def _read_block_comment(self) -> None:
        src = self.source
        length = len(src)
        self.pos += 2
        while self.pos < length - 1:
            if src[self.pos] == '#' and src[self.pos + 1] == '>':
                self.pos += 2
                return
            self.pos += 1
        self.pos = length

    def _try_skip_expandable(self) -> bool:
        src = self.source
        length = len(src)
        c = src[self.pos]
        if c == '`' and self.pos + 1 < length:
            self.pos += 2
            return True
        if c == '$' and self.pos + 1 < length and src[self.pos + 1] == '(':
            self.pos += 2
            self._skip_subexpression_content()
            return True
        return False

    def _read_string(self, quote_set: frozenset[str], expandable: bool = False) -> str:
        start = self.pos
        src = self.source
        length = len(src)
        self.pos += 1
        while self.pos < length:
            c = src[self.pos]
            if expandable and self._try_skip_expandable():
                continue
            if c in quote_set:
                self.pos += 1
                if self.pos < length and src[self.pos] in quote_set:
                    self.pos += 1
                    continue
                return src[start:self.pos]
            self.pos += 1
        return src[start:self.pos]

    def _skip_subexpression_content(self):
        src = self.source
        length = len(src)
        depth = 1
        while self.pos < length:
            c = src[self.pos]
            if c == '`' and self.pos + 1 < length:
                self.pos += 2
                continue
            if c == '(':
                depth += 1
                self.pos += 1
                continue
            if c == ')':
                depth -= 1
                self.pos += 1
                if depth == 0:
                    return
                continue
            if c == '@' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if self.pos + 2 < length and src[self.pos + 2] in '\r\n':
                    if nc in SINGLE_QUOTES:
                        self._read_here_string(SINGLE_QUOTES)
                        continue
                    if nc in DOUBLE_QUOTES:
                        self._read_here_string(DOUBLE_QUOTES, expandable=True)
                        continue
            if c in SINGLE_QUOTES:
                self._read_string(SINGLE_QUOTES)
                continue
            if c in DOUBLE_QUOTES:
                self._read_string(DOUBLE_QUOTES, expandable=True)
                continue
            self.pos += 1

    def _skip_here_string_header(self):
        src = self.source
        length = len(src)
        self.pos += 2
        if self.pos < length and src[self.pos] == '\r':
            self.pos += 1
        if self.pos < length and src[self.pos] == '\n':
            self.pos += 1

    def _check_here_string_terminator(self, quote_set: frozenset[str]) -> bool:
        src = self.source
        length = len(src)
        if src[self.pos] == '\r' and self.pos + 1 < length and src[self.pos + 1] == '\n':
            nl_end = self.pos + 2
        else:
            nl_end = self.pos + 1
        if nl_end + 1 < length and src[nl_end] in quote_set and src[nl_end + 1] == '@':
            self.pos = nl_end + 2
            return True
        return False

    def _read_here_string(self, quote_set: frozenset[str], expandable: bool = False) -> str:
        start = self.pos
        src = self.source
        length = len(src)
        self._skip_here_string_header()
        while self.pos < length:
            c = src[self.pos]
            if c in '\r\n':
                if self._check_here_string_terminator(quote_set):
                    return src[start:self.pos]
            if expandable and self._try_skip_expandable():
                continue
            self.pos += 1
        return src[start:self.pos]

    def _read_variable(self, prefix: str) -> Ps1Token:
        start = self.pos
        self.pos += len(prefix)
        m = _VARIABLE_PATTERN.match(self.source, self.pos)
        if m:
            self.pos = m.end()
        kind = Ps1TokenKind.SPLAT_VARIABLE if prefix == '@' else Ps1TokenKind.VARIABLE
        return Ps1Token(kind, self.source[start:self.pos], start)

    def _read_number(self) -> Ps1Token | None:
        src = self.source
        m = _REAL_PATTERN.match(src, self.pos)
        if m:
            text = m.group()
            end = m.end()
            if text.endswith('.') and end < len(src):
                nc = src[end]
                if nc == '.' or nc.isalpha() or nc in '_$@{':
                    text = text[:-1]
                    end -= 1
                    if text and text.replace('_', '').isdigit():
                        start = self.pos
                        self.pos = end
                        return Ps1Token(Ps1TokenKind.INTEGER, text, start)
            start = self.pos
            self.pos = end
            return Ps1Token(Ps1TokenKind.REAL, text, start)
        m = _INTEGER_PATTERN.match(src, self.pos)
        if m:
            start = self.pos
            self.pos = m.end()
            if (
                self.pos + 1 < len(src)
                and src[self.pos].lower() in 'kmgtp'
                and src[self.pos + 1].lower() == 'b'
            ):
                text = src[start:self.pos + 2]
                self.pos += 2
                return Ps1Token(Ps1TokenKind.REAL, text, start)
            return Ps1Token(Ps1TokenKind.INTEGER, m.group(), start)
        return None

    def _try_dash_operator(self) -> Ps1Token | None:
        src = self.source
        start = self.pos
        self.pos += 1
        m = _DASH_WORD.match(src, self.pos)
        if not m:
            self.pos = start
            return None
        word = m.group().lower()
        if word in _DASH_OPERATORS:
            self.pos = m.end()
            return Ps1Token(Ps1TokenKind.OPERATOR, _DASH_OPERATORS[word], start)
        self.pos = start
        return None

    def _try_parameter(self) -> Ps1Token | None:
        src = self.source
        length = len(src)
        start = self.pos
        self.pos += 1
        if self.pos >= length:
            self.pos = start
            return None
        c = src[self.pos]
        if not (c.isalpha() or c == '_' or c == '?'):
            self.pos = start
            return None
        self.pos += 1
        while self.pos < length:
            c = src[self.pos]
            if c in _PARAMETER_TERMINATORS or c.isspace():
                break
            if c in SINGLE_QUOTES or c in DOUBLE_QUOTES:
                self.pos = start
                return self._read_generic_token()
            if c == ':':
                self.pos += 1
                break
            self.pos += 1
        return Ps1Token(Ps1TokenKind.PARAMETER, src[start:self.pos], start)

    def _try_redirection(self) -> Ps1Token | None:
        m = _REDIRECTION_PATTERN.match(self.source, self.pos)
        if m:
            start = self.pos
            self.pos = m.end()
            return Ps1Token(Ps1TokenKind.REDIRECTION, m.group(), start)
        return None

    def _read_identifier(self) -> Ps1Token:
        src = self.source
        length = len(src)
        start = self.pos
        c = src[self.pos]
        if c == '`' and self.pos + 1 < length:
            self.pos += 2
        else:
            self.pos += 1
        while self.pos < length:
            ch = src[self.pos]
            if ch == '`' and self.pos + 1 < length:
                self.pos += 2
            elif ch.isalnum() or ch == '_':
                self.pos += 1
            elif ch in DASHES and self.mode != Ps1LexerMode.EXPRESSION:
                self.pos += 1
            else:
                break
        return Ps1Token(Ps1TokenKind.GENERIC_TOKEN, src[start:self.pos], start)

    def _read_generic_token(self) -> Ps1Token:
        start = self.pos
        src = self.source
        length = len(src)
        has_expansion = False
        while self.pos < length:
            c = src[self.pos]
            if c == '`' and self.pos + 1 < length:
                self.pos += 2
                continue
            if c in SINGLE_QUOTES:
                self._read_string(SINGLE_QUOTES)
                continue
            if c in DOUBLE_QUOTES:
                self._read_string(DOUBLE_QUOTES, expandable=True)
                continue
            if c == '$' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if nc == '(':
                    has_expansion = True
                    self.pos += 2
                    self._skip_subexpression_content()
                    continue
                if nc.isalnum() or nc in _VARIABLE_START_CHARS:
                    m = _VARIABLE_PATTERN.match(src, self.pos + 1)
                    if m:
                        has_expansion = True
                        self.pos = m.end()
                        continue
                self.pos += 1
                continue
            if c in _FORCE_START_NEW_TOKEN:
                break
            self.pos += 1
        kind = Ps1TokenKind.GENERIC_EXPAND if has_expansion else Ps1TokenKind.GENERIC_TOKEN
        return Ps1Token(kind, src[start:self.pos], start)

    def _emit(self, token: Ps1Token) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
        mode_hint = yield token
        if mode_hint is not None:
            self.mode = mode_hint

    def _emit_keyword_or_token(self, token: Ps1Token) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
        kw = _KEYWORDS.get(token.value.lower())
        if kw is not None:
            yield from self._emit(Ps1Token(kw, token.value, token.offset))
        else:
            yield from self._emit(token)

    def tokenize(self) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
        src = self.source
        length = len(src)

        while True:
            self._skip_whitespace()
            if self._at_end():
                yield Ps1Token(Ps1TokenKind.EOF, '', self.pos)
                return

            start = self.pos
            c = src[self.pos]

            if c == '\r' and self.pos + 1 < length and src[self.pos + 1] == '\n':
                self.pos += 2
                yield from self._emit(Ps1Token(Ps1TokenKind.NEWLINE, '\r\n', start))
                continue
            if c == '\n':
                self.pos += 1
                yield from self._emit(Ps1Token(Ps1TokenKind.NEWLINE, '\n', start))
                continue

            c2 = src[self.pos:self.pos + 2]
            if len(c2) == 2:
                d0 = '-' if c2[0] in DASHES else c2[0]
                d1 = '-' if c2[1] in DASHES else c2[1]
                c2 = d0 + d1

            if c2 == '<#':
                self._read_block_comment()
                continue
            if c == '#':
                self._read_line_comment()
                continue

            if c == '@' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if nc in SINGLE_QUOTES:
                    text = self._read_here_string(SINGLE_QUOTES)
                    yield from self._emit(Ps1Token(Ps1TokenKind.HSTRING_VERBATIM, text, start))
                    continue
                if nc in DOUBLE_QUOTES:
                    text = self._read_here_string(DOUBLE_QUOTES, expandable=True)
                    yield from self._emit(Ps1Token(Ps1TokenKind.HSTRING_EXPAND, text, start))
                    continue

            if c2 in ('..', '--', '++', '::', '+=', '-=', '*=', '/=', '%=') and self.mode == Ps1LexerMode.ARGUMENT:
                after = self.pos + 2
                if after < length and src[after] not in _FORCE_START_NEW_TOKEN:
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit(token)
                        continue

            if c2 in _TWO_CHAR_OPS:
                self.pos += 2
                kind = _TWO_CHAR_OPS[c2]
                yield from self._emit(Ps1Token(kind, c2, start))
                continue

            if c == ':' and self.pos + 1 < length and (src[self.pos + 1].isalpha() or src[self.pos + 1] == '_'):
                self.pos += 1
                while self.pos < length and (src[self.pos].isalnum() or src[self.pos] == '_'):
                    self.pos += 1
                yield from self._emit(Ps1Token(Ps1TokenKind.LABEL, src[start:self.pos], start))
                continue

            if c == '$' or (c == '@' and self.pos + 1 < length and src[self.pos + 1] not in '({'):
                nc = src[self.pos + 1] if self.pos + 1 < length else ''
                if nc and (nc.isalnum() or nc in _VARIABLE_START_CHARS):
                    token = self._read_variable(c)
                    if self.mode == Ps1LexerMode.ARGUMENT and self.pos < length:
                        fc = src[self.pos]
                        if fc not in _FORCE_START_NEW_TOKEN and fc not in _VARIABLE_STOPS_NO_RESCAN:
                            self.pos = start
                            token = self._read_generic_token()
                    yield from self._emit(token)
                    continue

            if c in SINGLE_QUOTES:
                text = self._read_string(SINGLE_QUOTES)
                yield from self._emit(Ps1Token(Ps1TokenKind.STRING_VERBATIM, text, start))
                continue
            if c in DOUBLE_QUOTES:
                text = self._read_string(DOUBLE_QUOTES, expandable=True)
                yield from self._emit(Ps1Token(Ps1TokenKind.STRING_EXPAND, text, start))
                continue

            if c in '123456' and self.pos + 1 < length and src[self.pos + 1] == '>':
                if self.mode == Ps1LexerMode.ARGUMENT:
                    redir = self._try_redirection()
                    if redir:
                        yield from self._emit(redir)
                        continue

            if c.isdigit() or (c == '.' and self.pos + 1 < length and src[self.pos + 1].isdigit()):
                token = self._read_number()
                if token:
                    nc = src[self.pos] if self.pos < length else None
                    if nc is not None and nc not in _FORCE_START_NEW_TOKEN and not nc.isspace():
                        if self.mode == Ps1LexerMode.ARGUMENT or (
                            nc not in _FORCE_NEW_TOKEN_AFTER_NUMBER
                        ):
                            self.pos = start
                            token = self._read_generic_token()
                    yield from self._emit(token)
                    continue

            if c in DASHES:
                if self.mode == Ps1LexerMode.EXPRESSION:
                    op = self._try_dash_operator()
                    if op:
                        yield from self._emit(op)
                        continue
                elif self.mode == Ps1LexerMode.ARGUMENT:
                    param = self._try_parameter()
                    if param:
                        yield from self._emit(param)
                        continue

            redir = self._try_redirection()
            if redir:
                yield from self._emit(redir)
                continue

            if self.mode == Ps1LexerMode.ARGUMENT:
                if c == '.' and self.pos + 1 < length:
                    nc = src[self.pos + 1]
                    if nc not in _FORCE_START_NEW_TOKEN and nc != '$' and nc not in SINGLE_QUOTES and nc not in DOUBLE_QUOTES:
                        token = self._read_generic_token()
                        if token.value:
                            yield from self._emit(token)
                            continue

            if self.mode == Ps1LexerMode.ARGUMENT and c in '*/%=!+':
                if self.pos + 1 < length and src[self.pos + 1] not in _FORCE_START_NEW_TOKEN:
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit(token)
                        continue

            if c in _ONE_CHAR_OPS or c in DASHES:
                self.pos += 1
                kind = _ONE_CHAR_OPS.get(c) or Ps1TokenKind.DASH
                yield from self._emit(Ps1Token(kind, c, start))
                continue

            if self.mode == Ps1LexerMode.ARGUMENT:
                if c.isalpha() or c == '_' or c == '\\' or c == '`':
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit_keyword_or_token(token)
                        continue

            if c.isalpha() or c == '_' or c == '`':
                token = self._read_identifier()
                if self.pos < length and src[self.pos] not in _FORCE_START_NEW_TOKEN:
                    if self.mode == Ps1LexerMode.ARGUMENT or (
                        src[self.pos] in SINGLE_QUOTES
                        or src[self.pos] in DOUBLE_QUOTES
                        or src[self.pos] == '$'
                    ):
                        self.pos = start
                        token = self._read_generic_token()
                if token.value:
                    yield from self._emit_keyword_or_token(token)
                    continue

            self.pos += 1
            yield from self._emit(Ps1Token(Ps1TokenKind.GENERIC_TOKEN, c, start))

Instance variables

var source

The type of the None singleton.

var mode_stack

The type of the None singleton.

var pos

The type of the None singleton.

var mode
Expand source code Browse git
@property
def mode(self) -> Ps1LexerMode:
    return self.mode_stack[-1]

Methods

def push_mode(self, mode)
Expand source code Browse git
def push_mode(self, mode: Ps1LexerMode):
    self.mode_stack.append(mode)
def pop_mode(self)
Expand source code Browse git
def pop_mode(self):
    if len(self.mode_stack) > 1:
        self.mode_stack.pop()
def tokenize(self)
Expand source code Browse git
def tokenize(self) -> Generator[Ps1Token, Ps1LexerMode | None, None]:
    src = self.source
    length = len(src)

    while True:
        self._skip_whitespace()
        if self._at_end():
            yield Ps1Token(Ps1TokenKind.EOF, '', self.pos)
            return

        start = self.pos
        c = src[self.pos]

        if c == '\r' and self.pos + 1 < length and src[self.pos + 1] == '\n':
            self.pos += 2
            yield from self._emit(Ps1Token(Ps1TokenKind.NEWLINE, '\r\n', start))
            continue
        if c == '\n':
            self.pos += 1
            yield from self._emit(Ps1Token(Ps1TokenKind.NEWLINE, '\n', start))
            continue

        c2 = src[self.pos:self.pos + 2]
        if len(c2) == 2:
            d0 = '-' if c2[0] in DASHES else c2[0]
            d1 = '-' if c2[1] in DASHES else c2[1]
            c2 = d0 + d1

        if c2 == '<#':
            self._read_block_comment()
            continue
        if c == '#':
            self._read_line_comment()
            continue

        if c == '@' and self.pos + 1 < length:
            nc = src[self.pos + 1]
            if nc in SINGLE_QUOTES:
                text = self._read_here_string(SINGLE_QUOTES)
                yield from self._emit(Ps1Token(Ps1TokenKind.HSTRING_VERBATIM, text, start))
                continue
            if nc in DOUBLE_QUOTES:
                text = self._read_here_string(DOUBLE_QUOTES, expandable=True)
                yield from self._emit(Ps1Token(Ps1TokenKind.HSTRING_EXPAND, text, start))
                continue

        if c2 in ('..', '--', '++', '::', '+=', '-=', '*=', '/=', '%=') and self.mode == Ps1LexerMode.ARGUMENT:
            after = self.pos + 2
            if after < length and src[after] not in _FORCE_START_NEW_TOKEN:
                token = self._read_generic_token()
                if token.value:
                    yield from self._emit(token)
                    continue

        if c2 in _TWO_CHAR_OPS:
            self.pos += 2
            kind = _TWO_CHAR_OPS[c2]
            yield from self._emit(Ps1Token(kind, c2, start))
            continue

        if c == ':' and self.pos + 1 < length and (src[self.pos + 1].isalpha() or src[self.pos + 1] == '_'):
            self.pos += 1
            while self.pos < length and (src[self.pos].isalnum() or src[self.pos] == '_'):
                self.pos += 1
            yield from self._emit(Ps1Token(Ps1TokenKind.LABEL, src[start:self.pos], start))
            continue

        if c == '$' or (c == '@' and self.pos + 1 < length and src[self.pos + 1] not in '({'):
            nc = src[self.pos + 1] if self.pos + 1 < length else ''
            if nc and (nc.isalnum() or nc in _VARIABLE_START_CHARS):
                token = self._read_variable(c)
                if self.mode == Ps1LexerMode.ARGUMENT and self.pos < length:
                    fc = src[self.pos]
                    if fc not in _FORCE_START_NEW_TOKEN and fc not in _VARIABLE_STOPS_NO_RESCAN:
                        self.pos = start
                        token = self._read_generic_token()
                yield from self._emit(token)
                continue

        if c in SINGLE_QUOTES:
            text = self._read_string(SINGLE_QUOTES)
            yield from self._emit(Ps1Token(Ps1TokenKind.STRING_VERBATIM, text, start))
            continue
        if c in DOUBLE_QUOTES:
            text = self._read_string(DOUBLE_QUOTES, expandable=True)
            yield from self._emit(Ps1Token(Ps1TokenKind.STRING_EXPAND, text, start))
            continue

        if c in '123456' and self.pos + 1 < length and src[self.pos + 1] == '>':
            if self.mode == Ps1LexerMode.ARGUMENT:
                redir = self._try_redirection()
                if redir:
                    yield from self._emit(redir)
                    continue

        if c.isdigit() or (c == '.' and self.pos + 1 < length and src[self.pos + 1].isdigit()):
            token = self._read_number()
            if token:
                nc = src[self.pos] if self.pos < length else None
                if nc is not None and nc not in _FORCE_START_NEW_TOKEN and not nc.isspace():
                    if self.mode == Ps1LexerMode.ARGUMENT or (
                        nc not in _FORCE_NEW_TOKEN_AFTER_NUMBER
                    ):
                        self.pos = start
                        token = self._read_generic_token()
                yield from self._emit(token)
                continue

        if c in DASHES:
            if self.mode == Ps1LexerMode.EXPRESSION:
                op = self._try_dash_operator()
                if op:
                    yield from self._emit(op)
                    continue
            elif self.mode == Ps1LexerMode.ARGUMENT:
                param = self._try_parameter()
                if param:
                    yield from self._emit(param)
                    continue

        redir = self._try_redirection()
        if redir:
            yield from self._emit(redir)
            continue

        if self.mode == Ps1LexerMode.ARGUMENT:
            if c == '.' and self.pos + 1 < length:
                nc = src[self.pos + 1]
                if nc not in _FORCE_START_NEW_TOKEN and nc != '$' and nc not in SINGLE_QUOTES and nc not in DOUBLE_QUOTES:
                    token = self._read_generic_token()
                    if token.value:
                        yield from self._emit(token)
                        continue

        if self.mode == Ps1LexerMode.ARGUMENT and c in '*/%=!+':
            if self.pos + 1 < length and src[self.pos + 1] not in _FORCE_START_NEW_TOKEN:
                token = self._read_generic_token()
                if token.value:
                    yield from self._emit(token)
                    continue

        if c in _ONE_CHAR_OPS or c in DASHES:
            self.pos += 1
            kind = _ONE_CHAR_OPS.get(c) or Ps1TokenKind.DASH
            yield from self._emit(Ps1Token(kind, c, start))
            continue

        if self.mode == Ps1LexerMode.ARGUMENT:
            if c.isalpha() or c == '_' or c == '\\' or c == '`':
                token = self._read_generic_token()
                if token.value:
                    yield from self._emit_keyword_or_token(token)
                    continue

        if c.isalpha() or c == '_' or c == '`':
            token = self._read_identifier()
            if self.pos < length and src[self.pos] not in _FORCE_START_NEW_TOKEN:
                if self.mode == Ps1LexerMode.ARGUMENT or (
                    src[self.pos] in SINGLE_QUOTES
                    or src[self.pos] in DOUBLE_QUOTES
                    or src[self.pos] == '$'
                ):
                    self.pos = start
                    token = self._read_generic_token()
            if token.value:
                yield from self._emit_keyword_or_token(token)
                continue

        self.pos += 1
        yield from self._emit(Ps1Token(Ps1TokenKind.GENERIC_TOKEN, c, start))