Module refinery.lib.batch.state
Expand source code Browse git
from __future__ import annotations
import ntpath
from datetime import datetime
from random import randrange, seed
from typing import NamedTuple
from uuid import uuid4
from refinery.lib.batch.const import WHITESPACE
from refinery.lib.dt import date_from_timestamp, isodate
class ParserConfig(NamedTuple):
uncaret: bool = True
unquote: bool = False
toupper: bool = False
stop_letters: bytes = WHITESPACE
stop_strings: tuple[bytes, ...] = ()
_DEFAULT_ENVIRONMENT = {
'ALLUSERSPROFILE' : r'C:\ProgramData',
'APPDATA' : r'C:\Users\{u}\AppData\Roaming',
'CommonProgramFiles' : r'C:\Program Files\Common Files',
'CommonProgramFiles(x86)' : r'C:\Program Files (x86)\Common Files',
'CommonProgramW6432' : r'C:\Program Files\Common Files',
'COMPUTERNAME' : r'{h}',
'ComSpec' : r'C:\WINDOWS\system32\cmd.exe',
'DriverData' : r'C:\Windows\System32\Drivers\DriverData',
'HOMEDRIVE' : r'C:',
'HOMEPATH' : r'\Users\{u}',
'LOCALAPPDATA' : r'C:\Users\{u}\AppData\Local',
'LOGONSERVER' : r'\\{h}',
'NUMBER_OF_PROCESSORS' : r'16',
'OneDrive' : r'C:\Users\{u}\OneDrive',
'OS' : r'Windows_NT',
'PATHEXT' : r'.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC',
'PROCESSOR_ARCHITECTURE' : r'AMD64',
'PROCESSOR_IDENTIFIER' : r'Intel64 Family 6 Model 158 Stepping 10, GenuineIntel',
'PROCESSOR_LEVEL' : r'6',
'PROCESSOR_REVISION' : r'99ff',
'ProgramData' : r'C:\ProgramData',
'ProgramW6432' : r'C:\Program Files',
'ProgramFiles' : r'C:\Program Files',
'ProgramFiles(x86)' : r'C:\Program Files (x86)',
'PUBLIC' : r'C:\Users\Public',
'SESSIONNAME' : r'Console',
'SystemDrive' : r'C:',
'SystemRoot' : r'C:\WINDOWS',
'TEMP' : r'C:\Users\{u}\AppData\Local\Temp',
'TMP' : r'C:\Users\{u}\AppData\Local\Temp',
'USERDOMAIN' : r'{h}',
'USERDOMAIN_ROAMINGPROFILE' : r'{h}',
'USERNAME' : r'{u}',
'USERPROFILE' : r'C:\Users\{u}',
'WINDIR' : r'C:\WINDOWS',
'PATH': ';'.join(
[
r'C:\Windows',
r'C:\Windows\System32',
r'C:\Windows\System32\Wbem',
r'C:\Windows\System32\WindowsPowerShell\v1.0\\',
r'C:\Windows\System32\OpenSSH\\',
r'C:\Program Files\dotnet\\',
]
),
}
class BatchState:
name: str
args: list[str]
environments: list[dict[str, str]]
delayexpands: list[bool]
ext_settings: list[bool]
file_system: dict[str, str]
def __init__(
self,
delayed_expansion: bool = False,
extensions_enabled: bool = True,
extensions_version: int = 2,
environment: dict | None = None,
file_system: dict | None = None,
username: str = 'Administrator',
hostname: str | None = None,
now: int | float | str | datetime | None = None,
cwd: str = 'C:\\',
):
self.delayed_expansion = delayed_expansion
self.extensions_version = extensions_version
self.extensions_enabled = extensions_enabled
self.file_system_seed = file_system or {}
self.environment_seed = environment or {}
if hostname is None:
hostname = str(uuid4())
for key, value in _DEFAULT_ENVIRONMENT.items():
self.environment_seed.setdefault(
key.upper(),
value.format(h=hostname, u=username)
)
if isinstance(now, str):
now = isodate(now)
if isinstance(now, (int, float)):
now = date_from_timestamp(now)
if now is None:
now = datetime.now()
self.cwd = cwd
self.now = now
self.hostname = hostname
self.username = username
seed(self.now.timestamp())
self.reset()
@property
def cwd(self):
return self._cwd
@cwd.setter
def cwd(self, new: str):
new = new.replace('/', '\\')
if not new.endswith('\\'):
new = F'{new}\\'
if not ntpath.isabs(new):
new = ntpath.join(self.cwd, new)
if not ntpath.isabs(new):
raise ValueError(F'Invalid absolute path: {new}')
self._cwd = ntpath.normcase(ntpath.normpath(new))
@property
def ec(self) -> int:
return self.errorlevel
@ec.setter
def ec(self, value: int | None):
ec = value or 0
self.environment['ERRORLEVEL'] = str(ec)
self.errorlevel = ec
def reset(self):
self.labels = {}
self.environments = [dict(self.environment_seed)]
self.delayexpands = [self.delayed_expansion]
self.ext_settings = [self.extensions_enabled]
self.file_system = dict(self.file_system_seed)
self.dirstack = []
self.linebreaks = []
self.name = F'{uuid4()}.bat'
self.args = []
self._cmd = ''
self.ec = None
@property
def command_line(self):
return self._cmd
@command_line.setter
def command_line(self, value: str):
self._cmd = value
self.args = value.split()
def envar(self, name: str) -> str:
name = name.upper()
if name in (e := self.environment):
return e[name]
elif name == 'DATE':
return self.now.strftime('%Y-%m-%d')
elif name == 'TIME':
time = self.now.strftime('%M:%S,%f')
return F'{self.now.hour:2d}:{time:.8}'
elif name == 'RANDOM':
return str(randrange(0, 32767))
elif name == 'ERRORLEVEL':
return str(self.ec)
elif name == 'CD':
return self.cwd
elif name == 'CMDCMDLINE':
line = self.envar('COMSPEC')
if args := self.args:
args = ' '.join(args)
line = F'{line} /c "{args}"'
return line
elif name == 'CMDEXTVERSION':
return str(self.extensions_version)
elif name == 'HIGHESTNUMANODENUMBER':
return '0'
else:
return ''
def _resolved(self, path: str) -> str:
if not ntpath.isabs(path):
path = F'{self.cwd}{path}'
return ntpath.normcase(ntpath.normpath(path))
def create_file(self, path: str, data: str = ''):
self.file_system[self._resolved(path)] = data
def append_file(self, path: str, data: str):
path = self._resolved(path)
if left := self.file_system.get(path, None):
data = F'{left}{data}'
self.file_system[path] = data
def remove_file(self, path: str):
self.file_system.pop(self._resolved(path), None)
def ingest_file(self, path: str) -> str | None:
return self.file_system.get(self._resolved(path))
def exists_file(self, path: str) -> bool:
return self._resolved(path) in self.file_system
@property
def environment(self):
return self.environments[-1]
@property
def delayexpand(self):
return self.delayexpands[-1]
@property
def ext_setting(self):
return self.ext_settings[-1]
Classes
class ParserConfig (uncaret=True, unquote=False, toupper=False, stop_letters=b' \t\x0b', stop_strings=())-
ParserConfig(uncaret, unquote, toupper, stop_letters, stop_strings)
Expand source code Browse git
class ParserConfig(NamedTuple): uncaret: bool = True unquote: bool = False toupper: bool = False stop_letters: bytes = WHITESPACE stop_strings: tuple[bytes, ...] = ()Ancestors
- builtins.tuple
Instance variables
var uncaret-
Alias for field number 0
Expand source code Browse git
class ParserConfig(NamedTuple): uncaret: bool = True unquote: bool = False toupper: bool = False stop_letters: bytes = WHITESPACE stop_strings: tuple[bytes, ...] = () var unquote-
Alias for field number 1
Expand source code Browse git
class ParserConfig(NamedTuple): uncaret: bool = True unquote: bool = False toupper: bool = False stop_letters: bytes = WHITESPACE stop_strings: tuple[bytes, ...] = () var toupper-
Alias for field number 2
Expand source code Browse git
class ParserConfig(NamedTuple): uncaret: bool = True unquote: bool = False toupper: bool = False stop_letters: bytes = WHITESPACE stop_strings: tuple[bytes, ...] = () var stop_letters-
Alias for field number 3
Expand source code Browse git
class ParserConfig(NamedTuple): uncaret: bool = True unquote: bool = False toupper: bool = False stop_letters: bytes = WHITESPACE stop_strings: tuple[bytes, ...] = () var stop_strings-
Alias for field number 4
Expand source code Browse git
class ParserConfig(NamedTuple): uncaret: bool = True unquote: bool = False toupper: bool = False stop_letters: bytes = WHITESPACE stop_strings: tuple[bytes, ...] = ()
class BatchState (delayed_expansion=False, extensions_enabled=True, extensions_version=2, environment=None, file_system=None, username='Administrator', hostname=None, now=None, cwd='C:\\')-
Expand source code Browse git
class BatchState: name: str args: list[str] environments: list[dict[str, str]] delayexpands: list[bool] ext_settings: list[bool] file_system: dict[str, str] def __init__( self, delayed_expansion: bool = False, extensions_enabled: bool = True, extensions_version: int = 2, environment: dict | None = None, file_system: dict | None = None, username: str = 'Administrator', hostname: str | None = None, now: int | float | str | datetime | None = None, cwd: str = 'C:\\', ): self.delayed_expansion = delayed_expansion self.extensions_version = extensions_version self.extensions_enabled = extensions_enabled self.file_system_seed = file_system or {} self.environment_seed = environment or {} if hostname is None: hostname = str(uuid4()) for key, value in _DEFAULT_ENVIRONMENT.items(): self.environment_seed.setdefault( key.upper(), value.format(h=hostname, u=username) ) if isinstance(now, str): now = isodate(now) if isinstance(now, (int, float)): now = date_from_timestamp(now) if now is None: now = datetime.now() self.cwd = cwd self.now = now self.hostname = hostname self.username = username seed(self.now.timestamp()) self.reset() @property def cwd(self): return self._cwd @cwd.setter def cwd(self, new: str): new = new.replace('/', '\\') if not new.endswith('\\'): new = F'{new}\\' if not ntpath.isabs(new): new = ntpath.join(self.cwd, new) if not ntpath.isabs(new): raise ValueError(F'Invalid absolute path: {new}') self._cwd = ntpath.normcase(ntpath.normpath(new)) @property def ec(self) -> int: return self.errorlevel @ec.setter def ec(self, value: int | None): ec = value or 0 self.environment['ERRORLEVEL'] = str(ec) self.errorlevel = ec def reset(self): self.labels = {} self.environments = [dict(self.environment_seed)] self.delayexpands = [self.delayed_expansion] self.ext_settings = [self.extensions_enabled] self.file_system = dict(self.file_system_seed) self.dirstack = [] self.linebreaks = [] self.name = F'{uuid4()}.bat' self.args = [] self._cmd = '' self.ec = None @property def command_line(self): return self._cmd @command_line.setter def command_line(self, value: str): self._cmd = value self.args = value.split() def envar(self, name: str) -> str: name = name.upper() if name in (e := self.environment): return e[name] elif name == 'DATE': return self.now.strftime('%Y-%m-%d') elif name == 'TIME': time = self.now.strftime('%M:%S,%f') return F'{self.now.hour:2d}:{time:.8}' elif name == 'RANDOM': return str(randrange(0, 32767)) elif name == 'ERRORLEVEL': return str(self.ec) elif name == 'CD': return self.cwd elif name == 'CMDCMDLINE': line = self.envar('COMSPEC') if args := self.args: args = ' '.join(args) line = F'{line} /c "{args}"' return line elif name == 'CMDEXTVERSION': return str(self.extensions_version) elif name == 'HIGHESTNUMANODENUMBER': return '0' else: return '' def _resolved(self, path: str) -> str: if not ntpath.isabs(path): path = F'{self.cwd}{path}' return ntpath.normcase(ntpath.normpath(path)) def create_file(self, path: str, data: str = ''): self.file_system[self._resolved(path)] = data def append_file(self, path: str, data: str): path = self._resolved(path) if left := self.file_system.get(path, None): data = F'{left}{data}' self.file_system[path] = data def remove_file(self, path: str): self.file_system.pop(self._resolved(path), None) def ingest_file(self, path: str) -> str | None: return self.file_system.get(self._resolved(path)) def exists_file(self, path: str) -> bool: return self._resolved(path) in self.file_system @property def environment(self): return self.environments[-1] @property def delayexpand(self): return self.delayexpands[-1] @property def ext_setting(self): return self.ext_settings[-1]Class variables
var name-
The type of the None singleton.
var args-
The type of the None singleton.
var environments-
The type of the None singleton.
var delayexpands-
The type of the None singleton.
var ext_settings-
The type of the None singleton.
var file_system-
The type of the None singleton.
Instance variables
var cwd-
Expand source code Browse git
@property def cwd(self): return self._cwd var ec-
Expand source code Browse git
@property def ec(self) -> int: return self.errorlevel var command_line-
Expand source code Browse git
@property def command_line(self): return self._cmd var environment-
Expand source code Browse git
@property def environment(self): return self.environments[-1] var delayexpand-
Expand source code Browse git
@property def delayexpand(self): return self.delayexpands[-1] var ext_setting-
Expand source code Browse git
@property def ext_setting(self): return self.ext_settings[-1]
Methods
def reset(self)-
Expand source code Browse git
def reset(self): self.labels = {} self.environments = [dict(self.environment_seed)] self.delayexpands = [self.delayed_expansion] self.ext_settings = [self.extensions_enabled] self.file_system = dict(self.file_system_seed) self.dirstack = [] self.linebreaks = [] self.name = F'{uuid4()}.bat' self.args = [] self._cmd = '' self.ec = None def envar(self, name)-
Expand source code Browse git
def envar(self, name: str) -> str: name = name.upper() if name in (e := self.environment): return e[name] elif name == 'DATE': return self.now.strftime('%Y-%m-%d') elif name == 'TIME': time = self.now.strftime('%M:%S,%f') return F'{self.now.hour:2d}:{time:.8}' elif name == 'RANDOM': return str(randrange(0, 32767)) elif name == 'ERRORLEVEL': return str(self.ec) elif name == 'CD': return self.cwd elif name == 'CMDCMDLINE': line = self.envar('COMSPEC') if args := self.args: args = ' '.join(args) line = F'{line} /c "{args}"' return line elif name == 'CMDEXTVERSION': return str(self.extensions_version) elif name == 'HIGHESTNUMANODENUMBER': return '0' else: return '' def create_file(self, path, data='')-
Expand source code Browse git
def create_file(self, path: str, data: str = ''): self.file_system[self._resolved(path)] = data def append_file(self, path, data)-
Expand source code Browse git
def append_file(self, path: str, data: str): path = self._resolved(path) if left := self.file_system.get(path, None): data = F'{left}{data}' self.file_system[path] = data def remove_file(self, path)-
Expand source code Browse git
def remove_file(self, path: str): self.file_system.pop(self._resolved(path), None) def ingest_file(self, path)-
Expand source code Browse git
def ingest_file(self, path: str) -> str | None: return self.file_system.get(self._resolved(path)) def exists_file(self, path)-
Expand source code Browse git
def exists_file(self, path: str) -> bool: return self._resolved(path) in self.file_system