Module refinery.units.formats.archive.xtasar

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Optional, Dict, Union, Type

from refinery.units.formats.archive import ArchiveUnit, UnpackResult
from refinery.lib.structures import StructReader, Struct

import json

JSONDict = Dict[str, Union[int, float, str, Type[None], 'JSONDict']]


class AsarHeader(Struct):
    def __init__(self, reader: StructReader[bytearray]):
        if reader.u32() != 4:
            raise ValueError('Not an ASAR file.')
        size = reader.u32() - 8
        reader.seekrel(8)
        directory = reader.read(size)
        end = directory.rfind(B'}')
        if end < 0:
            raise RuntimeError('Directory not terminated')
        directory[end:] = []
        bl = directory.count(B'{'[0])
        br = directory.count(B'}'[0])
        if br < bl:
            directory += (bl - br) * B'}'
        self.directory = json.loads(directory)
        self.base = reader.tell()


class xtasar(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a ASAR archive.
    """
    def unpack(self, data: bytearray):
        def _unpack(dir: JSONDict, *path):
            for name, listing in dir.get('files', {}).items():
                yield from _unpack(listing, *path, name)
            try:
                offset = dir['offset']
                size = dir['size']
            except KeyError:
                return
            try:
                offset = int(offset) + header.base
                end = int(size) + offset
            except TypeError:
                self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
                return
            if not path:
                self.log_warn(F'not processing item at root with offset {offset} and size {size}')
                return
            yield UnpackResult(
                '/'.join(path),
                lambda a=offset, b=end: data[a:b],
                offset=offset
            )

        header = AsarHeader(data)
        self.log_debug(F'header read successfully, base offset is {header.base}.')
        yield from _unpack(header.directory)

    @classmethod
    def handles(cls, data: bytearray) -> Optional[bool]:
        return data.startswith(b'\04\0\0\0') and data[0x10:0x18] == B'{"files"'

Classes

class AsarHeader (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class AsarHeader(Struct):
    def __init__(self, reader: StructReader[bytearray]):
        if reader.u32() != 4:
            raise ValueError('Not an ASAR file.')
        size = reader.u32() - 8
        reader.seekrel(8)
        directory = reader.read(size)
        end = directory.rfind(B'}')
        if end < 0:
            raise RuntimeError('Directory not terminated')
        directory[end:] = []
        bl = directory.count(B'{'[0])
        br = directory.count(B'}'[0])
        if br < bl:
            directory += (bl - br) * B'}'
        self.directory = json.loads(directory)
        self.base = reader.tell()

Ancestors

class xtasar (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date', pwd=b'')

Extract files from a ASAR archive. This unit is a path extractor which extracts data from a hierarchical structure. Each extracted item is emitted as a separate chunk and has attached to it a meta variable that contains its path within the source structure. The positional arguments to the command are patterns that can be used to filter the extracted items by their path. To view only the paths of all chunks, use the listing switch:

emit something | xtasar --list

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit something | xtasar [| dump {path} ]
Expand source code Browse git
class xtasar(ArchiveUnit, docs='{0}{s}{PathExtractorUnit}'):
    """
    Extract files from a ASAR archive.
    """
    def unpack(self, data: bytearray):
        def _unpack(dir: JSONDict, *path):
            for name, listing in dir.get('files', {}).items():
                yield from _unpack(listing, *path, name)
            try:
                offset = dir['offset']
                size = dir['size']
            except KeyError:
                return
            try:
                offset = int(offset) + header.base
                end = int(size) + offset
            except TypeError:
                self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
                return
            if not path:
                self.log_warn(F'not processing item at root with offset {offset} and size {size}')
                return
            yield UnpackResult(
                '/'.join(path),
                lambda a=offset, b=end: data[a:b],
                offset=offset
            )

        header = AsarHeader(data)
        self.log_debug(F'header read successfully, base offset is {header.base}.')
        yield from _unpack(header.directory)

    @classmethod
    def handles(cls, data: bytearray) -> Optional[bool]:
        return data.startswith(b'\04\0\0\0') and data[0x10:0x18] == B'{"files"'

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    def _unpack(dir: JSONDict, *path):
        for name, listing in dir.get('files', {}).items():
            yield from _unpack(listing, *path, name)
        try:
            offset = dir['offset']
            size = dir['size']
        except KeyError:
            return
        try:
            offset = int(offset) + header.base
            end = int(size) + offset
        except TypeError:
            self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
            return
        if not path:
            self.log_warn(F'not processing item at root with offset {offset} and size {size}')
            return
        yield UnpackResult(
            '/'.join(path),
            lambda a=offset, b=end: data[a:b],
            offset=offset
        )

    header = AsarHeader(data)
    self.log_debug(F'header read successfully, base offset is {header.base}.')
    yield from _unpack(header.directory)

Inherited members