Module refinery.units.compression.lz
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import ByteString, List
from enum import IntFlag
from itertools import repeat
import lzma as std_lzma
from refinery.units import Arg, Unit, RefineryPartialResult
from refinery.lib.argformats import OptionFactory, extract_options
from refinery.lib.structures import MemoryFile
__all__ = ['lzma']
class F(IntFlag):
DEFAULT = 0
INJECT = 1
STEPWISE = 2
class lzma(Unit):
"""
LZMA compression and decompression.
"""
_LZMA_FILTER = extract_options(std_lzma, 'FILTER_', 'DELTA')
_LZMA_PARSER = OptionFactory(_LZMA_FILTER)
def __init__(
self, filter: Arg.Choice(choices=list(_LZMA_FILTER), metavar='FILTER', help=(
'Specifies a bcj filter to be applied. Possible values are: {choices}')) = None,
raw : Arg.Switch('-r', group='MODE', help='Use raw (no container) format.') = False,
alone : Arg.Switch('-a', group='MODE', help='Use the lzma container format.') = False,
xz : Arg.Switch('-x', group='MODE', help='Use the default xz format.') = False,
level : Arg.Number('-l', bound=(0, 9), help='The compression level preset; between 0 and 9.') = 9,
delta : Arg.Number('-d', help='Add a delta filter when compressing.') = None,
):
filter = filter and self._LZMA_PARSER(filter)
if (raw, alone, xz).count(True) > 1:
raise ValueError('Only one container format can be enabled.')
if level not in range(10):
raise ValueError('Compression level must be a number between 0 and 9.')
super().__init__(filter=filter, raw=raw, alone=alone, xz=xz, delta=delta,
level=level | std_lzma.PRESET_EXTREME)
def _get_lz_mode_and_filters(self, reverse=False):
mode = std_lzma.FORMAT_AUTO
filters = []
if self.args.filter is not None:
filters.append({'id': self.args.filter.value})
if self.args.delta is not None:
self.log_debug('adding delta filter')
filters.append({
'id': std_lzma.FILTER_DELTA,
'dist': self.args.delta
})
if self.args.alone:
self.log_debug('setting alone format')
mode = std_lzma.FORMAT_ALONE
filters.append({
'id': std_lzma.FILTER_LZMA1,
'preset': self.args.level
})
elif self.args.raw:
self.log_debug('setting raw format')
mode = std_lzma.FORMAT_RAW
filters.append({
'id': std_lzma.FILTER_LZMA2,
'preset': self.args.level
})
elif self.args.xz or reverse:
if reverse and not self.log_debug('setting xz container format'):
self.log_info('choosing default .xz container format for compression.')
mode = std_lzma.FORMAT_XZ
filters.append({
'id': std_lzma.FILTER_LZMA2,
'preset': self.args.level
})
return mode, filters
def reverse(self, data):
mode, filters = self._get_lz_mode_and_filters(True)
lz = std_lzma.LZMACompressor(mode, filters=filters)
output = lz.compress(data)
output += lz.flush()
return output
def _process_stream(self, data: ByteString, strategy: F, keywords):
if strategy & F.STEPWISE:
sizes = repeat(1)
else:
sizes = [len(data)]
lz = std_lzma.LZMADecompressor(**keywords)
with MemoryFile() as output:
with MemoryFile(data) as stream:
if strategy & F.INJECT:
output.write(lz.decompress(stream.read(5)))
output.write(lz.decompress(B'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF'))
for size in sizes:
if stream.eof or stream.closed:
break
try:
position = stream.tell()
output.write(lz.decompress(stream.read(size)))
except (EOFError, std_lzma.LZMAError) as error:
msg = error.args[0] if len(error.args) == 1 else error.__class__.__name__
raise RefineryPartialResult(F'compression failed at offset {position}: {msg!s}',
output.getvalue())
return output.getvalue()
def process(self, data: bytearray):
errors: List[RefineryPartialResult] = []
view = memoryview(data)
keywords = {}
keywords['format'], filters = self._get_lz_mode_and_filters(False)
if self.args.raw:
keywords['filters'] = filters
for strategy in (F.DEFAULT, F.INJECT, F.STEPWISE, F.INJECT | F.STEPWISE):
try:
return self._process_stream(view, strategy, keywords)
except RefineryPartialResult as p:
self.log_info(F'decompression failed with strategy {strategy}: {p.message}')
errors.append(p)
raise max(errors, key=lambda e: len(e.partial))
@classmethod
def handles(self, data: bytearray):
if data[:4] == B'\x5D\0\0\0':
return True
if data[:5] == B'\xFD7zXZ':
return True
Classes
class lzma (filter=None, raw=False, alone=False, xz=False, level=9, delta=None)
-
LZMA compression and decompression.
Expand source code Browse git
class lzma(Unit): """ LZMA compression and decompression. """ _LZMA_FILTER = extract_options(std_lzma, 'FILTER_', 'DELTA') _LZMA_PARSER = OptionFactory(_LZMA_FILTER) def __init__( self, filter: Arg.Choice(choices=list(_LZMA_FILTER), metavar='FILTER', help=( 'Specifies a bcj filter to be applied. Possible values are: {choices}')) = None, raw : Arg.Switch('-r', group='MODE', help='Use raw (no container) format.') = False, alone : Arg.Switch('-a', group='MODE', help='Use the lzma container format.') = False, xz : Arg.Switch('-x', group='MODE', help='Use the default xz format.') = False, level : Arg.Number('-l', bound=(0, 9), help='The compression level preset; between 0 and 9.') = 9, delta : Arg.Number('-d', help='Add a delta filter when compressing.') = None, ): filter = filter and self._LZMA_PARSER(filter) if (raw, alone, xz).count(True) > 1: raise ValueError('Only one container format can be enabled.') if level not in range(10): raise ValueError('Compression level must be a number between 0 and 9.') super().__init__(filter=filter, raw=raw, alone=alone, xz=xz, delta=delta, level=level | std_lzma.PRESET_EXTREME) def _get_lz_mode_and_filters(self, reverse=False): mode = std_lzma.FORMAT_AUTO filters = [] if self.args.filter is not None: filters.append({'id': self.args.filter.value}) if self.args.delta is not None: self.log_debug('adding delta filter') filters.append({ 'id': std_lzma.FILTER_DELTA, 'dist': self.args.delta }) if self.args.alone: self.log_debug('setting alone format') mode = std_lzma.FORMAT_ALONE filters.append({ 'id': std_lzma.FILTER_LZMA1, 'preset': self.args.level }) elif self.args.raw: self.log_debug('setting raw format') mode = std_lzma.FORMAT_RAW filters.append({ 'id': std_lzma.FILTER_LZMA2, 'preset': self.args.level }) elif self.args.xz or reverse: if reverse and not self.log_debug('setting xz container format'): self.log_info('choosing default .xz container format for compression.') mode = std_lzma.FORMAT_XZ filters.append({ 'id': std_lzma.FILTER_LZMA2, 'preset': self.args.level }) return mode, filters def reverse(self, data): mode, filters = self._get_lz_mode_and_filters(True) lz = std_lzma.LZMACompressor(mode, filters=filters) output = lz.compress(data) output += lz.flush() return output def _process_stream(self, data: ByteString, strategy: F, keywords): if strategy & F.STEPWISE: sizes = repeat(1) else: sizes = [len(data)] lz = std_lzma.LZMADecompressor(**keywords) with MemoryFile() as output: with MemoryFile(data) as stream: if strategy & F.INJECT: output.write(lz.decompress(stream.read(5))) output.write(lz.decompress(B'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')) for size in sizes: if stream.eof or stream.closed: break try: position = stream.tell() output.write(lz.decompress(stream.read(size))) except (EOFError, std_lzma.LZMAError) as error: msg = error.args[0] if len(error.args) == 1 else error.__class__.__name__ raise RefineryPartialResult(F'compression failed at offset {position}: {msg!s}', output.getvalue()) return output.getvalue() def process(self, data: bytearray): errors: List[RefineryPartialResult] = [] view = memoryview(data) keywords = {} keywords['format'], filters = self._get_lz_mode_and_filters(False) if self.args.raw: keywords['filters'] = filters for strategy in (F.DEFAULT, F.INJECT, F.STEPWISE, F.INJECT | F.STEPWISE): try: return self._process_stream(view, strategy, keywords) except RefineryPartialResult as p: self.log_info(F'decompression failed with strategy {strategy}: {p.message}') errors.append(p) raise max(errors, key=lambda e: len(e.partial)) @classmethod def handles(self, data: bytearray): if data[:4] == B'\x5D\0\0\0': return True if data[:5] == B'\xFD7zXZ': return True
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members