Module refinery.lib.ole.rtf

Port of oletools rtfobj.py and oleobj.py for RTF embedded object extraction. Parses RTF documents and extracts OLE 1.0 objects and OLE Package streams without requiring oletools as a dependency.

Expand source code Browse git
"""
Port of oletools rtfobj.py and oleobj.py for RTF embedded object extraction. Parses RTF documents
and extracts OLE 1.0 objects and OLE Package streams without requiring oletools as a dependency.
"""
from __future__ import annotations

import enum
import re

from dataclasses import dataclass, field

from refinery.lib.structures import EOF, Struct, StructReader

KNOWN_CLSIDS: dict[str, str] = {
    '00000300-0000-0000-C000-000000000046': 'StdOleLink (CVE-2017-0199, CVE-2017-8570, CVE-2017-8759, CVE-2018-8174)',
    '00000303-0000-0000-C000-000000000046': 'File Moniker (CVE-2017-0199, CVE-2017-8570)',
    '00000309-0000-0000-C000-000000000046': 'Composite Moniker (CVE-2017-8570)',
    '00020C01-0000-0000-C000-000000000046': 'OLE Package Object',
    '00021700-0000-0000-C000-000000000046': 'Microsoft Equation 2.0 (CVE-2017-11882, CVE-2018-0802)',
    '00022601-0000-0000-C000-000000000046': 'OLE Package Object',
    '0002CE02-0000-0000-C000-000000000046': 'Microsoft Equation 3.0 (CVE-2017-11882, CVE-2018-0802)',
    '0003000B-0000-0000-C000-000000000046': 'Microsoft Equation (CVE-2017-11882, CVE-2018-0802)',
    '0003000C-0000-0000-C000-000000000046': 'OLE Package Object',
    '0003000E-0000-0000-C000-000000000046': 'OLE Package Object',
    '0004A6B0-0000-0000-C000-000000000046': 'Microsoft Equation 2.0 (CVE-2017-11882, CVE-2018-0802)',
    '3050F4D8-98B5-11CF-BB82-00AA00BDCE0B': 'HTML Application (CVE-2017-0199)',
    '79EAC9E0-BAF9-11CE-8C82-00AA004BA90B': 'URL Moniker (CVE-2017-0199, CVE-2017-8570, CVE-2018-8174)',
    'D27CDB6E-AE6D-11CF-96B8-444553540000': 'Shockwave Flash Object',
    'ECABB0C7-7F19-11D2-978E-0000F8757E2A': 'SOAP Moniker (CVE-2017-8759)',
    'F20DA720-C02F-11CE-927B-0800095AE340': 'OLE Package Object',
    'F4754C9B-64F5-4B40-8AF4-679732AC0607': 'Microsoft Word Document (Word.Document.12)',
}

DESTINATION_CONTROL_WORDS: frozenset[bytes] = frozenset((
    b"aftncn",
    b"aftnsep",
    b"aftnsepc",
    b"annotation",
    b"atnauthor",
    b"atndate",
    b"atnid",
    b"atnparent",
    b"atnref",
    b"atrfend",
    b"atrfstart",
    b"author",
    b"background",
    b"bkmkend",
    b"bkmkstart",
    b"blipuid",
    b"buptim",
    b"category",
    b"colorschememapping",
    b"colortbl",
    b"comment",
    b"company",
    b"creatim",
    b"datafield",
    b"datastore",
    b"defchp",
    b"defpap",
    b"do",
    b"doccomm",
    b"docvar",
    b"dptxbxtext",
    b"ebcend",
    b"ebcstart",
    b"factoidname",
    b"falt",
    b"fchars",
    b"ffdeftext",
    b"ffentrymcr",
    b"ffexitmcr",
    b"ffformat",
    b"ffhelptext",
    b"ffl",
    b"ffname",
    b"ffstattext",
    b"field",
    b"file",
    b"filetbl",
    b"fldinst",
    b"fldrslt",
    b"fldtype",
    b"fontemb",
    b"fonttbl",
    b"footer",
    b"footerf",
    b"footerl",
    b"footerr",
    b"footnote",
    b"formfield",
    b"ftncn",
    b"ftnsep",
    b"ftnsepc",
    b"g",
    b"generator",
    b"gridtbl",
    b"header",
    b"headerf",
    b"headerl",
    b"headerr",
    b"hl",
    b"hlfr",
    b"hlinkbase",
    b"hlloc",
    b"hlsrc",
    b"hsv",
    b"info",
    b"keywords",
    b"latentstyles",
    b"lchars",
    b"levelnumbers",
    b"leveltext",
    b"lfolevel",
    b"linkval",
    b"list",
    b"listlevel",
    b"listname",
    b"listoverride",
    b"listoverridetable",
    b"listpicture",
    b"liststylename",
    b"listtable",
    b"listtext",
    b"lsdlockedexcept",
    b"macc",
    b"maccPr",
    b"mailmerge",
    b"malnScr",
    b"manager",
    b"margPr",
    b"mbar",
    b"mbarPr",
    b"mbaseJc",
    b"mbegChr",
    b"mborderBox",
    b"mborderBoxPr",
    b"mbox",
    b"mboxPr",
    b"mchr",
    b"mcount",
    b"mctrlPr",
    b"md",
    b"mdeg",
    b"mdegHide",
    b"mden",
    b"mdiff",
    b"mdPr",
    b"me",
    b"mendChr",
    b"meqArr",
    b"meqArrPr",
    b"mf",
    b"mfName",
    b"mfPr",
    b"mfunc",
    b"mfuncPr",
    b"mgroupChr",
    b"mgroupChrPr",
    b"mgrow",
    b"mhideBot",
    b"mhideLeft",
    b"mhideRight",
    b"mhideTop",
    b"mlim",
    b"mlimLoc",
    b"mlimLow",
    b"mlimLowPr",
    b"mlimUpp",
    b"mlimUppPr",
    b"mm",
    b"mmaddfieldname",
    b"mmathPict",
    b"mmaxDist",
    b"mmc",
    b"mmcJc",
    b"mmconnectstr",
    b"mmconnectstrdata",
    b"mmcPr",
    b"mmcs",
    b"mmdatasource",
    b"mmheadersource",
    b"mmmailsubject",
    b"mmodso",
    b"mmodsofilter",
    b"mmodsofldmpdata",
    b"mmodsomappedname",
    b"mmodsoname",
    b"mmodsorecipdata",
    b"mmodsosort",
    b"mmodsosrc",
    b"mmodsotable",
    b"mmodsoudl",
    b"mmodsoudldata",
    b"mmodsouniquetag",
    b"mmPr",
    b"mmquery",
    b"mmr",
    b"mnary",
    b"mnaryPr",
    b"mnoBreak",
    b"mnum",
    b"mobjDist",
    b"moMath",
    b"moMathPara",
    b"moMathParaPr",
    b"mopEmu",
    b"mphant",
    b"mphantPr",
    b"mplcHide",
    b"mpos",
    b"mr",
    b"mrad",
    b"mradPr",
    b"mrPr",
    b"msepChr",
    b"mshow",
    b"mshp",
    b"msPre",
    b"msPrePr",
    b"msSub",
    b"msSubPr",
    b"msSubSup",
    b"msSubSupPr",
    b"msSup",
    b"msSupPr",
    b"mstrikeBLTR",
    b"mstrikeH",
    b"mstrikeTLBR",
    b"mstrikeV",
    b"msub",
    b"msubHide",
    b"msup",
    b"msupHide",
    b"mtransp",
    b"mtype",
    b"mvertJc",
    b"mvfmf",
    b"mvfml",
    b"mvtof",
    b"mvtol",
    b"mzeroAsc",
    b"mzeroDesc",
    b"mzeroWid",
    b"nesttableprops",
    b"nonesttables",
    b"objalias",
    b"objclass",
    b"objdata",
    b"object",
    b"objname",
    b"objsect",
    b"oldcprops",
    b"oldpprops",
    b"oldsprops",
    b"oldtprops",
    b"oleclsid",
    b"operator",
    b"panose",
    b"password",
    b"passwordhash",
    b"pgp",
    b"pgptbl",
    b"picprop",
    b"pict",
    b"pn",
    b"pnseclvl",
    b"pntext",
    b"pntxta",
    b"pntxtb",
    b"printim",
    b"propname",
    b"protend",
    b"protstart",
    b"protusertbl",
    b"result",
    b"revtbl",
    b"revtim",
    b"rxe",
    b"shp",
    b"shpgrp",
    b"shpinst",
    b"shppict",
    b"shprslt",
    b"shptxt",
    b"sn",
    b"sp",
    b"staticval",
    b"stylesheet",
    b"subject",
    b"sv",
    b"svb",
    b"tc",
    b"template",
    b"themedata",
    b"title",
    b"txe",
    b"ud",
    b"upr",
    b"userprops",
    b"wgrffmtfilter",
    b"windowcaption",
    b"writereservation",
    b"writereservhash",
    b"xe",
    b"xform",
    b"xmlattrname",
    b"xmlattrvalue",
    b"xmlclose",
    b"xmlname",
    b"xmlnstbl",
    b"xmlopen",
    b"margSz",
    b"pnaiu",
    b"pnaiud",
))

_BACKSLASH = ord('\\')
_BRACE_OPEN = ord('{')
_BRACE_CLOSE = ord('}')
_SPACE = ord(' ')

_RE_CONTROL_WORD = re.compile(
    b'\\\\([a-zA-Z]{1,250})'
    b'(?:(-?\\d+)(?=[^0-9])|(?=[^a-zA-Z0-9])|$)'
)
_RE_CONTROL_SYMBOL = re.compile(b'\\\\[^a-zA-Z]')
_RE_TEXT = re.compile(b'[^{}\\\\]+')
_RE_NON_HEX = re.compile(b'[^a-fA-F0-9]')


class TYPE(enum.IntEnum):
    LINKED = 0x01
    EMBEDDED = 0x02


class OleObject(Struct[memoryview]):
    """
    Represents an OLE 1.0 Object parsed from binary data according to MS-OLEDS 2.2 OLE1.0 Format
    Structures.
    """
    def __init__(self, reader: StructReader[memoryview]):
        self.ole_version = reader.u32()
        format_id = reader.u32()
        try:
            self.format_id: TYPE | None = TYPE(format_id)
        except Exception:
            raise ValueError(F'Unknown OLE format ID {format_id:#x}.')
        self.class_name = reader.read_length_prefixed()[:-1]
        self.topic_name = reader.read_length_prefixed()[:-1]
        self.item_name = reader.read_length_prefixed()[:-1]
        if self.format_id == TYPE.EMBEDDED:
            self.data_size: int | None = reader.u32()
            self.data: memoryview = reader.read(self.data_size)
            self.extra_data: memoryview = reader.read()
        else:
            self.data_size = None
            self.data = memoryview(B'')
            self.extra_data = memoryview(B'')


class OleNativeStream(Struct):
    """
    Parses an OLE Package / OLE Native Stream structure to extract the embedded filename, paths,
    and payload data.
    """

    def __init__(self, reader: StructReader, package: bool = False):
        if not package:
            reader.u32()
        reader.u16()
        self.filename = reader.read_c_string('latin1')
        self.src_path = reader.read_c_string('latin1')
        reader.u32()
        reader.u32()
        self.temp_path = reader.read_c_string('latin1')
        try:
            self.actual_size = reader.u32()
            self.data: bytes | None = reader.read(self.actual_size)
            self.is_link = False
        except (EOF, IndexError):
            self.is_link = True
            self.actual_size = 0
            self.data = None


@dataclass
class RtfObject:
    """
    Represents an object embedded in an RTF document, with parsed OLE metadata when available.
    """
    start: int = 0
    end: int = 0
    hexdata: bytes = b''
    rawdata: bytes = b''
    is_ole: bool = False
    oledata: bytes | None = None
    oledata_size: int | None = None
    format_id: TYPE | None = None
    class_name: bytes | None = None
    is_package: bool = False
    olepkgdata: bytes | None = None
    filename: str | None = None
    src_path: str | None = None
    temp_path: str | None = None
    clsid: str | None = None
    clsid_desc: str | None = None


@dataclass
class _Destination:
    """
    Internal tracking state for an RTF destination (a named group that accumulates data).
    """
    cword: bytes | None = None
    data: bytearray = field(default_factory=bytearray)
    start: int = 0
    end: int = 0
    group_level: int = 0


class RtfParser:
    """
    Generic RTF parser implementing a state machine that tracks groups, destinations, control
    words, control symbols, binary data, and text. Designed to handle malformed RTF as MS Word
    does.
    """
    def __init__(self, data: bytes | bytearray | memoryview):
        self.data: bytes | bytearray | memoryview = data
        self.index: int = 0
        self.group_level: int = 0
        self.current_destination = document_destination = _Destination()
        self.destinations: list[_Destination] = [document_destination]
        n = len(data)
        while self.index < n:
            byte = data[self.index]
            if byte == _BRACE_OPEN:
                self._open_group()
                self.index += 1
                continue
            if byte == _BRACE_CLOSE:
                self._close_group()
                self.index += 1
                continue
            if byte == _BACKSLASH:
                m = _RE_CONTROL_WORD.match(data, self.index)
                if m:
                    cword = m.group(1)
                    param = m.group(2)
                    self._control_word(m, cword, param)
                    self.index += len(m.group())
                    if cword == b'bin':
                        self._bin(param)
                    continue
                m = _RE_CONTROL_SYMBOL.match(data, self.index)
                if m:
                    self.control_symbol(m)
                    self.index += len(m.group())
                    continue
            m = _RE_TEXT.match(data, self.index)
            if m:
                self._text(m)
                self.index += len(m.group())
                continue
            self.index += 1
        self._end_of_file()

    def _open_group(self) -> None:
        self.group_level += 1
        self.open_group()

    def open_group(self) -> None:
        pass

    def _close_group(self) -> None:
        self.close_group()
        if self.group_level == self.current_destination.group_level:
            self._close_destination()
        self.group_level -= 1

    def close_group(self) -> None:
        pass

    def _open_destination(
        self, matchobject: re.Match, cword: bytes
    ) -> None:
        if self.current_destination.group_level == self.group_level:
            self._close_destination()
        dest = _Destination(cword)
        dest.group_level = self.group_level
        dest.start = self.index + len(matchobject.group())
        self.destinations.append(dest)
        self.current_destination = dest
        self.open_destination(dest)

    def open_destination(self, destination: _Destination) -> None:
        pass

    def _close_destination(self) -> None:
        self.current_destination.end = self.index
        self.close_destination(self.current_destination)
        if self.destinations:
            self.destinations.pop()
        if self.destinations:
            self.current_destination = self.destinations[-1]

    def close_destination(self, destination: _Destination) -> None:
        pass

    def _control_word(
        self,
        matchobject: re.Match,
        cword: bytes,
        param: bytes | None,
    ) -> None:
        if cword in DESTINATION_CONTROL_WORDS:
            self._open_destination(matchobject, cword)
        self.control_word(matchobject, cword, param)

    def control_word(
        self,
        matchobject: re.Match,
        cword: bytes,
        param: bytes | None,
    ) -> None:
        pass

    def control_symbol(self, matchobject: re.Match) -> None:
        pass

    def _text(self, matchobject: re.Match) -> None:
        text = matchobject.group()
        self.current_destination.data += text
        self.text(matchobject, text)

    def text(
        self, matchobject: re.Match, text: bytes
    ) -> None:
        pass

    def _bin(self, param: bytes | None) -> None:
        if param is None:
            binlen = 0
        else:
            binlen = int(param)
        if binlen < 0:
            binlen = 0
        index = self.index
        try:
            if self.data[index] == _SPACE:
                index += 1
        except IndexError:
            pass
        bindata = bytes(self.data[index:(index := index + binlen)])
        self.index = index
        self.bin_data(bindata)

    def bin_data(self, bindata: bytes) -> None:
        pass

    def _end_of_file(self) -> None:
        while self.group_level > 0:
            self._close_group()
        self.end_of_file()

    def end_of_file(self) -> None:
        pass


def _extract_clsid(oledata: bytes) -> tuple[str | None, str | None]:
    """
    Attempt to read the root entry CLSID from OLE2 data using refinery's internal OLE parser.
    Returns (clsid, description) or (None, None).
    """
    from refinery.lib.ole.file import OleFile, is_ole_file
    try:
        if not is_ole_file(oledata):
            return None, None
        ole = OleFile(oledata)
        raw_clsid = ole.getclsid('/')
        if not raw_clsid:
            return None, None
        desc = KNOWN_CLSIDS.get(raw_clsid.upper())
        return raw_clsid, desc
    except Exception:
        return None, None


class RtfObjParser(RtfParser):
    """
    Specialized RTF parser that extracts embedded OLE objects from RTF documents. After calling
    parse(), the extracted objects are available in the `objects` list.
    """

    def __init__(self, data: bytes | bytearray | memoryview):
        self.objects: list[RtfObject] = []
        super().__init__(data)

    def open_destination(self, destination: _Destination) -> None:
        pass

    def close_destination(self, destination: _Destination) -> None:
        if destination.cword != b'objdata':
            return
        rtfobj = RtfObject()
        self.objects.append(rtfobj)
        rtfobj.start = destination.start
        rtfobj.end = destination.end
        hexdata = destination.data.translate(None, b' \t\r\n\f\v')
        hexdata = _RE_NON_HEX.sub(b'', hexdata)
        if len(hexdata) & 1:
            hexdata = hexdata[:-1]
        rtfobj.hexdata = hexdata
        try:
            object_data = bytes.fromhex(hexdata.decode('ascii'))
        except (ValueError, UnicodeDecodeError):
            return
        rtfobj.rawdata = object_data
        try:
            obj = OleObject.Parse(memoryview(object_data))
            rtfobj.format_id = obj.format_id
            rtfobj.class_name = obj.class_name
            rtfobj.oledata_size = obj.data_size
            rtfobj.oledata = obj.data
            rtfobj.is_ole = True
            if obj.class_name and bytes(obj.class_name).lower() == b'package':
                try:
                    opkg = OleNativeStream.Parse(obj.data, package=True)
                    rtfobj.filename = opkg.filename
                    rtfobj.src_path = opkg.src_path
                    rtfobj.temp_path = opkg.temp_path
                    rtfobj.olepkgdata = opkg.data
                    rtfobj.is_package = True
                except Exception:
                    pass
            elif obj.data:
                clsid, clsid_desc = _extract_clsid(obj.data)
                if clsid is not None:
                    rtfobj.clsid = clsid
                    rtfobj.clsid_desc = clsid_desc
        except Exception:
            pass

    def bin_data(self, bindata: bytes) -> None:
        if self.current_destination.cword == b'objdata':
            import binascii
            self.current_destination.data.extend(binascii.hexlify(bindata))

    def control_symbol(self, matchobject: re.Match) -> None:
        symbol = matchobject.group()[1:2]
        if symbol != b"'":
            return
        # MS Word hex escape: \'hh
        # Read two bytes following the escape (any characters, not
        # just hex) and advance the index by 2.
        self.index += 2
        if self.current_destination.cword == b'objdata':
            # Emulate the MS Word RTF parser bug: if the number of
            # hex digits accumulated so far is odd, the last digit
            # is silently dropped before the \' escape is processed.
            self.current_destination.data = bytearray(_RE_NON_HEX.sub(b'', self.current_destination.data))
            if len(self.current_destination.data) & 1:
                del self.current_destination.data[-1:]

Classes

class TYPE (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class TYPE(enum.IntEnum):
    LINKED = 0x01
    EMBEDDED = 0x02

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var LINKED

The type of the None singleton.

var EMBEDDED

The type of the None singleton.

class OleObject (reader)

Represents an OLE 1.0 Object parsed from binary data according to MS-OLEDS 2.2 OLE1.0 Format Structures.

Expand source code Browse git
class OleObject(Struct[memoryview]):
    """
    Represents an OLE 1.0 Object parsed from binary data according to MS-OLEDS 2.2 OLE1.0 Format
    Structures.
    """
    def __init__(self, reader: StructReader[memoryview]):
        self.ole_version = reader.u32()
        format_id = reader.u32()
        try:
            self.format_id: TYPE | None = TYPE(format_id)
        except Exception:
            raise ValueError(F'Unknown OLE format ID {format_id:#x}.')
        self.class_name = reader.read_length_prefixed()[:-1]
        self.topic_name = reader.read_length_prefixed()[:-1]
        self.item_name = reader.read_length_prefixed()[:-1]
        if self.format_id == TYPE.EMBEDDED:
            self.data_size: int | None = reader.u32()
            self.data: memoryview = reader.read(self.data_size)
            self.extra_data: memoryview = reader.read()
        else:
            self.data_size = None
            self.data = memoryview(B'')
            self.extra_data = memoryview(B'')

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class OleNativeStream (reader, package=False)

Parses an OLE Package / OLE Native Stream structure to extract the embedded filename, paths, and payload data.

Expand source code Browse git
class OleNativeStream(Struct):
    """
    Parses an OLE Package / OLE Native Stream structure to extract the embedded filename, paths,
    and payload data.
    """

    def __init__(self, reader: StructReader, package: bool = False):
        if not package:
            reader.u32()
        reader.u16()
        self.filename = reader.read_c_string('latin1')
        self.src_path = reader.read_c_string('latin1')
        reader.u32()
        reader.u32()
        self.temp_path = reader.read_c_string('latin1')
        try:
            self.actual_size = reader.u32()
            self.data: bytes | None = reader.read(self.actual_size)
            self.is_link = False
        except (EOF, IndexError):
            self.is_link = True
            self.actual_size = 0
            self.data = None

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class RtfObject (start=0, end=0, hexdata=b'', rawdata=b'', is_ole=False, oledata=None, oledata_size=None, format_id=None, class_name=None, is_package=False, olepkgdata=None, filename=None, src_path=None, temp_path=None, clsid=None, clsid_desc=None)

Represents an object embedded in an RTF document, with parsed OLE metadata when available.

Expand source code Browse git
@dataclass
class RtfObject:
    """
    Represents an object embedded in an RTF document, with parsed OLE metadata when available.
    """
    start: int = 0
    end: int = 0
    hexdata: bytes = b''
    rawdata: bytes = b''
    is_ole: bool = False
    oledata: bytes | None = None
    oledata_size: int | None = None
    format_id: TYPE | None = None
    class_name: bytes | None = None
    is_package: bool = False
    olepkgdata: bytes | None = None
    filename: str | None = None
    src_path: str | None = None
    temp_path: str | None = None
    clsid: str | None = None
    clsid_desc: str | None = None

Instance variables

var start

The type of the None singleton.

var end

The type of the None singleton.

var hexdata

The type of the None singleton.

var rawdata

The type of the None singleton.

var is_ole

The type of the None singleton.

var oledata

The type of the None singleton.

var oledata_size

The type of the None singleton.

var format_id

The type of the None singleton.

var class_name

The type of the None singleton.

var is_package

The type of the None singleton.

var olepkgdata

The type of the None singleton.

var filename

The type of the None singleton.

var src_path

The type of the None singleton.

var temp_path

The type of the None singleton.

var clsid

The type of the None singleton.

var clsid_desc

The type of the None singleton.

class RtfParser (data)

Generic RTF parser implementing a state machine that tracks groups, destinations, control words, control symbols, binary data, and text. Designed to handle malformed RTF as MS Word does.

Expand source code Browse git
class RtfParser:
    """
    Generic RTF parser implementing a state machine that tracks groups, destinations, control
    words, control symbols, binary data, and text. Designed to handle malformed RTF as MS Word
    does.
    """
    def __init__(self, data: bytes | bytearray | memoryview):
        self.data: bytes | bytearray | memoryview = data
        self.index: int = 0
        self.group_level: int = 0
        self.current_destination = document_destination = _Destination()
        self.destinations: list[_Destination] = [document_destination]
        n = len(data)
        while self.index < n:
            byte = data[self.index]
            if byte == _BRACE_OPEN:
                self._open_group()
                self.index += 1
                continue
            if byte == _BRACE_CLOSE:
                self._close_group()
                self.index += 1
                continue
            if byte == _BACKSLASH:
                m = _RE_CONTROL_WORD.match(data, self.index)
                if m:
                    cword = m.group(1)
                    param = m.group(2)
                    self._control_word(m, cword, param)
                    self.index += len(m.group())
                    if cword == b'bin':
                        self._bin(param)
                    continue
                m = _RE_CONTROL_SYMBOL.match(data, self.index)
                if m:
                    self.control_symbol(m)
                    self.index += len(m.group())
                    continue
            m = _RE_TEXT.match(data, self.index)
            if m:
                self._text(m)
                self.index += len(m.group())
                continue
            self.index += 1
        self._end_of_file()

    def _open_group(self) -> None:
        self.group_level += 1
        self.open_group()

    def open_group(self) -> None:
        pass

    def _close_group(self) -> None:
        self.close_group()
        if self.group_level == self.current_destination.group_level:
            self._close_destination()
        self.group_level -= 1

    def close_group(self) -> None:
        pass

    def _open_destination(
        self, matchobject: re.Match, cword: bytes
    ) -> None:
        if self.current_destination.group_level == self.group_level:
            self._close_destination()
        dest = _Destination(cword)
        dest.group_level = self.group_level
        dest.start = self.index + len(matchobject.group())
        self.destinations.append(dest)
        self.current_destination = dest
        self.open_destination(dest)

    def open_destination(self, destination: _Destination) -> None:
        pass

    def _close_destination(self) -> None:
        self.current_destination.end = self.index
        self.close_destination(self.current_destination)
        if self.destinations:
            self.destinations.pop()
        if self.destinations:
            self.current_destination = self.destinations[-1]

    def close_destination(self, destination: _Destination) -> None:
        pass

    def _control_word(
        self,
        matchobject: re.Match,
        cword: bytes,
        param: bytes | None,
    ) -> None:
        if cword in DESTINATION_CONTROL_WORDS:
            self._open_destination(matchobject, cword)
        self.control_word(matchobject, cword, param)

    def control_word(
        self,
        matchobject: re.Match,
        cword: bytes,
        param: bytes | None,
    ) -> None:
        pass

    def control_symbol(self, matchobject: re.Match) -> None:
        pass

    def _text(self, matchobject: re.Match) -> None:
        text = matchobject.group()
        self.current_destination.data += text
        self.text(matchobject, text)

    def text(
        self, matchobject: re.Match, text: bytes
    ) -> None:
        pass

    def _bin(self, param: bytes | None) -> None:
        if param is None:
            binlen = 0
        else:
            binlen = int(param)
        if binlen < 0:
            binlen = 0
        index = self.index
        try:
            if self.data[index] == _SPACE:
                index += 1
        except IndexError:
            pass
        bindata = bytes(self.data[index:(index := index + binlen)])
        self.index = index
        self.bin_data(bindata)

    def bin_data(self, bindata: bytes) -> None:
        pass

    def _end_of_file(self) -> None:
        while self.group_level > 0:
            self._close_group()
        self.end_of_file()

    def end_of_file(self) -> None:
        pass

Subclasses

Methods

def open_group(self)
Expand source code Browse git
def open_group(self) -> None:
    pass
def close_group(self)
Expand source code Browse git
def close_group(self) -> None:
    pass
def open_destination(self, destination)
Expand source code Browse git
def open_destination(self, destination: _Destination) -> None:
    pass
def close_destination(self, destination)
Expand source code Browse git
def close_destination(self, destination: _Destination) -> None:
    pass
def control_word(self, matchobject, cword, param)
Expand source code Browse git
def control_word(
    self,
    matchobject: re.Match,
    cword: bytes,
    param: bytes | None,
) -> None:
    pass
def control_symbol(self, matchobject)
Expand source code Browse git
def control_symbol(self, matchobject: re.Match) -> None:
    pass
def text(self, matchobject, text)
Expand source code Browse git
def text(
    self, matchobject: re.Match, text: bytes
) -> None:
    pass
def bin_data(self, bindata)
Expand source code Browse git
def bin_data(self, bindata: bytes) -> None:
    pass
def end_of_file(self)
Expand source code Browse git
def end_of_file(self) -> None:
    pass
class RtfObjParser (data)

Specialized RTF parser that extracts embedded OLE objects from RTF documents. After calling parse(), the extracted objects are available in the objects list.

Expand source code Browse git
class RtfObjParser(RtfParser):
    """
    Specialized RTF parser that extracts embedded OLE objects from RTF documents. After calling
    parse(), the extracted objects are available in the `objects` list.
    """

    def __init__(self, data: bytes | bytearray | memoryview):
        self.objects: list[RtfObject] = []
        super().__init__(data)

    def open_destination(self, destination: _Destination) -> None:
        pass

    def close_destination(self, destination: _Destination) -> None:
        if destination.cword != b'objdata':
            return
        rtfobj = RtfObject()
        self.objects.append(rtfobj)
        rtfobj.start = destination.start
        rtfobj.end = destination.end
        hexdata = destination.data.translate(None, b' \t\r\n\f\v')
        hexdata = _RE_NON_HEX.sub(b'', hexdata)
        if len(hexdata) & 1:
            hexdata = hexdata[:-1]
        rtfobj.hexdata = hexdata
        try:
            object_data = bytes.fromhex(hexdata.decode('ascii'))
        except (ValueError, UnicodeDecodeError):
            return
        rtfobj.rawdata = object_data
        try:
            obj = OleObject.Parse(memoryview(object_data))
            rtfobj.format_id = obj.format_id
            rtfobj.class_name = obj.class_name
            rtfobj.oledata_size = obj.data_size
            rtfobj.oledata = obj.data
            rtfobj.is_ole = True
            if obj.class_name and bytes(obj.class_name).lower() == b'package':
                try:
                    opkg = OleNativeStream.Parse(obj.data, package=True)
                    rtfobj.filename = opkg.filename
                    rtfobj.src_path = opkg.src_path
                    rtfobj.temp_path = opkg.temp_path
                    rtfobj.olepkgdata = opkg.data
                    rtfobj.is_package = True
                except Exception:
                    pass
            elif obj.data:
                clsid, clsid_desc = _extract_clsid(obj.data)
                if clsid is not None:
                    rtfobj.clsid = clsid
                    rtfobj.clsid_desc = clsid_desc
        except Exception:
            pass

    def bin_data(self, bindata: bytes) -> None:
        if self.current_destination.cword == b'objdata':
            import binascii
            self.current_destination.data.extend(binascii.hexlify(bindata))

    def control_symbol(self, matchobject: re.Match) -> None:
        symbol = matchobject.group()[1:2]
        if symbol != b"'":
            return
        # MS Word hex escape: \'hh
        # Read two bytes following the escape (any characters, not
        # just hex) and advance the index by 2.
        self.index += 2
        if self.current_destination.cword == b'objdata':
            # Emulate the MS Word RTF parser bug: if the number of
            # hex digits accumulated so far is odd, the last digit
            # is silently dropped before the \' escape is processed.
            self.current_destination.data = bytearray(_RE_NON_HEX.sub(b'', self.current_destination.data))
            if len(self.current_destination.data) & 1:
                del self.current_destination.data[-1:]

Ancestors

Methods

def open_destination(self, destination)
Expand source code Browse git
def open_destination(self, destination: _Destination) -> None:
    pass
def close_destination(self, destination)
Expand source code Browse git
def close_destination(self, destination: _Destination) -> None:
    if destination.cword != b'objdata':
        return
    rtfobj = RtfObject()
    self.objects.append(rtfobj)
    rtfobj.start = destination.start
    rtfobj.end = destination.end
    hexdata = destination.data.translate(None, b' \t\r\n\f\v')
    hexdata = _RE_NON_HEX.sub(b'', hexdata)
    if len(hexdata) & 1:
        hexdata = hexdata[:-1]
    rtfobj.hexdata = hexdata
    try:
        object_data = bytes.fromhex(hexdata.decode('ascii'))
    except (ValueError, UnicodeDecodeError):
        return
    rtfobj.rawdata = object_data
    try:
        obj = OleObject.Parse(memoryview(object_data))
        rtfobj.format_id = obj.format_id
        rtfobj.class_name = obj.class_name
        rtfobj.oledata_size = obj.data_size
        rtfobj.oledata = obj.data
        rtfobj.is_ole = True
        if obj.class_name and bytes(obj.class_name).lower() == b'package':
            try:
                opkg = OleNativeStream.Parse(obj.data, package=True)
                rtfobj.filename = opkg.filename
                rtfobj.src_path = opkg.src_path
                rtfobj.temp_path = opkg.temp_path
                rtfobj.olepkgdata = opkg.data
                rtfobj.is_package = True
            except Exception:
                pass
        elif obj.data:
            clsid, clsid_desc = _extract_clsid(obj.data)
            if clsid is not None:
                rtfobj.clsid = clsid
                rtfobj.clsid_desc = clsid_desc
    except Exception:
        pass
def bin_data(self, bindata)
Expand source code Browse git
def bin_data(self, bindata: bytes) -> None:
    if self.current_destination.cword == b'objdata':
        import binascii
        self.current_destination.data.extend(binascii.hexlify(bindata))
def control_symbol(self, matchobject)
Expand source code Browse git
def control_symbol(self, matchobject: re.Match) -> None:
    symbol = matchobject.group()[1:2]
    if symbol != b"'":
        return
    # MS Word hex escape: \'hh
    # Read two bytes following the escape (any characters, not
    # just hex) and advance the index by 2.
    self.index += 2
    if self.current_destination.cword == b'objdata':
        # Emulate the MS Word RTF parser bug: if the number of
        # hex digits accumulated so far is odd, the last digit
        # is silently dropped before the \' escape is processed.
        self.current_destination.data = bytearray(_RE_NON_HEX.sub(b'', self.current_destination.data))
        if len(self.current_destination.data) & 1:
            del self.current_destination.data[-1:]