Module refinery.units.formats.archive.xtnode

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Iterable, Optional, Dict

import re
import json

from refinery.units.formats.archive import Arg, ArchiveUnit, UnpackResult
from refinery.units.encoding.esc import esc
from refinery.lib.structures import StructReader
from refinery.lib.patterns import formats
from refinery.lib.types import ByteStr, JSON
from refinery.units.pattern.carve_json import JSONCarver


class JSONReader(StructReader):

    def read_string(self) -> Optional[str]:
        quote = self.u8()
        value = bytearray()
        if quote not in B'\"\'':
            raise RuntimeError('trying to read a string, but no quote character was found')
        escaped = False
        while True:
            char = self.u8()
            if escaped:
                escaped = False
            elif char == B'\\':
                escaped = True
            elif char == quote:
                break
            value.append(char)
        return value | esc | str

    def read_json(self) -> Optional[JSON]:
        while self.u8() not in b'[{':
            pass
        self.seekrel(-1)
        end = JSONCarver.find_end(self._data, self._cursor)
        if end is None:
            return None
        data = self._data[self._cursor:end]
        self._cursor = end
        if isinstance(data, memoryview):
            data = bytes(data)
        return json.loads(data)

    def skip_comma(self):
        while self.u8() in b'\n\t\r\f\v\x20,':
            pass
        self.seekrel(-1)
        return self


class xtnode(ArchiveUnit):
    """
    Extracts and decompiles files from compiled Node.Js applications. Supports both nexe and pkg, two
    utilities that are commonly used to generate stand-alone executables.
    """

    _NEXE_SENTINEL = B'<nexe~~sentinel>'
    _PKG_PAYLOAD_P = B'PAYLOAD_POSITION'
    _PKG_PAYLOAD_S = B'PAYLOAD_SIZE'
    _PKG_PRELUDE_P = B'PRELUDE_POSITION'
    _PKG_PRELUDE_S = B'PRELUDE_SIZE'
    _PKG_COMMON_JS = B'sourceMappingURL=common.js.map'

    def __init__(
        self, *paths, entry: Arg.Switch('-u', help='Only extract the entry point.') = False,
        list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
    ):
        super().__init__(*paths, entry=entry,
            list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex,
            path=path, date=date)

    def unpack(self, data: ByteStr) -> Iterable[UnpackResult]:
        if self._is_nexe(data):
            self.log_info('unpacking as nexe')
            yield from self._unpack_nexe(data)
            return
        if self._is_pkg(data):
            self.log_info('unpacking as pkg')
            yield from self._unpack_pkg(data)
            return

    def _unpack_nexe(self, data: ByteStr):
        try:
            ep = re.compile(
                RB"entry\s*=\s*path\.resolve\(path\.dirname\(process\.execPath\),\s*(%s)\)" % formats.string)
            ep, = ep.finditer(data)
        except Exception:
            ep = None
            self.log_info('could not identify entry point')
        else:
            ep = ep.group(1) | esc(quoted=True) | str
            self.log_info(F'entry point: {ep}')
        view = memoryview(data)
        for marker in re.finditer(re.escape(self._NEXE_SENTINEL), data):
            end = marker.end() + 16
            sizes = data[marker.end():end]
            if sizes.startswith(b"')"):
                continue
            reader = StructReader(sizes)
            code_size = int(reader.f64())
            blob_size = int(reader.f64())
            start = marker.start() - code_size - blob_size
            try:
                reader = StructReader(view[start:end])
                code = reader.read_exactly(code_size)
                blob = reader.read_exactly(blob_size)
            except EOFError:
                self.log_debug(F'found marker at 0x{marker.start():X}, but failed to read data')
                continue
            else:
                self.log_debug(F'found marker at 0x{marker.start():X}, data start at {start:X}')
            for rsrc in re.finditer(RB'process\.__nexe\s*=', code):
                rsrc = JSONReader(code[rsrc.end():])
                rsrc = rsrc.read_json()
                if len(rsrc) == 1:
                    _, rsrc = rsrc.popitem()
                for path, (offset, length) in rsrc.items():
                    end = offset + length
                    if ep and self.args.entry and path != ep:
                        continue
                    yield UnpackResult(path, blob[offset:end])

    def _unpack_pkg(self, data: ByteStr):
        def _extract_coordinates(*v: bytes):
            for name in v:
                pattern = name + BR'''\s{0,3}=\s{0,3}(['"])([\s\d]+)\1'''
                value, = re.finditer(pattern, data)
                yield int(value.group(2).decode('utf8').strip(), 0)

        def _extract_data(*v: bytes):
            try:
                offset, length = _extract_coordinates(*v)
            except Exception:
                return None
            return data[offset:offset + length]

        payload = _extract_data(self._PKG_PAYLOAD_P, self._PKG_PAYLOAD_S)
        if not payload:
            raise ValueError('unable to extract payload')
        prelude = _extract_data(self._PKG_PRELUDE_P, self._PKG_PRELUDE_S)
        if not prelude:
            raise ValueError('unable to extract prelude')
        mapping = re.search(re.escape(self._PKG_COMMON_JS) + BR'\s*\},\s*\{', prelude)
        if not mapping:
            raise ValueError('unable to find common.js mapping')

        reader = JSONReader(prelude[mapping.end() - 1:])

        files: Dict[str, dict] = reader.read_json()

        if files is None:
            raise ValueError('failed to read file list')

        entry = reader.skip_comma().read_string()
        links = reader.skip_comma().read_json()

        # _unknown1 = reader.skip_comma().read_json()
        # _unknown2 = reader.skip_comma().read_terminated_array(B')').strip()

        root = next(iter(files))
        skip = 0
        view = memoryview(payload)

        for k in range(len(root) + 1):
            test = root[:k].rstrip('/').rstrip('\\')
            if not all(path.startswith(test) for path in files):
                root = test[:-1]
                skip = k - 1
                break

        entry = entry[skip:]
        self.log_info(F'detected root directory {root}, entry point is {entry}')

        for src, dst in links.items():
            new_files = {}
            self.log_info('link src:', src[skip:])
            self.log_info('link dst:', dst[skip:])
            for path, location in files.items():
                if not path.startswith(src):
                    continue
                new_path = dst + path[len(src):]
                new_files[new_path] = location
                self.log_debug('synthesizing linked file:', new_path)
            files.update(new_files)

        for path, location in files.items():
            path = path[skip:]
            if entry and self.args.entry and path != entry:
                continue
            data = None
            for kind, (offset, length) in location.items():
                stop = offset + length
                if kind == '3':  # metadata
                    continue
                if kind == '2':  # unknown
                    continue
                if kind in '01':
                    data = view[offset:stop]
            if data is not None:
                yield UnpackResult(path, data)

    @classmethod
    def _is_nexe(cls, data: ByteStr) -> bool:
        return cls._NEXE_SENTINEL in data

    @classmethod
    def _is_pkg(cls, data: ByteStr) -> bool:
        if cls._PKG_PAYLOAD_P not in data:
            return False
        if cls._PKG_PAYLOAD_S not in data:
            return False
        if cls._PKG_PRELUDE_P not in data:
            return False
        if cls._PKG_PRELUDE_S not in data:
            return False
        if cls._PKG_COMMON_JS not in data:
            return False
        return True

    @classmethod
    def handles(cls, data: ByteStr) -> Optional[bool]:
        return cls._is_nexe(data) or cls._is_pkg(data)

Classes

class JSONReader (data, bigendian=False)

An extension of a MemoryFile which provides methods to read structured data.

Expand source code Browse git
class JSONReader(StructReader):

    def read_string(self) -> Optional[str]:
        quote = self.u8()
        value = bytearray()
        if quote not in B'\"\'':
            raise RuntimeError('trying to read a string, but no quote character was found')
        escaped = False
        while True:
            char = self.u8()
            if escaped:
                escaped = False
            elif char == B'\\':
                escaped = True
            elif char == quote:
                break
            value.append(char)
        return value | esc | str

    def read_json(self) -> Optional[JSON]:
        while self.u8() not in b'[{':
            pass
        self.seekrel(-1)
        end = JSONCarver.find_end(self._data, self._cursor)
        if end is None:
            return None
        data = self._data[self._cursor:end]
        self._cursor = end
        if isinstance(data, memoryview):
            data = bytes(data)
        return json.loads(data)

    def skip_comma(self):
        while self.u8() in b'\n\t\r\f\v\x20,':
            pass
        self.seekrel(-1)
        return self

Ancestors

Class variables

var read_as_bytes

Instance variables

var closed
Expand source code Browse git
@property
def closed(self) -> bool:
    return self._closed

Methods

def read_string(self)
Expand source code Browse git
def read_string(self) -> Optional[str]:
    quote = self.u8()
    value = bytearray()
    if quote not in B'\"\'':
        raise RuntimeError('trying to read a string, but no quote character was found')
    escaped = False
    while True:
        char = self.u8()
        if escaped:
            escaped = False
        elif char == B'\\':
            escaped = True
        elif char == quote:
            break
        value.append(char)
    return value | esc | str
def read_json(self)
Expand source code Browse git
def read_json(self) -> Optional[JSON]:
    while self.u8() not in b'[{':
        pass
    self.seekrel(-1)
    end = JSONCarver.find_end(self._data, self._cursor)
    if end is None:
        return None
    data = self._data[self._cursor:end]
    self._cursor = end
    if isinstance(data, memoryview):
        data = bytes(data)
    return json.loads(data)
def skip_comma(self)
Expand source code Browse git
def skip_comma(self):
    while self.u8() in b'\n\t\r\f\v\x20,':
        pass
    self.seekrel(-1)
    return self

Inherited members

class xtnode (*paths, entry=False, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path', date=b'date')

Extracts and decompiles files from compiled Node.Js applications. Supports both nexe and pkg, two utilities that are commonly used to generate stand-alone executables.

Expand source code Browse git
class xtnode(ArchiveUnit):
    """
    Extracts and decompiles files from compiled Node.Js applications. Supports both nexe and pkg, two
    utilities that are commonly used to generate stand-alone executables.
    """

    _NEXE_SENTINEL = B'<nexe~~sentinel>'
    _PKG_PAYLOAD_P = B'PAYLOAD_POSITION'
    _PKG_PAYLOAD_S = B'PAYLOAD_SIZE'
    _PKG_PRELUDE_P = B'PRELUDE_POSITION'
    _PKG_PRELUDE_S = B'PRELUDE_SIZE'
    _PKG_COMMON_JS = B'sourceMappingURL=common.js.map'

    def __init__(
        self, *paths, entry: Arg.Switch('-u', help='Only extract the entry point.') = False,
        list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path', date=b'date',
    ):
        super().__init__(*paths, entry=entry,
            list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex,
            path=path, date=date)

    def unpack(self, data: ByteStr) -> Iterable[UnpackResult]:
        if self._is_nexe(data):
            self.log_info('unpacking as nexe')
            yield from self._unpack_nexe(data)
            return
        if self._is_pkg(data):
            self.log_info('unpacking as pkg')
            yield from self._unpack_pkg(data)
            return

    def _unpack_nexe(self, data: ByteStr):
        try:
            ep = re.compile(
                RB"entry\s*=\s*path\.resolve\(path\.dirname\(process\.execPath\),\s*(%s)\)" % formats.string)
            ep, = ep.finditer(data)
        except Exception:
            ep = None
            self.log_info('could not identify entry point')
        else:
            ep = ep.group(1) | esc(quoted=True) | str
            self.log_info(F'entry point: {ep}')
        view = memoryview(data)
        for marker in re.finditer(re.escape(self._NEXE_SENTINEL), data):
            end = marker.end() + 16
            sizes = data[marker.end():end]
            if sizes.startswith(b"')"):
                continue
            reader = StructReader(sizes)
            code_size = int(reader.f64())
            blob_size = int(reader.f64())
            start = marker.start() - code_size - blob_size
            try:
                reader = StructReader(view[start:end])
                code = reader.read_exactly(code_size)
                blob = reader.read_exactly(blob_size)
            except EOFError:
                self.log_debug(F'found marker at 0x{marker.start():X}, but failed to read data')
                continue
            else:
                self.log_debug(F'found marker at 0x{marker.start():X}, data start at {start:X}')
            for rsrc in re.finditer(RB'process\.__nexe\s*=', code):
                rsrc = JSONReader(code[rsrc.end():])
                rsrc = rsrc.read_json()
                if len(rsrc) == 1:
                    _, rsrc = rsrc.popitem()
                for path, (offset, length) in rsrc.items():
                    end = offset + length
                    if ep and self.args.entry and path != ep:
                        continue
                    yield UnpackResult(path, blob[offset:end])

    def _unpack_pkg(self, data: ByteStr):
        def _extract_coordinates(*v: bytes):
            for name in v:
                pattern = name + BR'''\s{0,3}=\s{0,3}(['"])([\s\d]+)\1'''
                value, = re.finditer(pattern, data)
                yield int(value.group(2).decode('utf8').strip(), 0)

        def _extract_data(*v: bytes):
            try:
                offset, length = _extract_coordinates(*v)
            except Exception:
                return None
            return data[offset:offset + length]

        payload = _extract_data(self._PKG_PAYLOAD_P, self._PKG_PAYLOAD_S)
        if not payload:
            raise ValueError('unable to extract payload')
        prelude = _extract_data(self._PKG_PRELUDE_P, self._PKG_PRELUDE_S)
        if not prelude:
            raise ValueError('unable to extract prelude')
        mapping = re.search(re.escape(self._PKG_COMMON_JS) + BR'\s*\},\s*\{', prelude)
        if not mapping:
            raise ValueError('unable to find common.js mapping')

        reader = JSONReader(prelude[mapping.end() - 1:])

        files: Dict[str, dict] = reader.read_json()

        if files is None:
            raise ValueError('failed to read file list')

        entry = reader.skip_comma().read_string()
        links = reader.skip_comma().read_json()

        # _unknown1 = reader.skip_comma().read_json()
        # _unknown2 = reader.skip_comma().read_terminated_array(B')').strip()

        root = next(iter(files))
        skip = 0
        view = memoryview(payload)

        for k in range(len(root) + 1):
            test = root[:k].rstrip('/').rstrip('\\')
            if not all(path.startswith(test) for path in files):
                root = test[:-1]
                skip = k - 1
                break

        entry = entry[skip:]
        self.log_info(F'detected root directory {root}, entry point is {entry}')

        for src, dst in links.items():
            new_files = {}
            self.log_info('link src:', src[skip:])
            self.log_info('link dst:', dst[skip:])
            for path, location in files.items():
                if not path.startswith(src):
                    continue
                new_path = dst + path[len(src):]
                new_files[new_path] = location
                self.log_debug('synthesizing linked file:', new_path)
            files.update(new_files)

        for path, location in files.items():
            path = path[skip:]
            if entry and self.args.entry and path != entry:
                continue
            data = None
            for kind, (offset, length) in location.items():
                stop = offset + length
                if kind == '3':  # metadata
                    continue
                if kind == '2':  # unknown
                    continue
                if kind in '01':
                    data = view[offset:stop]
            if data is not None:
                yield UnpackResult(path, data)

    @classmethod
    def _is_nexe(cls, data: ByteStr) -> bool:
        return cls._NEXE_SENTINEL in data

    @classmethod
    def _is_pkg(cls, data: ByteStr) -> bool:
        if cls._PKG_PAYLOAD_P not in data:
            return False
        if cls._PKG_PAYLOAD_S not in data:
            return False
        if cls._PKG_PRELUDE_P not in data:
            return False
        if cls._PKG_PRELUDE_S not in data:
            return False
        if cls._PKG_COMMON_JS not in data:
            return False
        return True

    @classmethod
    def handles(cls, data: ByteStr) -> Optional[bool]:
        return cls._is_nexe(data) or cls._is_pkg(data)

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data: ByteStr) -> Iterable[UnpackResult]:
    if self._is_nexe(data):
        self.log_info('unpacking as nexe')
        yield from self._unpack_nexe(data)
        return
    if self._is_pkg(data):
        self.log_info('unpacking as pkg')
        yield from self._unpack_pkg(data)
        return

Inherited members