Module refinery.units.formats.pe.dotnet.dnfields
Expand source code Browse git
from __future__ import annotations
import re
import struct
from collections import Counter
from typing import NamedTuple
from refinery.lib.dotnet import integer_from_ldc
from refinery.lib.dotnet.header import DotNetHeader
from refinery.lib.dotnet.signatures import FieldSig, SzArrayTypeSig, parse_signature
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.units.formats.pe.dotnet import CodePath
class FieldInfo(NamedTuple):
type: str
elements: int
size: int
offset: int
class dnfields(PathExtractorUnit):
"""
Extract data from constant field variables in classes of .NET executables.
Since the .NET header stores only the offset and not the size of constant fields, heuristics
are used to search for opcode sequences that load the data and additional heuristics are used
to guess the size of the data type.
"""
@classmethod
def handles(cls, data):
from refinery.lib.id import is_likely_pe_dotnet
return is_likely_pe_dotnet(data)
def unpack(self, data):
header = DotNetHeader(data, parse_resources=False)
tables = header.meta.Streams.Tables
fields = tables.FieldRVA
cpaths = CodePath(header)
if not fields:
return
icache: dict[bytes, FieldInfo] = {}
memory = memoryview(data)
def _guess_field_info(t: int, signature: bytes, field_name: str | None = None, sizemap: dict = {
'^s?byte$' : 1,
'^s?char$' : 2,
'^[us]?int.?16$' : 2,
'^[us]?int.?32$' : 4,
'^[us]?int.?64$' : 8,
}) -> tuple[str | None, FieldInfo | None]:
try:
info = icache[signature]
except KeyError:
info = None
else:
if field_name is not None:
return field_name, info
pattern = (
BR'(\x20....|\x1F.|[\x17-\x1E])' # ldc.i4 count
BR'\x8D(...)([\x01\x02])' # newarr col|row
BR'\x25' # dup
BR'\xD0\x%02x\x%02x\x%02x\x04' # ldtoken t
BR'(?:.{0,12}?' # ...
BR'\x80(...)\x04)?' % ( # stsfld variable
(t >> 0x00) & 0xFF,
(t >> 0x08) & 0xFF,
(t >> 0x10) & 0xFF
)
)
for match in re.finditer(pattern, memory, flags=re.DOTALL):
if info is None:
count, j, r, name = match.groups()
count = integer_from_ldc(count)
j, r = struct.unpack('<LB', B'%s\0%s' % (j, r))
typename = tables[r][j - 1].TypeName
else:
name = match.group(4)
typename = info.type
for pattern, size in sizemap.items():
if not re.match(pattern, typename, flags=re.IGNORECASE):
continue
if name:
try:
name = struct.unpack('<L', B'%s\0' % name)
name = name[0]
name = tables[4][name - 1].Name
except Exception as E:
self.log_info(F'attempt to parse field name failed: {E!s}')
name = None
if name is None:
name = field_name
if info is None:
info = FieldInfo(typename, count, size, match.start())
icache[signature] = info
return name, info
return None, None
def _guess_array_info(
t: int,
signature: bytes,
element_name: str,
element_size: int,
field_name: str | None = None,
) -> tuple[str | None, FieldInfo | None]:
try:
info = icache[signature]
except KeyError:
info = None
else:
if field_name is not None:
return field_name, info
pattern = (
BR'(\x20....|\x1F.|[\x17-\x1E])' # ldc.i4 count
BR'\x8D...[\x01\x02]' # newarr col|row
BR'\x25' # dup
BR'\xD0\x%02x\x%02x\x%02x\x04' # ldtoken t
BR'(?:.{0,12}?' # ...
BR'\x80(...)\x04)?' % ( # stsfld variable
(t >> 0x00) & 0xFF,
(t >> 0x08) & 0xFF,
(t >> 0x10) & 0xFF
)
)
for match in re.finditer(pattern, memory, flags=re.DOTALL):
count_bytes, name = match.groups()
count = integer_from_ldc(count_bytes)
if name:
try:
name = struct.unpack('<L', B'%s\0' % name)
name = name[0]
name = tables[4][name - 1].Name
except Exception as E:
self.log_info(
F'attempt to parse field name failed: {E!s}')
name = None
if name is None:
name = field_name
if info is None:
info = FieldInfo(
element_name, count, element_size, match.start())
icache[signature] = info
return name, info
return None, None
iwidth = len(str(len(fields)))
rwidth = max(len(F'{field.RVA:X}') for field in fields)
rwidth = max(rwidth, 4)
remaining_field_indices = set(range(len(tables.Field)))
unpack = []
name_count = Counter(tables[rv.Field].Name for rv in fields)
name_width = len(str(len(fields)))
for k, rv in enumerate(fields):
_index = rv.Field.Index
field = tables.Field[_index - 1]
remaining_field_indices.discard(_index - 1)
if not field.Flags.HasFieldRVA:
continue
fname = field.Name
type = None
signature = bytes(field.Signature)
offset = header.pe.rva_to_offset(rv.RVA)
guess = None
try:
sig = parse_signature(signature)
except Exception:
sig = None
if isinstance(sig, FieldSig):
ftype = sig.field_type
if isinstance(ftype, SzArrayTypeSig):
element = ftype.element
if element.byte_size is not None and element.byte_size > 0:
fname, guess = _guess_array_info(
_index, signature,
element.name, element.byte_size, fname)
elif ftype.byte_size is not None and ftype.byte_size > 0:
guess = FieldInfo(ftype.name, 1, ftype.byte_size, 0)
if guess is None:
fname, guess = _guess_field_info(_index, signature, fname)
if guess is None or fname is None:
self.log_warn(lambda: F'field {k:0{iwidth}d} with signature {field.Signature.hex()}: unable to guess type information')
continue
if not fname.isprintable() or name_count[fname] > 1:
fname = F'Field{k + 1:0{name_width}d}'
type = guess.type.lower()
if guess.elements > 1:
type += F'[{guess.elements}]'
self.log_debug(
F'field {k:0{iwidth}d}; token 0x{_index:06X}; RVA 0x{rv.RVA:04X}; count {guess.elements}; type {guess.type}; name {fname}')
end = offset + guess.elements * guess.size
path = cpaths.method_path(guess.offset) if guess.offset else ''
unpack.append(UnpackResult(F'{path}/{fname}', memory[offset:end], name=fname, type=type))
for _index in remaining_field_indices:
field = tables.Field[_index]
index = _index + 1
name = field.Name
if field.Flags.HasFieldRVA:
self.log_warn(F'field {name} has RVA flag set, but no RVA was found')
token = index.to_bytes(3, 'little')
values = {}
for match in re.finditer((
BR'\x72(?P<token>...)\x70' # ldstr
BR'(?:\x6F(?P<function>...)\x0A)?' # call GetBytes
BR'\x80%s\x04' # stsfld
) % re.escape(token), data, re.DOTALL):
md = match.groupdict()
fn_token = md.get('function')
fn_index = fn_token and int.from_bytes(fn_token, 'little') or None
if fn_index is not None:
fn_name = tables.MemberRef[fn_index].Name
if fn_name != 'GetBytes':
self.log_info(F'skipping string assignment passing through call to {fn_name}')
continue
k = int.from_bytes(md['token'], 'little')
values[match.start()] = header.meta.Streams.US[k].encode(self.codec)
if not values:
continue
if len(values) == 1:
offset, value = values.popitem()
path = cpaths.method_path(offset)
unpack.append(UnpackResult(F'{path}/{name}', value, name=name, type='string'))
unpack.sort(key=lambda u: u.path)
yield from unpack
Classes
class FieldInfo (type, elements, size, offset)-
FieldInfo(type, elements, size, offset)
Expand source code Browse git
class FieldInfo(NamedTuple): type: str elements: int size: int offset: intAncestors
- builtins.tuple
Instance variables
var type-
Alias for field number 0
Expand source code Browse git
class FieldInfo(NamedTuple): type: str elements: int size: int offset: int var elements-
Alias for field number 1
Expand source code Browse git
class FieldInfo(NamedTuple): type: str elements: int size: int offset: int var size-
Alias for field number 2
Expand source code Browse git
class FieldInfo(NamedTuple): type: str elements: int size: int offset: int var offset-
Alias for field number 3
Expand source code Browse git
class FieldInfo(NamedTuple): type: str elements: int size: int offset: int
class dnfields (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')-
Extract data from constant field variables in classes of .NET executables.
Since the .NET header stores only the offset and not the size of constant fields, heuristics are used to search for opcode sequences that load the data and additional heuristics are used to guess the size of the data type.
Expand source code Browse git
class dnfields(PathExtractorUnit): """ Extract data from constant field variables in classes of .NET executables. Since the .NET header stores only the offset and not the size of constant fields, heuristics are used to search for opcode sequences that load the data and additional heuristics are used to guess the size of the data type. """ @classmethod def handles(cls, data): from refinery.lib.id import is_likely_pe_dotnet return is_likely_pe_dotnet(data) def unpack(self, data): header = DotNetHeader(data, parse_resources=False) tables = header.meta.Streams.Tables fields = tables.FieldRVA cpaths = CodePath(header) if not fields: return icache: dict[bytes, FieldInfo] = {} memory = memoryview(data) def _guess_field_info(t: int, signature: bytes, field_name: str | None = None, sizemap: dict = { '^s?byte$' : 1, '^s?char$' : 2, '^[us]?int.?16$' : 2, '^[us]?int.?32$' : 4, '^[us]?int.?64$' : 8, }) -> tuple[str | None, FieldInfo | None]: try: info = icache[signature] except KeyError: info = None else: if field_name is not None: return field_name, info pattern = ( BR'(\x20....|\x1F.|[\x17-\x1E])' # ldc.i4 count BR'\x8D(...)([\x01\x02])' # newarr col|row BR'\x25' # dup BR'\xD0\x%02x\x%02x\x%02x\x04' # ldtoken t BR'(?:.{0,12}?' # ... BR'\x80(...)\x04)?' % ( # stsfld variable (t >> 0x00) & 0xFF, (t >> 0x08) & 0xFF, (t >> 0x10) & 0xFF ) ) for match in re.finditer(pattern, memory, flags=re.DOTALL): if info is None: count, j, r, name = match.groups() count = integer_from_ldc(count) j, r = struct.unpack('<LB', B'%s\0%s' % (j, r)) typename = tables[r][j - 1].TypeName else: name = match.group(4) typename = info.type for pattern, size in sizemap.items(): if not re.match(pattern, typename, flags=re.IGNORECASE): continue if name: try: name = struct.unpack('<L', B'%s\0' % name) name = name[0] name = tables[4][name - 1].Name except Exception as E: self.log_info(F'attempt to parse field name failed: {E!s}') name = None if name is None: name = field_name if info is None: info = FieldInfo(typename, count, size, match.start()) icache[signature] = info return name, info return None, None def _guess_array_info( t: int, signature: bytes, element_name: str, element_size: int, field_name: str | None = None, ) -> tuple[str | None, FieldInfo | None]: try: info = icache[signature] except KeyError: info = None else: if field_name is not None: return field_name, info pattern = ( BR'(\x20....|\x1F.|[\x17-\x1E])' # ldc.i4 count BR'\x8D...[\x01\x02]' # newarr col|row BR'\x25' # dup BR'\xD0\x%02x\x%02x\x%02x\x04' # ldtoken t BR'(?:.{0,12}?' # ... BR'\x80(...)\x04)?' % ( # stsfld variable (t >> 0x00) & 0xFF, (t >> 0x08) & 0xFF, (t >> 0x10) & 0xFF ) ) for match in re.finditer(pattern, memory, flags=re.DOTALL): count_bytes, name = match.groups() count = integer_from_ldc(count_bytes) if name: try: name = struct.unpack('<L', B'%s\0' % name) name = name[0] name = tables[4][name - 1].Name except Exception as E: self.log_info( F'attempt to parse field name failed: {E!s}') name = None if name is None: name = field_name if info is None: info = FieldInfo( element_name, count, element_size, match.start()) icache[signature] = info return name, info return None, None iwidth = len(str(len(fields))) rwidth = max(len(F'{field.RVA:X}') for field in fields) rwidth = max(rwidth, 4) remaining_field_indices = set(range(len(tables.Field))) unpack = [] name_count = Counter(tables[rv.Field].Name for rv in fields) name_width = len(str(len(fields))) for k, rv in enumerate(fields): _index = rv.Field.Index field = tables.Field[_index - 1] remaining_field_indices.discard(_index - 1) if not field.Flags.HasFieldRVA: continue fname = field.Name type = None signature = bytes(field.Signature) offset = header.pe.rva_to_offset(rv.RVA) guess = None try: sig = parse_signature(signature) except Exception: sig = None if isinstance(sig, FieldSig): ftype = sig.field_type if isinstance(ftype, SzArrayTypeSig): element = ftype.element if element.byte_size is not None and element.byte_size > 0: fname, guess = _guess_array_info( _index, signature, element.name, element.byte_size, fname) elif ftype.byte_size is not None and ftype.byte_size > 0: guess = FieldInfo(ftype.name, 1, ftype.byte_size, 0) if guess is None: fname, guess = _guess_field_info(_index, signature, fname) if guess is None or fname is None: self.log_warn(lambda: F'field {k:0{iwidth}d} with signature {field.Signature.hex()}: unable to guess type information') continue if not fname.isprintable() or name_count[fname] > 1: fname = F'Field{k + 1:0{name_width}d}' type = guess.type.lower() if guess.elements > 1: type += F'[{guess.elements}]' self.log_debug( F'field {k:0{iwidth}d}; token 0x{_index:06X}; RVA 0x{rv.RVA:04X}; count {guess.elements}; type {guess.type}; name {fname}') end = offset + guess.elements * guess.size path = cpaths.method_path(guess.offset) if guess.offset else '' unpack.append(UnpackResult(F'{path}/{fname}', memory[offset:end], name=fname, type=type)) for _index in remaining_field_indices: field = tables.Field[_index] index = _index + 1 name = field.Name if field.Flags.HasFieldRVA: self.log_warn(F'field {name} has RVA flag set, but no RVA was found') token = index.to_bytes(3, 'little') values = {} for match in re.finditer(( BR'\x72(?P<token>...)\x70' # ldstr BR'(?:\x6F(?P<function>...)\x0A)?' # call GetBytes BR'\x80%s\x04' # stsfld ) % re.escape(token), data, re.DOTALL): md = match.groupdict() fn_token = md.get('function') fn_index = fn_token and int.from_bytes(fn_token, 'little') or None if fn_index is not None: fn_name = tables.MemberRef[fn_index].Name if fn_name != 'GetBytes': self.log_info(F'skipping string assignment passing through call to {fn_name}') continue k = int.from_bytes(md['token'], 'little') values[match.start()] = header.meta.Streams.US[k].encode(self.codec) if not values: continue if len(values) == 1: offset, value = values.popitem() path = cpaths.method_path(offset) unpack.append(UnpackResult(F'{path}/{name}', value, name=name, type='string')) unpack.sort(key=lambda u: u.path) yield from unpackAncestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data): header = DotNetHeader(data, parse_resources=False) tables = header.meta.Streams.Tables fields = tables.FieldRVA cpaths = CodePath(header) if not fields: return icache: dict[bytes, FieldInfo] = {} memory = memoryview(data) def _guess_field_info(t: int, signature: bytes, field_name: str | None = None, sizemap: dict = { '^s?byte$' : 1, '^s?char$' : 2, '^[us]?int.?16$' : 2, '^[us]?int.?32$' : 4, '^[us]?int.?64$' : 8, }) -> tuple[str | None, FieldInfo | None]: try: info = icache[signature] except KeyError: info = None else: if field_name is not None: return field_name, info pattern = ( BR'(\x20....|\x1F.|[\x17-\x1E])' # ldc.i4 count BR'\x8D(...)([\x01\x02])' # newarr col|row BR'\x25' # dup BR'\xD0\x%02x\x%02x\x%02x\x04' # ldtoken t BR'(?:.{0,12}?' # ... BR'\x80(...)\x04)?' % ( # stsfld variable (t >> 0x00) & 0xFF, (t >> 0x08) & 0xFF, (t >> 0x10) & 0xFF ) ) for match in re.finditer(pattern, memory, flags=re.DOTALL): if info is None: count, j, r, name = match.groups() count = integer_from_ldc(count) j, r = struct.unpack('<LB', B'%s\0%s' % (j, r)) typename = tables[r][j - 1].TypeName else: name = match.group(4) typename = info.type for pattern, size in sizemap.items(): if not re.match(pattern, typename, flags=re.IGNORECASE): continue if name: try: name = struct.unpack('<L', B'%s\0' % name) name = name[0] name = tables[4][name - 1].Name except Exception as E: self.log_info(F'attempt to parse field name failed: {E!s}') name = None if name is None: name = field_name if info is None: info = FieldInfo(typename, count, size, match.start()) icache[signature] = info return name, info return None, None def _guess_array_info( t: int, signature: bytes, element_name: str, element_size: int, field_name: str | None = None, ) -> tuple[str | None, FieldInfo | None]: try: info = icache[signature] except KeyError: info = None else: if field_name is not None: return field_name, info pattern = ( BR'(\x20....|\x1F.|[\x17-\x1E])' # ldc.i4 count BR'\x8D...[\x01\x02]' # newarr col|row BR'\x25' # dup BR'\xD0\x%02x\x%02x\x%02x\x04' # ldtoken t BR'(?:.{0,12}?' # ... BR'\x80(...)\x04)?' % ( # stsfld variable (t >> 0x00) & 0xFF, (t >> 0x08) & 0xFF, (t >> 0x10) & 0xFF ) ) for match in re.finditer(pattern, memory, flags=re.DOTALL): count_bytes, name = match.groups() count = integer_from_ldc(count_bytes) if name: try: name = struct.unpack('<L', B'%s\0' % name) name = name[0] name = tables[4][name - 1].Name except Exception as E: self.log_info( F'attempt to parse field name failed: {E!s}') name = None if name is None: name = field_name if info is None: info = FieldInfo( element_name, count, element_size, match.start()) icache[signature] = info return name, info return None, None iwidth = len(str(len(fields))) rwidth = max(len(F'{field.RVA:X}') for field in fields) rwidth = max(rwidth, 4) remaining_field_indices = set(range(len(tables.Field))) unpack = [] name_count = Counter(tables[rv.Field].Name for rv in fields) name_width = len(str(len(fields))) for k, rv in enumerate(fields): _index = rv.Field.Index field = tables.Field[_index - 1] remaining_field_indices.discard(_index - 1) if not field.Flags.HasFieldRVA: continue fname = field.Name type = None signature = bytes(field.Signature) offset = header.pe.rva_to_offset(rv.RVA) guess = None try: sig = parse_signature(signature) except Exception: sig = None if isinstance(sig, FieldSig): ftype = sig.field_type if isinstance(ftype, SzArrayTypeSig): element = ftype.element if element.byte_size is not None and element.byte_size > 0: fname, guess = _guess_array_info( _index, signature, element.name, element.byte_size, fname) elif ftype.byte_size is not None and ftype.byte_size > 0: guess = FieldInfo(ftype.name, 1, ftype.byte_size, 0) if guess is None: fname, guess = _guess_field_info(_index, signature, fname) if guess is None or fname is None: self.log_warn(lambda: F'field {k:0{iwidth}d} with signature {field.Signature.hex()}: unable to guess type information') continue if not fname.isprintable() or name_count[fname] > 1: fname = F'Field{k + 1:0{name_width}d}' type = guess.type.lower() if guess.elements > 1: type += F'[{guess.elements}]' self.log_debug( F'field {k:0{iwidth}d}; token 0x{_index:06X}; RVA 0x{rv.RVA:04X}; count {guess.elements}; type {guess.type}; name {fname}') end = offset + guess.elements * guess.size path = cpaths.method_path(guess.offset) if guess.offset else '' unpack.append(UnpackResult(F'{path}/{fname}', memory[offset:end], name=fname, type=type)) for _index in remaining_field_indices: field = tables.Field[_index] index = _index + 1 name = field.Name if field.Flags.HasFieldRVA: self.log_warn(F'field {name} has RVA flag set, but no RVA was found') token = index.to_bytes(3, 'little') values = {} for match in re.finditer(( BR'\x72(?P<token>...)\x70' # ldstr BR'(?:\x6F(?P<function>...)\x0A)?' # call GetBytes BR'\x80%s\x04' # stsfld ) % re.escape(token), data, re.DOTALL): md = match.groupdict() fn_token = md.get('function') fn_index = fn_token and int.from_bytes(fn_token, 'little') or None if fn_index is not None: fn_name = tables.MemberRef[fn_index].Name if fn_name != 'GetBytes': self.log_info(F'skipping string assignment passing through call to {fn_name}') continue k = int.from_bytes(md['token'], 'little') values[match.start()] = header.meta.Streams.US[k].encode(self.codec) if not values: continue if len(values) == 1: offset, value = values.popitem() path = cpaths.method_path(offset) unpack.append(UnpackResult(F'{path}/{name}', value, name=name, type='string')) unpack.sort(key=lambda u: u.path) yield from unpack
Inherited members
PathExtractorUnit:CustomJoinBehaviourCustomPathSeparatorFilterEverythingRequiresactassemblecodecconsolefilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesreadread1required_dependenciesresetrunsourcesuperinit
UnitBase: