Module refinery.units.formats.archive.xtasar

Expand source code Browse git
from __future__ import annotations

import json

from typing import TYPE_CHECKING

from refinery.lib.structures import Struct, StructReader
from refinery.units.formats.archive import ArchiveUnit, UnpackResult

if TYPE_CHECKING:
    JSONDict = dict[str, int | float | str | type[None] | JSONDict]


class AsarHeader(Struct):
    def __init__(self, reader: StructReader[bytearray]):
        if reader.u32() != 4:
            raise ValueError('Not an ASAR file.')
        size = reader.u32() - 8
        reader.seekrel(8)
        directory = reader.read(size)
        end = directory.rfind(B'}')
        if end < 0:
            raise RuntimeError('Directory not terminated')
        directory[end:] = []
        bl = directory.count(B'{'[0])
        br = directory.count(B'}'[0])
        if br < bl:
            directory += (bl - br) * B'}'
        self.directory = json.loads(directory)
        self.base = reader.tell()


class xtasar(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from Atom Shell Archives (ASAR). These are often used to bundle Electron application
    data and resources.
    """
    def unpack(self, data: bytearray):
        def _unpack(dir: JSONDict, *path):
            for name, listing in dir.get('files', {}).items():
                yield from _unpack(listing, *path, name)
            try:
                offset = dir['offset']
                size = dir['size']
            except KeyError:
                return
            try:
                offset = int(offset) + header.base
                end = int(size) + offset
            except TypeError:
                self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
                return
            if not path:
                self.log_warn(F'not processing item at root with offset {offset} and size {size}')
                return
            yield UnpackResult(
                '/'.join(path),
                lambda a=offset, b=end: data[a:b],
                offset=offset
            )

        header = AsarHeader.Parse(data)
        self.log_debug(F'header read successfully, base offset is {header.base}.')
        yield from _unpack(header.directory)

    @classmethod
    def handles(cls, data) -> bool | None:
        return data[:4] == b'\04\0\0\0' and data[0x10:0x18] == B'{"files"'

Classes

class AsarHeader (reader)

A class to parse structured data. A Struct class can be instantiated as follows:

foo = Struct(data, bar=29)

The initialization routine of the structure will be called with a single argument reader. If the object data is already a StructReader, then it will be passed as reader. Otherwise, the argument will be wrapped in a StructReader. Additional arguments to the struct are passed through.

Expand source code Browse git
class AsarHeader(Struct):
    def __init__(self, reader: StructReader[bytearray]):
        if reader.u32() != 4:
            raise ValueError('Not an ASAR file.')
        size = reader.u32() - 8
        reader.seekrel(8)
        directory = reader.read(size)
        end = directory.rfind(B'}')
        if end < 0:
            raise RuntimeError('Directory not terminated')
        directory[end:] = []
        bl = directory.count(B'{'[0])
        br = directory.count(B'}'[0])
        if br < bl:
            directory += (bl - br) * B'}'
        self.directory = json.loads(directory)
        self.base = reader.tell()

Ancestors

  • Struct
  • typing.Generic
  • collections.abc.Buffer

Static methods

def Parse(reader, *args, **kwargs)
class xtasar (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', exclude=None, date=b'date', pwd=b'')

Extract files from Atom Shell Archives (ASAR). These are often used to bundle Electron application data and resources.

This unit extracts items with an associated virtual path from a container; each extracted item is emitted as a separate chunk with a corresponding meta variable named "path".

Positional arguments to xtasar are patterns to filter the extracted items. Use the -x flag to add an exclusion pattern. To extract all files with a foo or bar extension, but none that has the word "temp" in its path:

xtasar .foo .bar -x temp

To view only the paths of all chunks, use the listing switch:

emit data | ... | xtasar -l

Otherwise, extracted items are written to the standard output port and usually require a frame to properly process. In order to dump all extracted data to disk, the following pipeline can be used:

emit data | ... | xtasar [| dump extracted/{path} ]

The value {path} is a placeholder which is substituted by the virtual path of the extracted item. When using xtasar to unpack a file on disk, the following pattern can be useful:

ef pack.bin [| xtasar -j | d2p ]

The unit ef is also a path extractor. By specifying -j (or --join), the paths of extracted items are combined. Here, d2p is a shortcut for dump {path}. It deconflicts the joined paths with the local file system: If pack.bin contains items one.txt and two.txt, the following local file tree would be the result:

pack.bin
pack/one.txt
pack/two.txt

Finally, the -d (or --drop) switch can be used to not create (or alter) the path metadata at all, which is useful in cases where path metadata from a previous unit should be preserved.

Expand source code Browse git
class xtasar(ArchiveUnit, docs='{0}{p}{PathExtractorUnit}'):
    """
    Extract files from Atom Shell Archives (ASAR). These are often used to bundle Electron application
    data and resources.
    """
    def unpack(self, data: bytearray):
        def _unpack(dir: JSONDict, *path):
            for name, listing in dir.get('files', {}).items():
                yield from _unpack(listing, *path, name)
            try:
                offset = dir['offset']
                size = dir['size']
            except KeyError:
                return
            try:
                offset = int(offset) + header.base
                end = int(size) + offset
            except TypeError:
                self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
                return
            if not path:
                self.log_warn(F'not processing item at root with offset {offset} and size {size}')
                return
            yield UnpackResult(
                '/'.join(path),
                lambda a=offset, b=end: data[a:b],
                offset=offset
            )

        header = AsarHeader.Parse(data)
        self.log_debug(F'header read successfully, base offset is {header.base}.')
        yield from _unpack(header.directory)

    @classmethod
    def handles(cls, data) -> bool | None:
        return data[:4] == b'\04\0\0\0' and data[0x10:0x18] == B'{"files"'

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: bytearray):
    def _unpack(dir: JSONDict, *path):
        for name, listing in dir.get('files', {}).items():
            yield from _unpack(listing, *path, name)
        try:
            offset = dir['offset']
            size = dir['size']
        except KeyError:
            return
        try:
            offset = int(offset) + header.base
            end = int(size) + offset
        except TypeError:
            self.log_warn(F'unable to convert offset "{offset}" and size "{size}" to integers')
            return
        if not path:
            self.log_warn(F'not processing item at root with offset {offset} and size {size}')
            return
        yield UnpackResult(
            '/'.join(path),
            lambda a=offset, b=end: data[a:b],
            offset=offset
        )

    header = AsarHeader.Parse(data)
    self.log_debug(F'header read successfully, base offset is {header.base}.')
    yield from _unpack(header.directory)

Inherited members