Module refinery.units.formats.pdf
Expand source code Browse git
from __future__ import annotations
from typing import Optional, Set, TYPE_CHECKING, cast
from itertools import islice
import re
if TYPE_CHECKING:
from pypdf.generic import EncodedStreamObject
from pymupdf import Page
from refinery.units.formats.archive import ArchiveUnit, UnpackResult
from refinery.lib.tools import NoLogging
from refinery.lib.mime import get_cached_file_magic_info
from refinery.lib.structures import MemoryFile
def isdict(object):
return isinstance(object, dict) or all(hasattr(object, method) for method in ['items', 'values', 'keys'])
class xtpdf(ArchiveUnit):
"""
Extract objects from PDF documents.
"""
@ArchiveUnit.Requires('pypdf>=3.1.0', ['formats', 'default', 'extended'])
def _pypdf2():
import pypdf
import pypdf.generic
return pypdf
@ArchiveUnit.Requires('pymupdf', ['formats', 'default', 'extended'])
def _mupdf():
import os
for setting in ('PYMUPDF_MESSAGE', 'PYMUPDF_LOG'):
os.environ[setting] = F'path:{os.devnull}'
import pymupdf
return pymupdf
def _walk_raw(self, blob, memo: Optional[Set[int]] = None, *path):
while isinstance(blob, self._pypdf2.generic.IndirectObject):
try:
blob = blob.get_object()
except Exception:
break
if memo is None:
memo = {id(blob)}
elif id(blob) in memo:
return
else:
memo.add(id(blob))
try:
name = blob['/F']
blob = blob['/EF']['/F']
except Exception:
pass
else:
def unhex(match):
return bytes.fromhex(match[1]).decode('latin1')
name = re.sub('#([0-9a-fA-F]{2})', unhex, name)
path = *path[:-1], F'/{name}'
try:
def extract():
with NoLogging():
return get_data()
if TYPE_CHECKING:
blob = cast(EncodedStreamObject, blob)
get_data = blob.get_data
except AttributeError:
pass
else:
yield UnpackResult(''.join(path), extract, kind='object')
return
if isinstance(blob, self._pypdf2.generic.ByteStringObject):
yield UnpackResult(''.join(path), blob, kind='bytes')
return
if isinstance(blob, self._pypdf2.generic.TextStringObject):
yield UnpackResult(''.join(path), blob.encode(self.codec), kind='string')
return
if isinstance(blob, (
self._pypdf2.generic.BooleanObject,
self._pypdf2.generic.ByteStringObject,
self._pypdf2.generic.FloatObject,
self._pypdf2.generic.NameObject,
self._pypdf2.generic.NullObject,
self._pypdf2.generic.NumberObject,
self._pypdf2.generic.RectangleObject,
)):
# unhandled PDF objects
return
if isinstance(blob, self._pypdf2.generic.TreeObject):
blob = list(blob)
pdf = self._pypdf2.generic.PdfObject
if isinstance(blob, list):
if (
len(blob) % 2 == 0
and all(isinstance(key, str) for key in islice(iter(blob), 0, None, 2))
and all(isinstance(key, pdf) for key in islice(iter(blob), 1, None, 2))
):
blob = dict(zip(*([iter(blob)] * 2)))
else:
for key, value in enumerate(blob):
yield from self._walk_raw(value, memo, *path, F'/{key}')
return
if not isdict(blob):
return
assert isinstance(blob, dict)
for key, value in blob.items():
if not isinstance(key, str):
continue
if not key.startswith('/'):
key = F'/{key}'
yield from self._walk_raw(value, memo, *path, key)
def unpack(self, data):
try:
mu = self._mupdf.open(stream=data, filetype='pdf')
except Exception:
mu = password = None
else:
if password := self.args.pwd or None:
if mu.is_encrypted:
mu.authenticate(password)
else:
self.log_warn('This PDF document is not protected; ignoring password argument.')
password = None
elif mu.is_encrypted:
raise ValueError('This PDF is password protected.')
with MemoryFile(data, output=bytes) as stream:
with NoLogging():
pdf = self._pypdf2.PdfReader(stream, password=password)
catalog = pdf.trailer['/Root']
yield from self._walk_raw(catalog, None, 'raw')
if mu is None:
return
for k in range(len(mu)):
with NoLogging(NoLogging.Mode.ALL):
try:
page: Page = mu[k]
text = page.get_textpage()
except Exception:
continue
yield UnpackResult(F'parsed/page{k}.html', text.extractHTML().encode(self.codec))
yield UnpackResult(F'parsed/page{k}.json', text.extractJSON().encode(self.codec))
yield UnpackResult(F'parsed/page{k}.txt', text.extractText().encode(self.codec))
for j, image in enumerate(page.get_images(), 1):
xref = image[0]
base = mu.extract_image(xref)
data = base['image']
info = get_cached_file_magic_info(data)
yield UnpackResult(F'parsed/page{k}/img{j}.{info.extension}', data)
@classmethod
def handles(cls, data: bytearray) -> Optional[bool]:
return data.startswith(B'%PDF-')
Functions
def isdict(object)
-
Expand source code Browse git
def isdict(object): return isinstance(object, dict) or all(hasattr(object, method) for method in ['items', 'values', 'keys'])
Classes
class xtpdf (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')
-
Extract objects from PDF documents.
Expand source code Browse git
class xtpdf(ArchiveUnit): """ Extract objects from PDF documents. """ @ArchiveUnit.Requires('pypdf>=3.1.0', ['formats', 'default', 'extended']) def _pypdf2(): import pypdf import pypdf.generic return pypdf @ArchiveUnit.Requires('pymupdf', ['formats', 'default', 'extended']) def _mupdf(): import os for setting in ('PYMUPDF_MESSAGE', 'PYMUPDF_LOG'): os.environ[setting] = F'path:{os.devnull}' import pymupdf return pymupdf def _walk_raw(self, blob, memo: Optional[Set[int]] = None, *path): while isinstance(blob, self._pypdf2.generic.IndirectObject): try: blob = blob.get_object() except Exception: break if memo is None: memo = {id(blob)} elif id(blob) in memo: return else: memo.add(id(blob)) try: name = blob['/F'] blob = blob['/EF']['/F'] except Exception: pass else: def unhex(match): return bytes.fromhex(match[1]).decode('latin1') name = re.sub('#([0-9a-fA-F]{2})', unhex, name) path = *path[:-1], F'/{name}' try: def extract(): with NoLogging(): return get_data() if TYPE_CHECKING: blob = cast(EncodedStreamObject, blob) get_data = blob.get_data except AttributeError: pass else: yield UnpackResult(''.join(path), extract, kind='object') return if isinstance(blob, self._pypdf2.generic.ByteStringObject): yield UnpackResult(''.join(path), blob, kind='bytes') return if isinstance(blob, self._pypdf2.generic.TextStringObject): yield UnpackResult(''.join(path), blob.encode(self.codec), kind='string') return if isinstance(blob, ( self._pypdf2.generic.BooleanObject, self._pypdf2.generic.ByteStringObject, self._pypdf2.generic.FloatObject, self._pypdf2.generic.NameObject, self._pypdf2.generic.NullObject, self._pypdf2.generic.NumberObject, self._pypdf2.generic.RectangleObject, )): # unhandled PDF objects return if isinstance(blob, self._pypdf2.generic.TreeObject): blob = list(blob) pdf = self._pypdf2.generic.PdfObject if isinstance(blob, list): if ( len(blob) % 2 == 0 and all(isinstance(key, str) for key in islice(iter(blob), 0, None, 2)) and all(isinstance(key, pdf) for key in islice(iter(blob), 1, None, 2)) ): blob = dict(zip(*([iter(blob)] * 2))) else: for key, value in enumerate(blob): yield from self._walk_raw(value, memo, *path, F'/{key}') return if not isdict(blob): return assert isinstance(blob, dict) for key, value in blob.items(): if not isinstance(key, str): continue if not key.startswith('/'): key = F'/{key}' yield from self._walk_raw(value, memo, *path, key) def unpack(self, data): try: mu = self._mupdf.open(stream=data, filetype='pdf') except Exception: mu = password = None else: if password := self.args.pwd or None: if mu.is_encrypted: mu.authenticate(password) else: self.log_warn('This PDF document is not protected; ignoring password argument.') password = None elif mu.is_encrypted: raise ValueError('This PDF is password protected.') with MemoryFile(data, output=bytes) as stream: with NoLogging(): pdf = self._pypdf2.PdfReader(stream, password=password) catalog = pdf.trailer['/Root'] yield from self._walk_raw(catalog, None, 'raw') if mu is None: return for k in range(len(mu)): with NoLogging(NoLogging.Mode.ALL): try: page: Page = mu[k] text = page.get_textpage() except Exception: continue yield UnpackResult(F'parsed/page{k}.html', text.extractHTML().encode(self.codec)) yield UnpackResult(F'parsed/page{k}.json', text.extractJSON().encode(self.codec)) yield UnpackResult(F'parsed/page{k}.txt', text.extractText().encode(self.codec)) for j, image in enumerate(page.get_images(), 1): xref = image[0] base = mu.extract_image(xref) data = base['image'] info = get_cached_file_magic_info(data) yield UnpackResult(F'parsed/page{k}/img{j}.{info.extension}', data) @classmethod def handles(cls, data: bytearray) -> Optional[bool]: return data.startswith(B'%PDF-')
Ancestors
Subclasses
Class variables
var required_dependencies
var console
var reverse
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): try: mu = self._mupdf.open(stream=data, filetype='pdf') except Exception: mu = password = None else: if password := self.args.pwd or None: if mu.is_encrypted: mu.authenticate(password) else: self.log_warn('This PDF document is not protected; ignoring password argument.') password = None elif mu.is_encrypted: raise ValueError('This PDF is password protected.') with MemoryFile(data, output=bytes) as stream: with NoLogging(): pdf = self._pypdf2.PdfReader(stream, password=password) catalog = pdf.trailer['/Root'] yield from self._walk_raw(catalog, None, 'raw') if mu is None: return for k in range(len(mu)): with NoLogging(NoLogging.Mode.ALL): try: page: Page = mu[k] text = page.get_textpage() except Exception: continue yield UnpackResult(F'parsed/page{k}.html', text.extractHTML().encode(self.codec)) yield UnpackResult(F'parsed/page{k}.json', text.extractJSON().encode(self.codec)) yield UnpackResult(F'parsed/page{k}.txt', text.extractText().encode(self.codec)) for j, image in enumerate(page.get_images(), 1): xref = image[0] base = mu.extract_image(xref) data = base['image'] info = get_cached_file_magic_info(data) yield UnpackResult(F'parsed/page{k}/img{j}.{info.extension}', data)
Inherited members