Module refinery.units.formats
A package containing several sub-packages for various data formats.
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
A package containing several sub-packages for various data formats.
"""
from __future__ import annotations
import abc
import collections
import fnmatch
import os
import re
from zlib import adler32
from collections import Counter
from typing import ByteString, Iterable, Callable, List, Union, Optional
from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.meta import metavars, ByteStringWrapper, LazyMetaOracle
from refinery.lib.xml import XMLNodeBase
def pathspec(expression):
"""
Normalizes a path which is separated by backward or forward slashes to be
separated by forward slashes.
"""
return '/'.join(re.split(R'[\\\/]', expression))
class UnpackResult:
def get_data(self) -> ByteString:
if callable(self.data):
self.data = self.data()
return self.data
def __init__(self, _br__path: str, _br__data: Union[ByteString, Callable[[], ByteString]], **_br__meta):
self.path = _br__path
self.data = _br__data
self.meta = _br__meta
for key in [key for key, value in _br__meta.items() if value is None]:
del _br__meta[key]
class EndOfStringNotFound(ValueError):
def __init__(self):
super().__init__('end of string could not be determined')
class PathPattern:
def __init__(self, pp: Union[str, re.Pattern], regex=False, fuzzy=0):
if isinstance(pp, re.Pattern):
self.stops = []
self.pattern = pp
return
elif not regex:
self.stops = [stop for stop in re.split(R'(.*?[/*?])', pp) if stop]
pp, _, _ = fnmatch.translate(pp).partition(r'\Z')
p1 = re.compile(pp)
p2 = re.compile(F'.*?{pp}')
self.matchers = [p1.fullmatch, p2.fullmatch, p1.search]
self.fuzzy = fuzzy
def reach(self, path):
if not any(self.stops):
return True
for stop in self.stops:
if fnmatch.fnmatch(path, stop):
return True
return False
def check(self, path, fuzzy=0):
fuzzy = min(max(fuzzy, self.fuzzy), 2)
return self.matchers[fuzzy](path)
def __repr__(self):
return F'<PathPattern:{"//".join(self.stops) or "RE"}>'
class PathExtractorUnit(Unit, abstract=True):
_custom_path_separator = '/'
def __init__(
self,
*paths: Arg.String(metavar='path', nargs='*', help=(
'Wildcard pattern for the path of the item to be extracted. Each item is returned '
'as a separate output of this unit. Paths may contain wildcards; The default '
'argument is a single wildcard, which means that every item will be extracted. If '
'a given path yields no results, the unit performs increasingly fuzzy searches '
'with it. This can be disabled using the --exact switch.')),
list: Arg.Switch('-l',
help='Return all matching paths as UTF8-encoded output chunks.') = False,
join_path: Arg.Switch('-j', group='PATH',
help='Join path names from container with previous path names.') = False,
drop_path: Arg.Switch('-d', group='PATH',
help='Do not modify the path variable for output chunks.') = False,
fuzzy: Arg.Counts('-z', group='MATCH', help=(
'Specify once to add a leading wildcard to each patterns, twice to also add a '
'trailing wildcard.')) = 0,
exact: Arg.Switch('-e', group='MATCH',
help='Path patterns never match on substrings.') = False,
regex: Arg.Switch('-r',
help='Use regular expressions instead of wildcard patterns.') = False,
path: Arg('-P', metavar='NAME', help=(
'Name of the meta variable to receive the extracted path. The default value is '
'"{default}".')) = b'path',
**keywords
):
super().__init__(
paths=paths,
list=list,
join=join_path,
drop=drop_path,
path=path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
**keywords
)
@property
def _patterns(self):
paths = self.args.paths
if not paths:
if self.args.regex:
paths = ['.*']
else:
paths = [u'*']
else:
def to_string(t: Union[str, bytes]) -> str:
if isinstance(t, str):
return t
return t.decode(self.codec)
paths = [to_string(p) for p in paths]
for path in paths:
self.log_debug('path:', path)
return [
PathPattern(
path,
self.args.regex,
self.args.fuzzy,
) for path in paths
]
@abc.abstractmethod
def unpack(self, data: ByteString) -> Iterable[UnpackResult]:
raise NotImplementedError
def process(self, data: ByteString) -> ByteString:
meta = metavars(data)
results: List[UnpackResult] = list(self.unpack(data))
patterns = self._patterns
metavar = self.args.path.decode(self.codec)
occurrences = collections.defaultdict(int)
checksums = collections.defaultdict(set)
root = ''
uuid = 0
def get_data(result: UnpackResult):
try:
data = result.get_data()
except RefineryPartialResult as error:
if not self.args.lenient:
raise
result.data = data = error.partial
return data
def _uuid():
nonlocal uuid
crc = meta['crc32'].decode('ascii').upper()
uid = uuid
uuid += 1
return F'_{crc}.{uid:04X}'
def normalize(_path: str) -> str:
parts = re.split(r'[\\/]', F'{root}/{_path}')
while True:
for k, part in enumerate(parts):
if not part.strip('.'):
break
else:
break
size = len(part)
j = max(k - size, 0)
del parts[j:k + 1]
path = self._custom_path_separator.join(parts)
return path
if self.args.join:
try:
root = ByteStringWrapper(meta[metavar], self.codec)
except KeyError:
pass
for result in results:
path = normalize(result.path)
if not path:
from refinery.lib.mime import FileMagicInfo
path = _uuid()
ext = FileMagicInfo(get_data(result)).extension
if ext != 'bin':
path = F'{path}.{ext}'
self.log_warn(F'read chunk with empty path; using generated name {path}')
result.path = path
occurrences[path] += 1
for result in results:
path = result.path
if occurrences[path] > 1:
checksum = adler32(get_data(result))
if checksum in checksums[path]:
continue
checksums[path].add(checksum)
counter = len(checksums[path])
base, extension = os.path.splitext(path)
width = len(str(occurrences[path]))
if any(F'{base}.v{c:0{width}d}{extension}' in occurrences for c in range(occurrences[path])):
result.path = F'{base}.{_uuid()}{extension}'
else:
result.path = F'{base}.v{counter:0{width}d}{extension}'
self.log_warn(F'read chunk with duplicate path; deduplicating to {result.path}')
for p in patterns:
for fuzzy in range(3):
done = self.args.exact
for result in results:
path = result.path
if not p.check(path, fuzzy):
continue
done = True
if self.args.list:
yield self.labelled(path.encode(self.codec), **result.meta)
continue
if not self.args.drop:
result.meta[metavar] = path
try:
chunk = get_data(result)
except Exception as error:
if self.log_debug():
raise
self.log_warn(F'extraction failure for {path}: {error!s}')
else:
self.log_debug(F'extraction success for {path}')
yield self.labelled(chunk, **result.meta)
if done or self.args.fuzzy:
break
class XMLToPathExtractorUnit(PathExtractorUnit, abstract=True):
def __init__(
self, *paths,
format: Arg('-f', type=str, metavar='F', help=(
'A format expression to be applied for computing the path of an item. This must use '
'metadata that is available on the item. The current tag can be accessed as {0}. If '
'no format is specified, the unit attempts to derive a good attribute from the XML '
'tree to use for generating paths.'
)) = None,
list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
path=b'path', **keywords
):
super().__init__(
*paths,
format=format,
list=list,
path=path,
join_path=join_path,
drop_path=drop_path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
**keywords
)
@staticmethod
def _normalize_val(attr: str):
_bad = '[/\\$&%#:.]'
attr = attr.replace('[', '(')
attr = attr.replace(']', ')')
attr = re.sub(F'\\s*{_bad}+\\s+', ' ', attr)
attr = re.sub(F'\\s*{_bad}+\\s*', '.', attr)
return attr.strip()
@staticmethod
def _normalize_key(attribute: str):
_, _, a = attribute.rpartition(':')
return a
def _make_path_builder(
self,
meta: LazyMetaOracle,
root: XMLNodeBase
) -> Callable[[XMLNodeBase, Optional[int]], str]:
path_attributes = Counter()
def walk(node: XMLNodeBase):
total = 1
for key, val in node.attributes.items():
if re.fullmatch(R'[-\s\w+,.;@(){}]{1,64}', self._normalize_val(val)):
path_attributes[key] += 1
for child in node.children:
total += walk(child)
return total
total = walk(root)
if not path_attributes:
path_attribute = None
count = 0
else:
path_attribute, count = path_attributes.most_common(1)[0]
if 3 * count <= 2 * total:
path_attribute = None
nkey = self._normalize_key
nval = self._normalize_val
node_format = self.args.format
def path_builder(node: XMLNodeBase, index: Optional[int] = None) -> str:
attrs = node.attributes
if node_format and meta:
try:
return meta.format_str(
node_format,
self.codec,
node.tag, **{
nkey(key): nval(val)
for key, val in attrs.items()
}
)
except KeyError:
pass
if path_attribute is not None and path_attribute in attrs:
return self._normalize_val(attrs[path_attribute])
out = nval(node.tag)
if index is not None:
out = F'{out}/{index}'
return out
return path_builder
Sub-modules
refinery.units.formats.a3x
refinery.units.formats.archive
refinery.units.formats.bat
refinery.units.formats.csv
refinery.units.formats.deserialize_php
refinery.units.formats.dexstr
refinery.units.formats.email
refinery.units.formats.evtx
refinery.units.formats.exe
-
A package with units for generic executables. Usually, PE, ELF, and MachO formats are covered.
refinery.units.formats.hexload
refinery.units.formats.html
refinery.units.formats.httpresponse
refinery.units.formats.ifps
-
The code is based on the logic implemented in IFPSTools: https://github.com/Wack0/IFPSTools
refinery.units.formats.ifpsstr
refinery.units.formats.java
-
Units that process Java related binary formats such as class files and serialized Java objects.
refinery.units.formats.json
refinery.units.formats.lnk
refinery.units.formats.macho
refinery.units.formats.msgpack
refinery.units.formats.msi
refinery.units.formats.office
-
These units process data formats related to Microsoft Office.
refinery.units.formats.pcap
refinery.units.formats.pcap_http
refinery.units.formats.pdf
refinery.units.formats.pe
-
A package containing Portable Executable (PE) file related units.
refinery.units.formats.pkcs7
refinery.units.formats.pkcs7sig
refinery.units.formats.pyc
refinery.units.formats.stego
refinery.units.formats.tnetmtm
refinery.units.formats.winreg
refinery.units.formats.xml
Functions
def pathspec(expression)
-
Normalizes a path which is separated by backward or forward slashes to be separated by forward slashes.
Expand source code Browse git
def pathspec(expression): """ Normalizes a path which is separated by backward or forward slashes to be separated by forward slashes. """ return '/'.join(re.split(R'[\\\/]', expression))
Classes
class UnpackResult (_br__path, _br__data, **_br__meta)
-
Expand source code Browse git
class UnpackResult: def get_data(self) -> ByteString: if callable(self.data): self.data = self.data() return self.data def __init__(self, _br__path: str, _br__data: Union[ByteString, Callable[[], ByteString]], **_br__meta): self.path = _br__path self.data = _br__data self.meta = _br__meta for key in [key for key, value in _br__meta.items() if value is None]: del _br__meta[key]
Methods
def get_data(self)
-
Expand source code Browse git
def get_data(self) -> ByteString: if callable(self.data): self.data = self.data() return self.data
class EndOfStringNotFound
-
Inappropriate argument value (of correct type).
Expand source code Browse git
class EndOfStringNotFound(ValueError): def __init__(self): super().__init__('end of string could not be determined')
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class PathPattern (pp, regex=False, fuzzy=0)
-
Expand source code Browse git
class PathPattern: def __init__(self, pp: Union[str, re.Pattern], regex=False, fuzzy=0): if isinstance(pp, re.Pattern): self.stops = [] self.pattern = pp return elif not regex: self.stops = [stop for stop in re.split(R'(.*?[/*?])', pp) if stop] pp, _, _ = fnmatch.translate(pp).partition(r'\Z') p1 = re.compile(pp) p2 = re.compile(F'.*?{pp}') self.matchers = [p1.fullmatch, p2.fullmatch, p1.search] self.fuzzy = fuzzy def reach(self, path): if not any(self.stops): return True for stop in self.stops: if fnmatch.fnmatch(path, stop): return True return False def check(self, path, fuzzy=0): fuzzy = min(max(fuzzy, self.fuzzy), 2) return self.matchers[fuzzy](path) def __repr__(self): return F'<PathPattern:{"//".join(self.stops) or "RE"}>'
Methods
def reach(self, path)
-
Expand source code Browse git
def reach(self, path): if not any(self.stops): return True for stop in self.stops: if fnmatch.fnmatch(path, stop): return True return False
def check(self, path, fuzzy=0)
-
Expand source code Browse git
def check(self, path, fuzzy=0): fuzzy = min(max(fuzzy, self.fuzzy), 2) return self.matchers[fuzzy](path)
class PathExtractorUnit (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', **keywords)
-
Expand source code Browse git
class PathExtractorUnit(Unit, abstract=True): _custom_path_separator = '/' def __init__( self, *paths: Arg.String(metavar='path', nargs='*', help=( 'Wildcard pattern for the path of the item to be extracted. Each item is returned ' 'as a separate output of this unit. Paths may contain wildcards; The default ' 'argument is a single wildcard, which means that every item will be extracted. If ' 'a given path yields no results, the unit performs increasingly fuzzy searches ' 'with it. This can be disabled using the --exact switch.')), list: Arg.Switch('-l', help='Return all matching paths as UTF8-encoded output chunks.') = False, join_path: Arg.Switch('-j', group='PATH', help='Join path names from container with previous path names.') = False, drop_path: Arg.Switch('-d', group='PATH', help='Do not modify the path variable for output chunks.') = False, fuzzy: Arg.Counts('-z', group='MATCH', help=( 'Specify once to add a leading wildcard to each patterns, twice to also add a ' 'trailing wildcard.')) = 0, exact: Arg.Switch('-e', group='MATCH', help='Path patterns never match on substrings.') = False, regex: Arg.Switch('-r', help='Use regular expressions instead of wildcard patterns.') = False, path: Arg('-P', metavar='NAME', help=( 'Name of the meta variable to receive the extracted path. The default value is ' '"{default}".')) = b'path', **keywords ): super().__init__( paths=paths, list=list, join=join_path, drop=drop_path, path=path, fuzzy=fuzzy, exact=exact, regex=regex, **keywords ) @property def _patterns(self): paths = self.args.paths if not paths: if self.args.regex: paths = ['.*'] else: paths = [u'*'] else: def to_string(t: Union[str, bytes]) -> str: if isinstance(t, str): return t return t.decode(self.codec) paths = [to_string(p) for p in paths] for path in paths: self.log_debug('path:', path) return [ PathPattern( path, self.args.regex, self.args.fuzzy, ) for path in paths ] @abc.abstractmethod def unpack(self, data: ByteString) -> Iterable[UnpackResult]: raise NotImplementedError def process(self, data: ByteString) -> ByteString: meta = metavars(data) results: List[UnpackResult] = list(self.unpack(data)) patterns = self._patterns metavar = self.args.path.decode(self.codec) occurrences = collections.defaultdict(int) checksums = collections.defaultdict(set) root = '' uuid = 0 def get_data(result: UnpackResult): try: data = result.get_data() except RefineryPartialResult as error: if not self.args.lenient: raise result.data = data = error.partial return data def _uuid(): nonlocal uuid crc = meta['crc32'].decode('ascii').upper() uid = uuid uuid += 1 return F'_{crc}.{uid:04X}' def normalize(_path: str) -> str: parts = re.split(r'[\\/]', F'{root}/{_path}') while True: for k, part in enumerate(parts): if not part.strip('.'): break else: break size = len(part) j = max(k - size, 0) del parts[j:k + 1] path = self._custom_path_separator.join(parts) return path if self.args.join: try: root = ByteStringWrapper(meta[metavar], self.codec) except KeyError: pass for result in results: path = normalize(result.path) if not path: from refinery.lib.mime import FileMagicInfo path = _uuid() ext = FileMagicInfo(get_data(result)).extension if ext != 'bin': path = F'{path}.{ext}' self.log_warn(F'read chunk with empty path; using generated name {path}') result.path = path occurrences[path] += 1 for result in results: path = result.path if occurrences[path] > 1: checksum = adler32(get_data(result)) if checksum in checksums[path]: continue checksums[path].add(checksum) counter = len(checksums[path]) base, extension = os.path.splitext(path) width = len(str(occurrences[path])) if any(F'{base}.v{c:0{width}d}{extension}' in occurrences for c in range(occurrences[path])): result.path = F'{base}.{_uuid()}{extension}' else: result.path = F'{base}.v{counter:0{width}d}{extension}' self.log_warn(F'read chunk with duplicate path; deduplicating to {result.path}') for p in patterns: for fuzzy in range(3): done = self.args.exact for result in results: path = result.path if not p.check(path, fuzzy): continue done = True if self.args.list: yield self.labelled(path.encode(self.codec), **result.meta) continue if not self.args.drop: result.meta[metavar] = path try: chunk = get_data(result) except Exception as error: if self.log_debug(): raise self.log_warn(F'extraction failure for {path}: {error!s}') else: self.log_debug(F'extraction success for {path}') yield self.labelled(chunk, **result.meta) if done or self.args.fuzzy: break
Ancestors
Subclasses
- XMLToPathExtractorUnit
- a3x
- ArchiveUnit
- xtnuitka
- xtmail
- vsect
- jvdasm
- xtjson
- vbastr
- xtdoc
- xtone
- xtrtf
- xtvba
- xtpdf
- dnfields
- dnmr
- dnrc
- dnsfx
- perc
- winreg
- carve_pe
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
@abc.abstractmethod def unpack(self, data: ByteString) -> Iterable[UnpackResult]: raise NotImplementedError
Inherited members
class XMLToPathExtractorUnit (*paths, format=None, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', **keywords)
-
Expand source code Browse git
class XMLToPathExtractorUnit(PathExtractorUnit, abstract=True): def __init__( self, *paths, format: Arg('-f', type=str, metavar='F', help=( 'A format expression to be applied for computing the path of an item. This must use ' 'metadata that is available on the item. The current tag can be accessed as {0}. If ' 'no format is specified, the unit attempts to derive a good attribute from the XML ' 'tree to use for generating paths.' )) = None, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', **keywords ): super().__init__( *paths, format=format, list=list, path=path, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex, **keywords ) @staticmethod def _normalize_val(attr: str): _bad = '[/\\$&%#:.]' attr = attr.replace('[', '(') attr = attr.replace(']', ')') attr = re.sub(F'\\s*{_bad}+\\s+', ' ', attr) attr = re.sub(F'\\s*{_bad}+\\s*', '.', attr) return attr.strip() @staticmethod def _normalize_key(attribute: str): _, _, a = attribute.rpartition(':') return a def _make_path_builder( self, meta: LazyMetaOracle, root: XMLNodeBase ) -> Callable[[XMLNodeBase, Optional[int]], str]: path_attributes = Counter() def walk(node: XMLNodeBase): total = 1 for key, val in node.attributes.items(): if re.fullmatch(R'[-\s\w+,.;@(){}]{1,64}', self._normalize_val(val)): path_attributes[key] += 1 for child in node.children: total += walk(child) return total total = walk(root) if not path_attributes: path_attribute = None count = 0 else: path_attribute, count = path_attributes.most_common(1)[0] if 3 * count <= 2 * total: path_attribute = None nkey = self._normalize_key nval = self._normalize_val node_format = self.args.format def path_builder(node: XMLNodeBase, index: Optional[int] = None) -> str: attrs = node.attributes if node_format and meta: try: return meta.format_str( node_format, self.codec, node.tag, **{ nkey(key): nval(val) for key, val in attrs.items() } ) except KeyError: pass if path_attribute is not None and path_attribute in attrs: return self._normalize_val(attrs[path_attribute]) out = nval(node.tag) if index is not None: out = F'{out}/{index}' return out return path_builder
Ancestors
Subclasses
Class variables
var required_dependencies
var optional_dependencies
Inherited members