Module refinery.units.formats.office.doctxt

Expand source code Browse git
from __future__ import annotations

import codecs

from collections import OrderedDict
from io import StringIO
from typing import TYPE_CHECKING, Callable

from defusedxml.ElementTree import XML

if TYPE_CHECKING:
    from xml.etree.ElementTree import Element

from refinery.lib.frame import Chunk
from refinery.lib.ole.file import OleFile
from refinery.lib.structures import StructReader
from refinery.units.formats import Unit
from refinery.units.formats.archive.xtzip import xtzip


class doctxt(Unit):
    """
    Extracts the text body from Word documents. Handles both legacy .doc (OLE) and modern .docx
    (OOXML) Microsoft Word file formats.
    """
    @classmethod
    def handles(cls, data) -> bool | None:
        from refinery.lib.id import is_likely_doc
        if is_likely_doc(data):
            return True

    def process(self, data: bytearray):
        extractors: dict[str, Callable[[bytearray], str]] = OrderedDict(
            doc=self._extract_ole,
            docx=self._extract_docx,
            odt=self._extract_odt,
        )
        if data.startswith(B'PK'):
            self.log_debug('document contains zip file signature, likely a odt or docx file')
            extractors.move_to_end('doc')
            if 'opendocument' in str(data | xtzip('mimetype')):
                self.log_debug('odt signature detected')
                extractors.move_to_end('odt', last=False)
        for filetype, extractor in extractors.items():
            self.log_debug(F'trying to extract as {filetype}')
            try:
                result = extractor(data)
            except ImportError:
                raise
            except Exception as error:
                self.log_info(F'failed extractring as {filetype}: {error!s}')
            else:
                return result.encode(self.codec)
        raise ValueError('All extractors failed, the input data is not recognized as any known document format.')

    def _extract_docx(self, data: Chunk) -> str:
        NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
        PARAGRAPH = F'{NAMESPACE}p'
        TEXT = F'{NAMESPACE}t'
        chunk = data | xtzip('word/document.xml') | bytearray
        if not chunk:
            raise ValueError('No document.xml file found.')
        root: Element = XML(chunk)
        with StringIO() as output:
            for index, paragraph in enumerate(root.iter(PARAGRAPH)):
                if index > 0:
                    output.write('\n')
                for node in paragraph.iter(TEXT):
                    if node.text:
                        output.write(node.text)
            return output.getvalue()

    def _extract_odt(self, data: bytes):
        def _extract_text(node: Element):
            NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:text:1.0}'
            PARAGRAPH = F'{NAMESPACE}p'
            SPAN = F'{NAMESPACE}span'
            SPACE = F'{NAMESPACE}s'
            with StringIO() as res:
                for element in node:
                    tag = element.tag
                    text = element.text or ''
                    tail = element.tail or ''
                    if tag in [PARAGRAPH, SPAN]:
                        res.write(text)
                    elif tag == SPACE:
                        res.write(' ')
                    else:
                        self.log_debug(F'unknown tag: {tag}')
                    res.write(_extract_text(element))
                    res.write(tail)
                    if tag == PARAGRAPH:
                        res.write('\n')
                return res.getvalue()

        NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:office:1.0}'
        BODY = F'{NAMESPACE}body'
        TEXT = F'{NAMESPACE}text'
        for part in xtzip().unpack(data):
            if part.path != 'content.xml':
                continue
            xml_content: bytes = part.get_data()
            root: Element = XML(xml_content)
            body: Element = root.find(BODY)
            text: Element = body.find(TEXT)
            return _extract_text(text)
        else:
            raise ValueError('found no text')

    def _extract_ole(self, data: bytearray) -> str:
        with OleFile(data) as ole:
            doc = ole.openstream('WordDocument').read()
            with StructReader(doc) as reader:
                table_name = F'{(doc[11] >> 1) & 1}Table'
                reader.seek(0x1A2)
                offset = reader.u32()
                length = reader.u32()
            with StructReader(ole.openstream(table_name).read()) as reader:
                reader.seek(offset)
                table = reader.read(length)
            piece_table = self._load_piece_table(table)
            return self._get_text(doc, piece_table)

    def _load_piece_table(self, table: bytes) -> bytes:
        with StructReader(table) as reader:
            while not reader.eof:
                entry_type = reader.read_byte()
                if entry_type == 1:
                    reader.seekrel(reader.read_byte())
                    continue
                if entry_type == 2:
                    length = reader.u32()
                    return reader.read(length)
                raise NotImplementedError(F'Unsupported table entry type value 0x{entry_type:X}.')

    def _get_text(self, doc: bytes, piece_table: bytes) -> str:
        piece_count: int = 1 + (len(piece_table) - 4) // 12
        with StringIO() as text:
            with StructReader(piece_table) as reader:
                character_positions = [reader.u32() for _ in range(piece_count)]
                for i in range(piece_count - 1):
                    cp_start = character_positions[i]
                    cp_end = character_positions[i + 1]
                    fc_value = reader.read_one_struct('xxLxx')
                    is_ansi = bool((fc_value >> 30) & 1)
                    fc = fc_value & 0xBFFFFFFF
                    cb = cp_end - cp_start
                    if is_ansi:
                        encoding = 'cp1252'
                        fc = fc // 2
                    else:
                        encoding = 'utf16'
                        cb *= 2
                    raw = doc[fc : fc + cb]
                    text.write(codecs.decode(raw, encoding).replace('\r', '\n'))
            return text.getvalue()

Classes

class doctxt

Extracts the text body from Word documents. Handles both legacy .doc (OLE) and modern .docx (OOXML) Microsoft Word file formats.

Expand source code Browse git
class doctxt(Unit):
    """
    Extracts the text body from Word documents. Handles both legacy .doc (OLE) and modern .docx
    (OOXML) Microsoft Word file formats.
    """
    @classmethod
    def handles(cls, data) -> bool | None:
        from refinery.lib.id import is_likely_doc
        if is_likely_doc(data):
            return True

    def process(self, data: bytearray):
        extractors: dict[str, Callable[[bytearray], str]] = OrderedDict(
            doc=self._extract_ole,
            docx=self._extract_docx,
            odt=self._extract_odt,
        )
        if data.startswith(B'PK'):
            self.log_debug('document contains zip file signature, likely a odt or docx file')
            extractors.move_to_end('doc')
            if 'opendocument' in str(data | xtzip('mimetype')):
                self.log_debug('odt signature detected')
                extractors.move_to_end('odt', last=False)
        for filetype, extractor in extractors.items():
            self.log_debug(F'trying to extract as {filetype}')
            try:
                result = extractor(data)
            except ImportError:
                raise
            except Exception as error:
                self.log_info(F'failed extractring as {filetype}: {error!s}')
            else:
                return result.encode(self.codec)
        raise ValueError('All extractors failed, the input data is not recognized as any known document format.')

    def _extract_docx(self, data: Chunk) -> str:
        NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
        PARAGRAPH = F'{NAMESPACE}p'
        TEXT = F'{NAMESPACE}t'
        chunk = data | xtzip('word/document.xml') | bytearray
        if not chunk:
            raise ValueError('No document.xml file found.')
        root: Element = XML(chunk)
        with StringIO() as output:
            for index, paragraph in enumerate(root.iter(PARAGRAPH)):
                if index > 0:
                    output.write('\n')
                for node in paragraph.iter(TEXT):
                    if node.text:
                        output.write(node.text)
            return output.getvalue()

    def _extract_odt(self, data: bytes):
        def _extract_text(node: Element):
            NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:text:1.0}'
            PARAGRAPH = F'{NAMESPACE}p'
            SPAN = F'{NAMESPACE}span'
            SPACE = F'{NAMESPACE}s'
            with StringIO() as res:
                for element in node:
                    tag = element.tag
                    text = element.text or ''
                    tail = element.tail or ''
                    if tag in [PARAGRAPH, SPAN]:
                        res.write(text)
                    elif tag == SPACE:
                        res.write(' ')
                    else:
                        self.log_debug(F'unknown tag: {tag}')
                    res.write(_extract_text(element))
                    res.write(tail)
                    if tag == PARAGRAPH:
                        res.write('\n')
                return res.getvalue()

        NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:office:1.0}'
        BODY = F'{NAMESPACE}body'
        TEXT = F'{NAMESPACE}text'
        for part in xtzip().unpack(data):
            if part.path != 'content.xml':
                continue
            xml_content: bytes = part.get_data()
            root: Element = XML(xml_content)
            body: Element = root.find(BODY)
            text: Element = body.find(TEXT)
            return _extract_text(text)
        else:
            raise ValueError('found no text')

    def _extract_ole(self, data: bytearray) -> str:
        with OleFile(data) as ole:
            doc = ole.openstream('WordDocument').read()
            with StructReader(doc) as reader:
                table_name = F'{(doc[11] >> 1) & 1}Table'
                reader.seek(0x1A2)
                offset = reader.u32()
                length = reader.u32()
            with StructReader(ole.openstream(table_name).read()) as reader:
                reader.seek(offset)
                table = reader.read(length)
            piece_table = self._load_piece_table(table)
            return self._get_text(doc, piece_table)

    def _load_piece_table(self, table: bytes) -> bytes:
        with StructReader(table) as reader:
            while not reader.eof:
                entry_type = reader.read_byte()
                if entry_type == 1:
                    reader.seekrel(reader.read_byte())
                    continue
                if entry_type == 2:
                    length = reader.u32()
                    return reader.read(length)
                raise NotImplementedError(F'Unsupported table entry type value 0x{entry_type:X}.')

    def _get_text(self, doc: bytes, piece_table: bytes) -> str:
        piece_count: int = 1 + (len(piece_table) - 4) // 12
        with StringIO() as text:
            with StructReader(piece_table) as reader:
                character_positions = [reader.u32() for _ in range(piece_count)]
                for i in range(piece_count - 1):
                    cp_start = character_positions[i]
                    cp_end = character_positions[i + 1]
                    fc_value = reader.read_one_struct('xxLxx')
                    is_ansi = bool((fc_value >> 30) & 1)
                    fc = fc_value & 0xBFFFFFFF
                    cb = cp_end - cp_start
                    if is_ansi:
                        encoding = 'cp1252'
                        fc = fc // 2
                    else:
                        encoding = 'utf16'
                        cb *= 2
                    raw = doc[fc : fc + cb]
                    text.write(codecs.decode(raw, encoding).replace('\r', '\n'))
            return text.getvalue()

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Inherited members