Module refinery.lib.batch.state
Expand source code Browse git
from __future__ import annotations
import ntpath
from datetime import datetime
from enum import Enum
from random import randrange, seed
from uuid import uuid4
from refinery.lib.batch.model import MissingVariable
from refinery.lib.dt import date_from_timestamp, isodate
class ErrorZero(int, Enum):
Val = 0
def __bool__(self):
return True
def __str__(self):
return '0'
__repr__ = __str__
class RetainVariable(str, Enum):
Val = ''
_DEFAULT_ENVIRONMENT = {
'ALLUSERSPROFILE' : r'C:\ProgramData',
'APPDATA' : r'C:\Users\{u}\AppData\Roaming',
'CommonProgramFiles' : r'C:\Program Files\Common Files',
'CommonProgramFiles(x86)' : r'C:\Program Files (x86)\Common Files',
'CommonProgramW6432' : r'C:\Program Files\Common Files',
'COMPUTERNAME' : r'{h}',
'ComSpec' : r'C:\WINDOWS\system32\cmd.exe',
'DriverData' : r'C:\Windows\System32\Drivers\DriverData',
'HOMEDRIVE' : r'C:',
'HOMEPATH' : r'\Users\{u}',
'LOCALAPPDATA' : r'C:\Users\{u}\AppData\Local',
'LOGONSERVER' : r'\\{h}',
'NUMBER_OF_PROCESSORS' : r'16',
'OneDrive' : r'C:\Users\{u}\OneDrive',
'OS' : r'Windows_NT',
'PATHEXT' : r'.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC',
'PROCESSOR_ARCHITECTURE' : r'AMD64',
'PROCESSOR_IDENTIFIER' : r'Intel64 Family 6 Model 158 Stepping 10, GenuineIntel',
'PROCESSOR_LEVEL' : r'6',
'PROCESSOR_REVISION' : r'99ff',
'ProgramData' : r'C:\ProgramData',
'ProgramW6432' : r'C:\Program Files',
'ProgramFiles' : r'C:\Program Files',
'ProgramFiles(x86)' : r'C:\Program Files (x86)',
'PUBLIC' : r'C:\Users\Public',
'SESSIONNAME' : r'Console',
'SystemDrive' : r'C:',
'SystemRoot' : r'C:\WINDOWS',
'TEMP' : r'C:\Users\{u}\AppData\Local\Temp',
'TMP' : r'C:\Users\{u}\AppData\Local\Temp',
'USERDOMAIN' : r'{h}',
'USERDOMAIN_ROAMINGPROFILE' : r'{h}',
'USERNAME' : r'{u}',
'USERPROFILE' : r'C:\Users\{u}',
'WINDIR' : r'C:\WINDOWS',
'PATH': ';'.join(
[
r'C:\Windows',
r'C:\Windows\System32',
r'C:\Windows\System32\Wbem',
r'C:\Windows\System32\WindowsPowerShell\v1.0\\',
r'C:\Windows\System32\OpenSSH\\',
r'C:\Program Files\dotnet\\',
]
),
}
class BatchState:
name: str
args: list[str]
now: datetime
start_time: datetime
environment_stack: list[dict[str, str | RetainVariable]]
delayexpand_stack: list[bool]
cmdextended_stack: list[bool]
_for_loops: list[dict[str, str]]
file_system: dict[str, str]
def __init__(
self,
delayexpand: bool = False,
extensions_enabled: bool = True,
extensions_version: int = 2,
environment: dict | None = None,
file_system: dict | None = None,
username: str = 'Administrator',
hostname: str | None = None,
now: int | float | str | datetime | None = None,
cwd: str = 'C:\\',
filename: str | None = '',
echo: bool = True,
codec: str = 'cp1252',
):
self.extensions_version = extensions_version
file_system = file_system or {}
environment = environment or {}
if hostname is None:
hostname = str(uuid4())
for key, value in _DEFAULT_ENVIRONMENT.items():
environment.setdefault(
key.upper(),
value.format(h=hostname, u=username)
)
if isinstance(now, str):
now = isodate(now)
if isinstance(now, (int, float)):
now = date_from_timestamp(now)
if now is None:
now = datetime.now()
self.cwd = cwd
self.now = now
self.start_time = now
seed(self.now.timestamp())
self.hostname = hostname
self.username = username
self.labels = {}
self._for_loops = []
self.environment_stack = [environment]
self.delayexpand_stack = [delayexpand]
self.cmdextended_stack = [extensions_enabled]
self.file_system = file_system
self.dirstack = []
self.linebreaks = []
self.name = filename or F'{uuid4()}.bat'
self.args = []
self._cmd = ''
self.ec = None
self.echo = echo
self.codec = codec
@property
def cwd(self):
return self._cwd
@cwd.setter
def cwd(self, new: str):
new = new.replace('/', '\\')
if not new.endswith('\\'):
new = F'{new}\\'
if not ntpath.isabs(new):
new = ntpath.join(self.cwd, new)
if not ntpath.isabs(new):
raise ValueError(F'Invalid absolute path: {new}')
self._cwd = ntpath.normcase(ntpath.normpath(new))
@property
def ec(self) -> int | ErrorZero:
return self.errorlevel
@ec.setter
def ec(self, value: int | ErrorZero | None):
ec = value or 0
self.environment['ERRORLEVEL'] = str(ec)
self.errorlevel = ec
@property
def command_line(self):
return self._cmd
@command_line.setter
def command_line(self, value: str):
self._cmd = value
self.args = value.split()
def envar(self, name: str, default: str | None = None) -> str | RetainVariable:
name = name.upper()
if name in (e := self.environment):
return e[name]
elif name == 'DATE':
return self.now.strftime('%Y-%m-%d')
elif name == 'TIME':
time = self.now.strftime('%M:%S,%f')
return F'{self.now.hour:2d}:{time:.8}'
elif name == 'RANDOM':
return str(randrange(0, 32767))
elif name == 'ERRORLEVEL':
return str(self.ec)
elif name == 'CD':
return self.cwd
elif name == 'CMDCMDLINE':
line = self.envar('COMSPEC', 'cmd.exe')
if args := self.args:
args = ' '.join(args)
line = F'{line} /c "{args}"'
return line
elif name == 'CMDEXTVERSION':
return str(self.extensions_version)
elif name == 'HIGHESTNUMANODENUMBER':
return '0'
elif default is not None:
return default
else:
raise MissingVariable
def resolve_path(self, path: str) -> str:
if not ntpath.isabs(path):
path = F'{self.cwd}{path}'
return ntpath.normcase(ntpath.normpath(path))
def create_file(self, path: str, data: str = ''):
self.file_system[self.resolve_path(path)] = data
def append_file(self, path: str, data: str):
path = self.resolve_path(path)
if left := self.file_system.get(path, None):
data = F'{left}{data}'
self.file_system[path] = data
def remove_file(self, path: str):
self.file_system.pop(self.resolve_path(path), None)
def ingest_file(self, path: str) -> str | None:
return self.file_system.get(self.resolve_path(path))
def exists_file(self, path: str) -> bool:
return self.resolve_path(path) in self.file_system
def sizeof_file(self, path: str) -> int:
if data := self.ingest_file(path):
return len(data)
return -1
def new_forloop(self) -> dict[str, str]:
new = {}
old = self.for_loop_variables
if old is not None:
new.update(old)
self._for_loops.append(new)
return new
def end_forloop(self):
self._for_loops.pop()
@property
def environment(self):
return self.environment_stack[-1]
@property
def delayexpand(self):
return self.delayexpand_stack[-1]
@delayexpand.setter
def delayexpand(self, v):
self.delayexpand_stack[-1] = v
@property
def cmdextended(self):
return self.cmdextended_stack[-1]
@cmdextended.setter
def cmdextended(self, v):
self.cmdextended_stack[-1] = v
@property
def for_loop_variables(self):
if vars := self._for_loops:
return vars[-1]
else:
return None
Classes
class ErrorZero (*args, **kwds)-
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating-point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0) 4Expand source code Browse git
class ErrorZero(int, Enum): Val = 0 def __bool__(self): return True def __str__(self): return '0' __repr__ = __str__Ancestors
- builtins.int
- enum.Enum
Class variables
var Val-
The type of the None singleton.
class RetainVariable (*args, **kwds)-
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
Expand source code Browse git
class RetainVariable(str, Enum): Val = ''Ancestors
- builtins.str
- enum.Enum
Class variables
var Val-
The type of the None singleton.
class BatchState (delayexpand=False, extensions_enabled=True, extensions_version=2, environment=None, file_system=None, username='Administrator', hostname=None, now=None, cwd='C:\\', filename='', echo=True, codec='cp1252')-
Expand source code Browse git
class BatchState: name: str args: list[str] now: datetime start_time: datetime environment_stack: list[dict[str, str | RetainVariable]] delayexpand_stack: list[bool] cmdextended_stack: list[bool] _for_loops: list[dict[str, str]] file_system: dict[str, str] def __init__( self, delayexpand: bool = False, extensions_enabled: bool = True, extensions_version: int = 2, environment: dict | None = None, file_system: dict | None = None, username: str = 'Administrator', hostname: str | None = None, now: int | float | str | datetime | None = None, cwd: str = 'C:\\', filename: str | None = '', echo: bool = True, codec: str = 'cp1252', ): self.extensions_version = extensions_version file_system = file_system or {} environment = environment or {} if hostname is None: hostname = str(uuid4()) for key, value in _DEFAULT_ENVIRONMENT.items(): environment.setdefault( key.upper(), value.format(h=hostname, u=username) ) if isinstance(now, str): now = isodate(now) if isinstance(now, (int, float)): now = date_from_timestamp(now) if now is None: now = datetime.now() self.cwd = cwd self.now = now self.start_time = now seed(self.now.timestamp()) self.hostname = hostname self.username = username self.labels = {} self._for_loops = [] self.environment_stack = [environment] self.delayexpand_stack = [delayexpand] self.cmdextended_stack = [extensions_enabled] self.file_system = file_system self.dirstack = [] self.linebreaks = [] self.name = filename or F'{uuid4()}.bat' self.args = [] self._cmd = '' self.ec = None self.echo = echo self.codec = codec @property def cwd(self): return self._cwd @cwd.setter def cwd(self, new: str): new = new.replace('/', '\\') if not new.endswith('\\'): new = F'{new}\\' if not ntpath.isabs(new): new = ntpath.join(self.cwd, new) if not ntpath.isabs(new): raise ValueError(F'Invalid absolute path: {new}') self._cwd = ntpath.normcase(ntpath.normpath(new)) @property def ec(self) -> int | ErrorZero: return self.errorlevel @ec.setter def ec(self, value: int | ErrorZero | None): ec = value or 0 self.environment['ERRORLEVEL'] = str(ec) self.errorlevel = ec @property def command_line(self): return self._cmd @command_line.setter def command_line(self, value: str): self._cmd = value self.args = value.split() def envar(self, name: str, default: str | None = None) -> str | RetainVariable: name = name.upper() if name in (e := self.environment): return e[name] elif name == 'DATE': return self.now.strftime('%Y-%m-%d') elif name == 'TIME': time = self.now.strftime('%M:%S,%f') return F'{self.now.hour:2d}:{time:.8}' elif name == 'RANDOM': return str(randrange(0, 32767)) elif name == 'ERRORLEVEL': return str(self.ec) elif name == 'CD': return self.cwd elif name == 'CMDCMDLINE': line = self.envar('COMSPEC', 'cmd.exe') if args := self.args: args = ' '.join(args) line = F'{line} /c "{args}"' return line elif name == 'CMDEXTVERSION': return str(self.extensions_version) elif name == 'HIGHESTNUMANODENUMBER': return '0' elif default is not None: return default else: raise MissingVariable def resolve_path(self, path: str) -> str: if not ntpath.isabs(path): path = F'{self.cwd}{path}' return ntpath.normcase(ntpath.normpath(path)) def create_file(self, path: str, data: str = ''): self.file_system[self.resolve_path(path)] = data def append_file(self, path: str, data: str): path = self.resolve_path(path) if left := self.file_system.get(path, None): data = F'{left}{data}' self.file_system[path] = data def remove_file(self, path: str): self.file_system.pop(self.resolve_path(path), None) def ingest_file(self, path: str) -> str | None: return self.file_system.get(self.resolve_path(path)) def exists_file(self, path: str) -> bool: return self.resolve_path(path) in self.file_system def sizeof_file(self, path: str) -> int: if data := self.ingest_file(path): return len(data) return -1 def new_forloop(self) -> dict[str, str]: new = {} old = self.for_loop_variables if old is not None: new.update(old) self._for_loops.append(new) return new def end_forloop(self): self._for_loops.pop() @property def environment(self): return self.environment_stack[-1] @property def delayexpand(self): return self.delayexpand_stack[-1] @delayexpand.setter def delayexpand(self, v): self.delayexpand_stack[-1] = v @property def cmdextended(self): return self.cmdextended_stack[-1] @cmdextended.setter def cmdextended(self, v): self.cmdextended_stack[-1] = v @property def for_loop_variables(self): if vars := self._for_loops: return vars[-1] else: return NoneClass variables
var name-
The type of the None singleton.
var args-
The type of the None singleton.
var now-
The type of the None singleton.
var start_time-
The type of the None singleton.
var environment_stack-
The type of the None singleton.
var delayexpand_stack-
The type of the None singleton.
var cmdextended_stack-
The type of the None singleton.
var file_system-
The type of the None singleton.
Instance variables
var cwd-
Expand source code Browse git
@property def cwd(self): return self._cwd var ec-
Expand source code Browse git
@property def ec(self) -> int | ErrorZero: return self.errorlevel var command_line-
Expand source code Browse git
@property def command_line(self): return self._cmd var environment-
Expand source code Browse git
@property def environment(self): return self.environment_stack[-1] var delayexpand-
Expand source code Browse git
@property def delayexpand(self): return self.delayexpand_stack[-1] var cmdextended-
Expand source code Browse git
@property def cmdextended(self): return self.cmdextended_stack[-1] var for_loop_variables-
Expand source code Browse git
@property def for_loop_variables(self): if vars := self._for_loops: return vars[-1] else: return None
Methods
def envar(self, name, default=None)-
Expand source code Browse git
def envar(self, name: str, default: str | None = None) -> str | RetainVariable: name = name.upper() if name in (e := self.environment): return e[name] elif name == 'DATE': return self.now.strftime('%Y-%m-%d') elif name == 'TIME': time = self.now.strftime('%M:%S,%f') return F'{self.now.hour:2d}:{time:.8}' elif name == 'RANDOM': return str(randrange(0, 32767)) elif name == 'ERRORLEVEL': return str(self.ec) elif name == 'CD': return self.cwd elif name == 'CMDCMDLINE': line = self.envar('COMSPEC', 'cmd.exe') if args := self.args: args = ' '.join(args) line = F'{line} /c "{args}"' return line elif name == 'CMDEXTVERSION': return str(self.extensions_version) elif name == 'HIGHESTNUMANODENUMBER': return '0' elif default is not None: return default else: raise MissingVariable def resolve_path(self, path)-
Expand source code Browse git
def resolve_path(self, path: str) -> str: if not ntpath.isabs(path): path = F'{self.cwd}{path}' return ntpath.normcase(ntpath.normpath(path)) def create_file(self, path, data='')-
Expand source code Browse git
def create_file(self, path: str, data: str = ''): self.file_system[self.resolve_path(path)] = data def append_file(self, path, data)-
Expand source code Browse git
def append_file(self, path: str, data: str): path = self.resolve_path(path) if left := self.file_system.get(path, None): data = F'{left}{data}' self.file_system[path] = data def remove_file(self, path)-
Expand source code Browse git
def remove_file(self, path: str): self.file_system.pop(self.resolve_path(path), None) def ingest_file(self, path)-
Expand source code Browse git
def ingest_file(self, path: str) -> str | None: return self.file_system.get(self.resolve_path(path)) def exists_file(self, path)-
Expand source code Browse git
def exists_file(self, path: str) -> bool: return self.resolve_path(path) in self.file_system def sizeof_file(self, path)-
Expand source code Browse git
def sizeof_file(self, path: str) -> int: if data := self.ingest_file(path): return len(data) return -1 def new_forloop(self)-
Expand source code Browse git
def new_forloop(self) -> dict[str, str]: new = {} old = self.for_loop_variables if old is not None: new.update(old) self._for_loops.append(new) return new def end_forloop(self)-
Expand source code Browse git
def end_forloop(self): self._for_loops.pop()