Module refinery.lib.unrar.filters

RAR5 data filters (decompression post-processing) and RAR3 standard filter identification and execution.

Expand source code Browse git
"""
RAR5 data filters (decompression post-processing) and
RAR3 standard filter identification and execution.
"""
from __future__ import annotations

import enum
import struct

from dataclasses import dataclass


class FilterType(enum.IntEnum):
    FILTER_DELTA = 0
    FILTER_E8 = 1
    FILTER_E8E9 = 2
    FILTER_ARM = 3
    FILTER_AUDIO = 4
    FILTER_RGB = 5
    FILTER_ITANIUM = 6
    FILTER_PPM = 7
    FILTER_NONE = 8


@dataclass
class UnpackFilter:
    """
    A pending filter to be applied during decompression.
    """
    type: int = FilterType.FILTER_NONE
    block_start: int = 0
    block_length: int = 0
    channels: int = 0
    next_window: bool = False


def apply_filter(
    data: bytearray,
    filter_type: int,
    channels: int = 0,
    file_offset: int = 0,
) -> bytearray:
    """
    Apply a RAR5 filter to decompressed data.
    """
    if filter_type in (FilterType.FILTER_E8, FilterType.FILTER_E8E9):
        return _filter_e8(data, filter_type == FilterType.FILTER_E8E9, file_offset)
    if filter_type == FilterType.FILTER_ARM:
        return _filter_arm(data, file_offset)
    if filter_type == FilterType.FILTER_DELTA:
        return _filter_delta(data, channels)
    return data


def _filter_e8(data: bytearray, include_e9: bool, file_offset: int) -> bytearray:
    """
    x86 E8 (and optionally E9) call/jump address conversion filter: Converts relative call
    addresses to absolute addresses.
    """
    file_size = 0x1000000
    cur_pos = 0
    data_size = len(data)

    while cur_pos + 4 < data_size:
        b = data[cur_pos]
        cur_pos += 1
        if b == 0xE8 or (include_e9 and b == 0xE9):
            offset = (cur_pos + file_offset) % file_size
            addr = struct.unpack_from('<I', data, cur_pos)[0]
            if addr & 0x80000000:
                if (addr + offset) & 0x80000000 == 0:
                    struct.pack_into('<I', data, cur_pos, (addr + file_size) & 0xFFFFFFFF)
            else:
                if (addr - file_size) & 0x80000000:
                    struct.pack_into('<I', data, cur_pos, (addr - offset) & 0xFFFFFFFF)
            cur_pos += 4
    return data


def _filter_arm(data: bytearray, file_offset: int) -> bytearray:
    """
    ARM BL branch address conversion filter.
    """
    data_size = len(data)
    for cur_pos in range(0, data_size - 3, 4):
        if data[cur_pos + 3] == 0xEB:  # BL with Always condition
            offset = data[cur_pos] + data[cur_pos + 1] * 0x100 + data[cur_pos + 2] * 0x10000
            offset -= (file_offset + cur_pos) // 4
            data[cur_pos] = offset & 0xFF
            data[cur_pos + 1] = (offset >> 8) & 0xFF
            data[cur_pos + 2] = (offset >> 16) & 0xFF
    return data


def _filter_delta(data: bytearray, channels: int) -> bytearray:
    """
    Delta filter: channels bytes are grouped, then delta-decoded.
    """
    data_size = len(data)
    if channels < 1:
        return data
    dst = bytearray(data_size)
    src_pos = 0
    for cur_channel in range(channels):
        prev_byte = 0
        dest_pos = cur_channel
        while dest_pos < data_size:
            prev_byte = (prev_byte - data[src_pos]) & 0xFF
            dst[dest_pos] = prev_byte
            src_pos += 1
            dest_pos += channels
    return dst


class V3FilterType(enum.IntEnum):
    VMSF_NONE = 0
    VMSF_E8 = 1
    VMSF_E8E9 = 2
    VMSF_ITANIUM = 3
    VMSF_RGB = 4
    VMSF_AUDIO = 5
    VMSF_DELTA = 6


# RAR3 standard filter CRC32 fingerprints for identification.
# Instead of running the full VM, we identify standard filters by their
# bytecode CRC32 and execute native implementations.
_V3_FILTER_CRC = {
    0xAD576887: V3FilterType.VMSF_E8,
    0x3CD7E57E: V3FilterType.VMSF_E8E9,
    0x3769893F: V3FilterType.VMSF_ITANIUM,
    0x0E06077D: V3FilterType.VMSF_DELTA,
    0x1C2C5DC8: V3FilterType.VMSF_RGB,
    0xBC85E701: V3FilterType.VMSF_AUDIO,
}


def identify_v3_filter(code_crc: int):
    """
    Identify a RAR3 VM filter by its bytecode CRC32.
    """
    return _V3_FILTER_CRC.get(code_crc, V3FilterType.VMSF_NONE)


def execute_v3_filter(
    filter_type: int,
    data: bytearray,
    block_length: int,
    initial_register_values: list[int] | None = None
) -> bytearray:
    """
    Execute a RAR3 standard filter.
    """
    ir = initial_register_values
    if filter_type == V3FilterType.VMSF_E8:
        return _v3_filter_e8(data, block_length, False, ir)
    elif filter_type == V3FilterType.VMSF_E8E9:
        return _v3_filter_e8(data, block_length, True, ir)
    elif filter_type == V3FilterType.VMSF_ITANIUM:
        return _v3_filter_itanium(data, block_length, ir)
    elif filter_type == V3FilterType.VMSF_DELTA:
        return _v3_filter_delta(data, block_length, ir)
    elif filter_type == V3FilterType.VMSF_RGB:
        return _v3_filter_rgb(data, block_length, ir)
    elif filter_type == V3FilterType.VMSF_AUDIO:
        return _v3_filter_audio(data, block_length, ir)
    raise NotImplementedError(F'Non-standard RAR3 VM filter type: {filter_type}')


def _v3_filter_e8(
    data: bytearray,
    data_size: int,
    include_e9: bool,
    initial_register_values: list[int] | None = None
) -> bytearray:
    """
    RAR3 E8/E8E9 filter (same logic as RAR5 version).
    """
    file_size = 0x1000000
    file_offset = initial_register_values[6] if initial_register_values else 0
    cur_pos = 0
    while cur_pos + 4 < data_size:
        b = data[cur_pos]
        cur_pos += 1
        if b == 0xE8 or (include_e9 and b == 0xE9):
            offset = (cur_pos + file_offset) & 0xFFFFFFFF
            addr = struct.unpack_from('<I', data, cur_pos)[0]
            if addr & 0x80000000:
                if (addr + offset) & 0x80000000 == 0:
                    struct.pack_into('<I', data, cur_pos, (addr + file_size) & 0xFFFFFFFF)
            else:
                if (addr - file_size) & 0x80000000:
                    struct.pack_into('<I', data, cur_pos, (addr - offset) & 0xFFFFFFFF)
            cur_pos += 4
    return data


def _v3_filter_itanium(data: bytearray, data_size: int, init_r: list[int] | None) -> bytearray:
    """
    RAR3 Itanium filter: IA-64 branch address conversion.
    """
    file_offset = init_r[6] if init_r else 0
    aligned_size = data_size & ~0xF
    byte_masks = (4, 4, 6, 6, 0, 0, 7, 7, 4, 4, 0, 0, 4, 4, 0, 0)

    for i in range(0, aligned_size, 16):
        mask_index = data[i] & 0x1F
        if mask_index >= 16:
            continue
        cmd_mask = byte_masks[mask_index]
        if cmd_mask == 0:
            continue
        for j in range(3):
            if not (cmd_mask & (1 << j)):
                continue
            start_pos = i + 5 * j + 5
            if start_pos + 4 > data_size:
                break
            bit_pos = (start_pos & 0xF) * 8
            # Extract bits from the 128-bit bundle
            idx = start_pos & ~0xF
            if idx + 16 > len(data):
                break
            val = int.from_bytes(data[idx:idx + 16], 'little')
            op_code = (val >> bit_pos) & 0xFFFFFFFFFF  # 41-bit instruction
            if ((op_code >> 37) & 0xF) == 5:
                addr = (((op_code >> 13) & 0xFFFFF) | ((op_code >> 36) & 1) << 20) << 4
                addr -= file_offset + i
                addr = (addr >> 4) & 0x1FFFFF
                raw = op_code & ~(0x1FFFFF << 13)
                raw |= ((addr & 0xFFFFF) << 13) | ((addr >> 20) << 36)
                # Write back
                mask = ((1 << 41) - 1) << bit_pos
                val = (val & ~mask) | ((raw & ((1 << 41) - 1)) << bit_pos)
                data[idx:idx + 16] = val.to_bytes(16, 'little')
    return data


def _v3_filter_delta(data: bytearray, data_size: int, init_r: list[int] | None) -> bytearray:
    """
    RAR3 delta filter.
    """
    channels = init_r[0] if init_r else 1
    return _filter_delta(data[:data_size], channels)


def _v3_filter_rgb(data: bytearray, data_size: int, init_r: list[int] | None) -> bytearray:
    """
    RAR3 RGB delta filter.
    """
    width = (init_r[0] - 3) if init_r else 3
    pos_r = init_r[1] if init_r else 0
    channels = 3
    dst = bytearray(data_size)
    src_pos = 0
    for cur_channel in range(channels):
        prev_byte = 0
        for i in range(cur_channel, data_size, channels):
            predicted = prev_byte
            upper_pos = i - width
            if upper_pos >= channels:
                upper_left = dst[upper_pos - channels] if upper_pos >= channels else 0
                upper = dst[upper_pos]
                predicted = prev_byte + upper - upper_left
                pa = abs(predicted - prev_byte)
                pb = abs(predicted - upper)
                pc = abs(predicted - upper_left)
                if pa <= pb and pa <= pc:
                    predicted = prev_byte
                elif pb <= pc:
                    predicted = upper
                else:
                    predicted = upper_left
            if src_pos < data_size:
                prev_byte = (predicted - data[src_pos]) & 0xFF
                dst[i] = prev_byte
                src_pos += 1

    # Green-channel post-processing: add green to R and B
    if data_size >= 3:
        border = data_size - 2
        i = pos_r
        while i < border:
            g = dst[i + 1]
            dst[i + 0] = (dst[i + 0] + g) & 0xFF
            dst[i + 2] = (dst[i + 2] + g) & 0xFF
            i += 3

    return dst


def _v3_filter_audio(data: bytearray, data_size: int, init_r: list[int] | None) -> bytearray:
    """
    RAR3 audio delta filter with adaptive prediction.
    """
    channels = init_r[0] if init_r else 1
    if channels == 0:
        channels = 1
    dst = bytearray(data_size)
    src_pos = 0
    _U32 = 0xFFFFFFFF

    def _s32(v):
        v &= _U32
        return v - 0x100000000 if v >= 0x80000000 else v

    for cur_channel in range(channels):
        prev_byte = 0    # uint
        prev_delta = 0   # uint
        d1 = 0           # int
        d2 = 0           # int
        k1 = 0           # int
        k2 = 0           # int
        k3 = 0           # int
        dif = [0] * 7    # uint[7]
        byte_count = 0

        i = cur_channel
        while i < data_size:
            if src_pos >= data_size:
                break
            d3 = d2
            # C++: D2=PrevDelta-D1 (uint - int -> uint, stored in int)
            d2 = _s32(prev_delta - d1)
            d1 = _s32(prev_delta)

            # C++: uint Predicted=8*PrevByte+K1*D1+K2*D2+K3*D3;
            predicted = (8 * prev_byte + k1 * d1 + k2 * d2 + k3 * d3) & _U32
            # C++: Predicted=(Predicted>>3) & 0xff;  (unsigned right shift)
            predicted = (predicted >> 3) & 0xFF

            cur_byte = data[src_pos]
            src_pos += 1

            # C++: Predicted-=CurByte;  (uint -= uint, wraps)
            predicted = (predicted - cur_byte) & 0xFF
            dst[i] = predicted

            # C++: PrevDelta=(signed char)(Predicted-PrevByte);
            # (signed char) truncates to 8-bit signed, then assigned to uint
            delta = (predicted - prev_byte) & 0xFF
            if delta >= 128:
                prev_delta = (delta - 256) & _U32
            else:
                prev_delta = delta
            prev_byte = predicted

            # C++: int D=(signed char)CurByte; D=(uint)D<<3;
            d = cur_byte if cur_byte < 128 else cur_byte - 256
            d = _s32(d << 3)

            dif[0] = (dif[0] + abs(d)) & _U32
            dif[1] = (dif[1] + abs(d - d1)) & _U32
            dif[2] = (dif[2] + abs(d + d1)) & _U32
            dif[3] = (dif[3] + abs(d - d2)) & _U32
            dif[4] = (dif[4] + abs(d + d2)) & _U32
            dif[5] = (dif[5] + abs(d - d3)) & _U32
            dif[6] = (dif[6] + abs(d + d3)) & _U32

            if (byte_count & 0x1F) == 0:
                min_dif = dif[0]
                num_min_dif = 0
                dif[0] = 0
                for j in range(1, 7):
                    if dif[j] < min_dif:
                        min_dif = dif[j]
                        num_min_dif = j
                    dif[j] = 0
                if num_min_dif == 1:
                    if k1 >= -16:
                        k1 -= 1
                elif num_min_dif == 2:
                    if k1 < 16:
                        k1 += 1
                elif num_min_dif == 3:
                    if k2 >= -16:
                        k2 -= 1
                elif num_min_dif == 4:
                    if k2 < 16:
                        k2 += 1
                elif num_min_dif == 5:
                    if k3 >= -16:
                        k3 -= 1
                elif num_min_dif == 6:
                    if k3 < 16:
                        k3 += 1

            byte_count += 1
            i += channels
    return dst

Functions

def apply_filter(data, filter_type, channels=0, file_offset=0)

Apply a RAR5 filter to decompressed data.

Expand source code Browse git
def apply_filter(
    data: bytearray,
    filter_type: int,
    channels: int = 0,
    file_offset: int = 0,
) -> bytearray:
    """
    Apply a RAR5 filter to decompressed data.
    """
    if filter_type in (FilterType.FILTER_E8, FilterType.FILTER_E8E9):
        return _filter_e8(data, filter_type == FilterType.FILTER_E8E9, file_offset)
    if filter_type == FilterType.FILTER_ARM:
        return _filter_arm(data, file_offset)
    if filter_type == FilterType.FILTER_DELTA:
        return _filter_delta(data, channels)
    return data
def identify_v3_filter(code_crc)

Identify a RAR3 VM filter by its bytecode CRC32.

Expand source code Browse git
def identify_v3_filter(code_crc: int):
    """
    Identify a RAR3 VM filter by its bytecode CRC32.
    """
    return _V3_FILTER_CRC.get(code_crc, V3FilterType.VMSF_NONE)
def execute_v3_filter(filter_type, data, block_length, initial_register_values=None)

Execute a RAR3 standard filter.

Expand source code Browse git
def execute_v3_filter(
    filter_type: int,
    data: bytearray,
    block_length: int,
    initial_register_values: list[int] | None = None
) -> bytearray:
    """
    Execute a RAR3 standard filter.
    """
    ir = initial_register_values
    if filter_type == V3FilterType.VMSF_E8:
        return _v3_filter_e8(data, block_length, False, ir)
    elif filter_type == V3FilterType.VMSF_E8E9:
        return _v3_filter_e8(data, block_length, True, ir)
    elif filter_type == V3FilterType.VMSF_ITANIUM:
        return _v3_filter_itanium(data, block_length, ir)
    elif filter_type == V3FilterType.VMSF_DELTA:
        return _v3_filter_delta(data, block_length, ir)
    elif filter_type == V3FilterType.VMSF_RGB:
        return _v3_filter_rgb(data, block_length, ir)
    elif filter_type == V3FilterType.VMSF_AUDIO:
        return _v3_filter_audio(data, block_length, ir)
    raise NotImplementedError(F'Non-standard RAR3 VM filter type: {filter_type}')

Classes

class FilterType (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class FilterType(enum.IntEnum):
    FILTER_DELTA = 0
    FILTER_E8 = 1
    FILTER_E8E9 = 2
    FILTER_ARM = 3
    FILTER_AUDIO = 4
    FILTER_RGB = 5
    FILTER_ITANIUM = 6
    FILTER_PPM = 7
    FILTER_NONE = 8

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var FILTER_DELTA

The type of the None singleton.

var FILTER_E8

The type of the None singleton.

var FILTER_E8E9

The type of the None singleton.

var FILTER_ARM

The type of the None singleton.

var FILTER_AUDIO

The type of the None singleton.

var FILTER_RGB

The type of the None singleton.

var FILTER_ITANIUM

The type of the None singleton.

var FILTER_PPM

The type of the None singleton.

var FILTER_NONE

The type of the None singleton.

class UnpackFilter (type=8, block_start=0, block_length=0, channels=0, next_window=False)

A pending filter to be applied during decompression.

Expand source code Browse git
@dataclass
class UnpackFilter:
    """
    A pending filter to be applied during decompression.
    """
    type: int = FilterType.FILTER_NONE
    block_start: int = 0
    block_length: int = 0
    channels: int = 0
    next_window: bool = False

Instance variables

var type

The type of the None singleton.

var block_start

The type of the None singleton.

var block_length

The type of the None singleton.

var channels

The type of the None singleton.

var next_window

The type of the None singleton.

class V3FilterType (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class V3FilterType(enum.IntEnum):
    VMSF_NONE = 0
    VMSF_E8 = 1
    VMSF_E8E9 = 2
    VMSF_ITANIUM = 3
    VMSF_RGB = 4
    VMSF_AUDIO = 5
    VMSF_DELTA = 6

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var VMSF_NONE

The type of the None singleton.

var VMSF_E8

The type of the None singleton.

var VMSF_E8E9

The type of the None singleton.

var VMSF_ITANIUM

The type of the None singleton.

var VMSF_RGB

The type of the None singleton.

var VMSF_AUDIO

The type of the None singleton.

var VMSF_DELTA

The type of the None singleton.