Module refinery.units.formats.office.doctxt
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from collections import OrderedDict
from io import StringIO
from typing import Callable, Dict, TYPE_CHECKING
from defusedxml.ElementTree import XML
if TYPE_CHECKING:
from xml.etree.ElementTree import Element
from refinery.lib.frame import Chunk
from refinery.lib.structures import MemoryFile, StructReader
from refinery.units.formats import Unit
from refinery.units.formats.archive.xtzip import xtzip
class doctxt(Unit):
"""
Extracts the text body from Word documents.
"""
@Unit.Requires('olefile', 'formats', 'office', 'extended')
def _olefile():
import olefile
return olefile
def process(self, data: bytearray):
extractors: Dict[str, Callable[[bytearray], str]] = OrderedDict(
doc=self._extract_ole,
docx=self._extract_docx,
odt=self._extract_odt,
)
if data.startswith(B'PK'):
self.log_debug('document contains zip file signature, likely a odt or docx file')
extractors.move_to_end('doc')
if 'opendocument' in str(data | xtzip('mimetype')):
self.log_debug('odt signature detected')
extractors.move_to_end('odt', last=False)
for filetype, extractor in extractors.items():
self.log_debug(F'trying to extract as {filetype}')
try:
result = extractor(data)
except ImportError:
raise
except Exception as error:
self.log_info(F'failed extractring as {filetype}: {error!s}')
else:
return result.encode(self.codec)
raise ValueError('All extractors failed, the input data is not recognized as any known document format.')
def _extract_docx(self, data: Chunk) -> str:
NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}'
PARAGRAPH = F'{NAMESPACE}p'
TEXT = F'{NAMESPACE}t'
chunk = data | xtzip('word/document.xml') | bytearray
if not chunk:
raise ValueError('No document.xml file found.')
root: Element = XML(chunk)
with StringIO() as output:
for index, paragraph in enumerate(root.iter(PARAGRAPH)):
if index > 0:
output.write('\n')
for node in paragraph.iter(TEXT):
if node.text:
output.write(node.text)
return output.getvalue()
def _extract_odt(self, data: bytes):
def _extract_text(node: Element):
NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:text:1.0}'
PARAGRAPH = F'{NAMESPACE}p'
SPAN = F'{NAMESPACE}span'
SPACE = F'{NAMESPACE}s'
with StringIO() as res:
for element in node:
tag = element.tag
text = element.text or ''
tail = element.tail or ''
if tag in [PARAGRAPH, SPAN]:
res.write(text)
elif tag == SPACE:
res.write(' ')
else:
self.log_debug(F'unknown tag: {tag}')
res.write(_extract_text(element))
res.write(tail)
if tag == PARAGRAPH:
res.write('\n')
return res.getvalue()
NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:office:1.0}'
BODY = F'{NAMESPACE}body'
TEXT = F'{NAMESPACE}text'
for part in xtzip().unpack(data):
if part.path != 'content.xml':
continue
xml_content: bytes = part.get_data()
root: Element = XML(xml_content)
body: Element = root.find(BODY)
text: Element = body.find(TEXT)
return _extract_text(text)
else:
raise ValueError('found no text')
def _extract_ole(self, data: bytearray) -> str:
stream = MemoryFile(data)
with self._olefile.OleFileIO(stream) as ole:
doc = ole.openstream('WordDocument').read()
with StructReader(doc) as reader:
table_name = F'{(doc[11] >> 1) & 1}Table'
reader.seek(0x1A2)
offset = reader.u32()
length = reader.u32()
with StructReader(ole.openstream(table_name).read()) as reader:
reader.seek(offset)
table = reader.read(length)
piece_table = self._load_piece_table(table)
return self._get_text(doc, piece_table)
def _load_piece_table(self, table: bytes) -> bytes:
with StructReader(table) as reader:
while not reader.eof:
entry_type = reader.read_byte()
if entry_type == 1:
reader.seekrel(reader.read_byte())
continue
if entry_type == 2:
length = reader.u32()
return reader.read(length)
raise NotImplementedError(F'Unsupported table entry type value 0x{entry_type:X}.')
def _get_text(self, doc: bytes, piece_table: bytes) -> str:
piece_count: int = 1 + (len(piece_table) - 4) // 12
with StringIO() as text:
with StructReader(piece_table) as reader:
character_positions = [reader.u32() for _ in range(piece_count)]
for i in range(piece_count - 1):
cp_start = character_positions[i]
cp_end = character_positions[i + 1]
fc_value = reader.read_struct('xxLxx', unwrap=True)
is_ansi = bool((fc_value >> 30) & 1)
fc = fc_value & 0xBFFFFFFF
cb = cp_end - cp_start
if is_ansi:
encoding = 'cp1252'
fc = fc // 2
else:
encoding = 'utf16'
cb *= 2
raw = doc[fc : fc + cb]
text.write(raw.decode(encoding).replace('\r', '\n'))
return text.getvalue()
Classes
class doctxt
-
Extracts the text body from Word documents.
Expand source code Browse git
class doctxt(Unit): """ Extracts the text body from Word documents. """ @Unit.Requires('olefile', 'formats', 'office', 'extended') def _olefile(): import olefile return olefile def process(self, data: bytearray): extractors: Dict[str, Callable[[bytearray], str]] = OrderedDict( doc=self._extract_ole, docx=self._extract_docx, odt=self._extract_odt, ) if data.startswith(B'PK'): self.log_debug('document contains zip file signature, likely a odt or docx file') extractors.move_to_end('doc') if 'opendocument' in str(data | xtzip('mimetype')): self.log_debug('odt signature detected') extractors.move_to_end('odt', last=False) for filetype, extractor in extractors.items(): self.log_debug(F'trying to extract as {filetype}') try: result = extractor(data) except ImportError: raise except Exception as error: self.log_info(F'failed extractring as {filetype}: {error!s}') else: return result.encode(self.codec) raise ValueError('All extractors failed, the input data is not recognized as any known document format.') def _extract_docx(self, data: Chunk) -> str: NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}' PARAGRAPH = F'{NAMESPACE}p' TEXT = F'{NAMESPACE}t' chunk = data | xtzip('word/document.xml') | bytearray if not chunk: raise ValueError('No document.xml file found.') root: Element = XML(chunk) with StringIO() as output: for index, paragraph in enumerate(root.iter(PARAGRAPH)): if index > 0: output.write('\n') for node in paragraph.iter(TEXT): if node.text: output.write(node.text) return output.getvalue() def _extract_odt(self, data: bytes): def _extract_text(node: Element): NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:text:1.0}' PARAGRAPH = F'{NAMESPACE}p' SPAN = F'{NAMESPACE}span' SPACE = F'{NAMESPACE}s' with StringIO() as res: for element in node: tag = element.tag text = element.text or '' tail = element.tail or '' if tag in [PARAGRAPH, SPAN]: res.write(text) elif tag == SPACE: res.write(' ') else: self.log_debug(F'unknown tag: {tag}') res.write(_extract_text(element)) res.write(tail) if tag == PARAGRAPH: res.write('\n') return res.getvalue() NAMESPACE = '{urn:oasis:names:tc:opendocument:xmlns:office:1.0}' BODY = F'{NAMESPACE}body' TEXT = F'{NAMESPACE}text' for part in xtzip().unpack(data): if part.path != 'content.xml': continue xml_content: bytes = part.get_data() root: Element = XML(xml_content) body: Element = root.find(BODY) text: Element = body.find(TEXT) return _extract_text(text) else: raise ValueError('found no text') def _extract_ole(self, data: bytearray) -> str: stream = MemoryFile(data) with self._olefile.OleFileIO(stream) as ole: doc = ole.openstream('WordDocument').read() with StructReader(doc) as reader: table_name = F'{(doc[11] >> 1) & 1}Table' reader.seek(0x1A2) offset = reader.u32() length = reader.u32() with StructReader(ole.openstream(table_name).read()) as reader: reader.seek(offset) table = reader.read(length) piece_table = self._load_piece_table(table) return self._get_text(doc, piece_table) def _load_piece_table(self, table: bytes) -> bytes: with StructReader(table) as reader: while not reader.eof: entry_type = reader.read_byte() if entry_type == 1: reader.seekrel(reader.read_byte()) continue if entry_type == 2: length = reader.u32() return reader.read(length) raise NotImplementedError(F'Unsupported table entry type value 0x{entry_type:X}.') def _get_text(self, doc: bytes, piece_table: bytes) -> str: piece_count: int = 1 + (len(piece_table) - 4) // 12 with StringIO() as text: with StructReader(piece_table) as reader: character_positions = [reader.u32() for _ in range(piece_count)] for i in range(piece_count - 1): cp_start = character_positions[i] cp_end = character_positions[i + 1] fc_value = reader.read_struct('xxLxx', unwrap=True) is_ansi = bool((fc_value >> 30) & 1) fc = fc_value & 0xBFFFFFFF cb = cp_end - cp_start if is_ansi: encoding = 'cp1252' fc = fc // 2 else: encoding = 'utf16' cb *= 2 raw = doc[fc : fc + cb] text.write(raw.decode(encoding).replace('\r', '\n')) return text.getvalue()
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members