Module refinery.units.formats.pdf
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Optional, Set, TYPE_CHECKING, cast
from itertools import islice
if TYPE_CHECKING:
from pypdf.generic import EncodedStreamObject
from refinery.units.formats import PathExtractorUnit, UnpackResult
from refinery.lib.tools import NoLogging
from refinery.lib.structures import MemoryFile
def isdict(object):
return isinstance(object, dict) or all(hasattr(object, method) for method in ['items', 'values', 'keys'])
class xtpdf(PathExtractorUnit):
"""
Extract objects from PDF documents.
"""
@PathExtractorUnit.Requires('pypdf>=3.1.0', 'formats', 'default', 'extended')
def _pypdf2():
import pypdf
import pypdf.generic
return pypdf
def _walk(self, blob, memo: Optional[Set[int]] = None, *path):
while isinstance(blob, self._pypdf2.generic.IndirectObject):
try:
blob = blob.get_object()
except Exception:
break
if memo is None:
memo = {id(blob)}
elif id(blob) in memo:
return
else:
memo.add(id(blob))
try:
name = blob['/F']
blob = blob['/EF']['/F']
except Exception:
pass
else:
path = *path[:-1], F'/{name}'
try:
def extract():
with NoLogging():
return get_data()
if TYPE_CHECKING:
blob = cast(EncodedStreamObject, blob)
get_data = blob.get_data
except AttributeError:
pass
else:
yield UnpackResult(''.join(path), extract, kind='object')
return
if isinstance(blob, self._pypdf2.generic.ByteStringObject):
yield UnpackResult(''.join(path), blob, kind='bytes')
return
if isinstance(blob, self._pypdf2.generic.TextStringObject):
yield UnpackResult(''.join(path), blob.encode(self.codec), kind='string')
return
if isinstance(blob, (
self._pypdf2.generic.BooleanObject,
self._pypdf2.generic.ByteStringObject,
self._pypdf2.generic.FloatObject,
self._pypdf2.generic.NameObject,
self._pypdf2.generic.NullObject,
self._pypdf2.generic.NumberObject,
self._pypdf2.generic.RectangleObject,
)):
# unhandled PDF objects
return
if isinstance(blob, self._pypdf2.generic.TreeObject):
blob = list(blob)
pdf = self._pypdf2.generic.PdfObject
if isinstance(blob, list):
if (
len(blob) % 2 == 0
and all(isinstance(key, str) for key in islice(iter(blob), 0, None, 2))
and all(isinstance(key, pdf) for key in islice(iter(blob), 1, None, 2))
):
blob = dict(zip(*([iter(blob)] * 2)))
else:
for key, value in enumerate(blob):
yield from self._walk(value, memo, *path, F'/{key}')
return
if not isdict(blob):
return
for key, value in blob.items():
if not isinstance(key, str):
continue
if not key.startswith('/'):
key = F'/{key}'
yield from self._walk(value, memo, *path, key)
def unpack(self, data):
with MemoryFile(data, read_as_bytes=True) as stream:
with NoLogging():
pdf = self._pypdf2.PdfReader(stream)
catalog = pdf.trailer['/Root']
yield from self._walk(catalog)
@classmethod
def handles(self, data: bytearray) -> Optional[bool]:
return data.startswith(B'%PDF-')
Functions
def isdict(object)
-
Expand source code Browse git
def isdict(object): return isinstance(object, dict) or all(hasattr(object, method) for method in ['items', 'values', 'keys'])
Classes
class xtpdf (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')
-
Extract objects from PDF documents.
Expand source code Browse git
class xtpdf(PathExtractorUnit): """ Extract objects from PDF documents. """ @PathExtractorUnit.Requires('pypdf>=3.1.0', 'formats', 'default', 'extended') def _pypdf2(): import pypdf import pypdf.generic return pypdf def _walk(self, blob, memo: Optional[Set[int]] = None, *path): while isinstance(blob, self._pypdf2.generic.IndirectObject): try: blob = blob.get_object() except Exception: break if memo is None: memo = {id(blob)} elif id(blob) in memo: return else: memo.add(id(blob)) try: name = blob['/F'] blob = blob['/EF']['/F'] except Exception: pass else: path = *path[:-1], F'/{name}' try: def extract(): with NoLogging(): return get_data() if TYPE_CHECKING: blob = cast(EncodedStreamObject, blob) get_data = blob.get_data except AttributeError: pass else: yield UnpackResult(''.join(path), extract, kind='object') return if isinstance(blob, self._pypdf2.generic.ByteStringObject): yield UnpackResult(''.join(path), blob, kind='bytes') return if isinstance(blob, self._pypdf2.generic.TextStringObject): yield UnpackResult(''.join(path), blob.encode(self.codec), kind='string') return if isinstance(blob, ( self._pypdf2.generic.BooleanObject, self._pypdf2.generic.ByteStringObject, self._pypdf2.generic.FloatObject, self._pypdf2.generic.NameObject, self._pypdf2.generic.NullObject, self._pypdf2.generic.NumberObject, self._pypdf2.generic.RectangleObject, )): # unhandled PDF objects return if isinstance(blob, self._pypdf2.generic.TreeObject): blob = list(blob) pdf = self._pypdf2.generic.PdfObject if isinstance(blob, list): if ( len(blob) % 2 == 0 and all(isinstance(key, str) for key in islice(iter(blob), 0, None, 2)) and all(isinstance(key, pdf) for key in islice(iter(blob), 1, None, 2)) ): blob = dict(zip(*([iter(blob)] * 2))) else: for key, value in enumerate(blob): yield from self._walk(value, memo, *path, F'/{key}') return if not isdict(blob): return for key, value in blob.items(): if not isinstance(key, str): continue if not key.startswith('/'): key = F'/{key}' yield from self._walk(value, memo, *path, key) def unpack(self, data): with MemoryFile(data, read_as_bytes=True) as stream: with NoLogging(): pdf = self._pypdf2.PdfReader(stream) catalog = pdf.trailer['/Root'] yield from self._walk(catalog) @classmethod def handles(self, data: bytearray) -> Optional[bool]: return data.startswith(B'%PDF-')
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): with MemoryFile(data, read_as_bytes=True) as stream: with NoLogging(): pdf = self._pypdf2.PdfReader(stream) catalog = pdf.trailer['/Root'] yield from self._walk(catalog)
Inherited members