Module refinery.units.formats.email
Expand source code Browse git
from __future__ import annotations
import email.utils
import re
from email.parser import Parser
from typing import Iterable
from refinery.lib import json
from refinery.lib.id import is_likely_email
from refinery.lib.mime import file_extension
from refinery.lib.types import asbuffer, isbuffer
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.units.pattern.mimewords import mimewords
CDFv2_MARKER = B'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'
class xtmail(PathExtractorUnit):
"""
Extract files and body from EMail messages. The unit supports both the Outlook message format
and regular MIME documents.
"""
def _get_headparts(self, head: Iterable[tuple[str, str]]):
def normalize_spaces(value: str):
return ''.join(re.sub(R'\A\s+', '\x20', t) for t in value.splitlines(False))
_headers: dict[str, list[str]] = {}
for key, value in head:
_headers.setdefault(key, []).append(mimewords.convert(normalize_spaces(value)))
headers = {
key: value[0] if len(value) == 1 else [t for t in value if t]
for key, value in _headers.items()}
yield UnpackResult('headers.txt',
lambda h=head: '\n'.join(F'{k}: {v}' for k, v in h).encode(self.codec))
received = []
for recv in headers.get('Received', []):
if not recv.startswith('from '):
received = None
break
recv = recv[5:]
src, _, rest = recv.partition(' by ')
dst, _, rest = rest.partition(' with ')
received.append({
'Source': src.partition('\x20')[0],
'Target': dst.partition('\x20')[0],
})
if received:
received.reverse()
headers['ReceivedTrace'] = received
yield UnpackResult('headers.json', lambda jsn=headers: json.dumps(jsn))
def _get_parts_outlook(self, data):
from refinery.lib.outlook import MsgFile
def make_message(name, msg: MsgFile):
if msg.html_body is not None:
yield UnpackResult(F'{name}.htm', msg.html_body)
if msg.body is not None:
yield UnpackResult(F'{name}.txt', msg.body.encode(self.codec))
if msg.rtf_body is not None:
yield UnpackResult(F'{name}.rtf', msg.rtf_body)
msg = MsgFile(data)
header = dict(msg.header.items()) if msg.header else {}
mc = msg.message_class
if mc:
header['X-Message-Class'] = mc
if x := msg.date:
header['Date'] = email.utils.format_datetime(x)
if x := msg.sender:
header['From'] = x
if x := msg.to:
header['To'] = x
if x := msg.cc:
header['Cc'] = x
if x := msg.bcc:
header['Bcc'] = x
if x := msg.message_id:
header['Message-Id'] = x
if x := msg.subject:
header['Subject'] = x
for key, val in list(header.items()):
if val := val.strip().replace('\0', ''):
header[key] = val
else:
del header[key]
yield from self._get_headparts(header.items())
yield from make_message('body', msg)
item = {}
mc = msg.message_class or ''
if 'CONTACT' in mc:
item['name'] = msg.display_name
item['company'] = msg.company
item['title'] = msg.job_title
phone = {}
if hp := msg.home_phone:
phone.update(home=hp)
if bp := msg.business_phone:
phone.update(business=bp)
if mp := msg.mobile_phone:
phone.update(mobile=mp)
if phone:
item['phone'] = phone
elif 'APPOINTMENT' in mc or 'MEETING' in mc or 'SCHEDULE' in mc:
if x := msg.start_time:
item['start'] = x.isoformat(' ', 'seconds')
if x := msg.end_time:
item['end'] = x.isoformat(' ', 'seconds')
if x := msg.location:
item['location'] = x
if item:
yield UnpackResult(F'{mc.lower()}.json', json.dumps(item))
msgcount = 0
def walk_attachments(msg: MsgFile):
for attachment in msg.attachments:
yield attachment
if isinstance(ad := attachment.data, MsgFile):
yield from walk_attachments(ad)
for attachment in walk_attachments(msg):
if isinstance(ad := attachment.data, MsgFile):
msgcount += 1
yield from make_message(
F'attachments/msg_{msgcount:d}', ad)
continue
if not isbuffer(attachment.data):
self.log_warn('unknown attachment type, please report this!')
continue
path = attachment.long_filename or attachment.short_filename or 'unnamed'
path = path.rstrip('\0')
yield UnpackResult(F'attachments/{path}', ad)
@PathExtractorUnit.Requires('chardet', ['default', 'extended'])
def _chardet():
import chardet
return chardet
def _get_parts_regular(self, data: bytes):
for codec in ('ascii', 'utf8', ...):
try:
if codec is ...:
info = self._chardet.detect(data)
codec = str(info['encoding'])
msg = data.decode(codec)
break
except UnicodeDecodeError:
continue
else:
raise ValueError('This is not a plaintext email message.')
msg = Parser().parsestr(msg)
yield from self._get_headparts(msg.items())
for k, part in enumerate(msg.walk()):
path = part.get_filename()
error_message = None
result = None
if path is None:
extension = file_extension(part.get_content_type(), 'txt')
path = F'body.{extension}'
else:
path = path | mimewords | str
path = F'attachments/{path}'
try:
payload = part.get_payload(decode=True)
if payload is None or isinstance(payload, bytes):
result = payload
else:
raise TypeError
except Exception as E:
try:
payload = part.get_payload(decode=False)
except Exception as E:
error_message = str(E)
else:
from refinery.units.pattern.carve import carve
self.log_warn(F'manually decoding part {k}, data might be corrupted: {path}')
if isinstance(payload, str):
payload = payload.encode('latin1')
if payload := asbuffer(payload):
result = next(payload | carve('b64', stripspace=True, single=True, decode=True))
else:
error_message = str(E)
result = None
if not result:
if error_message is not None:
self.log_warn(F'could not get content of message part {k}: {error_message!s}')
continue
yield UnpackResult(path, result)
def unpack(self, data):
if data[:len(CDFv2_MARKER)] == CDFv2_MARKER:
yield from self._get_parts_outlook(data)
else:
yield from self._get_parts_regular(data)
@classmethod
def handles(cls, data) -> bool:
return is_likely_email(data)
Classes
class xtmail (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')-
Extract files and body from EMail messages. The unit supports both the Outlook message format and regular MIME documents.
Expand source code Browse git
class xtmail(PathExtractorUnit): """ Extract files and body from EMail messages. The unit supports both the Outlook message format and regular MIME documents. """ def _get_headparts(self, head: Iterable[tuple[str, str]]): def normalize_spaces(value: str): return ''.join(re.sub(R'\A\s+', '\x20', t) for t in value.splitlines(False)) _headers: dict[str, list[str]] = {} for key, value in head: _headers.setdefault(key, []).append(mimewords.convert(normalize_spaces(value))) headers = { key: value[0] if len(value) == 1 else [t for t in value if t] for key, value in _headers.items()} yield UnpackResult('headers.txt', lambda h=head: '\n'.join(F'{k}: {v}' for k, v in h).encode(self.codec)) received = [] for recv in headers.get('Received', []): if not recv.startswith('from '): received = None break recv = recv[5:] src, _, rest = recv.partition(' by ') dst, _, rest = rest.partition(' with ') received.append({ 'Source': src.partition('\x20')[0], 'Target': dst.partition('\x20')[0], }) if received: received.reverse() headers['ReceivedTrace'] = received yield UnpackResult('headers.json', lambda jsn=headers: json.dumps(jsn)) def _get_parts_outlook(self, data): from refinery.lib.outlook import MsgFile def make_message(name, msg: MsgFile): if msg.html_body is not None: yield UnpackResult(F'{name}.htm', msg.html_body) if msg.body is not None: yield UnpackResult(F'{name}.txt', msg.body.encode(self.codec)) if msg.rtf_body is not None: yield UnpackResult(F'{name}.rtf', msg.rtf_body) msg = MsgFile(data) header = dict(msg.header.items()) if msg.header else {} mc = msg.message_class if mc: header['X-Message-Class'] = mc if x := msg.date: header['Date'] = email.utils.format_datetime(x) if x := msg.sender: header['From'] = x if x := msg.to: header['To'] = x if x := msg.cc: header['Cc'] = x if x := msg.bcc: header['Bcc'] = x if x := msg.message_id: header['Message-Id'] = x if x := msg.subject: header['Subject'] = x for key, val in list(header.items()): if val := val.strip().replace('\0', ''): header[key] = val else: del header[key] yield from self._get_headparts(header.items()) yield from make_message('body', msg) item = {} mc = msg.message_class or '' if 'CONTACT' in mc: item['name'] = msg.display_name item['company'] = msg.company item['title'] = msg.job_title phone = {} if hp := msg.home_phone: phone.update(home=hp) if bp := msg.business_phone: phone.update(business=bp) if mp := msg.mobile_phone: phone.update(mobile=mp) if phone: item['phone'] = phone elif 'APPOINTMENT' in mc or 'MEETING' in mc or 'SCHEDULE' in mc: if x := msg.start_time: item['start'] = x.isoformat(' ', 'seconds') if x := msg.end_time: item['end'] = x.isoformat(' ', 'seconds') if x := msg.location: item['location'] = x if item: yield UnpackResult(F'{mc.lower()}.json', json.dumps(item)) msgcount = 0 def walk_attachments(msg: MsgFile): for attachment in msg.attachments: yield attachment if isinstance(ad := attachment.data, MsgFile): yield from walk_attachments(ad) for attachment in walk_attachments(msg): if isinstance(ad := attachment.data, MsgFile): msgcount += 1 yield from make_message( F'attachments/msg_{msgcount:d}', ad) continue if not isbuffer(attachment.data): self.log_warn('unknown attachment type, please report this!') continue path = attachment.long_filename or attachment.short_filename or 'unnamed' path = path.rstrip('\0') yield UnpackResult(F'attachments/{path}', ad) @PathExtractorUnit.Requires('chardet', ['default', 'extended']) def _chardet(): import chardet return chardet def _get_parts_regular(self, data: bytes): for codec in ('ascii', 'utf8', ...): try: if codec is ...: info = self._chardet.detect(data) codec = str(info['encoding']) msg = data.decode(codec) break except UnicodeDecodeError: continue else: raise ValueError('This is not a plaintext email message.') msg = Parser().parsestr(msg) yield from self._get_headparts(msg.items()) for k, part in enumerate(msg.walk()): path = part.get_filename() error_message = None result = None if path is None: extension = file_extension(part.get_content_type(), 'txt') path = F'body.{extension}' else: path = path | mimewords | str path = F'attachments/{path}' try: payload = part.get_payload(decode=True) if payload is None or isinstance(payload, bytes): result = payload else: raise TypeError except Exception as E: try: payload = part.get_payload(decode=False) except Exception as E: error_message = str(E) else: from refinery.units.pattern.carve import carve self.log_warn(F'manually decoding part {k}, data might be corrupted: {path}') if isinstance(payload, str): payload = payload.encode('latin1') if payload := asbuffer(payload): result = next(payload | carve('b64', stripspace=True, single=True, decode=True)) else: error_message = str(E) result = None if not result: if error_message is not None: self.log_warn(F'could not get content of message part {k}: {error_message!s}') continue yield UnpackResult(path, result) def unpack(self, data): if data[:len(CDFv2_MARKER)] == CDFv2_MARKER: yield from self._get_parts_outlook(data) else: yield from self._get_parts_regular(data) @classmethod def handles(cls, data) -> bool: return is_likely_email(data)Ancestors
Subclasses
Class variables
var reverse-
The type of the None singleton.
Methods
def unpack(self, data)-
Expand source code Browse git
def unpack(self, data): if data[:len(CDFv2_MARKER)] == CDFv2_MARKER: yield from self._get_parts_outlook(data) else: yield from self._get_parts_regular(data)
Inherited members
PathExtractorUnit:CustomJoinBehaviourCustomPathSeparatorFilterEverythingRequiresactassemblecodecconsolefilterfinishhandlesis_quietis_reversibleisattylabelledleniencylog_alwayslog_debuglog_detachlog_faillog_infolog_levellog_warnloggernamenozzleoptional_dependenciesreadread1required_dependenciesresetrunsourcesuperinit
UnitBase: