Module refinery.units.formats.office.xlxtr
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import TYPE_CHECKING
from fnmatch import fnmatch
import re
import io
import enum
import defusedxml
import functools
from datetime import datetime
from refinery.units import Arg, Unit
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import NoLogging
if TYPE_CHECKING:
from pyxlsb2.records import SheetRecord
defusedxml.defuse_stdlib()
def _ref2rc(ref: str):
match = re.match(R'^([A-Z]+)(\d+)$', ref)
if not match:
raise ValueError
col = functools.reduce(lambda acc, c: (acc * 26) + c, (ord(c) - 0x40 for c in match[1]), 0)
row = int(match[2], 10)
return row, col
def _rc2ref(row: int, col: int):
if row <= 0:
raise ValueError
if col <= 0:
raise ValueError
alphabetic = ''
while col:
col, letter = divmod(col - 1, 26)
alphabetic = chr(0x41 + letter) + alphabetic
return F'{alphabetic}{row}'
class SheetReference:
def _parse_sheet(self, token: str):
try:
sheet, token = token.rsplit('#', 1)
except ValueError:
sheet = None
else:
try:
sheet = int(sheet, 0) - 1
except (TypeError, ValueError):
if sheet[0] in ('"', "'") and sheet[~0] == sheet[0] and len(sheet) > 2:
sheet = sheet[1:~1]
return sheet, token
def _parse_range(self, token):
try:
start, end = token.split(':')
return start, end
except ValueError:
return token, token
@staticmethod
def _parse_token(token):
try:
row, col = _ref2rc(token)
except ValueError:
row, col = (int(x, 0) for x in token.split('.'))
if row <= 0:
raise ValueError(F'row must be positive, {row} is an invalid value')
if col <= 0:
raise ValueError(F'col must be positive, {col} is an invalid value')
return row, col
def __init__(self, sheet_reference=None):
self.lbound = 1, 1
self.ubound = None
if sheet_reference is None:
self.sheet = None
return
self.sheet, token = self._parse_sheet(sheet_reference)
if not token:
return
try:
start, stop = (self._parse_token(x) for x in self._parse_range(token))
except Exception:
self.sheet = sheet_reference
else:
row_min = min(start[0], stop[0])
col_min = min(start[1], stop[1])
row_max = max(start[0], stop[0])
col_max = max(start[1], stop[1])
self.lbound = (row_min, col_min)
self.ubound = (row_max, col_max)
def match(self, index: int, name: str):
if self.sheet is None:
return True
if isinstance(self.sheet, int):
return self.sheet == index
return self.sheet == name or fnmatch(name, self.sheet)
def cells(self, row_max, col_max):
if self.ubound is not None:
row_max, col_max = self.ubound
row, col = self.lbound
colstart = col
while True:
yield row, col
if col < col_max:
col += 1
elif row < row_max:
row, col = row + 1, colstart
else:
break
def __contains__(self, ref):
if self.ubound is None:
return True
if not isinstance(ref, tuple):
ref = self._parse_token(ref)
row, col = ref
if row not in range(self.lbound[0], self.ubound[0] + 1):
return False
if col not in range(self.lbound[1], self.ubound[1] + 1):
return False
return True
class Workbook:
class _xlmode(enum.IntEnum):
openpyxl = 1
xlrd = 2
pyxlsb2 = 3
def __init__(self, data, unit: _ExcelUnit):
def openpyxl():
return unit._openpyxl.load_workbook(MemoryFile(data), read_only=True)
def pyxlsb2():
return unit._pyxlsb2.open_workbook(MemoryFile(data))
def xlrd():
with io.StringIO() as logfile:
vb = max(unit.log_level.verbosity - 1, 0)
wb = unit._xlrd.open_workbook(file_contents=data, logfile=logfile, verbosity=vb, on_demand=True)
logfile.seek(0)
for entry in logfile:
entry = entry.strip()
if re.search(R'^[A-Z]+:', entry) or '***' in entry:
unit.log_info(entry)
return wb
exception = None
for mode, loader in [
(self._xlmode.openpyxl, openpyxl),
(self._xlmode.xlrd, xlrd),
(self._xlmode.pyxlsb2, pyxlsb2)
]:
try:
self.workbook = loader()
except Exception as e:
exception = e
else:
self.mode = mode
exception = None
break
if exception:
raise exception
def sheets(self):
if self.mode is self._xlmode.openpyxl:
yield from self.workbook.sheetnames
return
if self.mode is self._xlmode.xlrd:
yield from self.workbook.sheet_names()
return
assert self.mode is self._xlmode.pyxlsb2
for rec in self.workbook.sheets:
rec: SheetRecord
yield rec.name
def get_sheet_data(self, name: str):
def _sanitize(value):
if value is None:
return None
if isinstance(value, str):
return value
try:
it = iter(value)
except Exception:
pass
else:
return [_sanitize(v) for v in it]
if isinstance(value, float):
if float(int(value)) == value:
return int(value)
if isinstance(value, datetime):
return value.isoformat(' ', 'seconds')
return str(value)
def _padded(data):
ncols = max((len(row) for row in data), default=0)
for row in data:
row.extend([None] * (ncols - len(row)))
return data
if self.mode is self._xlmode.openpyxl:
sheet = self.workbook[name]
with NoLogging():
data = _padded(_sanitize(sheet.iter_rows(values_only=True)))
elif self.mode is self._xlmode.pyxlsb2:
sheet = self.workbook.get_sheet_by_name(name)
data = _padded(_sanitize(sheet.rows()))
elif self.mode is self._xlmode.xlrd:
sheet = self.workbook.sheet_by_name(name)
data = []
for r in range(sheet.nrows):
row = []
for c in range(sheet.ncols):
try:
row.append(_sanitize(sheet.cell_value(r, c)))
except IndexError:
row.append(None)
data.append(row)
else:
raise RuntimeError(F'Invalid mode {self.mode!r}.')
return data
class _ExcelUnit(Unit, abstract=True):
@Unit.Requires('xlrd2', 'formats', 'office', 'extended')
def _xlrd():
import xlrd2
return xlrd2
@Unit.Requires('openpyxl', 'formats', 'office', 'extended')
def _openpyxl():
import openpyxl
return openpyxl
@Unit.Requires('pyxlsb2', 'formats', 'office', 'extended')
def _pyxlsb2():
import pyxlsb2
return pyxlsb2
class xlxtr(_ExcelUnit):
"""
Extract data from Microsoft Excel documents, both Legacy and new XML type documents. A sheet
reference is of the form `B1` or `1.2`, both specifying the first cell of the second column.
A cell range can be specified as `B1:C12`, or `1.2:C12`, or `1.2:12.3`. Finally, the unit will
always refer to the first sheet in the document and to change this, specify the sheet name or
index separated by a hashtag, i.e. `sheet#B1:C12` or `1#B1:C12`. Note that indices are
1-based. To get all elements of one sheet, use `sheet#`. The unit If parsing a sheet reference
fails, the script will assume that the given reference specifies a sheet.
"""
def __init__(self, *references: Arg(metavar='reference', type=SheetReference, help=(
'A sheet reference to be extracted. '
'If no sheet references are given, the unit lists all sheet names.'
))):
if not references:
references = [SheetReference('*')]
super().__init__(references=references)
def process(self, data):
wb = Workbook(data, self)
for ref in self.args.references:
ref: SheetReference
for k, name in enumerate(wb.sheets()):
if not ref.match(k, name):
continue
for r, row in enumerate(wb.get_sheet_data(name), 1):
for c, value in enumerate(row, 1):
if (r, c) not in ref:
continue
if value is None:
continue
yield self.labelled(
str(value).encode(self.codec),
row=r,
col=c,
ref=_rc2ref(r, c),
sheet=name
)
Classes
class SheetReference (sheet_reference=None)
-
Expand source code Browse git
class SheetReference: def _parse_sheet(self, token: str): try: sheet, token = token.rsplit('#', 1) except ValueError: sheet = None else: try: sheet = int(sheet, 0) - 1 except (TypeError, ValueError): if sheet[0] in ('"', "'") and sheet[~0] == sheet[0] and len(sheet) > 2: sheet = sheet[1:~1] return sheet, token def _parse_range(self, token): try: start, end = token.split(':') return start, end except ValueError: return token, token @staticmethod def _parse_token(token): try: row, col = _ref2rc(token) except ValueError: row, col = (int(x, 0) for x in token.split('.')) if row <= 0: raise ValueError(F'row must be positive, {row} is an invalid value') if col <= 0: raise ValueError(F'col must be positive, {col} is an invalid value') return row, col def __init__(self, sheet_reference=None): self.lbound = 1, 1 self.ubound = None if sheet_reference is None: self.sheet = None return self.sheet, token = self._parse_sheet(sheet_reference) if not token: return try: start, stop = (self._parse_token(x) for x in self._parse_range(token)) except Exception: self.sheet = sheet_reference else: row_min = min(start[0], stop[0]) col_min = min(start[1], stop[1]) row_max = max(start[0], stop[0]) col_max = max(start[1], stop[1]) self.lbound = (row_min, col_min) self.ubound = (row_max, col_max) def match(self, index: int, name: str): if self.sheet is None: return True if isinstance(self.sheet, int): return self.sheet == index return self.sheet == name or fnmatch(name, self.sheet) def cells(self, row_max, col_max): if self.ubound is not None: row_max, col_max = self.ubound row, col = self.lbound colstart = col while True: yield row, col if col < col_max: col += 1 elif row < row_max: row, col = row + 1, colstart else: break def __contains__(self, ref): if self.ubound is None: return True if not isinstance(ref, tuple): ref = self._parse_token(ref) row, col = ref if row not in range(self.lbound[0], self.ubound[0] + 1): return False if col not in range(self.lbound[1], self.ubound[1] + 1): return False return True
Methods
def match(self, index, name)
-
Expand source code Browse git
def match(self, index: int, name: str): if self.sheet is None: return True if isinstance(self.sheet, int): return self.sheet == index return self.sheet == name or fnmatch(name, self.sheet)
def cells(self, row_max, col_max)
-
Expand source code Browse git
def cells(self, row_max, col_max): if self.ubound is not None: row_max, col_max = self.ubound row, col = self.lbound colstart = col while True: yield row, col if col < col_max: col += 1 elif row < row_max: row, col = row + 1, colstart else: break
class Workbook (data, unit)
-
Expand source code Browse git
class Workbook: class _xlmode(enum.IntEnum): openpyxl = 1 xlrd = 2 pyxlsb2 = 3 def __init__(self, data, unit: _ExcelUnit): def openpyxl(): return unit._openpyxl.load_workbook(MemoryFile(data), read_only=True) def pyxlsb2(): return unit._pyxlsb2.open_workbook(MemoryFile(data)) def xlrd(): with io.StringIO() as logfile: vb = max(unit.log_level.verbosity - 1, 0) wb = unit._xlrd.open_workbook(file_contents=data, logfile=logfile, verbosity=vb, on_demand=True) logfile.seek(0) for entry in logfile: entry = entry.strip() if re.search(R'^[A-Z]+:', entry) or '***' in entry: unit.log_info(entry) return wb exception = None for mode, loader in [ (self._xlmode.openpyxl, openpyxl), (self._xlmode.xlrd, xlrd), (self._xlmode.pyxlsb2, pyxlsb2) ]: try: self.workbook = loader() except Exception as e: exception = e else: self.mode = mode exception = None break if exception: raise exception def sheets(self): if self.mode is self._xlmode.openpyxl: yield from self.workbook.sheetnames return if self.mode is self._xlmode.xlrd: yield from self.workbook.sheet_names() return assert self.mode is self._xlmode.pyxlsb2 for rec in self.workbook.sheets: rec: SheetRecord yield rec.name def get_sheet_data(self, name: str): def _sanitize(value): if value is None: return None if isinstance(value, str): return value try: it = iter(value) except Exception: pass else: return [_sanitize(v) for v in it] if isinstance(value, float): if float(int(value)) == value: return int(value) if isinstance(value, datetime): return value.isoformat(' ', 'seconds') return str(value) def _padded(data): ncols = max((len(row) for row in data), default=0) for row in data: row.extend([None] * (ncols - len(row))) return data if self.mode is self._xlmode.openpyxl: sheet = self.workbook[name] with NoLogging(): data = _padded(_sanitize(sheet.iter_rows(values_only=True))) elif self.mode is self._xlmode.pyxlsb2: sheet = self.workbook.get_sheet_by_name(name) data = _padded(_sanitize(sheet.rows())) elif self.mode is self._xlmode.xlrd: sheet = self.workbook.sheet_by_name(name) data = [] for r in range(sheet.nrows): row = [] for c in range(sheet.ncols): try: row.append(_sanitize(sheet.cell_value(r, c))) except IndexError: row.append(None) data.append(row) else: raise RuntimeError(F'Invalid mode {self.mode!r}.') return data
Methods
def sheets(self)
-
Expand source code Browse git
def sheets(self): if self.mode is self._xlmode.openpyxl: yield from self.workbook.sheetnames return if self.mode is self._xlmode.xlrd: yield from self.workbook.sheet_names() return assert self.mode is self._xlmode.pyxlsb2 for rec in self.workbook.sheets: rec: SheetRecord yield rec.name
def get_sheet_data(self, name)
-
Expand source code Browse git
def get_sheet_data(self, name: str): def _sanitize(value): if value is None: return None if isinstance(value, str): return value try: it = iter(value) except Exception: pass else: return [_sanitize(v) for v in it] if isinstance(value, float): if float(int(value)) == value: return int(value) if isinstance(value, datetime): return value.isoformat(' ', 'seconds') return str(value) def _padded(data): ncols = max((len(row) for row in data), default=0) for row in data: row.extend([None] * (ncols - len(row))) return data if self.mode is self._xlmode.openpyxl: sheet = self.workbook[name] with NoLogging(): data = _padded(_sanitize(sheet.iter_rows(values_only=True))) elif self.mode is self._xlmode.pyxlsb2: sheet = self.workbook.get_sheet_by_name(name) data = _padded(_sanitize(sheet.rows())) elif self.mode is self._xlmode.xlrd: sheet = self.workbook.sheet_by_name(name) data = [] for r in range(sheet.nrows): row = [] for c in range(sheet.ncols): try: row.append(_sanitize(sheet.cell_value(r, c))) except IndexError: row.append(None) data.append(row) else: raise RuntimeError(F'Invalid mode {self.mode!r}.') return data
class xlxtr (*references)
-
Extract data from Microsoft Excel documents, both Legacy and new XML type documents. A sheet reference is of the form
B1
or1.2
, both specifying the first cell of the second column. A cell range can be specified asB1:C12
, or1.2:C12
, or1.2:12.3
. Finally, the unit will always refer to the first sheet in the document and to change this, specify the sheet name or index separated by a hashtag, i.e.sheet#B1:C12
or1#B1:C12
. Note that indices are 1-based. To get all elements of one sheet, usesheet#
. The unit If parsing a sheet reference fails, the script will assume that the given reference specifies a sheet.Expand source code Browse git
class xlxtr(_ExcelUnit): """ Extract data from Microsoft Excel documents, both Legacy and new XML type documents. A sheet reference is of the form `B1` or `1.2`, both specifying the first cell of the second column. A cell range can be specified as `B1:C12`, or `1.2:C12`, or `1.2:12.3`. Finally, the unit will always refer to the first sheet in the document and to change this, specify the sheet name or index separated by a hashtag, i.e. `sheet#B1:C12` or `1#B1:C12`. Note that indices are 1-based. To get all elements of one sheet, use `sheet#`. The unit If parsing a sheet reference fails, the script will assume that the given reference specifies a sheet. """ def __init__(self, *references: Arg(metavar='reference', type=SheetReference, help=( 'A sheet reference to be extracted. ' 'If no sheet references are given, the unit lists all sheet names.' ))): if not references: references = [SheetReference('*')] super().__init__(references=references) def process(self, data): wb = Workbook(data, self) for ref in self.args.references: ref: SheetReference for k, name in enumerate(wb.sheets()): if not ref.match(k, name): continue for r, row in enumerate(wb.get_sheet_data(name), 1): for c, value in enumerate(row, 1): if (r, c) not in ref: continue if value is None: continue yield self.labelled( str(value).encode(self.codec), row=r, col=c, ref=_rc2ref(r, c), sheet=name )
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members