Module refinery.units.pattern.defang
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import re
import itertools
from urllib.parse import urlparse, urlunparse
from refinery.units import Arg, Unit
from refinery.lib.patterns import defanged, indicators, tlds
class defang(Unit):
"""
Defangs all URL, domain and IPv4 address indicators in the input data by replacing the last dot
in the expression by `[.]`. For example, `127.0.0.1` will be replaced by `127.0.0[.]1`. For URL
indicators, the colon after the procol scheme is also wrapped in brackets.
"""
_WHITELIST = [
B'wscript.shell',
]
_PROTOCOL_ESCAPES = {
B'http': B'hxxp',
B'https': B'hxxps',
B'ftp': B'fxp',
B'ftps': B'fxps',
}
def __init__(
self,
url_only: Arg.Switch('-u', help='Only defang URLs, do not look for domains or IPs.') = False,
url_protocol: Arg.Switch('-p', help='Escape the protocol in URLs.') = False,
dot_only: Arg.Switch('-d', help='Do not escape the protocol colon in URLs.') = False,
quote_md: Arg.Switch('-q', help='Wrap all indicators in backticks for markdown code.') = False
):
self.superinit(super(), **vars())
def _quote(self, word):
return word if not self.args.quote_md else B'`%s`' % word
def reverse(self, data: bytearray):
def refang(hostname):
return hostname[0].replace(B'[.]', B'.')
data = defanged.hostname.sub(refang, data)
data = data.replace(B'[:]//', B'://')
data = data.replace(B'[://]', B'://')
data = re.sub(B'h.{3}?(s?)://', B'http\\1://', data)
data = re.sub(B'fxp(s?)://', B'ftp\\1://', data)
return data
def process(self, data):
def replace_hostname(hostname: bytes, match=True):
if match:
return self._quote(replace_hostname(hostname[0], False))
self.log_info('replace:', hostname)
host = hostname
user, atsgn, host = host.rpartition(B'@')
host, colon, port = host.rpartition(B':')
host = host.lower()
if not colon:
host = port
port = B''
if host in self._WHITELIST:
return hostname
host = re.split(R'(?:\[\.\]|\.)', host.decode('latin1'))
if len(host) == 1:
return hostname
components = iter(reversed(host))
defanged_parts = [next(components)]
separator = '[.]'
for part in components:
defanged_parts.append(separator)
defanged_parts.append(part)
separator = '[.]' if part in tlds else '.'
defanged_host = ''.join(reversed(defanged_parts)).encode('latin1')
return user + atsgn + defanged_host + colon + port
def replace_url(url: bytes):
if not url:
return url
self.log_info('replace:', url)
url = url.replace(B'[:]//', B'://', 1)
url = url.replace(B'[.]', B'.')
prefix = B'tcp'
if url.startswith(B'://'):
scheme = 0
elif url.startswith(B'//'):
scheme = 1
prefix = prefix + B':'
else:
scheme = 2
prefix = B''
parsed = urlparse(prefix + url)
operations = {
name: self.process(getattr(parsed, name))
for name in ('path', 'params', 'query', 'fragment')
}
if self.args.url_protocol and parsed.scheme:
operations.update(scheme=self._PROTOCOL_ESCAPES.get(parsed.scheme.lower(), scheme))
if scheme < 2:
operations.update(scheme=B'')
operations.update(netloc=replace_hostname(parsed.netloc, False))
url = urlunparse(parsed._replace(**operations))
if scheme == 0:
url = B':' + url
if not self.args.dot_only:
url = url.replace(B'://', B'[:]//')
return self._quote(url)
urlsplit = defanged.url.split(data)
step = defanged.url.value.groups + 1
urlsplit[1::step] = [replace_url(t) for t in itertools.islice(iter(urlsplit), 1, None, step)]
if not self.args.url_only:
urlsplit[0::step] = [
indicators.hostname.sub(replace_hostname, t)
for t in itertools.islice(iter(urlsplit), 0, None, step)
]
def fuse(urlsplit):
txt = itertools.islice(iter(urlsplit), 0, None, step)
url = itertools.islice(iter(urlsplit), 1, None, step)
while True:
try:
yield next(txt)
yield next(url)
except StopIteration:
break
return B''.join(fuse(urlsplit))
Classes
class defang (url_only=False, url_protocol=False, dot_only=False, quote_md=False)
-
Defangs all URL, domain and IPv4 address indicators in the input data by replacing the last dot in the expression by
[.]
. For example,127.0.0.1
will be replaced by127.0.0[.]1
. For URL indicators, the colon after the procol scheme is also wrapped in brackets.Expand source code Browse git
class defang(Unit): """ Defangs all URL, domain and IPv4 address indicators in the input data by replacing the last dot in the expression by `[.]`. For example, `127.0.0.1` will be replaced by `127.0.0[.]1`. For URL indicators, the colon after the procol scheme is also wrapped in brackets. """ _WHITELIST = [ B'wscript.shell', ] _PROTOCOL_ESCAPES = { B'http': B'hxxp', B'https': B'hxxps', B'ftp': B'fxp', B'ftps': B'fxps', } def __init__( self, url_only: Arg.Switch('-u', help='Only defang URLs, do not look for domains or IPs.') = False, url_protocol: Arg.Switch('-p', help='Escape the protocol in URLs.') = False, dot_only: Arg.Switch('-d', help='Do not escape the protocol colon in URLs.') = False, quote_md: Arg.Switch('-q', help='Wrap all indicators in backticks for markdown code.') = False ): self.superinit(super(), **vars()) def _quote(self, word): return word if not self.args.quote_md else B'`%s`' % word def reverse(self, data: bytearray): def refang(hostname): return hostname[0].replace(B'[.]', B'.') data = defanged.hostname.sub(refang, data) data = data.replace(B'[:]//', B'://') data = data.replace(B'[://]', B'://') data = re.sub(B'h.{3}?(s?)://', B'http\\1://', data) data = re.sub(B'fxp(s?)://', B'ftp\\1://', data) return data def process(self, data): def replace_hostname(hostname: bytes, match=True): if match: return self._quote(replace_hostname(hostname[0], False)) self.log_info('replace:', hostname) host = hostname user, atsgn, host = host.rpartition(B'@') host, colon, port = host.rpartition(B':') host = host.lower() if not colon: host = port port = B'' if host in self._WHITELIST: return hostname host = re.split(R'(?:\[\.\]|\.)', host.decode('latin1')) if len(host) == 1: return hostname components = iter(reversed(host)) defanged_parts = [next(components)] separator = '[.]' for part in components: defanged_parts.append(separator) defanged_parts.append(part) separator = '[.]' if part in tlds else '.' defanged_host = ''.join(reversed(defanged_parts)).encode('latin1') return user + atsgn + defanged_host + colon + port def replace_url(url: bytes): if not url: return url self.log_info('replace:', url) url = url.replace(B'[:]//', B'://', 1) url = url.replace(B'[.]', B'.') prefix = B'tcp' if url.startswith(B'://'): scheme = 0 elif url.startswith(B'//'): scheme = 1 prefix = prefix + B':' else: scheme = 2 prefix = B'' parsed = urlparse(prefix + url) operations = { name: self.process(getattr(parsed, name)) for name in ('path', 'params', 'query', 'fragment') } if self.args.url_protocol and parsed.scheme: operations.update(scheme=self._PROTOCOL_ESCAPES.get(parsed.scheme.lower(), scheme)) if scheme < 2: operations.update(scheme=B'') operations.update(netloc=replace_hostname(parsed.netloc, False)) url = urlunparse(parsed._replace(**operations)) if scheme == 0: url = B':' + url if not self.args.dot_only: url = url.replace(B'://', B'[:]//') return self._quote(url) urlsplit = defanged.url.split(data) step = defanged.url.value.groups + 1 urlsplit[1::step] = [replace_url(t) for t in itertools.islice(iter(urlsplit), 1, None, step)] if not self.args.url_only: urlsplit[0::step] = [ indicators.hostname.sub(replace_hostname, t) for t in itertools.islice(iter(urlsplit), 0, None, step) ] def fuse(urlsplit): txt = itertools.islice(iter(urlsplit), 0, None, step) url = itertools.islice(iter(urlsplit), 1, None, step) while True: try: yield next(txt) yield next(url) except StopIteration: break return B''.join(fuse(urlsplit))
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members