Module refinery.units.formats.email
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from email.parser import Parser
from functools import partial
from collections import defaultdict
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.units.pattern.mimewords import mimewords
from refinery.lib.mime import file_extension
from refinery.lib.tools import NoLogging, isbuffer
class xtmail(PathExtractorUnit):
"""
Extract files and body from EMail messages. The unit supports both the Outlook message format
and regular MIME documents.
"""
def _get_headparts(self, head):
mw = mimewords()
mw = partial(mw.process.__wrapped__.__wrapped__, mw)
jh = defaultdict(list)
for key, value in head:
jh[key].append(mw(''.join(t.lstrip() for t in value.splitlines(False))))
jh = {k: v[0] if len(v) == 1 else [t for t in v if t] for k, v in jh.items()}
yield UnpackResult('headers.txt',
lambda h=head: '\n'.join(F'{k}: {v}' for k, v in h).encode(self.codec))
yield UnpackResult('headers.json',
lambda jsn=jh: json.dumps(jsn, indent=4).encode(self.codec))
@PathExtractorUnit.Requires('extract-msg<=0.41.0', 'formats', 'office', 'default', 'extended')
def _extract_msg():
import extract_msg.message
import extract_msg.enums
return extract_msg
def _get_parts_outlook(self, data):
def ensure_bytes(data):
return data if isinstance(data, bytes) else data.encode(self.codec)
def make_message(name, msg):
with NoLogging():
try:
htm = msg.htmlBody
except Exception:
htm = None
try:
txt = msg.body
except Exception:
txt = None
if txt:
yield UnpackResult(F'{name}.txt', ensure_bytes(txt))
if htm:
yield UnpackResult(F'{name}.htm', ensure_bytes(htm))
msgcount = 0
with NoLogging():
class ForgivingMessage(self._extract_msg.message.Message):
"""
If parsing the input bytes fails early, the "__open" private attribute may not
yet exist. This hack prevents an exception to occur in the destructor.
"""
def __getattr__(self, key: str):
if key.endswith('_open'):
return False
raise AttributeError(key)
msg = ForgivingMessage(bytes(data))
yield from self._get_headparts(msg.header.items())
yield from make_message('body', msg)
def attachments(msg):
for attachment in getattr(msg, 'attachments', ()):
yield attachment
if attachment.type == 'data':
continue
yield from attachments(attachment.data)
for attachment in attachments(msg):
at = attachment.type
if at is self._extract_msg.enums.AttachmentType.MSG:
msgcount += 1
yield from make_message(F'attachments/msg_{msgcount:d}', attachment.data)
continue
if not isbuffer(attachment.data):
self.log_warn(F'unknown attachment of type {at}, please report this!')
continue
path = attachment.longFilename or attachment.shortFilename
yield UnpackResult(F'attachments/{path}', attachment.data)
@PathExtractorUnit.Requires('chardet', 'default', 'extended')
def _chardet():
import chardet
return chardet
def _get_parts_regular(self, data: bytes):
try:
info = self._chardet.detect(data)
msg = data.decode(info['encoding'])
except UnicodeDecodeError:
raise ValueError('This is not a plaintext email message.')
else:
msg = Parser().parsestr(msg)
yield from self._get_headparts(msg.items())
for k, part in enumerate(msg.walk()):
path = part.get_filename()
elog = None
if path is None:
extension = file_extension(part.get_content_type(), 'txt')
path = F'body.{extension}'
else:
path = path | mimewords | str
path = F'attachments/{path}'
try:
data = part.get_payload(decode=True)
except Exception as E:
try:
data = part.get_payload(decode=False)
except Exception as E:
elog = str(E)
data = None
else:
from refinery import carve
self.log_warn(F'manually decoding part {k}, data might be corrupted: {path}')
if isinstance(data, str):
data = data.encode('latin1')
if isbuffer(data):
data = next(data | carve('b64', stripspace=True, single=True, decode=True))
else:
elog = str(E)
data = None
if not data:
if elog is not None:
self.log_warn(F'could not get content of message part {k}: {elog!s}')
continue
yield UnpackResult(path, data)
def unpack(self, data):
try:
yield from self._get_parts_outlook(data)
except Exception:
self.log_debug('failed parsing input as Outlook message')
yield from self._get_parts_regular(data)
@classmethod
def handles(cls, data: bytearray) -> bool:
markers = [
b'\nReceived:\x20from'
b'\nSubject:\x20',
b'\nTo:\x20',
b'\nBcc:\x20',
b'\nContent-Transfer-Encoding:\x20',
b'\nContent-Type:\x20',
b'\nReturn-Path:\x20',
]
if data.startswith(B'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'):
markers = [marker.decode('latin1').encode('utf-16le') for marker in markers]
return sum(1 for marker in markers if marker in data) >= 3
Classes
class xtmail (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')
-
Extract files and body from EMail messages. The unit supports both the Outlook message format and regular MIME documents.
Expand source code Browse git
class xtmail(PathExtractorUnit): """ Extract files and body from EMail messages. The unit supports both the Outlook message format and regular MIME documents. """ def _get_headparts(self, head): mw = mimewords() mw = partial(mw.process.__wrapped__.__wrapped__, mw) jh = defaultdict(list) for key, value in head: jh[key].append(mw(''.join(t.lstrip() for t in value.splitlines(False)))) jh = {k: v[0] if len(v) == 1 else [t for t in v if t] for k, v in jh.items()} yield UnpackResult('headers.txt', lambda h=head: '\n'.join(F'{k}: {v}' for k, v in h).encode(self.codec)) yield UnpackResult('headers.json', lambda jsn=jh: json.dumps(jsn, indent=4).encode(self.codec)) @PathExtractorUnit.Requires('extract-msg<=0.41.0', 'formats', 'office', 'default', 'extended') def _extract_msg(): import extract_msg.message import extract_msg.enums return extract_msg def _get_parts_outlook(self, data): def ensure_bytes(data): return data if isinstance(data, bytes) else data.encode(self.codec) def make_message(name, msg): with NoLogging(): try: htm = msg.htmlBody except Exception: htm = None try: txt = msg.body except Exception: txt = None if txt: yield UnpackResult(F'{name}.txt', ensure_bytes(txt)) if htm: yield UnpackResult(F'{name}.htm', ensure_bytes(htm)) msgcount = 0 with NoLogging(): class ForgivingMessage(self._extract_msg.message.Message): """ If parsing the input bytes fails early, the "__open" private attribute may not yet exist. This hack prevents an exception to occur in the destructor. """ def __getattr__(self, key: str): if key.endswith('_open'): return False raise AttributeError(key) msg = ForgivingMessage(bytes(data)) yield from self._get_headparts(msg.header.items()) yield from make_message('body', msg) def attachments(msg): for attachment in getattr(msg, 'attachments', ()): yield attachment if attachment.type == 'data': continue yield from attachments(attachment.data) for attachment in attachments(msg): at = attachment.type if at is self._extract_msg.enums.AttachmentType.MSG: msgcount += 1 yield from make_message(F'attachments/msg_{msgcount:d}', attachment.data) continue if not isbuffer(attachment.data): self.log_warn(F'unknown attachment of type {at}, please report this!') continue path = attachment.longFilename or attachment.shortFilename yield UnpackResult(F'attachments/{path}', attachment.data) @PathExtractorUnit.Requires('chardet', 'default', 'extended') def _chardet(): import chardet return chardet def _get_parts_regular(self, data: bytes): try: info = self._chardet.detect(data) msg = data.decode(info['encoding']) except UnicodeDecodeError: raise ValueError('This is not a plaintext email message.') else: msg = Parser().parsestr(msg) yield from self._get_headparts(msg.items()) for k, part in enumerate(msg.walk()): path = part.get_filename() elog = None if path is None: extension = file_extension(part.get_content_type(), 'txt') path = F'body.{extension}' else: path = path | mimewords | str path = F'attachments/{path}' try: data = part.get_payload(decode=True) except Exception as E: try: data = part.get_payload(decode=False) except Exception as E: elog = str(E) data = None else: from refinery import carve self.log_warn(F'manually decoding part {k}, data might be corrupted: {path}') if isinstance(data, str): data = data.encode('latin1') if isbuffer(data): data = next(data | carve('b64', stripspace=True, single=True, decode=True)) else: elog = str(E) data = None if not data: if elog is not None: self.log_warn(F'could not get content of message part {k}: {elog!s}') continue yield UnpackResult(path, data) def unpack(self, data): try: yield from self._get_parts_outlook(data) except Exception: self.log_debug('failed parsing input as Outlook message') yield from self._get_parts_regular(data) @classmethod def handles(cls, data: bytearray) -> bool: markers = [ b'\nReceived:\x20from' b'\nSubject:\x20', b'\nTo:\x20', b'\nBcc:\x20', b'\nContent-Transfer-Encoding:\x20', b'\nContent-Type:\x20', b'\nReturn-Path:\x20', ] if data.startswith(B'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1'): markers = [marker.decode('latin1').encode('utf-16le') for marker in markers] return sum(1 for marker in markers if marker in data) >= 3
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): try: yield from self._get_parts_outlook(data) except Exception: self.log_debug('failed parsing input as Outlook message') yield from self._get_parts_regular(data)
Inherited members