Module refinery.lib.outlook
Lightweight parser for Outlook MSG files based on olefile.
Expand source code Browse git
"""
Lightweight parser for Outlook MSG files based on olefile.
"""
from __future__ import annotations
import codecs
import re
import struct
from datetime import datetime, timedelta, timezone
from email.parser import HeaderParser
from functools import cached_property
_PSETID_APPOINTMENT = bytes.fromhex('0220060000000000c000000000000046')
_LID_LOCATION = 0x8208
_FILETIME_EPOCH = datetime(1601, 1, 1, tzinfo=timezone.utc)
_CODEPAGE_OVERRIDES = {
20127: 'ascii',
20932: 'euc-jp',
28591: 'iso-8859-1',
28592: 'iso-8859-2',
28593: 'iso-8859-3',
28594: 'iso-8859-4',
28595: 'iso-8859-5',
28596: 'iso-8859-6',
28597: 'iso-8859-7',
28598: 'iso-8859-8',
28599: 'iso-8859-9',
28603: 'iso-8859-13',
28605: 'iso-8859-15',
50220: 'iso-2022-jp',
50221: 'iso-2022-jp',
50225: 'iso-2022-kr',
51932: 'euc-jp',
51949: 'euc-kr',
}
def _codepage_to_codec(cp: int) -> str:
if name := _CODEPAGE_OVERRIDES.get(cp):
return name
try:
return codecs.lookup(F'cp{cp}').name
except LookupError:
return 'cp1252'
def _filetime_to_datetime(filetime: int) -> datetime | None:
if filetime <= 0:
return None
return _FILETIME_EPOCH + timedelta(microseconds=filetime // 10)
class MsgAttachment:
def __init__(self, ole, prefix: str, parent: MsgFile):
self._ole = ole
self._prefix = prefix
self._parent = parent
props_path = F'{prefix}/__properties_version1.0'
self._props = parent._parse_properties(props_path, header_size=8)
def _stream(self, path: str) -> bytes | memoryview | None:
full = F'{self._prefix}/{path}'
if self._ole.exists(full):
return self._ole.openstream(full).read()
return None
def _read_string(self, prop_id: str) -> str | None:
for suffix, codec in (('001F', 'utf-16-le'), ('001E', self._parent._ansi_codec)):
data = self._stream(F'__substg1.0_{prop_id}{suffix}')
if data is not None:
return codecs.decode(data, codec).rstrip('\0')
return None
@cached_property
def attach_method(self) -> int:
return self._props.get('37050003', 1)
@cached_property
def content_id(self) -> str | None:
return self._read_string('3712')
@cached_property
def mime_type(self) -> str | None:
return self._read_string('370E')
@cached_property
def is_embedded_msg(self) -> bool:
return self.attach_method == 5
@cached_property
def is_ole_storage(self) -> bool:
return self.attach_method == 6
@cached_property
def data(self) -> bytes | memoryview | MsgFile:
method = self.attach_method
if method == 5:
prefix = F'{self._prefix}/__substg1.0_3701000D'
if self._ole.exists(prefix):
return MsgFile._from_ole(self._ole, prefix)
if method == 6:
binary = self._stream('__substg1.0_37010102')
if binary is not None:
return binary
if method in (2, 3, 4, 7):
ref = self._read_string('3701') or self.long_filename
if ref:
return ref.encode('utf-8')
return b''
raw = self._stream('__substg1.0_37010102')
return raw if raw is not None else b''
@cached_property
def long_filename(self) -> str | None:
return self._read_string('3707')
@cached_property
def short_filename(self) -> str | None:
return self._read_string('3704')
class MsgFile:
def __init__(self, data: bytes | bytearray | memoryview):
from refinery.lib.ole.file import OleFile
from refinery.lib.structures import MemoryFile
self._ole = OleFile(MemoryFile(data))
self._prefix = ''
self._init()
@classmethod
def _from_ole(cls, ole, prefix: str) -> MsgFile:
obj = object.__new__(cls)
obj._ole = ole
obj._prefix = prefix
obj._init()
return obj
def _init(self):
self._unicode = False
p = self._prefix
for entry in self._ole.listdir():
path = '/'.join(entry)
if path.startswith(p) and path.endswith('001F'):
self._unicode = True
break
props_path = F'{p}/__properties_version1.0' if p else '__properties_version1.0'
header_size = 32 if not p else 24
self._props = self._parse_properties(props_path, header_size)
cp = self._props.get('3FFD0003') or self._props.get('3FDE0003')
self._ansi_codec = _codepage_to_codec(cp) if cp else 'cp1252'
self._named_props = self._parse_named_properties()
def _read_stream(self, path: str) -> bytes | memoryview | None:
if self._ole.exists(path):
return self._ole.openstream(path).read()
return None
def _read_string(self, prop_id: str) -> str | None:
p = self._prefix
base = F'{p}/__substg1.0_{prop_id}' if p else F'__substg1.0_{prop_id}'
for suffix, codec in (('001F', 'utf-16-le'), ('001E', self._ansi_codec)):
data = self._read_stream(F'{base}{suffix}')
if data is not None:
try:
return codecs.decode(data, codec).rstrip('\0')
except (UnicodeDecodeError, LookupError):
continue
return None
def _read_binary(self, prop_id: str) -> bytes | memoryview | None:
p = self._prefix
path = F'{p}/__substg1.0_{prop_id}0102' if p else F'__substg1.0_{prop_id}0102'
return self._read_stream(path)
def _parse_named_properties(self) -> dict[tuple[bytes, int], int]:
"""
Parse the __nameid_version1.0 streams to build a mapping from (GUID, LID) to the runtime
property ID in the 0x8000+ range.
"""
result: dict[tuple[bytes, int], int] = {}
guid_stream = self._read_stream('__nameid_version1.0/__substg1.0_00020102')
data_stream = self._read_stream('__nameid_version1.0/__substg1.0_00030102')
if not guid_stream or not data_stream:
return result
guids: list[bytes] = []
for i in range(0, len(guid_stream), 16):
guids.append(bytes(guid_stream[i:i + 16]))
for i in range(0, len(data_stream) - 7, 8):
lid, info, _idx = struct.unpack_from('<IHH', data_stream, i)
is_string = info & 1
if is_string:
continue
guid_idx = info >> 1
if guid_idx < 3 or guid_idx - 3 >= len(guids):
continue
guid = guids[guid_idx - 3]
result[(guid, lid)] = 0x8000 + i // 8
return result
def _resolve_named_prop(self, guid: bytes, lid: int) -> str | None:
prop_id = self._named_props.get((guid, lid))
if prop_id is not None:
return F'{prop_id:04X}'
return None
def _parse_properties(self, stream_path: str, header_size: int = 32) -> dict:
data = self._read_stream(stream_path)
if not data or len(data) < header_size + 16:
return {}
props = {}
mv = memoryview(data)
for offset in range(header_size, len(data) - 15, 16):
record = mv[offset:offset + 16]
prop_type = int.from_bytes(record[0:2], 'little')
prop_id = int.from_bytes(record[2:4], 'little')
tag = F'{prop_id:04X}{prop_type:04X}'
if prop_type == 0x0003:
props[tag] = int.from_bytes(record[8:12], 'little')
elif prop_type == 0x0040:
ft = int.from_bytes(record[8:16], 'little')
props[tag] = _filetime_to_datetime(ft)
return props
@cached_property
def subject(self) -> str | None:
return self._read_string('0037')
@cached_property
def message_class(self) -> str:
return (self._read_string('001A') or 'IPM.Note').upper()
@cached_property
def sender(self) -> str | None:
return self._read_string('0C1A') or self._read_string('0065')
@cached_property
def to(self) -> str | None:
return self._read_string('0E04')
@cached_property
def cc(self) -> str | None:
return self._read_string('0E03')
@cached_property
def bcc(self) -> str | None:
return self._read_string('0E02')
@cached_property
def message_id(self) -> str | None:
return self._read_string('1035')
@cached_property
def date(self) -> datetime | None:
return self._props.get('0E060040') or self._props.get('00390040')
@cached_property
def header(self) -> dict[str, str]:
raw = self._read_string('007D')
if not raw:
return {}
parsed = HeaderParser().parsestr(raw)
return dict(parsed.items())
@cached_property
def body(self) -> str | None:
return self._read_string('1000')
@cached_property
def html_body(self) -> bytes | None:
return self._read_binary('1013')
@cached_property
def rtf_body(self) -> bytes | None:
data = self._read_binary('1009')
if data is None:
return None
from refinery.lib.rtfc import decompress
return decompress(data)
@cached_property
def attachments(self) -> list[MsgAttachment]:
result = []
p = self._prefix
prefix_parts = p.split('/') if p else []
depth = len(prefix_parts)
seen = set()
for entry in self._ole.listdir():
if entry[:depth] != prefix_parts:
continue
if len(entry) <= depth:
continue
dirname = entry[depth]
if not re.match(
r'__attach_version1\.0_#[0-9A-F]{8}\Z', dirname, re.IGNORECASE
):
continue
full = '/'.join(prefix_parts + [dirname]) if p else dirname
if full not in seen:
seen.add(full)
result.append(MsgAttachment(self._ole, full, self))
return result
# Contact properties (IPM.Contact)
@cached_property
def display_name(self) -> str | None:
return self._read_string('3001')
@cached_property
def company(self) -> str | None:
return self._read_string('3A16')
@cached_property
def job_title(self) -> str | None:
return self._read_string('3A17')
@cached_property
def business_phone(self) -> str | None:
return self._read_string('3A08')
@cached_property
def home_phone(self) -> str | None:
return self._read_string('3A09')
@cached_property
def mobile_phone(self) -> str | None:
return self._read_string('3A1C')
@cached_property
def start_time(self) -> datetime | None:
return self._props.get('00600040')
@cached_property
def end_time(self) -> datetime | None:
return self._props.get('00610040')
@cached_property
def location(self) -> str | None:
prop_id = self._resolve_named_prop(_PSETID_APPOINTMENT, _LID_LOCATION)
if prop_id:
return self._read_string(prop_id)
return None
Classes
class MsgAttachment (ole, prefix, parent)-
Expand source code Browse git
class MsgAttachment: def __init__(self, ole, prefix: str, parent: MsgFile): self._ole = ole self._prefix = prefix self._parent = parent props_path = F'{prefix}/__properties_version1.0' self._props = parent._parse_properties(props_path, header_size=8) def _stream(self, path: str) -> bytes | memoryview | None: full = F'{self._prefix}/{path}' if self._ole.exists(full): return self._ole.openstream(full).read() return None def _read_string(self, prop_id: str) -> str | None: for suffix, codec in (('001F', 'utf-16-le'), ('001E', self._parent._ansi_codec)): data = self._stream(F'__substg1.0_{prop_id}{suffix}') if data is not None: return codecs.decode(data, codec).rstrip('\0') return None @cached_property def attach_method(self) -> int: return self._props.get('37050003', 1) @cached_property def content_id(self) -> str | None: return self._read_string('3712') @cached_property def mime_type(self) -> str | None: return self._read_string('370E') @cached_property def is_embedded_msg(self) -> bool: return self.attach_method == 5 @cached_property def is_ole_storage(self) -> bool: return self.attach_method == 6 @cached_property def data(self) -> bytes | memoryview | MsgFile: method = self.attach_method if method == 5: prefix = F'{self._prefix}/__substg1.0_3701000D' if self._ole.exists(prefix): return MsgFile._from_ole(self._ole, prefix) if method == 6: binary = self._stream('__substg1.0_37010102') if binary is not None: return binary if method in (2, 3, 4, 7): ref = self._read_string('3701') or self.long_filename if ref: return ref.encode('utf-8') return b'' raw = self._stream('__substg1.0_37010102') return raw if raw is not None else b'' @cached_property def long_filename(self) -> str | None: return self._read_string('3707') @cached_property def short_filename(self) -> str | None: return self._read_string('3704')Instance variables
var attach_method-
Expand source code Browse git
@cached_property def attach_method(self) -> int: return self._props.get('37050003', 1) var content_id-
Expand source code Browse git
@cached_property def content_id(self) -> str | None: return self._read_string('3712') var mime_type-
Expand source code Browse git
@cached_property def mime_type(self) -> str | None: return self._read_string('370E') var is_embedded_msg-
Expand source code Browse git
@cached_property def is_embedded_msg(self) -> bool: return self.attach_method == 5 var is_ole_storage-
Expand source code Browse git
@cached_property def is_ole_storage(self) -> bool: return self.attach_method == 6 var data-
Expand source code Browse git
@cached_property def data(self) -> bytes | memoryview | MsgFile: method = self.attach_method if method == 5: prefix = F'{self._prefix}/__substg1.0_3701000D' if self._ole.exists(prefix): return MsgFile._from_ole(self._ole, prefix) if method == 6: binary = self._stream('__substg1.0_37010102') if binary is not None: return binary if method in (2, 3, 4, 7): ref = self._read_string('3701') or self.long_filename if ref: return ref.encode('utf-8') return b'' raw = self._stream('__substg1.0_37010102') return raw if raw is not None else b'' var long_filename-
Expand source code Browse git
@cached_property def long_filename(self) -> str | None: return self._read_string('3707') var short_filename-
Expand source code Browse git
@cached_property def short_filename(self) -> str | None: return self._read_string('3704')
class MsgFile (data)-
Expand source code Browse git
class MsgFile: def __init__(self, data: bytes | bytearray | memoryview): from refinery.lib.ole.file import OleFile from refinery.lib.structures import MemoryFile self._ole = OleFile(MemoryFile(data)) self._prefix = '' self._init() @classmethod def _from_ole(cls, ole, prefix: str) -> MsgFile: obj = object.__new__(cls) obj._ole = ole obj._prefix = prefix obj._init() return obj def _init(self): self._unicode = False p = self._prefix for entry in self._ole.listdir(): path = '/'.join(entry) if path.startswith(p) and path.endswith('001F'): self._unicode = True break props_path = F'{p}/__properties_version1.0' if p else '__properties_version1.0' header_size = 32 if not p else 24 self._props = self._parse_properties(props_path, header_size) cp = self._props.get('3FFD0003') or self._props.get('3FDE0003') self._ansi_codec = _codepage_to_codec(cp) if cp else 'cp1252' self._named_props = self._parse_named_properties() def _read_stream(self, path: str) -> bytes | memoryview | None: if self._ole.exists(path): return self._ole.openstream(path).read() return None def _read_string(self, prop_id: str) -> str | None: p = self._prefix base = F'{p}/__substg1.0_{prop_id}' if p else F'__substg1.0_{prop_id}' for suffix, codec in (('001F', 'utf-16-le'), ('001E', self._ansi_codec)): data = self._read_stream(F'{base}{suffix}') if data is not None: try: return codecs.decode(data, codec).rstrip('\0') except (UnicodeDecodeError, LookupError): continue return None def _read_binary(self, prop_id: str) -> bytes | memoryview | None: p = self._prefix path = F'{p}/__substg1.0_{prop_id}0102' if p else F'__substg1.0_{prop_id}0102' return self._read_stream(path) def _parse_named_properties(self) -> dict[tuple[bytes, int], int]: """ Parse the __nameid_version1.0 streams to build a mapping from (GUID, LID) to the runtime property ID in the 0x8000+ range. """ result: dict[tuple[bytes, int], int] = {} guid_stream = self._read_stream('__nameid_version1.0/__substg1.0_00020102') data_stream = self._read_stream('__nameid_version1.0/__substg1.0_00030102') if not guid_stream or not data_stream: return result guids: list[bytes] = [] for i in range(0, len(guid_stream), 16): guids.append(bytes(guid_stream[i:i + 16])) for i in range(0, len(data_stream) - 7, 8): lid, info, _idx = struct.unpack_from('<IHH', data_stream, i) is_string = info & 1 if is_string: continue guid_idx = info >> 1 if guid_idx < 3 or guid_idx - 3 >= len(guids): continue guid = guids[guid_idx - 3] result[(guid, lid)] = 0x8000 + i // 8 return result def _resolve_named_prop(self, guid: bytes, lid: int) -> str | None: prop_id = self._named_props.get((guid, lid)) if prop_id is not None: return F'{prop_id:04X}' return None def _parse_properties(self, stream_path: str, header_size: int = 32) -> dict: data = self._read_stream(stream_path) if not data or len(data) < header_size + 16: return {} props = {} mv = memoryview(data) for offset in range(header_size, len(data) - 15, 16): record = mv[offset:offset + 16] prop_type = int.from_bytes(record[0:2], 'little') prop_id = int.from_bytes(record[2:4], 'little') tag = F'{prop_id:04X}{prop_type:04X}' if prop_type == 0x0003: props[tag] = int.from_bytes(record[8:12], 'little') elif prop_type == 0x0040: ft = int.from_bytes(record[8:16], 'little') props[tag] = _filetime_to_datetime(ft) return props @cached_property def subject(self) -> str | None: return self._read_string('0037') @cached_property def message_class(self) -> str: return (self._read_string('001A') or 'IPM.Note').upper() @cached_property def sender(self) -> str | None: return self._read_string('0C1A') or self._read_string('0065') @cached_property def to(self) -> str | None: return self._read_string('0E04') @cached_property def cc(self) -> str | None: return self._read_string('0E03') @cached_property def bcc(self) -> str | None: return self._read_string('0E02') @cached_property def message_id(self) -> str | None: return self._read_string('1035') @cached_property def date(self) -> datetime | None: return self._props.get('0E060040') or self._props.get('00390040') @cached_property def header(self) -> dict[str, str]: raw = self._read_string('007D') if not raw: return {} parsed = HeaderParser().parsestr(raw) return dict(parsed.items()) @cached_property def body(self) -> str | None: return self._read_string('1000') @cached_property def html_body(self) -> bytes | None: return self._read_binary('1013') @cached_property def rtf_body(self) -> bytes | None: data = self._read_binary('1009') if data is None: return None from refinery.lib.rtfc import decompress return decompress(data) @cached_property def attachments(self) -> list[MsgAttachment]: result = [] p = self._prefix prefix_parts = p.split('/') if p else [] depth = len(prefix_parts) seen = set() for entry in self._ole.listdir(): if entry[:depth] != prefix_parts: continue if len(entry) <= depth: continue dirname = entry[depth] if not re.match( r'__attach_version1\.0_#[0-9A-F]{8}\Z', dirname, re.IGNORECASE ): continue full = '/'.join(prefix_parts + [dirname]) if p else dirname if full not in seen: seen.add(full) result.append(MsgAttachment(self._ole, full, self)) return result # Contact properties (IPM.Contact) @cached_property def display_name(self) -> str | None: return self._read_string('3001') @cached_property def company(self) -> str | None: return self._read_string('3A16') @cached_property def job_title(self) -> str | None: return self._read_string('3A17') @cached_property def business_phone(self) -> str | None: return self._read_string('3A08') @cached_property def home_phone(self) -> str | None: return self._read_string('3A09') @cached_property def mobile_phone(self) -> str | None: return self._read_string('3A1C') @cached_property def start_time(self) -> datetime | None: return self._props.get('00600040') @cached_property def end_time(self) -> datetime | None: return self._props.get('00610040') @cached_property def location(self) -> str | None: prop_id = self._resolve_named_prop(_PSETID_APPOINTMENT, _LID_LOCATION) if prop_id: return self._read_string(prop_id) return NoneInstance variables
var subject-
Expand source code Browse git
@cached_property def subject(self) -> str | None: return self._read_string('0037') var message_class-
Expand source code Browse git
@cached_property def message_class(self) -> str: return (self._read_string('001A') or 'IPM.Note').upper() var sender-
Expand source code Browse git
@cached_property def sender(self) -> str | None: return self._read_string('0C1A') or self._read_string('0065') var to-
Expand source code Browse git
@cached_property def to(self) -> str | None: return self._read_string('0E04') var cc-
Expand source code Browse git
@cached_property def cc(self) -> str | None: return self._read_string('0E03') var bcc-
Expand source code Browse git
@cached_property def bcc(self) -> str | None: return self._read_string('0E02') var message_id-
Expand source code Browse git
@cached_property def message_id(self) -> str | None: return self._read_string('1035') var date-
Expand source code Browse git
@cached_property def date(self) -> datetime | None: return self._props.get('0E060040') or self._props.get('00390040') var header-
Expand source code Browse git
@cached_property def header(self) -> dict[str, str]: raw = self._read_string('007D') if not raw: return {} parsed = HeaderParser().parsestr(raw) return dict(parsed.items()) var body-
Expand source code Browse git
@cached_property def body(self) -> str | None: return self._read_string('1000') var html_body-
Expand source code Browse git
@cached_property def html_body(self) -> bytes | None: return self._read_binary('1013') var rtf_body-
Expand source code Browse git
@cached_property def rtf_body(self) -> bytes | None: data = self._read_binary('1009') if data is None: return None from refinery.lib.rtfc import decompress return decompress(data) var attachments-
Expand source code Browse git
@cached_property def attachments(self) -> list[MsgAttachment]: result = [] p = self._prefix prefix_parts = p.split('/') if p else [] depth = len(prefix_parts) seen = set() for entry in self._ole.listdir(): if entry[:depth] != prefix_parts: continue if len(entry) <= depth: continue dirname = entry[depth] if not re.match( r'__attach_version1\.0_#[0-9A-F]{8}\Z', dirname, re.IGNORECASE ): continue full = '/'.join(prefix_parts + [dirname]) if p else dirname if full not in seen: seen.add(full) result.append(MsgAttachment(self._ole, full, self)) return result var display_name-
Expand source code Browse git
@cached_property def display_name(self) -> str | None: return self._read_string('3001') var company-
Expand source code Browse git
@cached_property def company(self) -> str | None: return self._read_string('3A16') var job_title-
Expand source code Browse git
@cached_property def job_title(self) -> str | None: return self._read_string('3A17') var business_phone-
Expand source code Browse git
@cached_property def business_phone(self) -> str | None: return self._read_string('3A08') var home_phone-
Expand source code Browse git
@cached_property def home_phone(self) -> str | None: return self._read_string('3A09') var mobile_phone-
Expand source code Browse git
@cached_property def mobile_phone(self) -> str | None: return self._read_string('3A1C') var start_time-
Expand source code Browse git
@cached_property def start_time(self) -> datetime | None: return self._props.get('00600040') var end_time-
Expand source code Browse git
@cached_property def end_time(self) -> datetime | None: return self._props.get('00610040') var location-
Expand source code Browse git
@cached_property def location(self) -> str | None: prop_id = self._resolve_named_prop(_PSETID_APPOINTMENT, _LID_LOCATION) if prop_id: return self._read_string(prop_id) return None