Module refinery.units.formats.archive.xtasar
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, Dict, Union, Type
from refinery.units.formats.archive import ArchiveUnit, UnpackResult
from refinery.lib.structures import StructReader, Struct
import json
JSONDict = Dict[str, Union[int, float, str, Type[None], 'JSONDict']]
class AsarHeader(Struct):
def __init__(self, reader: StructReader[bytearray]):
if reader.u32() != 4:
raise ValueError('Not an ASAR file.')
size = reader.u32() - 8
reader.seekrel(8)
directory = reader.read(size)
end = directory.rfind(B'}')
if end < 0:
raise RuntimeError('Directory not terminated')
directory[end:] = []
bl = directory.count(B'{'[0])
br = directory.count(B'}'[0])
if br < bl:
directory += (bl - br) * B'}'
self.directory = json.loads(directory)
self.base = reader.tell()
class xtasar(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
"""
Extract files from a ASAR archive.
"""
def unpack(self, data: bytearray):
def _unpack(dir: JSONDict, *path):
for name, listing in dir.get('files', {}).items():
yield from _unpack(listing, *path, name)
try:
offset = dir['offset']
size = dir['size']
except KeyError:
return
try:
offset = int(offset) + header.base
end = int(size) + offset
except TypeError:
self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
return
if not path:
self.log_warn(F'not processing item at root with offset {offset} and size {size}')
return
yield UnpackResult(
'/'.join(path),
lambda a=offset, b=end: data[a:b],
offset=offset
)
header = AsarHeader(data)
self.log_debug(F'header read successfully, base offset is {header.base}.')
yield from _unpack(header.directory)
@classmethod
def handles(cls, data: bytearray) -> Optional[bool]:
return data.startswith(b'\04\0\0\0') and data[0x10:0x18] == B'{"files"'
Classes
class AsarHeader (reader)
-
A class to parse structured data. A
Struct
class can be instantiated as follows:foo = Struct(data, bar=29)
The initialization routine of the structure will be called with a single argument
reader
. If the objectdata
is already aStructReader
, then it will be passed asreader
. Otherwise, the argument will be wrapped in aStructReader
. Additional arguments to the struct are passed through.Expand source code Browse git
class AsarHeader(Struct): def __init__(self, reader: StructReader[bytearray]): if reader.u32() != 4: raise ValueError('Not an ASAR file.') size = reader.u32() - 8 reader.seekrel(8) directory = reader.read(size) end = directory.rfind(B'}') if end < 0: raise RuntimeError('Directory not terminated') directory[end:] = [] bl = directory.count(B'{'[0]) br = directory.count(B'}'[0]) if br < bl: directory += (bl - br) * B'}' self.directory = json.loads(directory) self.base = reader.tell()
Ancestors
class xtasar (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')
-
Extract files from a ASAR archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:
emit something | xtasar --list
Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:
emit something | xtasar [| dump {path} ]
Expand source code Browse git
class xtasar(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'): """ Extract files from a ASAR archive. """ def unpack(self, data: bytearray): def _unpack(dir: JSONDict, *path): for name, listing in dir.get('files', {}).items(): yield from _unpack(listing, *path, name) try: offset = dir['offset'] size = dir['size'] except KeyError: return try: offset = int(offset) + header.base end = int(size) + offset except TypeError: self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers') return if not path: self.log_warn(F'not processing item at root with offset {offset} and size {size}') return yield UnpackResult( '/'.join(path), lambda a=offset, b=end: data[a:b], offset=offset ) header = AsarHeader(data) self.log_debug(F'header read successfully, base offset is {header.base}.') yield from _unpack(header.directory) @classmethod def handles(cls, data: bytearray) -> Optional[bool]: return data.startswith(b'\04\0\0\0') and data[0x10:0x18] == B'{"files"'
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data: bytearray): def _unpack(dir: JSONDict, *path): for name, listing in dir.get('files', {}).items(): yield from _unpack(listing, *path, name) try: offset = dir['offset'] size = dir['size'] except KeyError: return try: offset = int(offset) + header.base end = int(size) + offset except TypeError: self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers') return if not path: self.log_warn(F'not processing item at root with offset {offset} and size {size}') return yield UnpackResult( '/'.join(path), lambda a=offset, b=end: data[a:b], offset=offset ) header = AsarHeader(data) self.log_debug(F'header read successfully, base offset is {header.base}.') yield from _unpack(header.directory)
Inherited members