Module refinery.units.formats.archive.xtmagtape
Expand source code Browse git
from __future__ import annotations
import itertools
from refinery.lib.structures import MemoryFile, StructReader
from refinery.units import Unit
class xtmagtape(Unit):
    """
    Extract files from SIMH magtape files.
    """
    def process(self, data: bytearray):
        reader = StructReader(data)
        for r in itertools.count():
            buffer = MemoryFile()
            for k in itertools.count():
                try:
                    head = reader.peek(4)
                    size = reader.read_integer(24)
                    mark = reader.read_byte()
                except EOFError:
                    self.log_info('end of file while reading chunk header, terminating')
                    return
                if not any(head):
                    if k == 0:
                        return
                    break
                if mark != 0:
                    self.log_warn(F'error code 0x{mark:02X} in record {r}.{k}')
                buffer.write(reader.read(size))
                if reader.peek(4) != head:
                    if reader.tell() % 2 and reader.peek(5)[1:] == head:
                        padding = reader.read_byte()
                        if padding != 0:
                            self.log_info(F'nonzero padding byte in record {r}.{k}')
                    else:
                        raise ValueError('Invalid footer, data is corrupted.')
                reader.seekrel(4)
            yield buffer.getvalue()
Classes
class xtmagtape- 
Extract files from SIMH magtape files.
Expand source code Browse git
class xtmagtape(Unit): """ Extract files from SIMH magtape files. """ def process(self, data: bytearray): reader = StructReader(data) for r in itertools.count(): buffer = MemoryFile() for k in itertools.count(): try: head = reader.peek(4) size = reader.read_integer(24) mark = reader.read_byte() except EOFError: self.log_info('end of file while reading chunk header, terminating') return if not any(head): if k == 0: return break if mark != 0: self.log_warn(F'error code 0x{mark:02X} in record {r}.{k}') buffer.write(reader.read(size)) if reader.peek(4) != head: if reader.tell() % 2 and reader.peek(5)[1:] == head: padding = reader.read_byte() if padding != 0: self.log_info(F'nonzero padding byte in record {r}.{k}') else: raise ValueError('Invalid footer, data is corrupted.') reader.seekrel(4) yield buffer.getvalue()Ancestors
Subclasses
Class variables
var required_dependenciesvar optional_dependenciesvar consolevar reverse
Inherited members