Module refinery.units.pattern.xtp
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
from fnmatch import fnmatch
from ipaddress import ip_address
from typing import AnyStr, Union
from urllib.parse import urlparse
from string import ascii_letters
from pathlib import Path
from enum import Enum
from refinery.units.pattern import Arg, PatternExtractor
from refinery.units import RefineryCriticalException
from refinery.lib.patterns import indicators
class LetterWeight:
def __init__(self, weight):
self._weights = weight._weights
except AttributeError:
self._weights = {
letter: weight for letters, weight in weight.items() for letter in letters
for letter in range(0x100):
self._weights.setdefault(letter, 0)
def __call__(self, data: AnyStr) -> float:
if isinstance(data, str):
data = data.encode('latin1')
return sum(self._weights[c] for c in data) / len(data) / max(self._weights.values())
class LetterWeights(LetterWeight, Enum):
IOC = LetterWeight({
B'^`': 1,
B'!$%&()*+-<=>?[]{}~\t': 2,
B'.,:;#/\\|@_ ': 5,
B'0123456789abcdefghijklmnopqrstuvwxyz': 8,
Path = LetterWeight({
B'^`': 1,
B'$%&()*+-<=>?[]{}~\t': 2,
B'.,:;#/\\|@_ ': 4,
B'0123456789': 4,
B'abcdefghijklmnopqrstuvwxyz': 8,
class xtp(PatternExtractor):
Extract Patterns: Uses regular expressions to extract indicators from the input data and
optionally filters these results heuristically. The unit is designed to extract indicators
such as domain names and IP addresses, see below for a complete list. To extract data
formats such as hex-encoded data, use `refinery.carve`.
def __init__(
*pattern: Arg('pattern', type=str,
), help=(
'Choose the pattern to extract. The unit uses {{default}} by default. Use an '
'asterix character to select all available patterns. The available patterns '
'are: {}'.format(', '.join(p.display for p in indicators))
filter: Arg('-f', dest='filter', action='count',
'If this setting is enabled, the xtp unit will attempt to reduce the number '
'of false positives by certain crude heuristics. Specify multiple times to '
'make the filtering more aggressive.'
) = 0,
min=1, max=None, len=None, stripspace=False, duplicates=False, longest=False, take=None
self.superinit(super(), **vars(), ascii=True, utf16=True)
patterns = {
p for name in pattern for p in indicators if fnmatch(p.display, name)
# if indicators.hostname in patterns:
# patterns.remove(indicators.hostname)
# patterns.add(indicators.ipv4)
# patterns.add(indicators.domain)
patterns = [F'(?P<{}>{p.value})' for p in patterns]
if not patterns:
raise RefineryCriticalException('The given mask does not match any known indicator pattern.')
pattern = '|'.join(patterns)
self.args.pattern = re.compile(pattern.encode(self.codec), flags=re.DOTALL)
self.args.filter = filter
_ALPHABETIC = ascii_letters.encode('ASCII')
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 2,
'' : 1,
'' : 2,
'' : 2,
'' : 4,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 4,
'' : 1,
'' : 3,
'' : 1,
'' : 1,
'' : 1,
'' : 4,
'' : 5,
'' : 5,
'gov' : 2,
'' : 2,
'' : 1,
'' : 1,
'' : 1,
'' : 2,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 3, #
'' : 3,
'' : 1,
'' : 2,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 2,
'' : 1,
'' : 1,
'' : 4,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 3,
'' : 1,
'' : 2,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
'' : 1,
for _ext in [
B"'"[0]: B"'",
B'"'[0]: B'"',
B'('[0]: B')',
B'{'[0]: B'}',
B'['[0]: B']',
B'<'[0]: B'>',
def _check_match(self, data: Union[memoryview, bytearray], pos: int, name: str, value: bytes):
term = self._BRACKETING.get(data[pos - 1], None)
if term:
pos = value.find(term)
if pos > 0:
value = value[:pos]
if not self.args.filter:
return value
if name ==
if all(part.isdigit() for part in value.split(B'.')):
name =
elif B'.' not in value:
name =
name =
if name ==
ocets = [int(x) for x in value.split(B'.')]
if ocets.count(0) >= 3:
return None
if self.args.filter > 2 and sum(ocets) < 10:
return None
for area in (
bytes(data[pos - 20 : pos + 20]),
bytes(data[pos * 2 - 40 : pos * 2 + 40 : 2]),
bytes(data[pos * 2 - 41 : pos * 2 + 39 : 2]),
if B'version' in area.lower():
return None
ip = ip_address(value.decode(self.codec))
if not ip.is_global:
if self.args.filter >= 3 or not ip.is_private:
return None
elif name in {,,,,
if self.args.filter >= 2:
if LetterWeights.IOC(value) < 0.6:
self.log_info(F'excluding indicator because with low score: {value}', clip=True)
return None
if name != and len(value) > 0x100:
self.log_info(F'excluding indicator because it is too long: {value}', clip=True)
return None
ioc = value.decode(self.codec)
if '://' not in ioc: ioc = F'tcp://{ioc}'
parts = urlparse(ioc)
host, _, _ = parts.netloc.partition(':')
hl = host.lower()
for white, level in self._LEGITIMATE_HOSTS.items():
if self.args.filter >= level and (hl == white or hl.endswith(F'.{white}')):
self.log_info(F'excluding indicator because domain {hl} is whitelisted via {white}: {value}', clip=True)
self.log_debug(F'reduce level below {level} to allow, current level is {self.args.filter}')
return None
if name ==
scheme = parts.scheme.lower()
for p in ('http', 'https', 'ftp', 'file', 'mailto'):
if scheme.endswith(p):
pos = scheme.find(p)
value = value[pos:]
if any(hl == w for w in self._DOMAIN_WHITELIST):
self.log_info(F'excluding indicator because domain {hl} is whitelisted: {value}')
return None
if name in {,,
if data[pos - 1] in b'/\\' and self.args.filter >= 2:
return None
hostparts = host.split('.')
if self.args.filter >= 2:
if not all(p.isdigit() for p in hostparts) and all(len(p) < 4 for p in hostparts):
self.log_info(F'excluding host with too many short parts: {value}')
return None
if self.args.filter >= 3:
if len(hostparts) <= sum(3 for p in hostparts if p != p.lower() and p != p.upper()):
self.log_info(F'excluding host with too many mixed case parts: {value}')
return None
# These heuristics attempt to filter out member access to variables in
# scripts which can be mistaken for domains because of the TLD inflation
# we've had.
uppercase = sum(1 for c in host if c.isalpha() and c.upper() == c)
lowercase = sum(1 for c in host if c.isalpha() and c.lower() == c)
if lowercase and uppercase:
caseratio = uppercase / lowercase
if 0.1 < caseratio < 0.9:
self.log_info(F'excluding indicator with too much uppercase letters: {value}')
return None
if all(x.isidentifier() for x in hostparts):
if len(hostparts) == 2 and hostparts[0] in ('this', 'self'):
self.log_info(F'excluding host that looks like a code snippet: {value}')
return None
if len(hostparts[-2]) < 3:
self.log_info(F'excluding host with too short root domain name: {value}')
return None
if any(x.startswith('_') for x in hostparts):
self.log_info(F'excluding host with underscores: {value}')
return None
if len(hostparts[-1]) > 3:
prefix = '.'.join(hostparts[:-1])
seen_before = len(set(re.findall(
R'{}(?:\.\w+)+'.format(prefix).encode('ascii'), data)))
if seen_before > 2:
self.log_debug(F'excluding indicator that was already seen: {value}')
return None
elif name ==
at = value.find(B'@')
ix = 0
while value[ix] not in self._ALPHABETIC:
ix += 1
return None if at - ix < 3 else value[ix:]
elif name in (,,,
if len(value) < 8:
self.log_info(F'excluding path because it is too short: {value}')
return None
if len(value) > 16 and len(re.findall(RB'\\x\d\d', value)) > len(value) // 10:
self.log_info(F'excluding long path containign hex: {value}', clip=True)
return None
path_string = value.decode(self.codec)
except Exception:
self.log_debug(F'excluding path which did not decode: {value!r}', clip=True)
return None
path = Path(path_string)
except Exception as E:
self.log_debug(F'error parsing path "{path}": {E!s}')
return None
path_likeness = sum(v for v, x in [
(1, path.suffix),
(1, path_string.startswith('/')),
(2, path_string.startswith('%')),
(2, path_string.startswith('\\\\')),
(2, path_string[1:3] == ':\\'),
] if x)
if 2 + path_likeness < min(self.args.filter, 2):
self.log_info(F'excluding long path because it has no characteristic parts: {value}')
return None
bad_parts = 0
all_parts = len(
if self.args.filter >= 1:
date_likeness = sum(1
for t in ['yyyy', 'yy', 'mm', 'dd', 'hh', 'ss']
if t in or t.upper() in
if len(value) < 20 and date_likeness >= all_parts - 1:
self.log_info(F'excluding path that looks like a date format: {value}', clip=True)
return None
if self.args.filter >= 2:
for k, part in enumerate(
if not k:
drive, colon, slash = part.partition(':')
if colon and len(drive) == 1 and len(slash) <= 1:
if part[0] == part[~0] == '%':
if len(part) == 1:
if (
LetterWeights.Path(part) < 0.5 + (min(self.args.filter, 4) * 0.1)
or (self.args.filter >= 2 and LetterWeights.Path(part[:1]) < 0.5)
bad_parts += 1
self.log_debug(F'bad part {k + 1} in path: {part}')
for filter_limit in (2, 3, 4):
bad_ratio = 2 ** (filter_limit - 1)
if self.args.filter >= filter_limit and bad_parts * bad_ratio >= all_parts:
self.log_info(F'excluding path with bad parts: {value}', clip=True)
return None
return value
def process(self, data):
whitelist = set()
def check(match: re.Match):
for name, value in match.groupdict().items():
if value is not None:
raise RefineryCriticalException('Received empty match.')
if value in whitelist:
return None
result = self._check_match(match.string, match.start(), name, value)
if result is not None:
return self.labelled(result, pattern=name)
transforms = [check]
yield from self.matches_filtered(memoryview(data), self.args.pattern, *transforms)
class LetterWeight (weight)
Expand source code Browse git
class LetterWeight: def __init__(self, weight): try: self._weights = weight._weights except AttributeError: pass else: return self._weights = { letter: weight for letters, weight in weight.items() for letter in letters } for letter in range(0x100): self._weights.setdefault(letter, 0) def __call__(self, data: AnyStr) -> float: if isinstance(data, str): data = data.encode('latin1') return sum(self._weights[c] for c in data) / len(data) / max(self._weights.values())
class LetterWeights (weight)
An enumeration.
Expand source code Browse git
class LetterWeights(LetterWeight, Enum): IOC = LetterWeight({ B'^`': 1, B'!$%&()*+-<=>?[]{}~\t': 2, B'ABCDEFGHIJKLMNOPQRSTUVWXYZ': 4, B'.,:;#/\\|@_ ': 5, B'0123456789abcdefghijklmnopqrstuvwxyz': 8, }) Path = LetterWeight({ B'^`': 1, B'$%&()*+-<=>?[]{}~\t': 2, B'.,:;#/\\|@_ ': 4, B'0123456789': 4, B'ABCDEFGHIJKLMNOPQRSTUVWXYZ': 6, B'abcdefghijklmnopqrstuvwxyz': 8, })
- LetterWeight
- enum.Enum
Class variables
var IOC
var Path
class xtp (*pattern, filter=0, min=1, max=None, len=None, stripspace=False, duplicates=False, longest=False, take=None)
Extract Patterns: Uses regular expressions to extract indicators from the input data and optionally filters these results heuristically. The unit is designed to extract indicators such as domain names and IP addresses, see below for a complete list. To extract data formats such as hex-encoded data, use
.Expand source code Browse git
class xtp(PatternExtractor): """ Extract Patterns: Uses regular expressions to extract indicators from the input data and optionally filters these results heuristically. The unit is designed to extract indicators such as domain names and IP addresses, see below for a complete list. To extract data formats such as hex-encoded data, use `refinery.carve`. """ def __init__( self, *pattern: Arg('pattern', type=str, default=(,,, ), help=( 'Choose the pattern to extract. The unit uses {{default}} by default. Use an ' 'asterix character to select all available patterns. The available patterns ' 'are: {}'.format(', '.join(p.display for p in indicators)) ) ), filter: Arg('-f', dest='filter', action='count', help=( 'If this setting is enabled, the xtp unit will attempt to reduce the number ' 'of false positives by certain crude heuristics. Specify multiple times to ' 'make the filtering more aggressive.' ) ) = 0, min=1, max=None, len=None, stripspace=False, duplicates=False, longest=False, take=None ): self.superinit(super(), **vars(), ascii=True, utf16=True) patterns = { p for name in pattern for p in indicators if fnmatch(p.display, name) } # if indicators.hostname in patterns: # patterns.remove(indicators.hostname) # patterns.add(indicators.ipv4) # patterns.add(indicators.domain) patterns = [F'(?P<{}>{p.value})' for p in patterns] if not patterns: raise RefineryCriticalException('The given mask does not match any known indicator pattern.') pattern = '|'.join(patterns) self.args.pattern = re.compile(pattern.encode(self.codec), flags=re.DOTALL) self.args.filter = filter _ALPHABETIC = ascii_letters.encode('ASCII') _LEGITIMATE_HOSTS = { '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 2, '' : 1, '' : 2, '' : 2, '' : 4, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 4, '' : 1, '' : 3, '' : 1, '' : 1, '' : 1, '' : 4, '' : 5, '' : 5, 'gov' : 2, '' : 2, '' : 1, '' : 1, '' : 1, '' : 2, '' : 1, '' : 1, '' : 1, '' : 1, '' : 3, # '' : 3, '' : 1, '' : 2, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 2, '' : 1, '' : 1, '' : 4, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 3, '' : 1, '' : 2, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, '' : 1, } for _ext in [ 'build', 'data', 'do', 'help', 'java', 'md', 'mov', 'name', 'py', 'so', 'sys', 'zip', ]: _LEGITIMATE_HOSTS[_ext] = 4 _DOMAIN_WHITELIST = [ '', '', ] _BRACKETING = { B"'"[0]: B"'", B'"'[0]: B'"', B'('[0]: B')', B'{'[0]: B'}', B'['[0]: B']', B'<'[0]: B'>', } def _check_match(self, data: Union[memoryview, bytearray], pos: int, name: str, value: bytes): term = self._BRACKETING.get(data[pos - 1], None) if term: pos = value.find(term) if pos > 0: value = value[:pos] if not self.args.filter: return value if name == if all(part.isdigit() for part in value.split(B'.')): name = elif B'.' not in value: name = else: name = if name == ocets = [int(x) for x in value.split(B'.')] if ocets.count(0) >= 3: return None if self.args.filter > 2 and sum(ocets) < 10: return None for area in ( bytes(data[pos - 20 : pos + 20]), bytes(data[pos * 2 - 40 : pos * 2 + 40 : 2]), bytes(data[pos * 2 - 41 : pos * 2 + 39 : 2]), ): if B'version' in area.lower(): return None ip = ip_address(value.decode(self.codec)) if not ip.is_global: if self.args.filter >= 3 or not ip.is_private: return None elif name in {,,,, }: if self.args.filter >= 2: if LetterWeights.IOC(value) < 0.6: self.log_info(F'excluding indicator because with low score: {value}', clip=True) return None if name != and len(value) > 0x100: self.log_info(F'excluding indicator because it is too long: {value}', clip=True) return None ioc = value.decode(self.codec) if '://' not in ioc: ioc = F'tcp://{ioc}' parts = urlparse(ioc) host, _, _ = parts.netloc.partition(':') hl = host.lower() for white, level in self._LEGITIMATE_HOSTS.items(): if self.args.filter >= level and (hl == white or hl.endswith(F'.{white}')): self.log_info(F'excluding indicator because domain {hl} is whitelisted via {white}: {value}', clip=True) self.log_debug(F'reduce level below {level} to allow, current level is {self.args.filter}') return None if name == scheme = parts.scheme.lower() for p in ('http', 'https', 'ftp', 'file', 'mailto'): if scheme.endswith(p): pos = scheme.find(p) value = value[pos:] break if any(hl == w for w in self._DOMAIN_WHITELIST): self.log_info(F'excluding indicator because domain {hl} is whitelisted: {value}') return None if name in {,, }: if data[pos - 1] in b'/\\' and self.args.filter >= 2: return None hostparts = host.split('.') if self.args.filter >= 2: if not all(p.isdigit() for p in hostparts) and all(len(p) < 4 for p in hostparts): self.log_info(F'excluding host with too many short parts: {value}') return None if self.args.filter >= 3: if len(hostparts) <= sum(3 for p in hostparts if p != p.lower() and p != p.upper()): self.log_info(F'excluding host with too many mixed case parts: {value}') return None # These heuristics attempt to filter out member access to variables in # scripts which can be mistaken for domains because of the TLD inflation # we've had. uppercase = sum(1 for c in host if c.isalpha() and c.upper() == c) lowercase = sum(1 for c in host if c.isalpha() and c.lower() == c) if lowercase and uppercase: caseratio = uppercase / lowercase if 0.1 < caseratio < 0.9: self.log_info(F'excluding indicator with too much uppercase letters: {value}') return None if all(x.isidentifier() for x in hostparts): if len(hostparts) == 2 and hostparts[0] in ('this', 'self'): self.log_info(F'excluding host that looks like a code snippet: {value}') return None if len(hostparts[-2]) < 3: self.log_info(F'excluding host with too short root domain name: {value}') return None if any(x.startswith('_') for x in hostparts): self.log_info(F'excluding host with underscores: {value}') return None if len(hostparts[-1]) > 3: prefix = '.'.join(hostparts[:-1]) seen_before = len(set(re.findall( R'{}(?:\.\w+)+'.format(prefix).encode('ascii'), data))) if seen_before > 2: self.log_debug(F'excluding indicator that was already seen: {value}') return None elif name == at = value.find(B'@') ix = 0 while value[ix] not in self._ALPHABETIC: ix += 1 return None if at - ix < 3 else value[ix:] elif name in (,,, ): if len(value) < 8: self.log_info(F'excluding path because it is too short: {value}') return None if len(value) > 16 and len(re.findall(RB'\\x\d\d', value)) > len(value) // 10: self.log_info(F'excluding long path containign hex: {value}', clip=True) return None try: path_string = value.decode(self.codec) except Exception: self.log_debug(F'excluding path which did not decode: {value!r}', clip=True) return None try: path = Path(path_string) except Exception as E: self.log_debug(F'error parsing path "{path}": {E!s}') return None path_likeness = sum(v for v, x in [ (1, path.suffix), (1, path_string.startswith('/')), (2, path_string.startswith('%')), (2, path_string.startswith('\\\\')), (2, path_string[1:3] == ':\\'), ] if x) if 2 + path_likeness < min(self.args.filter, 2): self.log_info(F'excluding long path because it has no characteristic parts: {value}') return None bad_parts = 0 all_parts = len( if self.args.filter >= 1: date_likeness = sum(1 for t in ['yyyy', 'yy', 'mm', 'dd', 'hh', 'ss'] if t in or t.upper() in if len(value) < 20 and date_likeness >= all_parts - 1: self.log_info(F'excluding path that looks like a date format: {value}', clip=True) return None if self.args.filter >= 2: for k, part in enumerate( if not k: drive, colon, slash = part.partition(':') if colon and len(drive) == 1 and len(slash) <= 1: continue if part[0] == part[~0] == '%': continue if len(part) == 1: continue if ( LetterWeights.Path(part) < 0.5 + (min(self.args.filter, 4) * 0.1) or (self.args.filter >= 2 and LetterWeights.Path(part[:1]) < 0.5) ): bad_parts += 1 self.log_debug(F'bad part {k + 1} in path: {part}') for filter_limit in (2, 3, 4): bad_ratio = 2 ** (filter_limit - 1) if self.args.filter >= filter_limit and bad_parts * bad_ratio >= all_parts: self.log_info(F'excluding path with bad parts: {value}', clip=True) return None return value def process(self, data): whitelist = set() def check(match: re.Match): for name, value in match.groupdict().items(): if value is not None: break else: raise RefineryCriticalException('Received empty match.') if value in whitelist: return None result = self._check_match(match.string, match.start(), name, value) if result is not None: return self.labelled(result, pattern=name) whitelist.add(value) transforms = [check] yield from self.matches_filtered(memoryview(data), self.args.pattern, *transforms)
Class variables
var required_dependencies
var optional_dependencies
Inherited members