Module refinery.lib.batch.state

Expand source code Browse git
from __future__ import annotations

import ntpath

from datetime import datetime
from random import randrange, seed
from typing import NamedTuple
from uuid import uuid4

from refinery.lib.batch.const import WHITESPACE
from refinery.lib.dt import date_from_timestamp, isodate


class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()


_DEFAULT_ENVIRONMENT = {
    'ALLUSERSPROFILE'           : r'C:\ProgramData',
    'APPDATA'                   : r'C:\Users\{u}\AppData\Roaming',
    'CommonProgramFiles'        : r'C:\Program Files\Common Files',
    'CommonProgramFiles(x86)'   : r'C:\Program Files (x86)\Common Files',
    'CommonProgramW6432'        : r'C:\Program Files\Common Files',
    'COMPUTERNAME'              : r'{h}',
    'ComSpec'                   : r'C:\WINDOWS\system32\cmd.exe',
    'DriverData'                : r'C:\Windows\System32\Drivers\DriverData',
    'HOMEDRIVE'                 : r'C:',
    'HOMEPATH'                  : r'\Users\{u}',
    'LOCALAPPDATA'              : r'C:\Users\{u}\AppData\Local',
    'LOGONSERVER'               : r'\\{h}',
    'NUMBER_OF_PROCESSORS'      : r'16',
    'OneDrive'                  : r'C:\Users\{u}\OneDrive',
    'OS'                        : r'Windows_NT',
    'PATHEXT'                   : r'.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC',
    'PROCESSOR_ARCHITECTURE'    : r'AMD64',
    'PROCESSOR_IDENTIFIER'      : r'Intel64 Family 6 Model 158 Stepping 10, GenuineIntel',
    'PROCESSOR_LEVEL'           : r'6',
    'PROCESSOR_REVISION'        : r'99ff',
    'ProgramData'               : r'C:\ProgramData',
    'ProgramW6432'              : r'C:\Program Files',
    'ProgramFiles'              : r'C:\Program Files',
    'ProgramFiles(x86)'         : r'C:\Program Files (x86)',
    'PUBLIC'                    : r'C:\Users\Public',
    'SESSIONNAME'               : r'Console',
    'SystemDrive'               : r'C:',
    'SystemRoot'                : r'C:\WINDOWS',
    'TEMP'                      : r'C:\Users\{u}\AppData\Local\Temp',
    'TMP'                       : r'C:\Users\{u}\AppData\Local\Temp',
    'USERDOMAIN'                : r'{h}',
    'USERDOMAIN_ROAMINGPROFILE' : r'{h}',
    'USERNAME'                  : r'{u}',
    'USERPROFILE'               : r'C:\Users\{u}',
    'WINDIR'                    : r'C:\WINDOWS',
    'PATH': ';'.join(
        [
            r'C:\Windows',
            r'C:\Windows\System32',
            r'C:\Windows\System32\Wbem',
            r'C:\Windows\System32\WindowsPowerShell\v1.0\\',
            r'C:\Windows\System32\OpenSSH\\',
            r'C:\Program Files\dotnet\\',
        ]
    ),
}


class BatchState:

    name: str
    args: list[str]

    environments: list[dict[str, str]]
    delayexpands: list[bool]
    ext_settings: list[bool]
    file_system: dict[str, str]

    def __init__(
        self,
        delayed_expansion: bool = False,
        extensions_enabled: bool = True,
        extensions_version: int = 2,
        environment: dict | None = None,
        file_system: dict | None = None,
        username: str = 'Administrator',
        hostname: str | None = None,
        now: int | float | str | datetime | None = None,
        cwd: str = 'C:\\',
    ):
        self.delayed_expansion = delayed_expansion
        self.extensions_version = extensions_version
        self.extensions_enabled = extensions_enabled
        self.file_system_seed = file_system or {}
        self.environment_seed = environment or {}
        if hostname is None:
            hostname = str(uuid4())
        for key, value in _DEFAULT_ENVIRONMENT.items():
            self.environment_seed.setdefault(
                key.upper(),
                value.format(h=hostname, u=username)
            )
        if isinstance(now, str):
            now = isodate(now)
        if isinstance(now, (int, float)):
            now = date_from_timestamp(now)
        if now is None:
            now = datetime.now()
        self.cwd = cwd
        self.now = now
        self.hostname = hostname
        self.username = username
        seed(self.now.timestamp())
        self.reset()

    @property
    def cwd(self):
        return self._cwd

    @cwd.setter
    def cwd(self, new: str):
        new = new.replace('/', '\\')
        if not new.endswith('\\'):
            new = F'{new}\\'
        if not ntpath.isabs(new):
            new = ntpath.join(self.cwd, new)
        if not ntpath.isabs(new):
            raise ValueError(F'Invalid absolute path: {new}')
        self._cwd = ntpath.normcase(ntpath.normpath(new))

    @property
    def ec(self) -> int:
        return self.errorlevel

    @ec.setter
    def ec(self, value: int | None):
        ec = value or 0
        self.environment['ERRORLEVEL'] = str(ec)
        self.errorlevel = ec

    def reset(self):
        self.labels = {}
        self.environments = [dict(self.environment_seed)]
        self.delayexpands = [self.delayed_expansion]
        self.ext_settings = [self.extensions_enabled]
        self.file_system = dict(self.file_system_seed)
        self.dirstack = []
        self.linebreaks = []
        self.name = F'{uuid4()}.bat'
        self.args = []
        self._cmd = ''
        self.ec = None

    @property
    def command_line(self):
        return self._cmd

    @command_line.setter
    def command_line(self, value: str):
        self._cmd = value
        self.args = value.split()

    def envar(self, name: str) -> str:
        name = name.upper()
        if name in (e := self.environment):
            return e[name]
        elif name == 'DATE':
            return self.now.strftime('%Y-%m-%d')
        elif name == 'TIME':
            time = self.now.strftime('%M:%S,%f')
            return F'{self.now.hour:2d}:{time:.8}'
        elif name == 'RANDOM':
            return str(randrange(0, 32767))
        elif name == 'ERRORLEVEL':
            return str(self.ec)
        elif name == 'CD':
            return self.cwd
        elif name == 'CMDCMDLINE':
            line = self.envar('COMSPEC')
            if args := self.args:
                args = ' '.join(args)
                line = F'{line} /c "{args}"'
            return line
        elif name == 'CMDEXTVERSION':
            return str(self.extensions_version)
        elif name == 'HIGHESTNUMANODENUMBER':
            return '0'
        else:
            return ''

    def _resolved(self, path: str) -> str:
        if not ntpath.isabs(path):
            path = F'{self.cwd}{path}'
        return ntpath.normcase(ntpath.normpath(path))

    def create_file(self, path: str, data: str = ''):
        self.file_system[self._resolved(path)] = data

    def append_file(self, path: str, data: str):
        path = self._resolved(path)
        if left := self.file_system.get(path, None):
            data = F'{left}{data}'
        self.file_system[path] = data

    def remove_file(self, path: str):
        self.file_system.pop(self._resolved(path), None)

    def ingest_file(self, path: str) -> str | None:
        return self.file_system.get(self._resolved(path))

    def exists_file(self, path: str) -> bool:
        return self._resolved(path) in self.file_system

    @property
    def environment(self):
        return self.environments[-1]

    @property
    def delayexpand(self):
        return self.delayexpands[-1]

    @property
    def ext_setting(self):
        return self.ext_settings[-1]

Classes

class ParserConfig (uncaret=True, unquote=False, toupper=False, stop_letters=b' \t\x0b', stop_strings=())

ParserConfig(uncaret, unquote, toupper, stop_letters, stop_strings)

Expand source code Browse git
class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()

Ancestors

  • builtins.tuple

Instance variables

var uncaret

Alias for field number 0

Expand source code Browse git
class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()
var unquote

Alias for field number 1

Expand source code Browse git
class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()
var toupper

Alias for field number 2

Expand source code Browse git
class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()
var stop_letters

Alias for field number 3

Expand source code Browse git
class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()
var stop_strings

Alias for field number 4

Expand source code Browse git
class ParserConfig(NamedTuple):
    uncaret: bool = True
    unquote: bool = False
    toupper: bool = False
    stop_letters: bytes = WHITESPACE
    stop_strings: tuple[bytes, ...] = ()
class BatchState (delayed_expansion=False, extensions_enabled=True, extensions_version=2, environment=None, file_system=None, username='Administrator', hostname=None, now=None, cwd='C:\\')
Expand source code Browse git
class BatchState:

    name: str
    args: list[str]

    environments: list[dict[str, str]]
    delayexpands: list[bool]
    ext_settings: list[bool]
    file_system: dict[str, str]

    def __init__(
        self,
        delayed_expansion: bool = False,
        extensions_enabled: bool = True,
        extensions_version: int = 2,
        environment: dict | None = None,
        file_system: dict | None = None,
        username: str = 'Administrator',
        hostname: str | None = None,
        now: int | float | str | datetime | None = None,
        cwd: str = 'C:\\',
    ):
        self.delayed_expansion = delayed_expansion
        self.extensions_version = extensions_version
        self.extensions_enabled = extensions_enabled
        self.file_system_seed = file_system or {}
        self.environment_seed = environment or {}
        if hostname is None:
            hostname = str(uuid4())
        for key, value in _DEFAULT_ENVIRONMENT.items():
            self.environment_seed.setdefault(
                key.upper(),
                value.format(h=hostname, u=username)
            )
        if isinstance(now, str):
            now = isodate(now)
        if isinstance(now, (int, float)):
            now = date_from_timestamp(now)
        if now is None:
            now = datetime.now()
        self.cwd = cwd
        self.now = now
        self.hostname = hostname
        self.username = username
        seed(self.now.timestamp())
        self.reset()

    @property
    def cwd(self):
        return self._cwd

    @cwd.setter
    def cwd(self, new: str):
        new = new.replace('/', '\\')
        if not new.endswith('\\'):
            new = F'{new}\\'
        if not ntpath.isabs(new):
            new = ntpath.join(self.cwd, new)
        if not ntpath.isabs(new):
            raise ValueError(F'Invalid absolute path: {new}')
        self._cwd = ntpath.normcase(ntpath.normpath(new))

    @property
    def ec(self) -> int:
        return self.errorlevel

    @ec.setter
    def ec(self, value: int | None):
        ec = value or 0
        self.environment['ERRORLEVEL'] = str(ec)
        self.errorlevel = ec

    def reset(self):
        self.labels = {}
        self.environments = [dict(self.environment_seed)]
        self.delayexpands = [self.delayed_expansion]
        self.ext_settings = [self.extensions_enabled]
        self.file_system = dict(self.file_system_seed)
        self.dirstack = []
        self.linebreaks = []
        self.name = F'{uuid4()}.bat'
        self.args = []
        self._cmd = ''
        self.ec = None

    @property
    def command_line(self):
        return self._cmd

    @command_line.setter
    def command_line(self, value: str):
        self._cmd = value
        self.args = value.split()

    def envar(self, name: str) -> str:
        name = name.upper()
        if name in (e := self.environment):
            return e[name]
        elif name == 'DATE':
            return self.now.strftime('%Y-%m-%d')
        elif name == 'TIME':
            time = self.now.strftime('%M:%S,%f')
            return F'{self.now.hour:2d}:{time:.8}'
        elif name == 'RANDOM':
            return str(randrange(0, 32767))
        elif name == 'ERRORLEVEL':
            return str(self.ec)
        elif name == 'CD':
            return self.cwd
        elif name == 'CMDCMDLINE':
            line = self.envar('COMSPEC')
            if args := self.args:
                args = ' '.join(args)
                line = F'{line} /c "{args}"'
            return line
        elif name == 'CMDEXTVERSION':
            return str(self.extensions_version)
        elif name == 'HIGHESTNUMANODENUMBER':
            return '0'
        else:
            return ''

    def _resolved(self, path: str) -> str:
        if not ntpath.isabs(path):
            path = F'{self.cwd}{path}'
        return ntpath.normcase(ntpath.normpath(path))

    def create_file(self, path: str, data: str = ''):
        self.file_system[self._resolved(path)] = data

    def append_file(self, path: str, data: str):
        path = self._resolved(path)
        if left := self.file_system.get(path, None):
            data = F'{left}{data}'
        self.file_system[path] = data

    def remove_file(self, path: str):
        self.file_system.pop(self._resolved(path), None)

    def ingest_file(self, path: str) -> str | None:
        return self.file_system.get(self._resolved(path))

    def exists_file(self, path: str) -> bool:
        return self._resolved(path) in self.file_system

    @property
    def environment(self):
        return self.environments[-1]

    @property
    def delayexpand(self):
        return self.delayexpands[-1]

    @property
    def ext_setting(self):
        return self.ext_settings[-1]

Class variables

var name

The type of the None singleton.

var args

The type of the None singleton.

var environments

The type of the None singleton.

var delayexpands

The type of the None singleton.

var ext_settings

The type of the None singleton.

var file_system

The type of the None singleton.

Instance variables

var cwd
Expand source code Browse git
@property
def cwd(self):
    return self._cwd
var ec
Expand source code Browse git
@property
def ec(self) -> int:
    return self.errorlevel
var command_line
Expand source code Browse git
@property
def command_line(self):
    return self._cmd
var environment
Expand source code Browse git
@property
def environment(self):
    return self.environments[-1]
var delayexpand
Expand source code Browse git
@property
def delayexpand(self):
    return self.delayexpands[-1]
var ext_setting
Expand source code Browse git
@property
def ext_setting(self):
    return self.ext_settings[-1]

Methods

def reset(self)
Expand source code Browse git
def reset(self):
    self.labels = {}
    self.environments = [dict(self.environment_seed)]
    self.delayexpands = [self.delayed_expansion]
    self.ext_settings = [self.extensions_enabled]
    self.file_system = dict(self.file_system_seed)
    self.dirstack = []
    self.linebreaks = []
    self.name = F'{uuid4()}.bat'
    self.args = []
    self._cmd = ''
    self.ec = None
def envar(self, name)
Expand source code Browse git
def envar(self, name: str) -> str:
    name = name.upper()
    if name in (e := self.environment):
        return e[name]
    elif name == 'DATE':
        return self.now.strftime('%Y-%m-%d')
    elif name == 'TIME':
        time = self.now.strftime('%M:%S,%f')
        return F'{self.now.hour:2d}:{time:.8}'
    elif name == 'RANDOM':
        return str(randrange(0, 32767))
    elif name == 'ERRORLEVEL':
        return str(self.ec)
    elif name == 'CD':
        return self.cwd
    elif name == 'CMDCMDLINE':
        line = self.envar('COMSPEC')
        if args := self.args:
            args = ' '.join(args)
            line = F'{line} /c "{args}"'
        return line
    elif name == 'CMDEXTVERSION':
        return str(self.extensions_version)
    elif name == 'HIGHESTNUMANODENUMBER':
        return '0'
    else:
        return ''
def create_file(self, path, data='')
Expand source code Browse git
def create_file(self, path: str, data: str = ''):
    self.file_system[self._resolved(path)] = data
def append_file(self, path, data)
Expand source code Browse git
def append_file(self, path: str, data: str):
    path = self._resolved(path)
    if left := self.file_system.get(path, None):
        data = F'{left}{data}'
    self.file_system[path] = data
def remove_file(self, path)
Expand source code Browse git
def remove_file(self, path: str):
    self.file_system.pop(self._resolved(path), None)
def ingest_file(self, path)
Expand source code Browse git
def ingest_file(self, path: str) -> str | None:
    return self.file_system.get(self._resolved(path))
def exists_file(self, path)
Expand source code Browse git
def exists_file(self, path: str) -> bool:
    return self._resolved(path) in self.file_system