Module refinery.lib.batch.emulator
Expand source code Browse git
from __future__ import annotations
import itertools
import ntpath
import re
import uuid
from dataclasses import dataclass, field, fields
from enum import Enum
from io import StringIO
from typing import Callable, ClassVar, Generator, Iterable, TypeVar
from refinery.lib.batch.help import HelpOutput
from refinery.lib.batch.model import (
AbortExecution,
ArgVarFlags,
AstCondition,
AstError,
AstFor,
AstForParserMode,
AstForVariant,
AstGroup,
AstIf,
AstIfCmp,
AstIfVariant,
AstLabel,
AstNode,
AstPipeline,
AstSequence,
AstStatement,
Ctrl,
EmulatorException,
Exit,
Goto,
InputLocked,
InvalidLabel,
MissingVariable,
)
from refinery.lib.batch.parser import BatchParser
from refinery.lib.batch.state import BatchState, ErrorZero
from refinery.lib.batch.synth import SynCommand, SynNodeBase, synthesize
from refinery.lib.batch.util import batchint, uncaret, unquote
from refinery.lib.deobfuscation import cautious_parse, names_in_expression
from refinery.lib.patterns import indicators
from refinery.lib.types import buf
_T = TypeVar('_T')
def winfnmatch(pattern: str, path: str, cwd: str):
"""
A function similar to the fnmatch module, but using only Windows wildcards. In Batch, the
bracket wildcard does not exist.
"""
parts = re.split('([*?])', pattern)
regex = StringIO()
it = iter(parts)
verbatim = next(it)
for wildcard in it:
regex.write(re.escape(verbatim))
if wildcard == '*':
regex.write('.*')
if wildcard == '?':
regex.write('.')
verbatim = next(it)
regex.write(re.escape(verbatim))
cwd = re.escape(cwd.rstrip('\\'))
pattern = rF'(?s:{cwd}\\{regex.getvalue()})$'
return bool(re.match(pattern, path))
def _fuse(*iters):
with StringIO() as io:
for it in iters:
if isinstance(it, str):
io.write(it)
continue
for i in it:
io.write(i)
return io.getvalue()
def _onoff(v: str) -> bool:
vc = v.upper()
if vc == 'ON':
return True
if vc == 'OFF':
return False
raise ValueError(v)
class DevNull:
def getvalue(self):
return ''
def __iter__(self):
return self
def __next__(self):
raise StopIteration
def detach(self):
raise NotImplementedError
def readline(self, size: int = -1, /) -> str:
return ''
def read(self, size: int | None = -1, /) -> str:
return ''
def write(self, s: str, /) -> int:
return len(s)
def seek(self, k: int, whence: int = 0, /):
return
@property
def closed(self):
return True
class Error(str):
pass
ErrorCannotFindFile = Error('The system cannot find the file specified.')
@dataclass
class IO:
i: DevNull | StringIO = field(default_factory=StringIO)
o: DevNull | StringIO = field(default_factory=StringIO)
e: DevNull | StringIO = field(default_factory=StringIO)
def __iter__(self):
yield self.i
yield self.o
yield self.e
def __setitem__(self, k, v):
if k == 0:
self.i = v
elif k == 1:
self.o = v
elif k == 2:
self.e = v
else:
raise IndexError(k)
def __getitem__(self, k):
if k == 0:
return self.i
elif k == 1:
return self.o
elif k == 2:
return self.e
else:
raise IndexError(k)
@dataclass
class BatchEmulatorConfig:
show_nops: bool = False
show_junk: bool = False
show_labels: bool = False
show_sets: bool = False
show_comments: bool = False
skip_goto: bool = False
skip_call: bool = False
skip_exit: bool = False
class BatchEmulator:
class _node:
handlers: ClassVar[dict[
type[AstNode],
Callable[[
BatchEmulator,
AstNode,
IO,
bool,
], Generator[SynNodeBase[AstNode] | Error]]
]] = {}
def __init__(self, key: type[AstNode]):
self.key = key
def __call__(self, handler):
self.handlers[self.key] = handler
return handler
class _command:
handlers: ClassVar[dict[
str,
Callable[[
BatchEmulator,
SynCommand,
IO,
bool,
], Generator[str, None, int | None] | int | ErrorZero | None]
]] = {}
def __init__(self, key: str):
self.key = key.upper()
def __call__(self, handler):
self.handlers[self.key] = handler
return handler
def __init__(
self,
data: str | buf | BatchParser,
state: BatchState | None = None,
cfg: BatchEmulatorConfig | None = None,
std: IO | None = None,
):
self.stack = []
self.parser = BatchParser(data, state)
self.std = std or IO()
self.cfg = cfg or BatchEmulatorConfig()
self.block_labels = set()
def spawn(self, data: str | buf | BatchParser, state: BatchState | None = None, std: IO | None = None):
return BatchEmulator(
data,
state,
self.cfg,
std,
)
@property
def state(self):
return self.parser.state
@property
def environment(self):
return self.state.environment
@property
def delayexpand(self):
return self.state.delayexpand
def clone_state(
self,
delayexpand: bool | None = None,
cmdextended: bool | None = None,
environment: dict | None | ellipsis = ...,
filename: str | None = None,
):
state = self.state
if delayexpand is None:
delayexpand = False
if cmdextended is None:
cmdextended = state.cmdextended
if environment is ...:
environment = dict(state.environment)
return BatchState(
delayexpand,
cmdextended,
environment=environment,
file_system=state.file_system,
username=state.username,
hostname=state.hostname,
now=state.now,
cwd=state.cwd,
filename=filename,
)
def get_for_variable_regex(self, vars: Iterable[str]):
return re.compile(RF'%((?:~[fdpnxsatz]*)?)((?:\\$\\w+)?)([{"".join(vars)}])')
def expand_delayed_variables(self, block: str):
def expansion(match: re.Match[str]):
name = match.group(1)
try:
return parse(name)
except MissingVariable:
_, _, rest = name.partition(':')
return rest
parse = self.parser.lexer.parse_env_variable
return re.sub(r'!([^!\n]*)!', expansion, block)
def expand_forloop_variables(self, block: str, vars: dict[str, str] | None):
def expansion(match: re.Match[str]):
flags = ArgVarFlags.Empty
for flag in match[1]:
flags |= ArgVarFlags.FromToken(ord(flag))
return _vars[match[3]]
if not vars:
return block
_vars = vars
return self.get_for_variable_regex(vars).sub(expansion, block)
def contains_for_variable(self, ast: AstNode, vars: Iterable[str]):
def check(token):
if isinstance(token, list):
return any(check(v) for v in token)
if isinstance(token, dict):
return any(check(v) for v in token.values())
if isinstance(token, Enum):
return False
if isinstance(token, str):
return bool(checker(token))
if isinstance(token, AstNode):
for tf in fields(token):
if tf.name == 'parent':
continue
if check(getattr(token, tf.name)):
return True
return False
checker = self.get_for_variable_regex(vars).search
return check(ast) # type:ignore
def expand_ast_node(self, ast: _T) -> _T:
def expand(token):
if isinstance(token, list):
return [expand(v) for v in token]
if isinstance(token, dict):
return {k: expand(v) for k, v in token.items()}
if isinstance(token, Enum):
return token
if isinstance(token, str):
if delayexpand:
token = self.expand_delayed_variables(token)
return self.expand_forloop_variables(token, variables)
if isinstance(token, AstNode):
new = {}
for tf in fields(token):
value = getattr(token, tf.name)
if tf.name != 'parent':
value = expand(value)
new[tf.name] = value
return token.__class__(**new)
return token
delayexpand = self.delayexpand
variables = self.state.for_loop_variables
if not variables and not delayexpand:
return ast
return expand(ast) # type:ignore
def execute_find_or_findstr(self, cmd: SynCommand, std: IO, findstr: bool):
needles = []
paths: list[str | ellipsis] = [...]
flags = {}
it = iter(cmd.args)
arg = None
yield cmd
for arg in it:
if not arg.startswith('/'):
if not findstr and not arg.startswith('"'):
return 1
needles.extend(unquote(arg).split())
break
name, has_param, value = arg[1:].partition(':')
name = name.upper()
if name in ('OFF', 'OFFLINE'):
continue
elif len(name) > 1:
return 1
elif name == 'C':
needles.append(unquote(value))
elif name == 'F' and findstr:
if (p := self.state.ingest_file(value)) is None:
return 1
paths.extend(p.splitlines(False))
elif name == 'G' and findstr:
if (n := self.state.ingest_file(value)) is None:
return 1
needles.extend(n.splitlines(False))
elif has_param:
flags[name] = value
else:
flags[name] = True
valid_flags = 'VNI'
if findstr:
valid_flags += 'BELRSXMOPADQ'
for v in flags:
if v not in valid_flags:
return 1
prefix_filename = False
state = self.state
for arg in it:
pattern = unquote(arg)
if '*' in pattern or '?' in pattern:
prefix_filename = True
for path in state.file_system:
if winfnmatch(path, pattern, state.cwd):
paths.append(path)
else:
paths.append(pattern)
if len(paths) > 1:
prefix_filename = True
for n, needle in enumerate(needles):
if not findstr or 'L' in flags:
needle = re.escape(needle)
if 'X' in flags:
needle = F'^{needle}$'
elif 'B' in flags:
needle = F'^{needle}'
elif 'E' in flags:
needle = F'{needle}$'
needles[n] = needle
_V = 'V' in flags # noqa; Prints only lines that do not contain a match.
_P = 'P' in flags # noqa; Skip files with non-printable characters.
_O = 'O' in flags # noqa; Prints character offset before each matching line.
_N = 'N' in flags # noqa; Prints the line number before each line that matches.
_M = 'M' in flags # noqa; Prints only the filename if a file contains a match.
nothing_found = True
offset = 0
for path in paths:
if path is (...):
data = std.i.read()
else:
data = state.ingest_file(path)
if data is None:
return 1
if _P and not re.fullmatch('[\\s!-~]+', data):
continue
for n, line in enumerate(data.splitlines(True), 1):
for needle in needles:
hit = re.search(needle, line)
if _V == bool(hit):
continue
nothing_found = False
if not _M:
if _O:
o = offset + (hit.start() if hit else 0)
line = F'{o}:{line}'
if _N:
line = F'{n}:{line}'
if prefix_filename:
line = F'{path}:{line}'
std.o.write(line)
elif path is not (...):
std.o.write(path)
break
offset += len(line)
return int(nothing_found)
@_command('TYPE')
def execute_type(self, cmd: SynCommand, std: IO, *_):
path = cmd.argument_string.strip()
data = self.state.ingest_file(path)
if data is None:
yield ErrorCannotFindFile
return 1
else:
std.o.write(data)
return 0
@_command('FIND')
def execute_find(self, cmd: SynCommand, std: IO, *_):
return self.execute_find_or_findstr(cmd, std, findstr=False)
@_command('FINDSTR')
def execute_findstr(self, cmd: SynCommand, std: IO, *_):
return self.execute_find_or_findstr(cmd, std, findstr=True)
@_command('SET')
def execute_set(self, cmd: SynCommand, std: IO, *_):
if not (args := cmd.args):
raise EmulatorException('Empty SET instruction')
if cmd.verb.upper() != 'SET':
raise RuntimeError
# Since variables can be used in GOTO, a SET can be used to change the behavior of a GOTO.
self.block_labels.clear()
arithmetic = False
quote_mode = False
prompt = None
it = iter(args)
tk = next(it)
if tk.upper() == '/P':
if std.i.closed:
prompt = ''
elif not (prompt := std.i.readline()).endswith('\n'):
raise InputLocked
else:
prompt = prompt.rstrip('\r\n')
tk = next(it)
else:
cmd.junk = not self.cfg.show_sets
yield cmd
if tk.upper() == '/A':
arithmetic = True
try:
tk = next(it)
except StopIteration:
tk = ''
args = [tk, *it, *cmd.trailing_spaces]
if arithmetic:
def defang(s: str):
def r(m: re.Match[str]):
return F'_{prefix}{ord(m[0]):X}_'
return re.sub(r'[^-\s()!~*/%+><&^|_\w]', r, s)
def refang(s: str): # noqa
def r(m: re.Match[str]):
return chr(int(m[1], 16))
return re.sub(rf'_{prefix}([A-F0-9]+)_', r, s)
prefix = F'{uuid.uuid4().time_mid:X}'
namespace = {}
translate = {}
value = None
if not (program := ''.join(args)):
std.e.write('The syntax of the command is incorrect.\r\n')
return ErrorZero.Val
for assignment in program.split(','):
assignment = assignment.strip()
if not assignment:
std.e.write('Missing operand.\r\n')
return ErrorZero.Val
name, operator, definition = re.split(r'([*+^|/%-&]|<<|>>|)=', assignment, maxsplit=1)
name = name.upper()
definition = re.sub(r'\b0([0-7]+)\b', r'0o\1', definition)
if operator:
definition = F'{name}{operator}({definition})'
definition = defang(definition)
expression = cautious_parse(definition)
names = names_in_expression(expression)
if names.stored or names.others:
raise EmulatorException('Arithmetic SET had unexpected variable access.')
for var in names.loaded:
original = refang(name).upper()
translate[original] = var
if var in namespace:
continue
try:
namespace[var] = batchint(self.environment[original])
except (KeyError, ValueError):
namespace[var] = 0
code = compile(expression, filename='[ast]', mode='eval')
value = eval(code, namespace, {})
self.environment[name] = str(value)
namespace[defang(name)] = value
if value is None:
std.e.write('The syntax of the command is incorrect.')
return
else:
std.o.write(F'{value!s}\r\n')
else:
try:
eq = args.index(Ctrl.Equals)
except ValueError:
assignment = cmd.argument_string
if assignment.startswith('"'):
quote_mode = True
assignment, _, unquoted = assignment[1:].rpartition('"')
assignment = assignment or unquoted
else:
assignment = ''.join(args)
name, _, content = assignment.partition('=')
else:
with StringIO() as io:
for k in range(eq + 1, len(args)):
io.write(args[k])
content = io.getvalue()
name = cmd.args[eq - 1] if eq else ''
name = name.upper()
trailing_caret, content = uncaret(content, quote_mode)
if trailing_caret:
content = content[:-1]
if prompt is not None:
if (qc := content.strip()).startswith('"'):
_, _, qc = qc. partition('"') # noqa
qc, _, r = qc.rpartition('"') # noqa
content = qc or r
std.o.write(content)
content = prompt
if name:
if content:
self.environment[name] = content
else:
self.environment.pop(name, None)
@_command('CALL')
def execute_call(self, cmd: SynCommand, std: IO, *_):
cmdl = cmd.argument_string
empty, colon, label = cmdl.partition(':')
if colon and not empty:
try:
offset = self.parser.lexer.labels[label.upper()]
except KeyError as KE:
raise InvalidLabel(label) from KE
emu = self.spawn(self.parser, std=std)
else:
offset = 0
path = cmdl.strip()
code = self.state.ingest_file(path)
if code is None:
yield cmd
return
state = self.clone_state(environment=self.state.environment, filename=path)
emu = self.spawn(code, std=std, state=state)
if self.cfg.skip_call:
emu.execute(called=True)
else:
yield from emu.trace(offset, called=True)
@_command('SETLOCAL')
def execute_setlocal(self, cmd: SynCommand, *_):
yield cmd
setting = cmd.argument_string.strip().upper()
delay = {
'DISABLEDELAYEDEXPANSION': False,
'ENABLEDELAYEDEXPANSION' : True,
}.get(setting, self.state.delayexpand)
cmdxt = {
'DISABLEEXTENSIONS': False,
'ENABLEEXTENSIONS' : True,
}.get(setting, self.state.cmdextended)
self.state.delayexpand_stack.append(delay)
self.state.cmdextended_stack.append(cmdxt)
self.state.environment_stack.append(dict(self.environment))
@_command('ENDLOCAL')
def execute_endlocal(self, cmd: SynCommand, *_):
yield cmd
if len(self.state.environment_stack) > 1:
self.state.environment_stack.pop()
self.state.delayexpand_stack.pop()
@_command('GOTO')
def execute_goto(self, cmd: SynCommand, std: IO, *_):
if self.cfg.skip_goto:
yield cmd
return
it = iter(cmd.args)
mark = False
for label in it:
if not isinstance(label, Ctrl):
break
if label == Ctrl.Label:
mark = True
for label in it:
break
else:
label = ''
break
else:
std.e.write('No batch label specified to GOTO command.\r\n')
raise AbortExecution
label, *_ = label.split(maxsplit=1)
key = label.upper()
if mark and key == 'EOF':
raise Exit(int(self.state.ec), False)
if key not in self.block_labels:
raise Goto(label)
else:
yield Error(F'Infinite Loop detected for label {key}')
@_command('EXIT')
def execute_exit(self, cmd: SynCommand, *_):
it = iter(cmd.args)
exit = True
token = 0
for arg in it:
if arg.upper() == '/B':
exit = False
continue
token = arg
break
try:
code = int(token)
except ValueError:
code = 0
yield cmd
if self.cfg.skip_exit:
return
raise Exit(code, exit)
@_command('CHDIR')
@_command('CD')
def execute_chdir(self, cmd: SynCommand, *_):
yield cmd
self.state.cwd = cmd.argument_string.strip()
@_command('PUSHD')
def execute_pushd(self, cmd: SynCommand, *_):
yield cmd
self.state.dirstack.append(self.state.cwd)
self.execute_chdir(cmd)
@_command('POPD')
def execute_popd(self, cmd: SynCommand, *_):
yield cmd
try:
self.state.cwd = self.state.dirstack.pop()
except IndexError:
pass
@_command('ECHO')
def execute_echo(self, cmd: SynCommand, std: IO, in_group: bool):
cmdl = cmd.argument_string
mode = cmdl.strip().lower()
current_state = self.state.echo
if mode == 'on':
if self.cfg.show_nops or current_state is False:
yield cmd
self.state.echo = True
return
if mode == 'off':
if self.cfg.show_nops or current_state is True:
yield cmd
self.state.echo = False
return
yield cmd
if mode:
if in_group and not cmdl.endswith(' '):
cmdl += ' '
std.o.write(F'{cmdl}\r\n')
else:
mode = 'on' if self.state.echo else 'off'
std.o.write(F'ECHO is {mode}.\r\n')
@_command('CLS')
def execute_cls(self, cmd: SynCommand, *_):
yield cmd
@_command('ERASE')
@_command('DEL')
def execute_del(self, cmd: SynCommand, std: IO, *_):
if not cmd.args:
yield Error('The syntax of the command is incorrect')
return 1
else:
yield cmd
flags = {}
it = iter(cmd.args)
while (arg := next(it)).startswith('/') and 1 < len(arg):
flag = arg.upper()
if flag[:3] == '/A:':
flags['A'] = flag[3:]
continue
flags[flag[1]] = True
_P = 'P' in flags # Prompts for confirmation before deleting each file.
_F = 'F' in flags # Force deleting of read-only files.
_S = 'S' in flags # Delete specified files from all subdirectories.
_Q = 'Q' in flags # Quiet mode, do not ask if ok to delete on global wildcard
paths = [arg, *it]
state = self.state
cwd = state.cwd
for pattern in paths:
for path in list(state.file_system):
if not winfnmatch(pattern, path, cwd):
continue
if _F:
pass
if _S:
pass
if _Q:
pass
if _P and state.exists_file(pattern):
std.o.write(F'{pattern}, Delete (Y/N)? ')
decision = None
while decision not in ('y', 'n'):
confirmation = std.i.readline()
if not confirmation.endswith('\n'):
raise InputLocked
decision = confirmation[:1].lower()
if decision == 'n':
continue
state.remove_file(path)
return 0
@_command('START')
def execute_start(self, cmd: SynCommand, std: IO, *_):
yield cmd
it = iter(cmd.ast.fragments)
it = itertools.islice(it, cmd.argument_offset, None)
title = None
start = None
cwd = self.state.cwd
env = ...
for arg in it:
if title is None:
if '"' not in arg:
title = ''
else:
title = unquote(arg)
continue
if arg.isspace():
continue
if not arg.startswith('/'):
start = unquote(arg)
break
if (flag := arg.upper()) in ('/NODE', '/AFFINITY', '/MACHINE'):
next(it)
elif flag == '/D':
cwd = next(it)
elif flag == '/I':
env = None
if start and (batch := self.state.ingest_file(start)):
state = self.clone_state(environment=env)
state.cwd = cwd
state.command_line = _fuse(it).strip()
shell = self.spawn(batch, state, std)
yield from shell.trace()
@_command('CMD')
def execute_cmd(self, cmd: SynCommand, std: IO, *_):
yield cmd
it = iter(cmd.ast.fragments)
command = None
quiet = False
strip = False
codec = 'cp1252'
delayexpand = None
cmdextended = None
for arg in it:
if arg.isspace() or not arg.startswith('/'):
continue
name, _, flag = arg[1:].partition(':')
flag = flag.upper()
name = name.upper()
if name in 'CKR':
command = _fuse(it)
break
elif name == 'Q':
quiet = True
elif name == 'S':
strip = True
elif name == 'U':
codec = 'utf-16le'
elif name == 'E':
cmdextended = _onoff(flag)
elif name == 'V':
delayexpand = _onoff(flag)
else:
return 0
if (stripped := re.search('^\\s*"(.*)"', command)) and (strip
or command.count('"') != 2
or re.search('[&<>()@^|]', stripped[1])
or re.search('\\s', stripped[1]) is None
):
command = stripped[1]
state = self.clone_state(delayexpand=delayexpand, cmdextended=cmdextended)
state.codec = codec
state.echo = not quiet
shell = self.spawn(command, state, std)
yield from shell.trace()
@_command('ARP')
@_command('AT')
@_command('ATBROKER')
@_command('BGINFO')
@_command('BITSADMIN')
@_command('CERTUTIL')
@_command('CLIP')
@_command('CMSTP')
@_command('COMPACT')
@_command('CONTROL')
@_command('CSCRIPT')
@_command('CURL')
@_command('DEFRAG')
@_command('DISKSHADOW')
@_command('ESENTUTL')
@_command('EXPAND')
@_command('EXPLORER')
@_command('EXTRAC32')
@_command('FODHELPER')
@_command('FORFILES')
@_command('FTP')
@_command('HOSTNAME')
@_command('HOSTNAME')
@_command('INSTALLUTIL')
@_command('IPCONFIG')
@_command('LOGOFF')
@_command('MAKECAB')
@_command('MAVINJECT')
@_command('MOUNTVOL')
@_command('MSBUILD')
@_command('MSHTA')
@_command('MSIEXEC')
@_command('MSTSC')
@_command('NET')
@_command('NET1')
@_command('NETSH')
@_command('NSLOOKUP')
@_command('ODBCCONF')
@_command('PATHPING')
@_command('PING')
@_command('POWERSHELL')
@_command('PRESENTATIONHOST')
@_command('PWSH')
@_command('REG')
@_command('REGSVR32')
@_command('ROUTE')
@_command('RUNDLL32')
@_command('SCP')
@_command('SDCLT')
@_command('SETX')
@_command('SFTP')
@_command('SHUTDOWN')
@_command('SSH')
@_command('SUBST')
@_command('SYNCAPPVPUBLISHINGSERVER')
@_command('SYSTEMINFO')
@_command('TAR')
@_command('TELNET')
@_command('TFTP')
@_command('TIMEOUT')
@_command('TRACERT')
@_command('VSSADMIN')
@_command('WBADMIN')
@_command('WHERE')
@_command('WHOAMI')
@_command('WINRM')
@_command('WINRS')
@_command('WSCRIPT')
def execute_unimplemented_program(self, cmd: SynCommand, *_):
yield cmd
return 0
@_command('CLS')
def execute_unimplemented_command_unmodified_ec(self, cmd: SynCommand, *_):
yield cmd
@_command('ASSOC')
@_command('ATTRIB')
@_command('BCDEDIT')
@_command('BREAK')
@_command('CACLS')
@_command('CHCP')
@_command('CHKDSK')
@_command('CHKNTFS')
@_command('COLOR')
@_command('COMP')
@_command('COMPACT')
@_command('CONVERT')
@_command('COPY')
@_command('DATE')
@_command('DIR')
@_command('DISKPART')
@_command('DOSKEY')
@_command('DRIVERQUERY')
@_command('FC')
@_command('FORMAT')
@_command('FSUTIL')
@_command('FTYPE')
@_command('GPRESULT')
@_command('ICACLS')
@_command('LABEL')
@_command('MD')
@_command('MKDIR')
@_command('MKLINK')
@_command('MODE')
@_command('MORE')
@_command('MOVE')
@_command('OPENFILES')
@_command('PATH')
@_command('PAUSE')
@_command('PRINT')
@_command('PROMPT')
@_command('RD')
@_command('RECOVER')
@_command('REN')
@_command('RENAME')
@_command('REPLACE')
@_command('RMDIR')
@_command('ROBOCOPY')
@_command('SC')
@_command('SCHTASKS')
@_command('SHIFT')
@_command('SHUTDOWN')
@_command('SORT')
@_command('SUBST')
@_command('SYSTEMINFO')
@_command('TASKKILL')
@_command('TASKLIST')
@_command('TIME')
@_command('TITLE')
@_command('TREE')
@_command('TYPE')
@_command('VER')
@_command('VERIFY')
@_command('VOL')
@_command('WMIC')
@_command('XCOPY')
def execute_unimplemented_command(self, cmd: SynCommand, *_):
yield cmd
return 0
@_command('REM')
def execute_rem(self, cmd: SynCommand, *_):
if self.cfg.show_comments:
yield cmd
@_command('HELP')
def execute_help(self, cmd: SynCommand, std: IO, *_):
yield cmd
std.o.write(HelpOutput['HELP'])
return 0
def execute_command(self, cmd: SynCommand, std: IO, in_group: bool):
verb = cmd.verb.upper().strip()
handler = self._command.handlers.get(verb)
if handler is None:
base, ext = ntpath.splitext(verb)
handler = None
if any(ext == pe.upper() for pe in self.state.envar('PATHEXT', '').split(';')):
handler = self._command.handlers.get(base)
if handler is None:
if self.state.exists_file(verb):
self.state.ec = 0
elif not indicators.winfpath.value.fullmatch(verb):
if '\uFFFD' in verb or not verb.isprintable():
self.state.ec = 9009
cmd.junk = True
else:
cmd.junk = not self.cfg.show_junk
yield cmd
return
paths: dict[int, str] = {}
for src, r in cmd.ast.redirects.items():
if not 0 <= src <= 2 or (src == 0) != r.is_input:
continue
if isinstance((target := r.target), str):
if target.upper() == 'NUL':
std[src] = DevNull()
else:
data = self.state.ingest_file(target)
if src == 0:
if data is None:
yield ErrorCannotFindFile
return
std.i = StringIO(data)
else:
if r.is_out_append:
buffer = StringIO(data)
buffer.seek(0, 2)
else:
buffer = StringIO()
std[src] = buffer
paths[src] = target
elif src == 1 and target == 2:
std.o = std.e
elif src == 2 and target == 1:
std.e = std.o
if '/?' in cmd.args:
std.o.write(HelpOutput[verb])
self.state.ec = 0
return
if (result := handler(self, cmd, std, in_group)) is None:
pass
elif not isinstance(result, (int, ErrorZero)):
result = (yield from result)
for k, path in paths.items():
self.state.create_file(path, std[k].getvalue())
if result is not None:
self.state.ec = result
@_node(AstPipeline)
def trace_pipeline(self, pipeline: AstPipeline, std: IO, in_group: bool):
length = len(pipeline.parts)
streams = IO(*std)
if length > 1:
yield synthesize(pipeline)
for k, part in enumerate(pipeline.parts, 1):
if k != 1:
streams.i = streams.o
streams.i.seek(0)
if k == length:
streams.o = std.o
else:
streams.o = StringIO()
if isinstance(part, AstGroup):
it = self.trace_group(part, streams, in_group)
else:
ast = self.expand_ast_node(part)
cmd = synthesize(ast)
it = self.execute_command(cmd, streams, in_group)
yield from it
@_node(AstSequence)
def trace_sequence(self, sequence: AstSequence, std: IO, in_group: bool):
yield from self.trace_statement(sequence.head, std, in_group)
for cs in sequence.tail:
if cs.condition == AstCondition.Failure:
if bool(self.state.ec) is False:
continue
if cs.condition == AstCondition.Success:
if bool(self.state.ec) is True:
continue
yield from self.trace_statement(cs.statement, std, in_group)
@_node(AstIf)
def trace_if(self, _if: AstIf, std: IO, in_group: bool):
yield synthesize(_if)
_if = self.expand_ast_node(_if)
self.block_labels.clear()
if _if.variant == AstIfVariant.ErrorLevel:
condition = _if.var_int <= self.state.ec
elif _if.variant == AstIfVariant.CmdExtVersion:
condition = _if.var_int <= self.state.extensions_version
elif _if.variant == AstIfVariant.Exist:
condition = self.state.exists_file(_if.var_str)
elif _if.variant == AstIfVariant.Defined:
condition = _if.var_str.upper() in self.state.environment
else:
lhs = _if.lhs
rhs = _if.rhs
cmp = _if.cmp
assert lhs is not None
assert rhs is not None
if cmp == AstIfCmp.STR:
if _if.casefold:
if isinstance(lhs, str):
lhs = lhs.casefold()
if isinstance(rhs, str):
rhs = rhs.casefold()
condition = lhs == rhs
elif cmp == AstIfCmp.GTR:
condition = lhs > rhs
elif cmp == AstIfCmp.GEQ:
condition = lhs >= rhs
elif cmp == AstIfCmp.NEQ:
condition = lhs != rhs
elif cmp == AstIfCmp.EQU:
condition = lhs == rhs
elif cmp == AstIfCmp.LSS:
condition = lhs < rhs
elif cmp == AstIfCmp.LEQ:
condition = lhs <= rhs
else:
raise RuntimeError(cmp)
if _if.negated:
condition = not condition
if condition:
yield from self.trace_sequence(_if.then_do, std, in_group)
elif (_else := _if.else_do):
yield from self.trace_sequence(_else, std, in_group)
@_node(AstFor)
def trace_for(self, _for: AstFor, std: IO, in_group: bool):
state = self.state
cwd = state.cwd
vars = state.new_forloop()
body = _for.body
name = _for.variable
vars[name] = ''
if (
self.contains_for_variable(body, vars)
or _for.variant != AstForVariant.NumericLoop
or len(_for.spec) != 1
):
yield synthesize(_for)
if _for.variant == AstForVariant.FileParsing:
if _for.mode == AstForParserMode.Command:
emulator = self.spawn(_for.specline, self.clone_state(filename=state.name))
yield from emulator.trace()
lines = emulator.std.o.getvalue().splitlines()
elif _for.mode == AstForParserMode.Literal:
lines = _for.spec
else:
def lines_from_files():
fs = state.file_system
for name in _for.spec:
for path, content in fs.items():
if not winfnmatch(path, name, cwd):
continue
yield from content.splitlines(False)
lines = lines_from_files()
opt = _for.options
tokens = sorted(opt.tokens)
split = re.compile('[{}]+'.format(re.escape(opt.delims)))
count = tokens[-1] + 1 if tokens else 0
first_variable = ord(name)
if opt.asterisk:
tokens.append(count)
for n, line in enumerate(lines):
if n < opt.skip:
continue
if opt.comment and line.startswith(opt.comment):
continue
if count:
tokenized = split.split(line, maxsplit=count)
else:
tokenized = (line,)
for k, tok in enumerate(tokens):
name = chr(first_variable + k)
if not name.isalpha():
raise EmulatorException('Ran out of variables in FOR-Loop.')
try:
vars[name] = tokenized[tok]
except IndexError:
vars[name] = ''
yield from self.trace_sequence(body, std, in_group)
else:
for entry in _for.spec:
vars[name] = entry
yield from self.trace_sequence(body, std, in_group)
state.end_forloop()
@_node(AstGroup)
def trace_group(self, group: AstGroup, std: IO, in_group: bool):
for sequence in group.fragments:
yield from self.trace_sequence(sequence, std, True)
yield synthesize(group)
@_node(AstLabel)
def trace_label(self, label: AstLabel, *_):
if label.comment:
if self.cfg.show_comments:
yield synthesize(label)
else:
if self.cfg.show_labels:
yield synthesize(label)
self.block_labels.add(label.label.upper())
def trace_statement(self, statement: AstStatement, std: IO, in_group: bool):
try:
handler = self._node.handlers[statement.__class__]
except KeyError:
raise RuntimeError(statement)
yield from handler(self, statement, std, in_group)
def emulate_commands(self, allow_junk=False):
for syn in self.trace():
if not isinstance(syn, SynCommand):
continue
if not allow_junk and syn.junk:
continue
yield str(syn)
def emulate_to_depth(self, depth: int = 0):
for syn in self.trace():
if not isinstance(syn, SynNodeBase):
continue
if syn.ast.depth <= depth:
yield str(syn)
def emulate(self, offset: int = 0):
last: AstNode | None = None
junk: AstNode | None = None
for syn in self.trace(offset):
if not isinstance(syn, SynNodeBase):
continue
ast = syn.ast
if isinstance(syn, SynCommand) and syn.junk:
junk = ast
continue
if junk is not None:
if junk.is_descendant_of(ast):
if not last or not last.is_descendant_of(ast):
continue
if last is not None:
if ast.is_descendant_of(last):
# we already synthesized a parent construct, like a FOR loop or IF block
continue
if last.is_descendant_of(ast):
# we synthesized a command and no longer need to synthesize an AST node that
# wraps it, like a group
continue
if isinstance(ast, AstPipeline):
if len(ast.parts) == 1:
continue
if last is ast:
raise RuntimeError('Emulator attempted to synthesize the same command twice.')
last = ast
yield str(syn)
def execute(self, offset: int = 0, called: bool = False):
for _ in self.trace(offset, called=called):
pass
def trace(self, offset: int = 0, called: bool = False):
if (name := self.state.name):
self.state.create_file(name, self.parser.lexer.text)
length = len(self.parser.lexer.code)
labels = self.parser.lexer.labels
while offset < length:
try:
for sequence in self.parser.parse(offset):
if isinstance(sequence, AstError):
yield Error(sequence.error)
continue
yield from self.trace_sequence(sequence, self.std, False)
except Goto as goto:
try:
offset = labels[goto.label.upper()]
except KeyError:
raise InvalidLabel(goto.label) from goto
continue
except Exit as exit:
self.state.ec = exit.code
if exit.exit and called:
raise
else:
break
except AbortExecution:
self.state.ec = 1
break
else:
break
Functions
def winfnmatch(pattern, path, cwd)-
A function similar to the fnmatch module, but using only Windows wildcards. In Batch, the bracket wildcard does not exist.
Expand source code Browse git
def winfnmatch(pattern: str, path: str, cwd: str): """ A function similar to the fnmatch module, but using only Windows wildcards. In Batch, the bracket wildcard does not exist. """ parts = re.split('([*?])', pattern) regex = StringIO() it = iter(parts) verbatim = next(it) for wildcard in it: regex.write(re.escape(verbatim)) if wildcard == '*': regex.write('.*') if wildcard == '?': regex.write('.') verbatim = next(it) regex.write(re.escape(verbatim)) cwd = re.escape(cwd.rstrip('\\')) pattern = rF'(?s:{cwd}\\{regex.getvalue()})$' return bool(re.match(pattern, path))
Classes
class DevNull-
Expand source code Browse git
class DevNull: def getvalue(self): return '' def __iter__(self): return self def __next__(self): raise StopIteration def detach(self): raise NotImplementedError def readline(self, size: int = -1, /) -> str: return '' def read(self, size: int | None = -1, /) -> str: return '' def write(self, s: str, /) -> int: return len(s) def seek(self, k: int, whence: int = 0, /): return @property def closed(self): return TrueInstance variables
var closed-
Expand source code Browse git
@property def closed(self): return True
Methods
def getvalue(self)-
Expand source code Browse git
def getvalue(self): return '' def detach(self)-
Expand source code Browse git
def detach(self): raise NotImplementedError def readline(self, size=-1, /)-
Expand source code Browse git
def readline(self, size: int = -1, /) -> str: return '' def read(self, size=-1, /)-
Expand source code Browse git
def read(self, size: int | None = -1, /) -> str: return '' def write(self, s, /)-
Expand source code Browse git
def write(self, s: str, /) -> int: return len(s) def seek(self, k, whence=0, /)-
Expand source code Browse git
def seek(self, k: int, whence: int = 0, /): return
class Error (...)-
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
Expand source code Browse git
class Error(str): passAncestors
- builtins.str
class IO (i=<factory>, o=<factory>, e=<factory>)-
IO(i: 'DevNull | StringIO' =
, o: 'DevNull | StringIO' = , e: 'DevNull | StringIO' = ) Expand source code Browse git
@dataclass class IO: i: DevNull | StringIO = field(default_factory=StringIO) o: DevNull | StringIO = field(default_factory=StringIO) e: DevNull | StringIO = field(default_factory=StringIO) def __iter__(self): yield self.i yield self.o yield self.e def __setitem__(self, k, v): if k == 0: self.i = v elif k == 1: self.o = v elif k == 2: self.e = v else: raise IndexError(k) def __getitem__(self, k): if k == 0: return self.i elif k == 1: return self.o elif k == 2: return self.e else: raise IndexError(k)Instance variables
var i-
The type of the None singleton.
var o-
The type of the None singleton.
var e-
The type of the None singleton.
class BatchEmulatorConfig (show_nops=False, show_junk=False, show_labels=False, show_sets=False, show_comments=False, skip_goto=False, skip_call=False, skip_exit=False)-
BatchEmulatorConfig(show_nops: 'bool' = False, show_junk: 'bool' = False, show_labels: 'bool' = False, show_sets: 'bool' = False, show_comments: 'bool' = False, skip_goto: 'bool' = False, skip_call: 'bool' = False, skip_exit: 'bool' = False)
Expand source code Browse git
@dataclass class BatchEmulatorConfig: show_nops: bool = False show_junk: bool = False show_labels: bool = False show_sets: bool = False show_comments: bool = False skip_goto: bool = False skip_call: bool = False skip_exit: bool = FalseInstance variables
var show_nops-
The type of the None singleton.
var show_junk-
The type of the None singleton.
var show_labels-
The type of the None singleton.
var show_sets-
The type of the None singleton.
var show_comments-
The type of the None singleton.
var skip_goto-
The type of the None singleton.
var skip_call-
The type of the None singleton.
var skip_exit-
The type of the None singleton.
class BatchEmulator (data, state=None, cfg=None, std=None)-
Expand source code Browse git
class BatchEmulator: class _node: handlers: ClassVar[dict[ type[AstNode], Callable[[ BatchEmulator, AstNode, IO, bool, ], Generator[SynNodeBase[AstNode] | Error]] ]] = {} def __init__(self, key: type[AstNode]): self.key = key def __call__(self, handler): self.handlers[self.key] = handler return handler class _command: handlers: ClassVar[dict[ str, Callable[[ BatchEmulator, SynCommand, IO, bool, ], Generator[str, None, int | None] | int | ErrorZero | None] ]] = {} def __init__(self, key: str): self.key = key.upper() def __call__(self, handler): self.handlers[self.key] = handler return handler def __init__( self, data: str | buf | BatchParser, state: BatchState | None = None, cfg: BatchEmulatorConfig | None = None, std: IO | None = None, ): self.stack = [] self.parser = BatchParser(data, state) self.std = std or IO() self.cfg = cfg or BatchEmulatorConfig() self.block_labels = set() def spawn(self, data: str | buf | BatchParser, state: BatchState | None = None, std: IO | None = None): return BatchEmulator( data, state, self.cfg, std, ) @property def state(self): return self.parser.state @property def environment(self): return self.state.environment @property def delayexpand(self): return self.state.delayexpand def clone_state( self, delayexpand: bool | None = None, cmdextended: bool | None = None, environment: dict | None | ellipsis = ..., filename: str | None = None, ): state = self.state if delayexpand is None: delayexpand = False if cmdextended is None: cmdextended = state.cmdextended if environment is ...: environment = dict(state.environment) return BatchState( delayexpand, cmdextended, environment=environment, file_system=state.file_system, username=state.username, hostname=state.hostname, now=state.now, cwd=state.cwd, filename=filename, ) def get_for_variable_regex(self, vars: Iterable[str]): return re.compile(RF'%((?:~[fdpnxsatz]*)?)((?:\\$\\w+)?)([{"".join(vars)}])') def expand_delayed_variables(self, block: str): def expansion(match: re.Match[str]): name = match.group(1) try: return parse(name) except MissingVariable: _, _, rest = name.partition(':') return rest parse = self.parser.lexer.parse_env_variable return re.sub(r'!([^!\n]*)!', expansion, block) def expand_forloop_variables(self, block: str, vars: dict[str, str] | None): def expansion(match: re.Match[str]): flags = ArgVarFlags.Empty for flag in match[1]: flags |= ArgVarFlags.FromToken(ord(flag)) return _vars[match[3]] if not vars: return block _vars = vars return self.get_for_variable_regex(vars).sub(expansion, block) def contains_for_variable(self, ast: AstNode, vars: Iterable[str]): def check(token): if isinstance(token, list): return any(check(v) for v in token) if isinstance(token, dict): return any(check(v) for v in token.values()) if isinstance(token, Enum): return False if isinstance(token, str): return bool(checker(token)) if isinstance(token, AstNode): for tf in fields(token): if tf.name == 'parent': continue if check(getattr(token, tf.name)): return True return False checker = self.get_for_variable_regex(vars).search return check(ast) # type:ignore def expand_ast_node(self, ast: _T) -> _T: def expand(token): if isinstance(token, list): return [expand(v) for v in token] if isinstance(token, dict): return {k: expand(v) for k, v in token.items()} if isinstance(token, Enum): return token if isinstance(token, str): if delayexpand: token = self.expand_delayed_variables(token) return self.expand_forloop_variables(token, variables) if isinstance(token, AstNode): new = {} for tf in fields(token): value = getattr(token, tf.name) if tf.name != 'parent': value = expand(value) new[tf.name] = value return token.__class__(**new) return token delayexpand = self.delayexpand variables = self.state.for_loop_variables if not variables and not delayexpand: return ast return expand(ast) # type:ignore def execute_find_or_findstr(self, cmd: SynCommand, std: IO, findstr: bool): needles = [] paths: list[str | ellipsis] = [...] flags = {} it = iter(cmd.args) arg = None yield cmd for arg in it: if not arg.startswith('/'): if not findstr and not arg.startswith('"'): return 1 needles.extend(unquote(arg).split()) break name, has_param, value = arg[1:].partition(':') name = name.upper() if name in ('OFF', 'OFFLINE'): continue elif len(name) > 1: return 1 elif name == 'C': needles.append(unquote(value)) elif name == 'F' and findstr: if (p := self.state.ingest_file(value)) is None: return 1 paths.extend(p.splitlines(False)) elif name == 'G' and findstr: if (n := self.state.ingest_file(value)) is None: return 1 needles.extend(n.splitlines(False)) elif has_param: flags[name] = value else: flags[name] = True valid_flags = 'VNI' if findstr: valid_flags += 'BELRSXMOPADQ' for v in flags: if v not in valid_flags: return 1 prefix_filename = False state = self.state for arg in it: pattern = unquote(arg) if '*' in pattern or '?' in pattern: prefix_filename = True for path in state.file_system: if winfnmatch(path, pattern, state.cwd): paths.append(path) else: paths.append(pattern) if len(paths) > 1: prefix_filename = True for n, needle in enumerate(needles): if not findstr or 'L' in flags: needle = re.escape(needle) if 'X' in flags: needle = F'^{needle}$' elif 'B' in flags: needle = F'^{needle}' elif 'E' in flags: needle = F'{needle}$' needles[n] = needle _V = 'V' in flags # noqa; Prints only lines that do not contain a match. _P = 'P' in flags # noqa; Skip files with non-printable characters. _O = 'O' in flags # noqa; Prints character offset before each matching line. _N = 'N' in flags # noqa; Prints the line number before each line that matches. _M = 'M' in flags # noqa; Prints only the filename if a file contains a match. nothing_found = True offset = 0 for path in paths: if path is (...): data = std.i.read() else: data = state.ingest_file(path) if data is None: return 1 if _P and not re.fullmatch('[\\s!-~]+', data): continue for n, line in enumerate(data.splitlines(True), 1): for needle in needles: hit = re.search(needle, line) if _V == bool(hit): continue nothing_found = False if not _M: if _O: o = offset + (hit.start() if hit else 0) line = F'{o}:{line}' if _N: line = F'{n}:{line}' if prefix_filename: line = F'{path}:{line}' std.o.write(line) elif path is not (...): std.o.write(path) break offset += len(line) return int(nothing_found) @_command('TYPE') def execute_type(self, cmd: SynCommand, std: IO, *_): path = cmd.argument_string.strip() data = self.state.ingest_file(path) if data is None: yield ErrorCannotFindFile return 1 else: std.o.write(data) return 0 @_command('FIND') def execute_find(self, cmd: SynCommand, std: IO, *_): return self.execute_find_or_findstr(cmd, std, findstr=False) @_command('FINDSTR') def execute_findstr(self, cmd: SynCommand, std: IO, *_): return self.execute_find_or_findstr(cmd, std, findstr=True) @_command('SET') def execute_set(self, cmd: SynCommand, std: IO, *_): if not (args := cmd.args): raise EmulatorException('Empty SET instruction') if cmd.verb.upper() != 'SET': raise RuntimeError # Since variables can be used in GOTO, a SET can be used to change the behavior of a GOTO. self.block_labels.clear() arithmetic = False quote_mode = False prompt = None it = iter(args) tk = next(it) if tk.upper() == '/P': if std.i.closed: prompt = '' elif not (prompt := std.i.readline()).endswith('\n'): raise InputLocked else: prompt = prompt.rstrip('\r\n') tk = next(it) else: cmd.junk = not self.cfg.show_sets yield cmd if tk.upper() == '/A': arithmetic = True try: tk = next(it) except StopIteration: tk = '' args = [tk, *it, *cmd.trailing_spaces] if arithmetic: def defang(s: str): def r(m: re.Match[str]): return F'_{prefix}{ord(m[0]):X}_' return re.sub(r'[^-\s()!~*/%+><&^|_\w]', r, s) def refang(s: str): # noqa def r(m: re.Match[str]): return chr(int(m[1], 16)) return re.sub(rf'_{prefix}([A-F0-9]+)_', r, s) prefix = F'{uuid.uuid4().time_mid:X}' namespace = {} translate = {} value = None if not (program := ''.join(args)): std.e.write('The syntax of the command is incorrect.\r\n') return ErrorZero.Val for assignment in program.split(','): assignment = assignment.strip() if not assignment: std.e.write('Missing operand.\r\n') return ErrorZero.Val name, operator, definition = re.split(r'([*+^|/%-&]|<<|>>|)=', assignment, maxsplit=1) name = name.upper() definition = re.sub(r'\b0([0-7]+)\b', r'0o\1', definition) if operator: definition = F'{name}{operator}({definition})' definition = defang(definition) expression = cautious_parse(definition) names = names_in_expression(expression) if names.stored or names.others: raise EmulatorException('Arithmetic SET had unexpected variable access.') for var in names.loaded: original = refang(name).upper() translate[original] = var if var in namespace: continue try: namespace[var] = batchint(self.environment[original]) except (KeyError, ValueError): namespace[var] = 0 code = compile(expression, filename='[ast]', mode='eval') value = eval(code, namespace, {}) self.environment[name] = str(value) namespace[defang(name)] = value if value is None: std.e.write('The syntax of the command is incorrect.') return else: std.o.write(F'{value!s}\r\n') else: try: eq = args.index(Ctrl.Equals) except ValueError: assignment = cmd.argument_string if assignment.startswith('"'): quote_mode = True assignment, _, unquoted = assignment[1:].rpartition('"') assignment = assignment or unquoted else: assignment = ''.join(args) name, _, content = assignment.partition('=') else: with StringIO() as io: for k in range(eq + 1, len(args)): io.write(args[k]) content = io.getvalue() name = cmd.args[eq - 1] if eq else '' name = name.upper() trailing_caret, content = uncaret(content, quote_mode) if trailing_caret: content = content[:-1] if prompt is not None: if (qc := content.strip()).startswith('"'): _, _, qc = qc. partition('"') # noqa qc, _, r = qc.rpartition('"') # noqa content = qc or r std.o.write(content) content = prompt if name: if content: self.environment[name] = content else: self.environment.pop(name, None) @_command('CALL') def execute_call(self, cmd: SynCommand, std: IO, *_): cmdl = cmd.argument_string empty, colon, label = cmdl.partition(':') if colon and not empty: try: offset = self.parser.lexer.labels[label.upper()] except KeyError as KE: raise InvalidLabel(label) from KE emu = self.spawn(self.parser, std=std) else: offset = 0 path = cmdl.strip() code = self.state.ingest_file(path) if code is None: yield cmd return state = self.clone_state(environment=self.state.environment, filename=path) emu = self.spawn(code, std=std, state=state) if self.cfg.skip_call: emu.execute(called=True) else: yield from emu.trace(offset, called=True) @_command('SETLOCAL') def execute_setlocal(self, cmd: SynCommand, *_): yield cmd setting = cmd.argument_string.strip().upper() delay = { 'DISABLEDELAYEDEXPANSION': False, 'ENABLEDELAYEDEXPANSION' : True, }.get(setting, self.state.delayexpand) cmdxt = { 'DISABLEEXTENSIONS': False, 'ENABLEEXTENSIONS' : True, }.get(setting, self.state.cmdextended) self.state.delayexpand_stack.append(delay) self.state.cmdextended_stack.append(cmdxt) self.state.environment_stack.append(dict(self.environment)) @_command('ENDLOCAL') def execute_endlocal(self, cmd: SynCommand, *_): yield cmd if len(self.state.environment_stack) > 1: self.state.environment_stack.pop() self.state.delayexpand_stack.pop() @_command('GOTO') def execute_goto(self, cmd: SynCommand, std: IO, *_): if self.cfg.skip_goto: yield cmd return it = iter(cmd.args) mark = False for label in it: if not isinstance(label, Ctrl): break if label == Ctrl.Label: mark = True for label in it: break else: label = '' break else: std.e.write('No batch label specified to GOTO command.\r\n') raise AbortExecution label, *_ = label.split(maxsplit=1) key = label.upper() if mark and key == 'EOF': raise Exit(int(self.state.ec), False) if key not in self.block_labels: raise Goto(label) else: yield Error(F'Infinite Loop detected for label {key}') @_command('EXIT') def execute_exit(self, cmd: SynCommand, *_): it = iter(cmd.args) exit = True token = 0 for arg in it: if arg.upper() == '/B': exit = False continue token = arg break try: code = int(token) except ValueError: code = 0 yield cmd if self.cfg.skip_exit: return raise Exit(code, exit) @_command('CHDIR') @_command('CD') def execute_chdir(self, cmd: SynCommand, *_): yield cmd self.state.cwd = cmd.argument_string.strip() @_command('PUSHD') def execute_pushd(self, cmd: SynCommand, *_): yield cmd self.state.dirstack.append(self.state.cwd) self.execute_chdir(cmd) @_command('POPD') def execute_popd(self, cmd: SynCommand, *_): yield cmd try: self.state.cwd = self.state.dirstack.pop() except IndexError: pass @_command('ECHO') def execute_echo(self, cmd: SynCommand, std: IO, in_group: bool): cmdl = cmd.argument_string mode = cmdl.strip().lower() current_state = self.state.echo if mode == 'on': if self.cfg.show_nops or current_state is False: yield cmd self.state.echo = True return if mode == 'off': if self.cfg.show_nops or current_state is True: yield cmd self.state.echo = False return yield cmd if mode: if in_group and not cmdl.endswith(' '): cmdl += ' ' std.o.write(F'{cmdl}\r\n') else: mode = 'on' if self.state.echo else 'off' std.o.write(F'ECHO is {mode}.\r\n') @_command('CLS') def execute_cls(self, cmd: SynCommand, *_): yield cmd @_command('ERASE') @_command('DEL') def execute_del(self, cmd: SynCommand, std: IO, *_): if not cmd.args: yield Error('The syntax of the command is incorrect') return 1 else: yield cmd flags = {} it = iter(cmd.args) while (arg := next(it)).startswith('/') and 1 < len(arg): flag = arg.upper() if flag[:3] == '/A:': flags['A'] = flag[3:] continue flags[flag[1]] = True _P = 'P' in flags # Prompts for confirmation before deleting each file. _F = 'F' in flags # Force deleting of read-only files. _S = 'S' in flags # Delete specified files from all subdirectories. _Q = 'Q' in flags # Quiet mode, do not ask if ok to delete on global wildcard paths = [arg, *it] state = self.state cwd = state.cwd for pattern in paths: for path in list(state.file_system): if not winfnmatch(pattern, path, cwd): continue if _F: pass if _S: pass if _Q: pass if _P and state.exists_file(pattern): std.o.write(F'{pattern}, Delete (Y/N)? ') decision = None while decision not in ('y', 'n'): confirmation = std.i.readline() if not confirmation.endswith('\n'): raise InputLocked decision = confirmation[:1].lower() if decision == 'n': continue state.remove_file(path) return 0 @_command('START') def execute_start(self, cmd: SynCommand, std: IO, *_): yield cmd it = iter(cmd.ast.fragments) it = itertools.islice(it, cmd.argument_offset, None) title = None start = None cwd = self.state.cwd env = ... for arg in it: if title is None: if '"' not in arg: title = '' else: title = unquote(arg) continue if arg.isspace(): continue if not arg.startswith('/'): start = unquote(arg) break if (flag := arg.upper()) in ('/NODE', '/AFFINITY', '/MACHINE'): next(it) elif flag == '/D': cwd = next(it) elif flag == '/I': env = None if start and (batch := self.state.ingest_file(start)): state = self.clone_state(environment=env) state.cwd = cwd state.command_line = _fuse(it).strip() shell = self.spawn(batch, state, std) yield from shell.trace() @_command('CMD') def execute_cmd(self, cmd: SynCommand, std: IO, *_): yield cmd it = iter(cmd.ast.fragments) command = None quiet = False strip = False codec = 'cp1252' delayexpand = None cmdextended = None for arg in it: if arg.isspace() or not arg.startswith('/'): continue name, _, flag = arg[1:].partition(':') flag = flag.upper() name = name.upper() if name in 'CKR': command = _fuse(it) break elif name == 'Q': quiet = True elif name == 'S': strip = True elif name == 'U': codec = 'utf-16le' elif name == 'E': cmdextended = _onoff(flag) elif name == 'V': delayexpand = _onoff(flag) else: return 0 if (stripped := re.search('^\\s*"(.*)"', command)) and (strip or command.count('"') != 2 or re.search('[&<>()@^|]', stripped[1]) or re.search('\\s', stripped[1]) is None ): command = stripped[1] state = self.clone_state(delayexpand=delayexpand, cmdextended=cmdextended) state.codec = codec state.echo = not quiet shell = self.spawn(command, state, std) yield from shell.trace() @_command('ARP') @_command('AT') @_command('ATBROKER') @_command('BGINFO') @_command('BITSADMIN') @_command('CERTUTIL') @_command('CLIP') @_command('CMSTP') @_command('COMPACT') @_command('CONTROL') @_command('CSCRIPT') @_command('CURL') @_command('DEFRAG') @_command('DISKSHADOW') @_command('ESENTUTL') @_command('EXPAND') @_command('EXPLORER') @_command('EXTRAC32') @_command('FODHELPER') @_command('FORFILES') @_command('FTP') @_command('HOSTNAME') @_command('HOSTNAME') @_command('INSTALLUTIL') @_command('IPCONFIG') @_command('LOGOFF') @_command('MAKECAB') @_command('MAVINJECT') @_command('MOUNTVOL') @_command('MSBUILD') @_command('MSHTA') @_command('MSIEXEC') @_command('MSTSC') @_command('NET') @_command('NET1') @_command('NETSH') @_command('NSLOOKUP') @_command('ODBCCONF') @_command('PATHPING') @_command('PING') @_command('POWERSHELL') @_command('PRESENTATIONHOST') @_command('PWSH') @_command('REG') @_command('REGSVR32') @_command('ROUTE') @_command('RUNDLL32') @_command('SCP') @_command('SDCLT') @_command('SETX') @_command('SFTP') @_command('SHUTDOWN') @_command('SSH') @_command('SUBST') @_command('SYNCAPPVPUBLISHINGSERVER') @_command('SYSTEMINFO') @_command('TAR') @_command('TELNET') @_command('TFTP') @_command('TIMEOUT') @_command('TRACERT') @_command('VSSADMIN') @_command('WBADMIN') @_command('WHERE') @_command('WHOAMI') @_command('WINRM') @_command('WINRS') @_command('WSCRIPT') def execute_unimplemented_program(self, cmd: SynCommand, *_): yield cmd return 0 @_command('CLS') def execute_unimplemented_command_unmodified_ec(self, cmd: SynCommand, *_): yield cmd @_command('ASSOC') @_command('ATTRIB') @_command('BCDEDIT') @_command('BREAK') @_command('CACLS') @_command('CHCP') @_command('CHKDSK') @_command('CHKNTFS') @_command('COLOR') @_command('COMP') @_command('COMPACT') @_command('CONVERT') @_command('COPY') @_command('DATE') @_command('DIR') @_command('DISKPART') @_command('DOSKEY') @_command('DRIVERQUERY') @_command('FC') @_command('FORMAT') @_command('FSUTIL') @_command('FTYPE') @_command('GPRESULT') @_command('ICACLS') @_command('LABEL') @_command('MD') @_command('MKDIR') @_command('MKLINK') @_command('MODE') @_command('MORE') @_command('MOVE') @_command('OPENFILES') @_command('PATH') @_command('PAUSE') @_command('PRINT') @_command('PROMPT') @_command('RD') @_command('RECOVER') @_command('REN') @_command('RENAME') @_command('REPLACE') @_command('RMDIR') @_command('ROBOCOPY') @_command('SC') @_command('SCHTASKS') @_command('SHIFT') @_command('SHUTDOWN') @_command('SORT') @_command('SUBST') @_command('SYSTEMINFO') @_command('TASKKILL') @_command('TASKLIST') @_command('TIME') @_command('TITLE') @_command('TREE') @_command('TYPE') @_command('VER') @_command('VERIFY') @_command('VOL') @_command('WMIC') @_command('XCOPY') def execute_unimplemented_command(self, cmd: SynCommand, *_): yield cmd return 0 @_command('REM') def execute_rem(self, cmd: SynCommand, *_): if self.cfg.show_comments: yield cmd @_command('HELP') def execute_help(self, cmd: SynCommand, std: IO, *_): yield cmd std.o.write(HelpOutput['HELP']) return 0 def execute_command(self, cmd: SynCommand, std: IO, in_group: bool): verb = cmd.verb.upper().strip() handler = self._command.handlers.get(verb) if handler is None: base, ext = ntpath.splitext(verb) handler = None if any(ext == pe.upper() for pe in self.state.envar('PATHEXT', '').split(';')): handler = self._command.handlers.get(base) if handler is None: if self.state.exists_file(verb): self.state.ec = 0 elif not indicators.winfpath.value.fullmatch(verb): if '\uFFFD' in verb or not verb.isprintable(): self.state.ec = 9009 cmd.junk = True else: cmd.junk = not self.cfg.show_junk yield cmd return paths: dict[int, str] = {} for src, r in cmd.ast.redirects.items(): if not 0 <= src <= 2 or (src == 0) != r.is_input: continue if isinstance((target := r.target), str): if target.upper() == 'NUL': std[src] = DevNull() else: data = self.state.ingest_file(target) if src == 0: if data is None: yield ErrorCannotFindFile return std.i = StringIO(data) else: if r.is_out_append: buffer = StringIO(data) buffer.seek(0, 2) else: buffer = StringIO() std[src] = buffer paths[src] = target elif src == 1 and target == 2: std.o = std.e elif src == 2 and target == 1: std.e = std.o if '/?' in cmd.args: std.o.write(HelpOutput[verb]) self.state.ec = 0 return if (result := handler(self, cmd, std, in_group)) is None: pass elif not isinstance(result, (int, ErrorZero)): result = (yield from result) for k, path in paths.items(): self.state.create_file(path, std[k].getvalue()) if result is not None: self.state.ec = result @_node(AstPipeline) def trace_pipeline(self, pipeline: AstPipeline, std: IO, in_group: bool): length = len(pipeline.parts) streams = IO(*std) if length > 1: yield synthesize(pipeline) for k, part in enumerate(pipeline.parts, 1): if k != 1: streams.i = streams.o streams.i.seek(0) if k == length: streams.o = std.o else: streams.o = StringIO() if isinstance(part, AstGroup): it = self.trace_group(part, streams, in_group) else: ast = self.expand_ast_node(part) cmd = synthesize(ast) it = self.execute_command(cmd, streams, in_group) yield from it @_node(AstSequence) def trace_sequence(self, sequence: AstSequence, std: IO, in_group: bool): yield from self.trace_statement(sequence.head, std, in_group) for cs in sequence.tail: if cs.condition == AstCondition.Failure: if bool(self.state.ec) is False: continue if cs.condition == AstCondition.Success: if bool(self.state.ec) is True: continue yield from self.trace_statement(cs.statement, std, in_group) @_node(AstIf) def trace_if(self, _if: AstIf, std: IO, in_group: bool): yield synthesize(_if) _if = self.expand_ast_node(_if) self.block_labels.clear() if _if.variant == AstIfVariant.ErrorLevel: condition = _if.var_int <= self.state.ec elif _if.variant == AstIfVariant.CmdExtVersion: condition = _if.var_int <= self.state.extensions_version elif _if.variant == AstIfVariant.Exist: condition = self.state.exists_file(_if.var_str) elif _if.variant == AstIfVariant.Defined: condition = _if.var_str.upper() in self.state.environment else: lhs = _if.lhs rhs = _if.rhs cmp = _if.cmp assert lhs is not None assert rhs is not None if cmp == AstIfCmp.STR: if _if.casefold: if isinstance(lhs, str): lhs = lhs.casefold() if isinstance(rhs, str): rhs = rhs.casefold() condition = lhs == rhs elif cmp == AstIfCmp.GTR: condition = lhs > rhs elif cmp == AstIfCmp.GEQ: condition = lhs >= rhs elif cmp == AstIfCmp.NEQ: condition = lhs != rhs elif cmp == AstIfCmp.EQU: condition = lhs == rhs elif cmp == AstIfCmp.LSS: condition = lhs < rhs elif cmp == AstIfCmp.LEQ: condition = lhs <= rhs else: raise RuntimeError(cmp) if _if.negated: condition = not condition if condition: yield from self.trace_sequence(_if.then_do, std, in_group) elif (_else := _if.else_do): yield from self.trace_sequence(_else, std, in_group) @_node(AstFor) def trace_for(self, _for: AstFor, std: IO, in_group: bool): state = self.state cwd = state.cwd vars = state.new_forloop() body = _for.body name = _for.variable vars[name] = '' if ( self.contains_for_variable(body, vars) or _for.variant != AstForVariant.NumericLoop or len(_for.spec) != 1 ): yield synthesize(_for) if _for.variant == AstForVariant.FileParsing: if _for.mode == AstForParserMode.Command: emulator = self.spawn(_for.specline, self.clone_state(filename=state.name)) yield from emulator.trace() lines = emulator.std.o.getvalue().splitlines() elif _for.mode == AstForParserMode.Literal: lines = _for.spec else: def lines_from_files(): fs = state.file_system for name in _for.spec: for path, content in fs.items(): if not winfnmatch(path, name, cwd): continue yield from content.splitlines(False) lines = lines_from_files() opt = _for.options tokens = sorted(opt.tokens) split = re.compile('[{}]+'.format(re.escape(opt.delims))) count = tokens[-1] + 1 if tokens else 0 first_variable = ord(name) if opt.asterisk: tokens.append(count) for n, line in enumerate(lines): if n < opt.skip: continue if opt.comment and line.startswith(opt.comment): continue if count: tokenized = split.split(line, maxsplit=count) else: tokenized = (line,) for k, tok in enumerate(tokens): name = chr(first_variable + k) if not name.isalpha(): raise EmulatorException('Ran out of variables in FOR-Loop.') try: vars[name] = tokenized[tok] except IndexError: vars[name] = '' yield from self.trace_sequence(body, std, in_group) else: for entry in _for.spec: vars[name] = entry yield from self.trace_sequence(body, std, in_group) state.end_forloop() @_node(AstGroup) def trace_group(self, group: AstGroup, std: IO, in_group: bool): for sequence in group.fragments: yield from self.trace_sequence(sequence, std, True) yield synthesize(group) @_node(AstLabel) def trace_label(self, label: AstLabel, *_): if label.comment: if self.cfg.show_comments: yield synthesize(label) else: if self.cfg.show_labels: yield synthesize(label) self.block_labels.add(label.label.upper()) def trace_statement(self, statement: AstStatement, std: IO, in_group: bool): try: handler = self._node.handlers[statement.__class__] except KeyError: raise RuntimeError(statement) yield from handler(self, statement, std, in_group) def emulate_commands(self, allow_junk=False): for syn in self.trace(): if not isinstance(syn, SynCommand): continue if not allow_junk and syn.junk: continue yield str(syn) def emulate_to_depth(self, depth: int = 0): for syn in self.trace(): if not isinstance(syn, SynNodeBase): continue if syn.ast.depth <= depth: yield str(syn) def emulate(self, offset: int = 0): last: AstNode | None = None junk: AstNode | None = None for syn in self.trace(offset): if not isinstance(syn, SynNodeBase): continue ast = syn.ast if isinstance(syn, SynCommand) and syn.junk: junk = ast continue if junk is not None: if junk.is_descendant_of(ast): if not last or not last.is_descendant_of(ast): continue if last is not None: if ast.is_descendant_of(last): # we already synthesized a parent construct, like a FOR loop or IF block continue if last.is_descendant_of(ast): # we synthesized a command and no longer need to synthesize an AST node that # wraps it, like a group continue if isinstance(ast, AstPipeline): if len(ast.parts) == 1: continue if last is ast: raise RuntimeError('Emulator attempted to synthesize the same command twice.') last = ast yield str(syn) def execute(self, offset: int = 0, called: bool = False): for _ in self.trace(offset, called=called): pass def trace(self, offset: int = 0, called: bool = False): if (name := self.state.name): self.state.create_file(name, self.parser.lexer.text) length = len(self.parser.lexer.code) labels = self.parser.lexer.labels while offset < length: try: for sequence in self.parser.parse(offset): if isinstance(sequence, AstError): yield Error(sequence.error) continue yield from self.trace_sequence(sequence, self.std, False) except Goto as goto: try: offset = labels[goto.label.upper()] except KeyError: raise InvalidLabel(goto.label) from goto continue except Exit as exit: self.state.ec = exit.code if exit.exit and called: raise else: break except AbortExecution: self.state.ec = 1 break else: breakInstance variables
var state-
Expand source code Browse git
@property def state(self): return self.parser.state var environment-
Expand source code Browse git
@property def environment(self): return self.state.environment var delayexpand-
Expand source code Browse git
@property def delayexpand(self): return self.state.delayexpand
Methods
def spawn(self, data, state=None, std=None)-
Expand source code Browse git
def spawn(self, data: str | buf | BatchParser, state: BatchState | None = None, std: IO | None = None): return BatchEmulator( data, state, self.cfg, std, ) def clone_state(self, delayexpand=None, cmdextended=None, environment=Ellipsis, filename=None)-
Expand source code Browse git
def clone_state( self, delayexpand: bool | None = None, cmdextended: bool | None = None, environment: dict | None | ellipsis = ..., filename: str | None = None, ): state = self.state if delayexpand is None: delayexpand = False if cmdextended is None: cmdextended = state.cmdextended if environment is ...: environment = dict(state.environment) return BatchState( delayexpand, cmdextended, environment=environment, file_system=state.file_system, username=state.username, hostname=state.hostname, now=state.now, cwd=state.cwd, filename=filename, ) def get_for_variable_regex(self, vars)-
Expand source code Browse git
def get_for_variable_regex(self, vars: Iterable[str]): return re.compile(RF'%((?:~[fdpnxsatz]*)?)((?:\\$\\w+)?)([{"".join(vars)}])') def expand_delayed_variables(self, block)-
Expand source code Browse git
def expand_delayed_variables(self, block: str): def expansion(match: re.Match[str]): name = match.group(1) try: return parse(name) except MissingVariable: _, _, rest = name.partition(':') return rest parse = self.parser.lexer.parse_env_variable return re.sub(r'!([^!\n]*)!', expansion, block) def expand_forloop_variables(self, block, vars)-
Expand source code Browse git
def expand_forloop_variables(self, block: str, vars: dict[str, str] | None): def expansion(match: re.Match[str]): flags = ArgVarFlags.Empty for flag in match[1]: flags |= ArgVarFlags.FromToken(ord(flag)) return _vars[match[3]] if not vars: return block _vars = vars return self.get_for_variable_regex(vars).sub(expansion, block) def contains_for_variable(self, ast, vars)-
Expand source code Browse git
def contains_for_variable(self, ast: AstNode, vars: Iterable[str]): def check(token): if isinstance(token, list): return any(check(v) for v in token) if isinstance(token, dict): return any(check(v) for v in token.values()) if isinstance(token, Enum): return False if isinstance(token, str): return bool(checker(token)) if isinstance(token, AstNode): for tf in fields(token): if tf.name == 'parent': continue if check(getattr(token, tf.name)): return True return False checker = self.get_for_variable_regex(vars).search return check(ast) # type:ignore def expand_ast_node(self, ast)-
Expand source code Browse git
def expand_ast_node(self, ast: _T) -> _T: def expand(token): if isinstance(token, list): return [expand(v) for v in token] if isinstance(token, dict): return {k: expand(v) for k, v in token.items()} if isinstance(token, Enum): return token if isinstance(token, str): if delayexpand: token = self.expand_delayed_variables(token) return self.expand_forloop_variables(token, variables) if isinstance(token, AstNode): new = {} for tf in fields(token): value = getattr(token, tf.name) if tf.name != 'parent': value = expand(value) new[tf.name] = value return token.__class__(**new) return token delayexpand = self.delayexpand variables = self.state.for_loop_variables if not variables and not delayexpand: return ast return expand(ast) # type:ignore def execute_find_or_findstr(self, cmd, std, findstr)-
Expand source code Browse git
def execute_find_or_findstr(self, cmd: SynCommand, std: IO, findstr: bool): needles = [] paths: list[str | ellipsis] = [...] flags = {} it = iter(cmd.args) arg = None yield cmd for arg in it: if not arg.startswith('/'): if not findstr and not arg.startswith('"'): return 1 needles.extend(unquote(arg).split()) break name, has_param, value = arg[1:].partition(':') name = name.upper() if name in ('OFF', 'OFFLINE'): continue elif len(name) > 1: return 1 elif name == 'C': needles.append(unquote(value)) elif name == 'F' and findstr: if (p := self.state.ingest_file(value)) is None: return 1 paths.extend(p.splitlines(False)) elif name == 'G' and findstr: if (n := self.state.ingest_file(value)) is None: return 1 needles.extend(n.splitlines(False)) elif has_param: flags[name] = value else: flags[name] = True valid_flags = 'VNI' if findstr: valid_flags += 'BELRSXMOPADQ' for v in flags: if v not in valid_flags: return 1 prefix_filename = False state = self.state for arg in it: pattern = unquote(arg) if '*' in pattern or '?' in pattern: prefix_filename = True for path in state.file_system: if winfnmatch(path, pattern, state.cwd): paths.append(path) else: paths.append(pattern) if len(paths) > 1: prefix_filename = True for n, needle in enumerate(needles): if not findstr or 'L' in flags: needle = re.escape(needle) if 'X' in flags: needle = F'^{needle}$' elif 'B' in flags: needle = F'^{needle}' elif 'E' in flags: needle = F'{needle}$' needles[n] = needle _V = 'V' in flags # noqa; Prints only lines that do not contain a match. _P = 'P' in flags # noqa; Skip files with non-printable characters. _O = 'O' in flags # noqa; Prints character offset before each matching line. _N = 'N' in flags # noqa; Prints the line number before each line that matches. _M = 'M' in flags # noqa; Prints only the filename if a file contains a match. nothing_found = True offset = 0 for path in paths: if path is (...): data = std.i.read() else: data = state.ingest_file(path) if data is None: return 1 if _P and not re.fullmatch('[\\s!-~]+', data): continue for n, line in enumerate(data.splitlines(True), 1): for needle in needles: hit = re.search(needle, line) if _V == bool(hit): continue nothing_found = False if not _M: if _O: o = offset + (hit.start() if hit else 0) line = F'{o}:{line}' if _N: line = F'{n}:{line}' if prefix_filename: line = F'{path}:{line}' std.o.write(line) elif path is not (...): std.o.write(path) break offset += len(line) return int(nothing_found) def execute_type(self, cmd, std, *_)-
Expand source code Browse git
@_command('TYPE') def execute_type(self, cmd: SynCommand, std: IO, *_): path = cmd.argument_string.strip() data = self.state.ingest_file(path) if data is None: yield ErrorCannotFindFile return 1 else: std.o.write(data) return 0 def execute_find(self, cmd, std, *_)-
Expand source code Browse git
@_command('FIND') def execute_find(self, cmd: SynCommand, std: IO, *_): return self.execute_find_or_findstr(cmd, std, findstr=False) def execute_findstr(self, cmd, std, *_)-
Expand source code Browse git
@_command('FINDSTR') def execute_findstr(self, cmd: SynCommand, std: IO, *_): return self.execute_find_or_findstr(cmd, std, findstr=True) def execute_set(self, cmd, std, *_)-
Expand source code Browse git
@_command('SET') def execute_set(self, cmd: SynCommand, std: IO, *_): if not (args := cmd.args): raise EmulatorException('Empty SET instruction') if cmd.verb.upper() != 'SET': raise RuntimeError # Since variables can be used in GOTO, a SET can be used to change the behavior of a GOTO. self.block_labels.clear() arithmetic = False quote_mode = False prompt = None it = iter(args) tk = next(it) if tk.upper() == '/P': if std.i.closed: prompt = '' elif not (prompt := std.i.readline()).endswith('\n'): raise InputLocked else: prompt = prompt.rstrip('\r\n') tk = next(it) else: cmd.junk = not self.cfg.show_sets yield cmd if tk.upper() == '/A': arithmetic = True try: tk = next(it) except StopIteration: tk = '' args = [tk, *it, *cmd.trailing_spaces] if arithmetic: def defang(s: str): def r(m: re.Match[str]): return F'_{prefix}{ord(m[0]):X}_' return re.sub(r'[^-\s()!~*/%+><&^|_\w]', r, s) def refang(s: str): # noqa def r(m: re.Match[str]): return chr(int(m[1], 16)) return re.sub(rf'_{prefix}([A-F0-9]+)_', r, s) prefix = F'{uuid.uuid4().time_mid:X}' namespace = {} translate = {} value = None if not (program := ''.join(args)): std.e.write('The syntax of the command is incorrect.\r\n') return ErrorZero.Val for assignment in program.split(','): assignment = assignment.strip() if not assignment: std.e.write('Missing operand.\r\n') return ErrorZero.Val name, operator, definition = re.split(r'([*+^|/%-&]|<<|>>|)=', assignment, maxsplit=1) name = name.upper() definition = re.sub(r'\b0([0-7]+)\b', r'0o\1', definition) if operator: definition = F'{name}{operator}({definition})' definition = defang(definition) expression = cautious_parse(definition) names = names_in_expression(expression) if names.stored or names.others: raise EmulatorException('Arithmetic SET had unexpected variable access.') for var in names.loaded: original = refang(name).upper() translate[original] = var if var in namespace: continue try: namespace[var] = batchint(self.environment[original]) except (KeyError, ValueError): namespace[var] = 0 code = compile(expression, filename='[ast]', mode='eval') value = eval(code, namespace, {}) self.environment[name] = str(value) namespace[defang(name)] = value if value is None: std.e.write('The syntax of the command is incorrect.') return else: std.o.write(F'{value!s}\r\n') else: try: eq = args.index(Ctrl.Equals) except ValueError: assignment = cmd.argument_string if assignment.startswith('"'): quote_mode = True assignment, _, unquoted = assignment[1:].rpartition('"') assignment = assignment or unquoted else: assignment = ''.join(args) name, _, content = assignment.partition('=') else: with StringIO() as io: for k in range(eq + 1, len(args)): io.write(args[k]) content = io.getvalue() name = cmd.args[eq - 1] if eq else '' name = name.upper() trailing_caret, content = uncaret(content, quote_mode) if trailing_caret: content = content[:-1] if prompt is not None: if (qc := content.strip()).startswith('"'): _, _, qc = qc. partition('"') # noqa qc, _, r = qc.rpartition('"') # noqa content = qc or r std.o.write(content) content = prompt if name: if content: self.environment[name] = content else: self.environment.pop(name, None) def execute_call(self, cmd, std, *_)-
Expand source code Browse git
@_command('CALL') def execute_call(self, cmd: SynCommand, std: IO, *_): cmdl = cmd.argument_string empty, colon, label = cmdl.partition(':') if colon and not empty: try: offset = self.parser.lexer.labels[label.upper()] except KeyError as KE: raise InvalidLabel(label) from KE emu = self.spawn(self.parser, std=std) else: offset = 0 path = cmdl.strip() code = self.state.ingest_file(path) if code is None: yield cmd return state = self.clone_state(environment=self.state.environment, filename=path) emu = self.spawn(code, std=std, state=state) if self.cfg.skip_call: emu.execute(called=True) else: yield from emu.trace(offset, called=True) def execute_setlocal(self, cmd, *_)-
Expand source code Browse git
@_command('SETLOCAL') def execute_setlocal(self, cmd: SynCommand, *_): yield cmd setting = cmd.argument_string.strip().upper() delay = { 'DISABLEDELAYEDEXPANSION': False, 'ENABLEDELAYEDEXPANSION' : True, }.get(setting, self.state.delayexpand) cmdxt = { 'DISABLEEXTENSIONS': False, 'ENABLEEXTENSIONS' : True, }.get(setting, self.state.cmdextended) self.state.delayexpand_stack.append(delay) self.state.cmdextended_stack.append(cmdxt) self.state.environment_stack.append(dict(self.environment)) def execute_endlocal(self, cmd, *_)-
Expand source code Browse git
@_command('ENDLOCAL') def execute_endlocal(self, cmd: SynCommand, *_): yield cmd if len(self.state.environment_stack) > 1: self.state.environment_stack.pop() self.state.delayexpand_stack.pop() def execute_goto(self, cmd, std, *_)-
Expand source code Browse git
@_command('GOTO') def execute_goto(self, cmd: SynCommand, std: IO, *_): if self.cfg.skip_goto: yield cmd return it = iter(cmd.args) mark = False for label in it: if not isinstance(label, Ctrl): break if label == Ctrl.Label: mark = True for label in it: break else: label = '' break else: std.e.write('No batch label specified to GOTO command.\r\n') raise AbortExecution label, *_ = label.split(maxsplit=1) key = label.upper() if mark and key == 'EOF': raise Exit(int(self.state.ec), False) if key not in self.block_labels: raise Goto(label) else: yield Error(F'Infinite Loop detected for label {key}') def execute_exit(self, cmd, *_)-
Expand source code Browse git
@_command('EXIT') def execute_exit(self, cmd: SynCommand, *_): it = iter(cmd.args) exit = True token = 0 for arg in it: if arg.upper() == '/B': exit = False continue token = arg break try: code = int(token) except ValueError: code = 0 yield cmd if self.cfg.skip_exit: return raise Exit(code, exit) def execute_chdir(self, cmd, *_)-
Expand source code Browse git
@_command('CHDIR') @_command('CD') def execute_chdir(self, cmd: SynCommand, *_): yield cmd self.state.cwd = cmd.argument_string.strip() def execute_pushd(self, cmd, *_)-
Expand source code Browse git
@_command('PUSHD') def execute_pushd(self, cmd: SynCommand, *_): yield cmd self.state.dirstack.append(self.state.cwd) self.execute_chdir(cmd) def execute_popd(self, cmd, *_)-
Expand source code Browse git
@_command('POPD') def execute_popd(self, cmd: SynCommand, *_): yield cmd try: self.state.cwd = self.state.dirstack.pop() except IndexError: pass def execute_echo(self, cmd, std, in_group)-
Expand source code Browse git
@_command('ECHO') def execute_echo(self, cmd: SynCommand, std: IO, in_group: bool): cmdl = cmd.argument_string mode = cmdl.strip().lower() current_state = self.state.echo if mode == 'on': if self.cfg.show_nops or current_state is False: yield cmd self.state.echo = True return if mode == 'off': if self.cfg.show_nops or current_state is True: yield cmd self.state.echo = False return yield cmd if mode: if in_group and not cmdl.endswith(' '): cmdl += ' ' std.o.write(F'{cmdl}\r\n') else: mode = 'on' if self.state.echo else 'off' std.o.write(F'ECHO is {mode}.\r\n') def execute_cls(self, cmd, *_)-
Expand source code Browse git
@_command('CLS') def execute_cls(self, cmd: SynCommand, *_): yield cmd def execute_del(self, cmd, std, *_)-
Expand source code Browse git
@_command('ERASE') @_command('DEL') def execute_del(self, cmd: SynCommand, std: IO, *_): if not cmd.args: yield Error('The syntax of the command is incorrect') return 1 else: yield cmd flags = {} it = iter(cmd.args) while (arg := next(it)).startswith('/') and 1 < len(arg): flag = arg.upper() if flag[:3] == '/A:': flags['A'] = flag[3:] continue flags[flag[1]] = True _P = 'P' in flags # Prompts for confirmation before deleting each file. _F = 'F' in flags # Force deleting of read-only files. _S = 'S' in flags # Delete specified files from all subdirectories. _Q = 'Q' in flags # Quiet mode, do not ask if ok to delete on global wildcard paths = [arg, *it] state = self.state cwd = state.cwd for pattern in paths: for path in list(state.file_system): if not winfnmatch(pattern, path, cwd): continue if _F: pass if _S: pass if _Q: pass if _P and state.exists_file(pattern): std.o.write(F'{pattern}, Delete (Y/N)? ') decision = None while decision not in ('y', 'n'): confirmation = std.i.readline() if not confirmation.endswith('\n'): raise InputLocked decision = confirmation[:1].lower() if decision == 'n': continue state.remove_file(path) return 0 def execute_start(self, cmd, std, *_)-
Expand source code Browse git
@_command('START') def execute_start(self, cmd: SynCommand, std: IO, *_): yield cmd it = iter(cmd.ast.fragments) it = itertools.islice(it, cmd.argument_offset, None) title = None start = None cwd = self.state.cwd env = ... for arg in it: if title is None: if '"' not in arg: title = '' else: title = unquote(arg) continue if arg.isspace(): continue if not arg.startswith('/'): start = unquote(arg) break if (flag := arg.upper()) in ('/NODE', '/AFFINITY', '/MACHINE'): next(it) elif flag == '/D': cwd = next(it) elif flag == '/I': env = None if start and (batch := self.state.ingest_file(start)): state = self.clone_state(environment=env) state.cwd = cwd state.command_line = _fuse(it).strip() shell = self.spawn(batch, state, std) yield from shell.trace() def execute_cmd(self, cmd, std, *_)-
Expand source code Browse git
@_command('CMD') def execute_cmd(self, cmd: SynCommand, std: IO, *_): yield cmd it = iter(cmd.ast.fragments) command = None quiet = False strip = False codec = 'cp1252' delayexpand = None cmdextended = None for arg in it: if arg.isspace() or not arg.startswith('/'): continue name, _, flag = arg[1:].partition(':') flag = flag.upper() name = name.upper() if name in 'CKR': command = _fuse(it) break elif name == 'Q': quiet = True elif name == 'S': strip = True elif name == 'U': codec = 'utf-16le' elif name == 'E': cmdextended = _onoff(flag) elif name == 'V': delayexpand = _onoff(flag) else: return 0 if (stripped := re.search('^\\s*"(.*)"', command)) and (strip or command.count('"') != 2 or re.search('[&<>()@^|]', stripped[1]) or re.search('\\s', stripped[1]) is None ): command = stripped[1] state = self.clone_state(delayexpand=delayexpand, cmdextended=cmdextended) state.codec = codec state.echo = not quiet shell = self.spawn(command, state, std) yield from shell.trace() def execute_unimplemented_program(self, cmd, *_)-
Expand source code Browse git
@_command('ARP') @_command('AT') @_command('ATBROKER') @_command('BGINFO') @_command('BITSADMIN') @_command('CERTUTIL') @_command('CLIP') @_command('CMSTP') @_command('COMPACT') @_command('CONTROL') @_command('CSCRIPT') @_command('CURL') @_command('DEFRAG') @_command('DISKSHADOW') @_command('ESENTUTL') @_command('EXPAND') @_command('EXPLORER') @_command('EXTRAC32') @_command('FODHELPER') @_command('FORFILES') @_command('FTP') @_command('HOSTNAME') @_command('HOSTNAME') @_command('INSTALLUTIL') @_command('IPCONFIG') @_command('LOGOFF') @_command('MAKECAB') @_command('MAVINJECT') @_command('MOUNTVOL') @_command('MSBUILD') @_command('MSHTA') @_command('MSIEXEC') @_command('MSTSC') @_command('NET') @_command('NET1') @_command('NETSH') @_command('NSLOOKUP') @_command('ODBCCONF') @_command('PATHPING') @_command('PING') @_command('POWERSHELL') @_command('PRESENTATIONHOST') @_command('PWSH') @_command('REG') @_command('REGSVR32') @_command('ROUTE') @_command('RUNDLL32') @_command('SCP') @_command('SDCLT') @_command('SETX') @_command('SFTP') @_command('SHUTDOWN') @_command('SSH') @_command('SUBST') @_command('SYNCAPPVPUBLISHINGSERVER') @_command('SYSTEMINFO') @_command('TAR') @_command('TELNET') @_command('TFTP') @_command('TIMEOUT') @_command('TRACERT') @_command('VSSADMIN') @_command('WBADMIN') @_command('WHERE') @_command('WHOAMI') @_command('WINRM') @_command('WINRS') @_command('WSCRIPT') def execute_unimplemented_program(self, cmd: SynCommand, *_): yield cmd return 0 def execute_unimplemented_command_unmodified_ec(self, cmd, *_)-
Expand source code Browse git
@_command('CLS') def execute_unimplemented_command_unmodified_ec(self, cmd: SynCommand, *_): yield cmd def execute_unimplemented_command(self, cmd, *_)-
Expand source code Browse git
@_command('ASSOC') @_command('ATTRIB') @_command('BCDEDIT') @_command('BREAK') @_command('CACLS') @_command('CHCP') @_command('CHKDSK') @_command('CHKNTFS') @_command('COLOR') @_command('COMP') @_command('COMPACT') @_command('CONVERT') @_command('COPY') @_command('DATE') @_command('DIR') @_command('DISKPART') @_command('DOSKEY') @_command('DRIVERQUERY') @_command('FC') @_command('FORMAT') @_command('FSUTIL') @_command('FTYPE') @_command('GPRESULT') @_command('ICACLS') @_command('LABEL') @_command('MD') @_command('MKDIR') @_command('MKLINK') @_command('MODE') @_command('MORE') @_command('MOVE') @_command('OPENFILES') @_command('PATH') @_command('PAUSE') @_command('PRINT') @_command('PROMPT') @_command('RD') @_command('RECOVER') @_command('REN') @_command('RENAME') @_command('REPLACE') @_command('RMDIR') @_command('ROBOCOPY') @_command('SC') @_command('SCHTASKS') @_command('SHIFT') @_command('SHUTDOWN') @_command('SORT') @_command('SUBST') @_command('SYSTEMINFO') @_command('TASKKILL') @_command('TASKLIST') @_command('TIME') @_command('TITLE') @_command('TREE') @_command('TYPE') @_command('VER') @_command('VERIFY') @_command('VOL') @_command('WMIC') @_command('XCOPY') def execute_unimplemented_command(self, cmd: SynCommand, *_): yield cmd return 0 def execute_rem(self, cmd, *_)-
Expand source code Browse git
@_command('REM') def execute_rem(self, cmd: SynCommand, *_): if self.cfg.show_comments: yield cmd def execute_help(self, cmd, std, *_)-
Expand source code Browse git
@_command('HELP') def execute_help(self, cmd: SynCommand, std: IO, *_): yield cmd std.o.write(HelpOutput['HELP']) return 0 def execute_command(self, cmd, std, in_group)-
Expand source code Browse git
def execute_command(self, cmd: SynCommand, std: IO, in_group: bool): verb = cmd.verb.upper().strip() handler = self._command.handlers.get(verb) if handler is None: base, ext = ntpath.splitext(verb) handler = None if any(ext == pe.upper() for pe in self.state.envar('PATHEXT', '').split(';')): handler = self._command.handlers.get(base) if handler is None: if self.state.exists_file(verb): self.state.ec = 0 elif not indicators.winfpath.value.fullmatch(verb): if '\uFFFD' in verb or not verb.isprintable(): self.state.ec = 9009 cmd.junk = True else: cmd.junk = not self.cfg.show_junk yield cmd return paths: dict[int, str] = {} for src, r in cmd.ast.redirects.items(): if not 0 <= src <= 2 or (src == 0) != r.is_input: continue if isinstance((target := r.target), str): if target.upper() == 'NUL': std[src] = DevNull() else: data = self.state.ingest_file(target) if src == 0: if data is None: yield ErrorCannotFindFile return std.i = StringIO(data) else: if r.is_out_append: buffer = StringIO(data) buffer.seek(0, 2) else: buffer = StringIO() std[src] = buffer paths[src] = target elif src == 1 and target == 2: std.o = std.e elif src == 2 and target == 1: std.e = std.o if '/?' in cmd.args: std.o.write(HelpOutput[verb]) self.state.ec = 0 return if (result := handler(self, cmd, std, in_group)) is None: pass elif not isinstance(result, (int, ErrorZero)): result = (yield from result) for k, path in paths.items(): self.state.create_file(path, std[k].getvalue()) if result is not None: self.state.ec = result def trace_pipeline(self, pipeline, std, in_group)-
Expand source code Browse git
@_node(AstPipeline) def trace_pipeline(self, pipeline: AstPipeline, std: IO, in_group: bool): length = len(pipeline.parts) streams = IO(*std) if length > 1: yield synthesize(pipeline) for k, part in enumerate(pipeline.parts, 1): if k != 1: streams.i = streams.o streams.i.seek(0) if k == length: streams.o = std.o else: streams.o = StringIO() if isinstance(part, AstGroup): it = self.trace_group(part, streams, in_group) else: ast = self.expand_ast_node(part) cmd = synthesize(ast) it = self.execute_command(cmd, streams, in_group) yield from it def trace_sequence(self, sequence, std, in_group)-
Expand source code Browse git
@_node(AstSequence) def trace_sequence(self, sequence: AstSequence, std: IO, in_group: bool): yield from self.trace_statement(sequence.head, std, in_group) for cs in sequence.tail: if cs.condition == AstCondition.Failure: if bool(self.state.ec) is False: continue if cs.condition == AstCondition.Success: if bool(self.state.ec) is True: continue yield from self.trace_statement(cs.statement, std, in_group) def trace_if(self, _if, std, in_group)-
Expand source code Browse git
@_node(AstIf) def trace_if(self, _if: AstIf, std: IO, in_group: bool): yield synthesize(_if) _if = self.expand_ast_node(_if) self.block_labels.clear() if _if.variant == AstIfVariant.ErrorLevel: condition = _if.var_int <= self.state.ec elif _if.variant == AstIfVariant.CmdExtVersion: condition = _if.var_int <= self.state.extensions_version elif _if.variant == AstIfVariant.Exist: condition = self.state.exists_file(_if.var_str) elif _if.variant == AstIfVariant.Defined: condition = _if.var_str.upper() in self.state.environment else: lhs = _if.lhs rhs = _if.rhs cmp = _if.cmp assert lhs is not None assert rhs is not None if cmp == AstIfCmp.STR: if _if.casefold: if isinstance(lhs, str): lhs = lhs.casefold() if isinstance(rhs, str): rhs = rhs.casefold() condition = lhs == rhs elif cmp == AstIfCmp.GTR: condition = lhs > rhs elif cmp == AstIfCmp.GEQ: condition = lhs >= rhs elif cmp == AstIfCmp.NEQ: condition = lhs != rhs elif cmp == AstIfCmp.EQU: condition = lhs == rhs elif cmp == AstIfCmp.LSS: condition = lhs < rhs elif cmp == AstIfCmp.LEQ: condition = lhs <= rhs else: raise RuntimeError(cmp) if _if.negated: condition = not condition if condition: yield from self.trace_sequence(_if.then_do, std, in_group) elif (_else := _if.else_do): yield from self.trace_sequence(_else, std, in_group) def trace_for(self, _for, std, in_group)-
Expand source code Browse git
@_node(AstFor) def trace_for(self, _for: AstFor, std: IO, in_group: bool): state = self.state cwd = state.cwd vars = state.new_forloop() body = _for.body name = _for.variable vars[name] = '' if ( self.contains_for_variable(body, vars) or _for.variant != AstForVariant.NumericLoop or len(_for.spec) != 1 ): yield synthesize(_for) if _for.variant == AstForVariant.FileParsing: if _for.mode == AstForParserMode.Command: emulator = self.spawn(_for.specline, self.clone_state(filename=state.name)) yield from emulator.trace() lines = emulator.std.o.getvalue().splitlines() elif _for.mode == AstForParserMode.Literal: lines = _for.spec else: def lines_from_files(): fs = state.file_system for name in _for.spec: for path, content in fs.items(): if not winfnmatch(path, name, cwd): continue yield from content.splitlines(False) lines = lines_from_files() opt = _for.options tokens = sorted(opt.tokens) split = re.compile('[{}]+'.format(re.escape(opt.delims))) count = tokens[-1] + 1 if tokens else 0 first_variable = ord(name) if opt.asterisk: tokens.append(count) for n, line in enumerate(lines): if n < opt.skip: continue if opt.comment and line.startswith(opt.comment): continue if count: tokenized = split.split(line, maxsplit=count) else: tokenized = (line,) for k, tok in enumerate(tokens): name = chr(first_variable + k) if not name.isalpha(): raise EmulatorException('Ran out of variables in FOR-Loop.') try: vars[name] = tokenized[tok] except IndexError: vars[name] = '' yield from self.trace_sequence(body, std, in_group) else: for entry in _for.spec: vars[name] = entry yield from self.trace_sequence(body, std, in_group) state.end_forloop() def trace_group(self, group, std, in_group)-
Expand source code Browse git
@_node(AstGroup) def trace_group(self, group: AstGroup, std: IO, in_group: bool): for sequence in group.fragments: yield from self.trace_sequence(sequence, std, True) yield synthesize(group) def trace_label(self, label, *_)-
Expand source code Browse git
@_node(AstLabel) def trace_label(self, label: AstLabel, *_): if label.comment: if self.cfg.show_comments: yield synthesize(label) else: if self.cfg.show_labels: yield synthesize(label) self.block_labels.add(label.label.upper()) def trace_statement(self, statement, std, in_group)-
Expand source code Browse git
def trace_statement(self, statement: AstStatement, std: IO, in_group: bool): try: handler = self._node.handlers[statement.__class__] except KeyError: raise RuntimeError(statement) yield from handler(self, statement, std, in_group) def emulate_commands(self, allow_junk=False)-
Expand source code Browse git
def emulate_commands(self, allow_junk=False): for syn in self.trace(): if not isinstance(syn, SynCommand): continue if not allow_junk and syn.junk: continue yield str(syn) def emulate_to_depth(self, depth=0)-
Expand source code Browse git
def emulate_to_depth(self, depth: int = 0): for syn in self.trace(): if not isinstance(syn, SynNodeBase): continue if syn.ast.depth <= depth: yield str(syn) def emulate(self, offset=0)-
Expand source code Browse git
def emulate(self, offset: int = 0): last: AstNode | None = None junk: AstNode | None = None for syn in self.trace(offset): if not isinstance(syn, SynNodeBase): continue ast = syn.ast if isinstance(syn, SynCommand) and syn.junk: junk = ast continue if junk is not None: if junk.is_descendant_of(ast): if not last or not last.is_descendant_of(ast): continue if last is not None: if ast.is_descendant_of(last): # we already synthesized a parent construct, like a FOR loop or IF block continue if last.is_descendant_of(ast): # we synthesized a command and no longer need to synthesize an AST node that # wraps it, like a group continue if isinstance(ast, AstPipeline): if len(ast.parts) == 1: continue if last is ast: raise RuntimeError('Emulator attempted to synthesize the same command twice.') last = ast yield str(syn) def execute(self, offset=0, called=False)-
Expand source code Browse git
def execute(self, offset: int = 0, called: bool = False): for _ in self.trace(offset, called=called): pass def trace(self, offset=0, called=False)-
Expand source code Browse git
def trace(self, offset: int = 0, called: bool = False): if (name := self.state.name): self.state.create_file(name, self.parser.lexer.text) length = len(self.parser.lexer.code) labels = self.parser.lexer.labels while offset < length: try: for sequence in self.parser.parse(offset): if isinstance(sequence, AstError): yield Error(sequence.error) continue yield from self.trace_sequence(sequence, self.std, False) except Goto as goto: try: offset = labels[goto.label.upper()] except KeyError: raise InvalidLabel(goto.label) from goto continue except Exit as exit: self.state.ec = exit.code if exit.exit and called: raise else: break except AbortExecution: self.state.ec = 1 break else: break