Module refinery.units.formats.email
Expand source code Browse git
from __future__ import annotations
import json
import re
import email.utils
from typing import TYPE_CHECKING, Iterable, Tuple
from email.parser import Parser
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.units.pattern.mimewords import mimewords
from refinery.lib.mime import file_extension
from refinery.lib.tools import NoLogging, isbuffer, asbuffer
if TYPE_CHECKING:
from extract_msg import Message
CDFv2_MARKER = B'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'
_EMAIL_TXT_MARKERS = [
b'\nReceived:\x20from'
b'\nSubject:\x20',
b'\nTo:\x20',
b'\nFrom:\x20',
B'\nMessage-ID:\x20',
b'\nBcc:\x20',
b'\nContent-Transfer-Encoding:\x20',
b'\nContent-Type:\x20',
b'\nReturn-Path:\x20',
]
_EMAIL_BIN_MARKERS = [
marker.encode('utf-16le') for marker in (
"__nameid_version" # root node
"__recip_version" # recipients
"__properties_version" # properties
"__substg1.0_" # strings
)
]
class xtmail(PathExtractorUnit):
"""
Extract files and body from EMail messages. The unit supports both the Outlook message format
and regular MIME documents.
"""
def _get_headparts(self, head: Iterable[Tuple[str, str]]):
def normalize_spaces(value: str):
return ''.join(re.sub(R'\A\s+', '\x20', t) for t in value.splitlines(False))
_headers: dict[str, list[str]] = {}
for key, value in head:
_headers.setdefault(key, []).append(mimewords.convert(normalize_spaces(value)))
headers = {
key: value[0] if len(value) == 1 else [t for t in value if t]
for key, value in _headers.items()}
yield UnpackResult('headers.txt',
lambda h=head: '\n'.join(F'{k}: {v}' for k, v in h).encode(self.codec))
received = []
for recv in headers.get('Received', []):
if not recv.startswith('from '):
received = None
break
recv = recv[5:]
src, _, rest = recv.partition(' by ')
dst, _, rest = rest.partition(' with ')
received.append({
'Source': src.partition('\x20')[0],
'Target': dst.partition('\x20')[0],
})
if received:
received.reverse()
headers['ReceivedTrace'] = received
yield UnpackResult('headers.json',
lambda jsn=headers: json.dumps(jsn, indent=4).encode(self.codec))
@PathExtractorUnit.Requires('extract-msg', ['formats', 'office', 'default', 'extended'])
def _extract_msg():
import extract_msg.enums
return extract_msg
def _get_parts_outlook(self, data):
def ensure_bytes(data: bytes | str | None):
if data is None:
return B''
elif isinstance(data, str):
return data.encode(self.codec)
else:
return data
def make_message(name, msg: Message):
bodies = msg.detectedBodies
BT = self._extract_msg.enums.BodyTypes
if bodies & BT.HTML:
def htm(msg=msg):
with NoLogging():
try:
return ensure_bytes(msg.htmlBody)
except Exception:
return B''
yield UnpackResult(F'{name}.htm', htm)
if bodies & BT.PLAIN:
def txt(msg=msg):
with NoLogging():
try:
return ensure_bytes(msg.body)
except Exception:
return B''
yield UnpackResult(F'{name}.txt', txt)
if bodies & BT.RTF:
def rtf(msg=msg):
with NoLogging():
try:
return ensure_bytes(msg.rtfBody)
except Exception:
return B''
yield UnpackResult(F'{name}.rtf', rtf)
msgcount = 0
with NoLogging():
class ForgivingMessage(self._extract_msg.Message):
"""
If parsing the input bytes fails early, the "__open" private attribute may not
yet exist. This hack prevents an exception to occur in the destructor.
"""
def __getattr__(self, key: str):
if key.endswith('_open'):
return False
raise AttributeError(key)
msg = ForgivingMessage(bytes(data))
header = dict(msg.header)
if x := msg.date:
header['Date'] = email.utils.format_datetime(x)
if x := msg.sender:
header['From'] = x
if x := msg.to:
header['To'] = x
if x := msg.cc:
header['Cc'] = x
if x := msg.bcc:
header['Bcc'] = x
if x := msg.messageId:
header['Message-Id'] = x
if x := msg.subject:
header['Subject'] = x
for key, val in list(header.items()):
if val := val.strip().replace('\0', ''):
header[key] = val
else:
del header[key]
yield from self._get_headparts(header.items())
yield from make_message('body', msg)
def attachments(msg):
for attachment in getattr(msg, 'attachments', ()):
yield attachment
if attachment.type == 'data':
continue
yield from attachments(attachment.data)
for attachment in attachments(msg):
at = attachment.type
if at is self._extract_msg.enums.AttachmentType.MSG:
msgcount += 1
yield from make_message(F'attachments/msg_{msgcount:d}', attachment.data)
continue
if not isbuffer(attachment.data):
self.log_warn(F'unknown attachment of type {at}, please report this!')
continue
path = attachment.longFilename or attachment.shortFilename
path = path.rstrip('\0')
yield UnpackResult(F'attachments/{path}', attachment.data)
@PathExtractorUnit.Requires('chardet', ['default', 'extended'])
def _chardet():
import chardet
return chardet
def _get_parts_regular(self, data: bytes):
try:
info = self._chardet.detect(data)
msg = data.decode(str(info['encoding']))
except UnicodeDecodeError:
raise ValueError('This is not a plaintext email message.')
else:
msg = Parser().parsestr(msg)
yield from self._get_headparts(msg.items())
for k, part in enumerate(msg.walk()):
path = part.get_filename()
error_message = None
result = None
if path is None:
extension = file_extension(part.get_content_type(), 'txt')
path = F'body.{extension}'
else:
path = path | mimewords | str
path = F'attachments/{path}'
try:
payload = part.get_payload(decode=True)
if payload is None or isinstance(payload, bytes):
result = payload
else:
raise TypeError
except Exception as E:
try:
payload = part.get_payload(decode=False)
except Exception as E:
error_message = str(E)
else:
from refinery.units.pattern.carve import carve
self.log_warn(F'manually decoding part {k}, data might be corrupted: {path}')
if isinstance(payload, str):
payload = payload.encode('latin1')
if payload := asbuffer(payload):
result = next(payload | carve('b64', stripspace=True, single=True, decode=True))
else:
error_message = str(E)
result = None
if not result:
if error_message is not None:
self.log_warn(F'could not get content of message part {k}: {error_message!s}')
continue
yield UnpackResult(path, result)
def unpack(self, data):
if data[:len(CDFv2_MARKER)] == CDFv2_MARKER:
yield from self._get_parts_outlook(data)
else:
yield from self._get_parts_regular(data)
@classmethod
def handles(cls, data: bytearray) -> bool:
counter = 0
if data.startswith(CDFv2_MARKER):
markers = _EMAIL_BIN_MARKERS
threshold = 1
else:
markers = _EMAIL_TXT_MARKERS
threshold = 2
for marker in markers:
if re.search(re.escape(marker), data, flags=re.IGNORECASE):
counter += 1
if counter >= threshold:
return True
else:
return False
Classes
class xtmail (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')
-
Extract files and body from EMail messages. The unit supports both the Outlook message format and regular MIME documents.
Expand source code Browse git
class xtmail(PathExtractorUnit): """ Extract files and body from EMail messages. The unit supports both the Outlook message format and regular MIME documents. """ def _get_headparts(self, head: Iterable[Tuple[str, str]]): def normalize_spaces(value: str): return ''.join(re.sub(R'\A\s+', '\x20', t) for t in value.splitlines(False)) _headers: dict[str, list[str]] = {} for key, value in head: _headers.setdefault(key, []).append(mimewords.convert(normalize_spaces(value))) headers = { key: value[0] if len(value) == 1 else [t for t in value if t] for key, value in _headers.items()} yield UnpackResult('headers.txt', lambda h=head: '\n'.join(F'{k}: {v}' for k, v in h).encode(self.codec)) received = [] for recv in headers.get('Received', []): if not recv.startswith('from '): received = None break recv = recv[5:] src, _, rest = recv.partition(' by ') dst, _, rest = rest.partition(' with ') received.append({ 'Source': src.partition('\x20')[0], 'Target': dst.partition('\x20')[0], }) if received: received.reverse() headers['ReceivedTrace'] = received yield UnpackResult('headers.json', lambda jsn=headers: json.dumps(jsn, indent=4).encode(self.codec)) @PathExtractorUnit.Requires('extract-msg', ['formats', 'office', 'default', 'extended']) def _extract_msg(): import extract_msg.enums return extract_msg def _get_parts_outlook(self, data): def ensure_bytes(data: bytes | str | None): if data is None: return B'' elif isinstance(data, str): return data.encode(self.codec) else: return data def make_message(name, msg: Message): bodies = msg.detectedBodies BT = self._extract_msg.enums.BodyTypes if bodies & BT.HTML: def htm(msg=msg): with NoLogging(): try: return ensure_bytes(msg.htmlBody) except Exception: return B'' yield UnpackResult(F'{name}.htm', htm) if bodies & BT.PLAIN: def txt(msg=msg): with NoLogging(): try: return ensure_bytes(msg.body) except Exception: return B'' yield UnpackResult(F'{name}.txt', txt) if bodies & BT.RTF: def rtf(msg=msg): with NoLogging(): try: return ensure_bytes(msg.rtfBody) except Exception: return B'' yield UnpackResult(F'{name}.rtf', rtf) msgcount = 0 with NoLogging(): class ForgivingMessage(self._extract_msg.Message): """ If parsing the input bytes fails early, the "__open" private attribute may not yet exist. This hack prevents an exception to occur in the destructor. """ def __getattr__(self, key: str): if key.endswith('_open'): return False raise AttributeError(key) msg = ForgivingMessage(bytes(data)) header = dict(msg.header) if x := msg.date: header['Date'] = email.utils.format_datetime(x) if x := msg.sender: header['From'] = x if x := msg.to: header['To'] = x if x := msg.cc: header['Cc'] = x if x := msg.bcc: header['Bcc'] = x if x := msg.messageId: header['Message-Id'] = x if x := msg.subject: header['Subject'] = x for key, val in list(header.items()): if val := val.strip().replace('\0', ''): header[key] = val else: del header[key] yield from self._get_headparts(header.items()) yield from make_message('body', msg) def attachments(msg): for attachment in getattr(msg, 'attachments', ()): yield attachment if attachment.type == 'data': continue yield from attachments(attachment.data) for attachment in attachments(msg): at = attachment.type if at is self._extract_msg.enums.AttachmentType.MSG: msgcount += 1 yield from make_message(F'attachments/msg_{msgcount:d}', attachment.data) continue if not isbuffer(attachment.data): self.log_warn(F'unknown attachment of type {at}, please report this!') continue path = attachment.longFilename or attachment.shortFilename path = path.rstrip('\0') yield UnpackResult(F'attachments/{path}', attachment.data) @PathExtractorUnit.Requires('chardet', ['default', 'extended']) def _chardet(): import chardet return chardet def _get_parts_regular(self, data: bytes): try: info = self._chardet.detect(data) msg = data.decode(str(info['encoding'])) except UnicodeDecodeError: raise ValueError('This is not a plaintext email message.') else: msg = Parser().parsestr(msg) yield from self._get_headparts(msg.items()) for k, part in enumerate(msg.walk()): path = part.get_filename() error_message = None result = None if path is None: extension = file_extension(part.get_content_type(), 'txt') path = F'body.{extension}' else: path = path | mimewords | str path = F'attachments/{path}' try: payload = part.get_payload(decode=True) if payload is None or isinstance(payload, bytes): result = payload else: raise TypeError except Exception as E: try: payload = part.get_payload(decode=False) except Exception as E: error_message = str(E) else: from refinery.units.pattern.carve import carve self.log_warn(F'manually decoding part {k}, data might be corrupted: {path}') if isinstance(payload, str): payload = payload.encode('latin1') if payload := asbuffer(payload): result = next(payload | carve('b64', stripspace=True, single=True, decode=True)) else: error_message = str(E) result = None if not result: if error_message is not None: self.log_warn(F'could not get content of message part {k}: {error_message!s}') continue yield UnpackResult(path, result) def unpack(self, data): if data[:len(CDFv2_MARKER)] == CDFv2_MARKER: yield from self._get_parts_outlook(data) else: yield from self._get_parts_regular(data) @classmethod def handles(cls, data: bytearray) -> bool: counter = 0 if data.startswith(CDFv2_MARKER): markers = _EMAIL_BIN_MARKERS threshold = 1 else: markers = _EMAIL_TXT_MARKERS threshold = 2 for marker in markers: if re.search(re.escape(marker), data, flags=re.IGNORECASE): counter += 1 if counter >= threshold: return True else: return False
Ancestors
Subclasses
Class variables
var required_dependencies
var console
var reverse
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): if data[:len(CDFv2_MARKER)] == CDFv2_MARKER: yield from self._get_parts_outlook(data) else: yield from self._get_parts_regular(data)
Inherited members