Module refinery.lib.batch.state

Expand source code Browse git
from __future__ import annotations

import ntpath

from datetime import datetime
from enum import Enum
from random import randrange, seed
from uuid import uuid4

from refinery.lib.batch.model import MissingVariable
from refinery.lib.dt import date_from_timestamp, isodate


class ErrorZero(int, Enum):
    Val = 0

    def __bool__(self):
        return True

    def __str__(self):
        return '0'

    __repr__ = __str__


class RetainVariable(str, Enum):
    Val = ''


_DEFAULT_ENVIRONMENT = {
    'ALLUSERSPROFILE'           : r'C:\ProgramData',
    'APPDATA'                   : r'C:\Users\{u}\AppData\Roaming',
    'CommonProgramFiles'        : r'C:\Program Files\Common Files',
    'CommonProgramFiles(x86)'   : r'C:\Program Files (x86)\Common Files',
    'CommonProgramW6432'        : r'C:\Program Files\Common Files',
    'COMPUTERNAME'              : r'{h}',
    'ComSpec'                   : r'C:\WINDOWS\system32\cmd.exe',
    'DriverData'                : r'C:\Windows\System32\Drivers\DriverData',
    'HOMEDRIVE'                 : r'C:',
    'HOMEPATH'                  : r'\Users\{u}',
    'LOCALAPPDATA'              : r'C:\Users\{u}\AppData\Local',
    'LOGONSERVER'               : r'\\{h}',
    'NUMBER_OF_PROCESSORS'      : r'16',
    'OneDrive'                  : r'C:\Users\{u}\OneDrive',
    'OS'                        : r'Windows_NT',
    'PATHEXT'                   : r'.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC',
    'PROCESSOR_ARCHITECTURE'    : r'AMD64',
    'PROCESSOR_IDENTIFIER'      : r'Intel64 Family 6 Model 158 Stepping 10, GenuineIntel',
    'PROCESSOR_LEVEL'           : r'6',
    'PROCESSOR_REVISION'        : r'99ff',
    'ProgramData'               : r'C:\ProgramData',
    'ProgramW6432'              : r'C:\Program Files',
    'ProgramFiles'              : r'C:\Program Files',
    'ProgramFiles(x86)'         : r'C:\Program Files (x86)',
    'PUBLIC'                    : r'C:\Users\Public',
    'SESSIONNAME'               : r'Console',
    'SystemDrive'               : r'C:',
    'SystemRoot'                : r'C:\WINDOWS',
    'TEMP'                      : r'C:\Users\{u}\AppData\Local\Temp',
    'TMP'                       : r'C:\Users\{u}\AppData\Local\Temp',
    'USERDOMAIN'                : r'{h}',
    'USERDOMAIN_ROAMINGPROFILE' : r'{h}',
    'USERNAME'                  : r'{u}',
    'USERPROFILE'               : r'C:\Users\{u}',
    'WINDIR'                    : r'C:\WINDOWS',
    'PATH': ';'.join(
        [
            r'C:\Windows',
            r'C:\Windows\System32',
            r'C:\Windows\System32\Wbem',
            r'C:\Windows\System32\WindowsPowerShell\v1.0\\',
            r'C:\Windows\System32\OpenSSH\\',
            r'C:\Program Files\dotnet\\',
        ]
    ),
}


class BatchState:

    name: str
    args: list[str]

    now: datetime
    start_time: datetime

    environment_stack: list[dict[str, str | RetainVariable]]
    delayexpand_stack: list[bool]
    cmdextended_stack: list[bool]

    _for_loops: list[dict[str, str]]
    file_system: dict[str, str]

    def __init__(
        self,
        delayexpand: bool = False,
        extensions_enabled: bool = True,
        extensions_version: int = 2,
        environment: dict | None = None,
        file_system: dict | None = None,
        username: str = 'Administrator',
        hostname: str | None = None,
        now: int | float | str | datetime | None = None,
        cwd: str = 'C:\\',
        filename: str | None = '',
        echo: bool = True,
        codec: str = 'cp1252',
    ):
        self.extensions_version = extensions_version
        file_system = file_system or {}
        environment = environment or {}
        if hostname is None:
            hostname = str(uuid4())
        for key, value in _DEFAULT_ENVIRONMENT.items():
            environment.setdefault(
                key.upper(),
                value.format(h=hostname, u=username)
            )
        if isinstance(now, str):
            now = isodate(now)
        if isinstance(now, (int, float)):
            now = date_from_timestamp(now)
        if now is None:
            now = datetime.now()
        self.cwd = cwd
        self.now = now
        self.start_time = now
        seed(self.now.timestamp())
        self.hostname = hostname
        self.username = username
        self.labels = {}
        self._for_loops = []
        self.environment_stack = [environment]
        self.delayexpand_stack = [delayexpand]
        self.cmdextended_stack = [extensions_enabled]
        self.file_system = file_system
        self.dirstack = []
        self.linebreaks = []
        self.name = filename or F'{uuid4()}.bat'
        self.args = []
        self._cmd = ''
        self.ec = None
        self.echo = echo
        self.codec = codec

    @property
    def cwd(self):
        return self._cwd

    @cwd.setter
    def cwd(self, new: str):
        new = new.replace('/', '\\')
        if not new.endswith('\\'):
            new = F'{new}\\'
        if not ntpath.isabs(new):
            new = ntpath.join(self.cwd, new)
        if not ntpath.isabs(new):
            raise ValueError(F'Invalid absolute path: {new}')
        self._cwd = ntpath.normcase(ntpath.normpath(new))

    @property
    def ec(self) -> int | ErrorZero:
        return self.errorlevel

    @ec.setter
    def ec(self, value: int | ErrorZero | None):
        ec = value or 0
        self.environment['ERRORLEVEL'] = str(ec)
        self.errorlevel = ec

    @property
    def command_line(self):
        return self._cmd

    @command_line.setter
    def command_line(self, value: str):
        self._cmd = value
        self.args = value.split()

    def envar(self, name: str, default: str | None = None) -> str | RetainVariable:
        name = name.upper()
        if name in (e := self.environment):
            return e[name]
        elif name == 'DATE':
            return self.now.strftime('%Y-%m-%d')
        elif name == 'TIME':
            time = self.now.strftime('%M:%S,%f')
            return F'{self.now.hour:2d}:{time:.8}'
        elif name == 'RANDOM':
            return str(randrange(0, 32767))
        elif name == 'ERRORLEVEL':
            return str(self.ec)
        elif name == 'CD':
            return self.cwd
        elif name == 'CMDCMDLINE':
            line = self.envar('COMSPEC', 'cmd.exe')
            if args := self.args:
                args = ' '.join(args)
                line = F'{line} /c "{args}"'
            return line
        elif name == 'CMDEXTVERSION':
            return str(self.extensions_version)
        elif name == 'HIGHESTNUMANODENUMBER':
            return '0'
        elif default is not None:
            return default
        else:
            raise MissingVariable

    def resolve_path(self, path: str) -> str:
        if not ntpath.isabs(path):
            path = F'{self.cwd}{path}'
        return ntpath.normcase(ntpath.normpath(path))

    def create_file(self, path: str, data: str = ''):
        self.file_system[self.resolve_path(path)] = data

    def append_file(self, path: str, data: str):
        path = self.resolve_path(path)
        if left := self.file_system.get(path, None):
            data = F'{left}{data}'
        self.file_system[path] = data

    def remove_file(self, path: str):
        self.file_system.pop(self.resolve_path(path), None)

    def ingest_file(self, path: str) -> str | None:
        return self.file_system.get(self.resolve_path(path))

    def exists_file(self, path: str) -> bool:
        return self.resolve_path(path) in self.file_system

    def sizeof_file(self, path: str) -> int:
        if data := self.ingest_file(path):
            return len(data)
        return -1

    def new_forloop(self) -> dict[str, str]:
        new = {}
        old = self.for_loop_variables
        if old is not None:
            new.update(old)
        self._for_loops.append(new)
        return new

    def end_forloop(self):
        self._for_loops.pop()

    @property
    def environment(self):
        return self.environment_stack[-1]

    @property
    def delayexpand(self):
        return self.delayexpand_stack[-1]

    @delayexpand.setter
    def delayexpand(self, v):
        self.delayexpand_stack[-1] = v

    @property
    def cmdextended(self):
        return self.cmdextended_stack[-1]

    @cmdextended.setter
    def cmdextended(self, v):
        self.cmdextended_stack[-1] = v

    @property
    def for_loop_variables(self):
        if vars := self._for_loops:
            return vars[-1]
        else:
            return None

Classes

class ErrorZero (*args, **kwds)

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating-point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

>>> int('0b100', base=0)
4
Expand source code Browse git
class ErrorZero(int, Enum):
    Val = 0

    def __bool__(self):
        return True

    def __str__(self):
        return '0'

    __repr__ = __str__

Ancestors

  • builtins.int
  • enum.Enum

Class variables

var Val

The type of the None singleton.

class RetainVariable (*args, **kwds)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

Expand source code Browse git
class RetainVariable(str, Enum):
    Val = ''

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var Val

The type of the None singleton.

class BatchState (delayexpand=False, extensions_enabled=True, extensions_version=2, environment=None, file_system=None, username='Administrator', hostname=None, now=None, cwd='C:\\', filename='', echo=True, codec='cp1252')
Expand source code Browse git
class BatchState:

    name: str
    args: list[str]

    now: datetime
    start_time: datetime

    environment_stack: list[dict[str, str | RetainVariable]]
    delayexpand_stack: list[bool]
    cmdextended_stack: list[bool]

    _for_loops: list[dict[str, str]]
    file_system: dict[str, str]

    def __init__(
        self,
        delayexpand: bool = False,
        extensions_enabled: bool = True,
        extensions_version: int = 2,
        environment: dict | None = None,
        file_system: dict | None = None,
        username: str = 'Administrator',
        hostname: str | None = None,
        now: int | float | str | datetime | None = None,
        cwd: str = 'C:\\',
        filename: str | None = '',
        echo: bool = True,
        codec: str = 'cp1252',
    ):
        self.extensions_version = extensions_version
        file_system = file_system or {}
        environment = environment or {}
        if hostname is None:
            hostname = str(uuid4())
        for key, value in _DEFAULT_ENVIRONMENT.items():
            environment.setdefault(
                key.upper(),
                value.format(h=hostname, u=username)
            )
        if isinstance(now, str):
            now = isodate(now)
        if isinstance(now, (int, float)):
            now = date_from_timestamp(now)
        if now is None:
            now = datetime.now()
        self.cwd = cwd
        self.now = now
        self.start_time = now
        seed(self.now.timestamp())
        self.hostname = hostname
        self.username = username
        self.labels = {}
        self._for_loops = []
        self.environment_stack = [environment]
        self.delayexpand_stack = [delayexpand]
        self.cmdextended_stack = [extensions_enabled]
        self.file_system = file_system
        self.dirstack = []
        self.linebreaks = []
        self.name = filename or F'{uuid4()}.bat'
        self.args = []
        self._cmd = ''
        self.ec = None
        self.echo = echo
        self.codec = codec

    @property
    def cwd(self):
        return self._cwd

    @cwd.setter
    def cwd(self, new: str):
        new = new.replace('/', '\\')
        if not new.endswith('\\'):
            new = F'{new}\\'
        if not ntpath.isabs(new):
            new = ntpath.join(self.cwd, new)
        if not ntpath.isabs(new):
            raise ValueError(F'Invalid absolute path: {new}')
        self._cwd = ntpath.normcase(ntpath.normpath(new))

    @property
    def ec(self) -> int | ErrorZero:
        return self.errorlevel

    @ec.setter
    def ec(self, value: int | ErrorZero | None):
        ec = value or 0
        self.environment['ERRORLEVEL'] = str(ec)
        self.errorlevel = ec

    @property
    def command_line(self):
        return self._cmd

    @command_line.setter
    def command_line(self, value: str):
        self._cmd = value
        self.args = value.split()

    def envar(self, name: str, default: str | None = None) -> str | RetainVariable:
        name = name.upper()
        if name in (e := self.environment):
            return e[name]
        elif name == 'DATE':
            return self.now.strftime('%Y-%m-%d')
        elif name == 'TIME':
            time = self.now.strftime('%M:%S,%f')
            return F'{self.now.hour:2d}:{time:.8}'
        elif name == 'RANDOM':
            return str(randrange(0, 32767))
        elif name == 'ERRORLEVEL':
            return str(self.ec)
        elif name == 'CD':
            return self.cwd
        elif name == 'CMDCMDLINE':
            line = self.envar('COMSPEC', 'cmd.exe')
            if args := self.args:
                args = ' '.join(args)
                line = F'{line} /c "{args}"'
            return line
        elif name == 'CMDEXTVERSION':
            return str(self.extensions_version)
        elif name == 'HIGHESTNUMANODENUMBER':
            return '0'
        elif default is not None:
            return default
        else:
            raise MissingVariable

    def resolve_path(self, path: str) -> str:
        if not ntpath.isabs(path):
            path = F'{self.cwd}{path}'
        return ntpath.normcase(ntpath.normpath(path))

    def create_file(self, path: str, data: str = ''):
        self.file_system[self.resolve_path(path)] = data

    def append_file(self, path: str, data: str):
        path = self.resolve_path(path)
        if left := self.file_system.get(path, None):
            data = F'{left}{data}'
        self.file_system[path] = data

    def remove_file(self, path: str):
        self.file_system.pop(self.resolve_path(path), None)

    def ingest_file(self, path: str) -> str | None:
        return self.file_system.get(self.resolve_path(path))

    def exists_file(self, path: str) -> bool:
        return self.resolve_path(path) in self.file_system

    def sizeof_file(self, path: str) -> int:
        if data := self.ingest_file(path):
            return len(data)
        return -1

    def new_forloop(self) -> dict[str, str]:
        new = {}
        old = self.for_loop_variables
        if old is not None:
            new.update(old)
        self._for_loops.append(new)
        return new

    def end_forloop(self):
        self._for_loops.pop()

    @property
    def environment(self):
        return self.environment_stack[-1]

    @property
    def delayexpand(self):
        return self.delayexpand_stack[-1]

    @delayexpand.setter
    def delayexpand(self, v):
        self.delayexpand_stack[-1] = v

    @property
    def cmdextended(self):
        return self.cmdextended_stack[-1]

    @cmdextended.setter
    def cmdextended(self, v):
        self.cmdextended_stack[-1] = v

    @property
    def for_loop_variables(self):
        if vars := self._for_loops:
            return vars[-1]
        else:
            return None

Class variables

var name

The type of the None singleton.

var args

The type of the None singleton.

var now

The type of the None singleton.

var start_time

The type of the None singleton.

var environment_stack

The type of the None singleton.

var delayexpand_stack

The type of the None singleton.

var cmdextended_stack

The type of the None singleton.

var file_system

The type of the None singleton.

Instance variables

var cwd
Expand source code Browse git
@property
def cwd(self):
    return self._cwd
var ec
Expand source code Browse git
@property
def ec(self) -> int | ErrorZero:
    return self.errorlevel
var command_line
Expand source code Browse git
@property
def command_line(self):
    return self._cmd
var environment
Expand source code Browse git
@property
def environment(self):
    return self.environment_stack[-1]
var delayexpand
Expand source code Browse git
@property
def delayexpand(self):
    return self.delayexpand_stack[-1]
var cmdextended
Expand source code Browse git
@property
def cmdextended(self):
    return self.cmdextended_stack[-1]
var for_loop_variables
Expand source code Browse git
@property
def for_loop_variables(self):
    if vars := self._for_loops:
        return vars[-1]
    else:
        return None

Methods

def envar(self, name, default=None)
Expand source code Browse git
def envar(self, name: str, default: str | None = None) -> str | RetainVariable:
    name = name.upper()
    if name in (e := self.environment):
        return e[name]
    elif name == 'DATE':
        return self.now.strftime('%Y-%m-%d')
    elif name == 'TIME':
        time = self.now.strftime('%M:%S,%f')
        return F'{self.now.hour:2d}:{time:.8}'
    elif name == 'RANDOM':
        return str(randrange(0, 32767))
    elif name == 'ERRORLEVEL':
        return str(self.ec)
    elif name == 'CD':
        return self.cwd
    elif name == 'CMDCMDLINE':
        line = self.envar('COMSPEC', 'cmd.exe')
        if args := self.args:
            args = ' '.join(args)
            line = F'{line} /c "{args}"'
        return line
    elif name == 'CMDEXTVERSION':
        return str(self.extensions_version)
    elif name == 'HIGHESTNUMANODENUMBER':
        return '0'
    elif default is not None:
        return default
    else:
        raise MissingVariable
def resolve_path(self, path)
Expand source code Browse git
def resolve_path(self, path: str) -> str:
    if not ntpath.isabs(path):
        path = F'{self.cwd}{path}'
    return ntpath.normcase(ntpath.normpath(path))
def create_file(self, path, data='')
Expand source code Browse git
def create_file(self, path: str, data: str = ''):
    self.file_system[self.resolve_path(path)] = data
def append_file(self, path, data)
Expand source code Browse git
def append_file(self, path: str, data: str):
    path = self.resolve_path(path)
    if left := self.file_system.get(path, None):
        data = F'{left}{data}'
    self.file_system[path] = data
def remove_file(self, path)
Expand source code Browse git
def remove_file(self, path: str):
    self.file_system.pop(self.resolve_path(path), None)
def ingest_file(self, path)
Expand source code Browse git
def ingest_file(self, path: str) -> str | None:
    return self.file_system.get(self.resolve_path(path))
def exists_file(self, path)
Expand source code Browse git
def exists_file(self, path: str) -> bool:
    return self.resolve_path(path) in self.file_system
def sizeof_file(self, path)
Expand source code Browse git
def sizeof_file(self, path: str) -> int:
    if data := self.ingest_file(path):
        return len(data)
    return -1
def new_forloop(self)
Expand source code Browse git
def new_forloop(self) -> dict[str, str]:
    new = {}
    old = self.for_loop_variables
    if old is not None:
        new.update(old)
    self._for_loops.append(new)
    return new
def end_forloop(self)
Expand source code Browse git
def end_forloop(self):
    self._for_loops.pop()