Module refinery.lib.unrar.crc

CRC32, BLAKE2sp, and hash verification for RAR archives.

Expand source code Browse git
"""
CRC32, BLAKE2sp, and hash verification for RAR archives.
"""
from __future__ import annotations

import hashlib
import hmac
import struct
import zlib

from refinery.lib.types import buf
from refinery.lib.unrar.headers import HashType


def crc32(data: bytes | memoryview, init: int = 0) -> int:
    """
    Compute CRC32 consistent with unrar conventions.
    """
    return zlib.crc32(data, init) & 0xFFFFFFFF


def checksum14(data: bytes | memoryview, init: int = 0) -> int:
    """
    RAR 1.4 16-bit checksum.
    """
    crc = init & 0xFFFF
    for b in data:
        crc = (crc + b) & 0xFFFF
        crc = ((crc << 1) | (crc >> 15)) & 0xFFFF
    return crc


_crc_table: list[int] | None = None


def _init_crc_table() -> list[int]:
    global _crc_table
    if _crc_table is not None:
        return _crc_table
    table = [0] * 256
    for i in range(256):
        c = i
        for _ in range(8):
            if c & 1:
                c = (c >> 1) ^ 0xEDB88320
            else:
                c >>= 1
        table[i] = c & 0xFFFFFFFF
    _crc_table = table
    return table


def crc_table() -> list[int]:
    """
    Return the standard CRC32 lookup table used by legacy encryption.
    """
    return _init_crc_table()


BLAKE2S_BLOCKBYTES = 64
BLAKE2S_OUTBYTES = 32
PARALLELISM_DEGREE = 8

BLAKE2S_IV = (
    0x6A09E667, 0xBB67AE85, 0x3C6EF372, 0xA54FF53A,
    0x510E527F, 0x9B05688C, 0x1F83D9AB, 0x5BE0CD19,
)

BLAKE2S_SIGMA = (
    (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15),
    (14, 10, 4, 8, 9, 15, 13, 6, 1, 12, 0, 2, 11, 7, 5, 3),
    (11, 8, 12, 0, 5, 2, 15, 13, 10, 14, 3, 6, 7, 1, 9, 4),
    (7, 9, 3, 1, 13, 12, 11, 14, 2, 6, 5, 10, 4, 0, 15, 8),
    (9, 0, 5, 7, 2, 4, 10, 15, 14, 1, 11, 12, 6, 8, 3, 13),
    (2, 12, 6, 10, 0, 11, 8, 3, 4, 13, 7, 5, 15, 14, 1, 9),
    (12, 5, 1, 15, 14, 13, 4, 10, 0, 7, 6, 3, 9, 2, 8, 11),
    (13, 11, 7, 14, 12, 1, 3, 9, 5, 0, 15, 4, 8, 6, 2, 10),
    (6, 15, 14, 9, 11, 3, 0, 8, 12, 2, 13, 7, 1, 4, 10, 5),
    (10, 2, 8, 4, 7, 6, 1, 5, 15, 11, 9, 14, 3, 12, 13, 0),
)

_M32 = 0xFFFFFFFF


def _rotr32(x: int, n: int) -> int:
    return ((x >> n) | (x << (32 - n))) & _M32


class _Blake2sState:
    t: list[int]
    h: list[int]
    f: list[int]

    def __init__(self):
        self.h = list(BLAKE2S_IV)
        self.t = [0, 0]
        self.f = [0, 0]
        self.buf = bytearray(2 * BLAKE2S_BLOCKBYTES)
        self.buflen = 0
        self.last_node = 0

    def init_param(self, node_offset: int, node_depth: int):
        self.h = list(BLAKE2S_IV)
        self.t = [0, 0]
        self.f = [0, 0]
        self.buf = bytearray(2 * BLAKE2S_BLOCKBYTES)
        self.buflen = 0
        self.h[0] ^= 0x02080020
        self.h[2] ^= node_offset & _M32
        self.h[3] ^= ((node_depth << 16) | 0x20000000) & _M32

    def _compress(self, block: bytes | bytearray | memoryview):
        m = list(struct.unpack_from('<16I', block))
        v = list(self.h) + list(BLAKE2S_IV)
        v[12] ^= self.t[0]
        v[13] ^= self.t[1]
        v[14] ^= self.f[0]
        v[15] ^= self.f[1]

        for r in range(10):
            s = BLAKE2S_SIGMA[r]
            v[0] = (v[0] + v[4] + m[s[0]]) & _M32
            v[12] = _rotr32(v[12] ^ v[0], 16)
            v[8] = (v[8] + v[12]) & _M32
            v[4] = _rotr32(v[4] ^ v[8], 12)
            v[0] = (v[0] + v[4] + m[s[1]]) & _M32
            v[12] = _rotr32(v[12] ^ v[0], 8)
            v[8] = (v[8] + v[12]) & _M32
            v[4] = _rotr32(v[4] ^ v[8], 7)

            v[1] = (v[1] + v[5] + m[s[2]]) & _M32
            v[13] = _rotr32(v[13] ^ v[1], 16)
            v[9] = (v[9] + v[13]) & _M32
            v[5] = _rotr32(v[5] ^ v[9], 12)
            v[1] = (v[1] + v[5] + m[s[3]]) & _M32
            v[13] = _rotr32(v[13] ^ v[1], 8)
            v[9] = (v[9] + v[13]) & _M32
            v[5] = _rotr32(v[5] ^ v[9], 7)

            v[2] = (v[2] + v[6] + m[s[4]]) & _M32
            v[14] = _rotr32(v[14] ^ v[2], 16)
            v[10] = (v[10] + v[14]) & _M32
            v[6] = _rotr32(v[6] ^ v[10], 12)
            v[2] = (v[2] + v[6] + m[s[5]]) & _M32
            v[14] = _rotr32(v[14] ^ v[2], 8)
            v[10] = (v[10] + v[14]) & _M32
            v[6] = _rotr32(v[6] ^ v[10], 7)

            v[3] = (v[3] + v[7] + m[s[6]]) & _M32
            v[15] = _rotr32(v[15] ^ v[3], 16)
            v[11] = (v[11] + v[15]) & _M32
            v[7] = _rotr32(v[7] ^ v[11], 12)
            v[3] = (v[3] + v[7] + m[s[7]]) & _M32
            v[15] = _rotr32(v[15] ^ v[3], 8)
            v[11] = (v[11] + v[15]) & _M32
            v[7] = _rotr32(v[7] ^ v[11], 7)

            v[0] = (v[0] + v[5] + m[s[8]]) & _M32
            v[15] = _rotr32(v[15] ^ v[0], 16)
            v[10] = (v[10] + v[15]) & _M32
            v[5] = _rotr32(v[5] ^ v[10], 12)
            v[0] = (v[0] + v[5] + m[s[9]]) & _M32
            v[15] = _rotr32(v[15] ^ v[0], 8)
            v[10] = (v[10] + v[15]) & _M32
            v[5] = _rotr32(v[5] ^ v[10], 7)

            v[1] = (v[1] + v[6] + m[s[10]]) & _M32
            v[12] = _rotr32(v[12] ^ v[1], 16)
            v[11] = (v[11] + v[12]) & _M32
            v[6] = _rotr32(v[6] ^ v[11], 12)
            v[1] = (v[1] + v[6] + m[s[11]]) & _M32
            v[12] = _rotr32(v[12] ^ v[1], 8)
            v[11] = (v[11] + v[12]) & _M32
            v[6] = _rotr32(v[6] ^ v[11], 7)

            v[2] = (v[2] + v[7] + m[s[12]]) & _M32
            v[13] = _rotr32(v[13] ^ v[2], 16)
            v[8] = (v[8] + v[13]) & _M32
            v[7] = _rotr32(v[7] ^ v[8], 12)
            v[2] = (v[2] + v[7] + m[s[13]]) & _M32
            v[13] = _rotr32(v[13] ^ v[2], 8)
            v[8] = (v[8] + v[13]) & _M32
            v[7] = _rotr32(v[7] ^ v[8], 7)

            v[3] = (v[3] + v[4] + m[s[14]]) & _M32
            v[14] = _rotr32(v[14] ^ v[3], 16)
            v[9] = (v[9] + v[14]) & _M32
            v[4] = _rotr32(v[4] ^ v[9], 12)
            v[3] = (v[3] + v[4] + m[s[15]]) & _M32
            v[14] = _rotr32(v[14] ^ v[3], 8)
            v[9] = (v[9] + v[14]) & _M32
            v[4] = _rotr32(v[4] ^ v[9], 7)

        for i in range(8):
            self.h[i] = (self.h[i] ^ v[i] ^ v[i + 8]) & _M32

    def _increment_counter(self, inc: int):
        self.t[0] = (self.t[0] + inc) & _M32
        if self.t[0] < inc:
            self.t[1] = (self.t[1] + 1) & _M32

    def _set_lastblock(self):
        if self.last_node:
            self.f[1] = _M32
        self.f[0] = _M32

    def update(self, data: bytes | memoryview):
        offset = 0
        inlen = len(data)
        while inlen > 0:
            left = self.buflen
            fill = 2 * BLAKE2S_BLOCKBYTES - left
            if inlen > fill:
                self.buf[left:left + fill] = data[offset:offset + fill]
                self.buflen += fill
                self._increment_counter(BLAKE2S_BLOCKBYTES)
                self._compress(self.buf[:BLAKE2S_BLOCKBYTES])
                self.buf[:BLAKE2S_BLOCKBYTES] = self.buf[BLAKE2S_BLOCKBYTES:2 * BLAKE2S_BLOCKBYTES]
                self.buflen -= BLAKE2S_BLOCKBYTES
                offset += fill
                inlen -= fill
            else:
                self.buf[left:left + inlen] = data[offset:offset + inlen]
                self.buflen += inlen
                break

    def final(self) -> bytes:
        if self.buflen > BLAKE2S_BLOCKBYTES:
            self._increment_counter(BLAKE2S_BLOCKBYTES)
            self._compress(self.buf[:BLAKE2S_BLOCKBYTES])
            self.buflen -= BLAKE2S_BLOCKBYTES
            self.buf[:self.buflen] = self.buf[BLAKE2S_BLOCKBYTES:BLAKE2S_BLOCKBYTES + self.buflen]

        self._increment_counter(self.buflen)
        self._set_lastblock()
        pad_len = 2 * BLAKE2S_BLOCKBYTES - self.buflen
        self.buf[self.buflen:self.buflen + pad_len] = b'\x00' * pad_len
        self._compress(self.buf[:BLAKE2S_BLOCKBYTES])

        return struct.pack('<8I', *self.h)


class Blake2sp:
    """
    BLAKE2sp: 8-way parallel BLAKE2s tree hash.
    """
    def __init__(self):
        self._root = _Blake2sState()
        self._root.init_param(0, 1)  # root node
        self._leaves = []
        for i in range(PARALLELISM_DEGREE):
            s = _Blake2sState()
            s.init_param(i, 0)  # leaf node
            self._leaves.append(s)
        self._root.last_node = 1
        self._leaves[PARALLELISM_DEGREE - 1].last_node = 1
        self._buf = bytearray(PARALLELISM_DEGREE * BLAKE2S_BLOCKBYTES)
        self._buflen = 0

    def update(self, data: bytes | memoryview):
        offset = 0
        inlen = len(data)
        left = self._buflen
        fill = len(self._buf) - left

        if left and inlen >= fill:
            self._buf[left:left + fill] = data[offset:offset + fill]
            for i in range(PARALLELISM_DEGREE):
                start = i * BLAKE2S_BLOCKBYTES
                self._leaves[i].update(self._buf[start:start + BLAKE2S_BLOCKBYTES])
            offset += fill
            inlen -= fill
            left = 0

        block_set = PARALLELISM_DEGREE * BLAKE2S_BLOCKBYTES
        while inlen >= block_set:
            for i in range(PARALLELISM_DEGREE):
                start = offset + i * BLAKE2S_BLOCKBYTES
                self._leaves[i].update(data[start:start + BLAKE2S_BLOCKBYTES])
            offset += block_set
            inlen -= block_set

        if inlen > 0:
            self._buf[left:left + inlen] = data[offset:offset + inlen]
        self._buflen = left + inlen

    def digest(self) -> bytes:
        hashes = []
        for i in range(PARALLELISM_DEGREE):
            leaf = _Blake2sState()
            leaf.h = list(self._leaves[i].h)
            leaf.t = list(self._leaves[i].t)
            leaf.f = list(self._leaves[i].f)
            leaf.buf = bytearray(self._leaves[i].buf)
            leaf.buflen = self._leaves[i].buflen
            leaf.last_node = self._leaves[i].last_node

            if self._buflen > i * BLAKE2S_BLOCKBYTES:
                remaining = self._buflen - i * BLAKE2S_BLOCKBYTES
                if remaining > BLAKE2S_BLOCKBYTES:
                    remaining = BLAKE2S_BLOCKBYTES
                start = i * BLAKE2S_BLOCKBYTES
                leaf.update(self._buf[start:start + remaining])
            hashes.append(leaf.final())

        root = _Blake2sState()
        root.init_param(0, 1)
        root.last_node = 1
        for h in hashes:
            root.update(h)
        return root.final()


def blake2sp_hash(data: buf) -> bytes:
    """
    Compute BLAKE2sp hash of data (32 bytes output).
    """
    h = Blake2sp()
    h.update(data)
    return h.digest()


def convert_hash_to_mac(
    hash_type: int,
    hash_key: bytes,
    crc_value: int = 0,
    digest: bytes = b''
) -> tuple[int, bytes]:
    """
    Convert a hash result to HMAC-based MAC for encrypted RAR5 file verification.
    """
    if hash_type == HashType.HASH_CRC32:
        data = struct.pack('<I', crc_value)
        mac = hmac.new(hash_key, data, hashlib.sha256).digest()
        result = 0
        for i in range(32):
            result ^= mac[i] << ((i & 3) * 8)
        return result & 0xFFFFFFFF, b''

    if hash_type == HashType.HASH_BLAKE2:
        mac = hmac.new(hash_key, digest, hashlib.sha256).digest()
        return 0, mac

    return 0, b''

Functions

def crc32(data, init=0)

Compute CRC32 consistent with unrar conventions.

Expand source code Browse git
def crc32(data: bytes | memoryview, init: int = 0) -> int:
    """
    Compute CRC32 consistent with unrar conventions.
    """
    return zlib.crc32(data, init) & 0xFFFFFFFF
def checksum14(data, init=0)

RAR 1.4 16-bit checksum.

Expand source code Browse git
def checksum14(data: bytes | memoryview, init: int = 0) -> int:
    """
    RAR 1.4 16-bit checksum.
    """
    crc = init & 0xFFFF
    for b in data:
        crc = (crc + b) & 0xFFFF
        crc = ((crc << 1) | (crc >> 15)) & 0xFFFF
    return crc
def crc_table()

Return the standard CRC32 lookup table used by legacy encryption.

Expand source code Browse git
def crc_table() -> list[int]:
    """
    Return the standard CRC32 lookup table used by legacy encryption.
    """
    return _init_crc_table()
def blake2sp_hash(data)

Compute BLAKE2sp hash of data (32 bytes output).

Expand source code Browse git
def blake2sp_hash(data: buf) -> bytes:
    """
    Compute BLAKE2sp hash of data (32 bytes output).
    """
    h = Blake2sp()
    h.update(data)
    return h.digest()
def convert_hash_to_mac(hash_type, hash_key, crc_value=0, digest=b'')

Convert a hash result to HMAC-based MAC for encrypted RAR5 file verification.

Expand source code Browse git
def convert_hash_to_mac(
    hash_type: int,
    hash_key: bytes,
    crc_value: int = 0,
    digest: bytes = b''
) -> tuple[int, bytes]:
    """
    Convert a hash result to HMAC-based MAC for encrypted RAR5 file verification.
    """
    if hash_type == HashType.HASH_CRC32:
        data = struct.pack('<I', crc_value)
        mac = hmac.new(hash_key, data, hashlib.sha256).digest()
        result = 0
        for i in range(32):
            result ^= mac[i] << ((i & 3) * 8)
        return result & 0xFFFFFFFF, b''

    if hash_type == HashType.HASH_BLAKE2:
        mac = hmac.new(hash_key, digest, hashlib.sha256).digest()
        return 0, mac

    return 0, b''

Classes

class Blake2sp

BLAKE2sp: 8-way parallel BLAKE2s tree hash.

Expand source code Browse git
class Blake2sp:
    """
    BLAKE2sp: 8-way parallel BLAKE2s tree hash.
    """
    def __init__(self):
        self._root = _Blake2sState()
        self._root.init_param(0, 1)  # root node
        self._leaves = []
        for i in range(PARALLELISM_DEGREE):
            s = _Blake2sState()
            s.init_param(i, 0)  # leaf node
            self._leaves.append(s)
        self._root.last_node = 1
        self._leaves[PARALLELISM_DEGREE - 1].last_node = 1
        self._buf = bytearray(PARALLELISM_DEGREE * BLAKE2S_BLOCKBYTES)
        self._buflen = 0

    def update(self, data: bytes | memoryview):
        offset = 0
        inlen = len(data)
        left = self._buflen
        fill = len(self._buf) - left

        if left and inlen >= fill:
            self._buf[left:left + fill] = data[offset:offset + fill]
            for i in range(PARALLELISM_DEGREE):
                start = i * BLAKE2S_BLOCKBYTES
                self._leaves[i].update(self._buf[start:start + BLAKE2S_BLOCKBYTES])
            offset += fill
            inlen -= fill
            left = 0

        block_set = PARALLELISM_DEGREE * BLAKE2S_BLOCKBYTES
        while inlen >= block_set:
            for i in range(PARALLELISM_DEGREE):
                start = offset + i * BLAKE2S_BLOCKBYTES
                self._leaves[i].update(data[start:start + BLAKE2S_BLOCKBYTES])
            offset += block_set
            inlen -= block_set

        if inlen > 0:
            self._buf[left:left + inlen] = data[offset:offset + inlen]
        self._buflen = left + inlen

    def digest(self) -> bytes:
        hashes = []
        for i in range(PARALLELISM_DEGREE):
            leaf = _Blake2sState()
            leaf.h = list(self._leaves[i].h)
            leaf.t = list(self._leaves[i].t)
            leaf.f = list(self._leaves[i].f)
            leaf.buf = bytearray(self._leaves[i].buf)
            leaf.buflen = self._leaves[i].buflen
            leaf.last_node = self._leaves[i].last_node

            if self._buflen > i * BLAKE2S_BLOCKBYTES:
                remaining = self._buflen - i * BLAKE2S_BLOCKBYTES
                if remaining > BLAKE2S_BLOCKBYTES:
                    remaining = BLAKE2S_BLOCKBYTES
                start = i * BLAKE2S_BLOCKBYTES
                leaf.update(self._buf[start:start + remaining])
            hashes.append(leaf.final())

        root = _Blake2sState()
        root.init_param(0, 1)
        root.last_node = 1
        for h in hashes:
            root.update(h)
        return root.final()

Methods

def update(self, data)
Expand source code Browse git
def update(self, data: bytes | memoryview):
    offset = 0
    inlen = len(data)
    left = self._buflen
    fill = len(self._buf) - left

    if left and inlen >= fill:
        self._buf[left:left + fill] = data[offset:offset + fill]
        for i in range(PARALLELISM_DEGREE):
            start = i * BLAKE2S_BLOCKBYTES
            self._leaves[i].update(self._buf[start:start + BLAKE2S_BLOCKBYTES])
        offset += fill
        inlen -= fill
        left = 0

    block_set = PARALLELISM_DEGREE * BLAKE2S_BLOCKBYTES
    while inlen >= block_set:
        for i in range(PARALLELISM_DEGREE):
            start = offset + i * BLAKE2S_BLOCKBYTES
            self._leaves[i].update(data[start:start + BLAKE2S_BLOCKBYTES])
        offset += block_set
        inlen -= block_set

    if inlen > 0:
        self._buf[left:left + inlen] = data[offset:offset + inlen]
    self._buflen = left + inlen
def digest(self)
Expand source code Browse git
def digest(self) -> bytes:
    hashes = []
    for i in range(PARALLELISM_DEGREE):
        leaf = _Blake2sState()
        leaf.h = list(self._leaves[i].h)
        leaf.t = list(self._leaves[i].t)
        leaf.f = list(self._leaves[i].f)
        leaf.buf = bytearray(self._leaves[i].buf)
        leaf.buflen = self._leaves[i].buflen
        leaf.last_node = self._leaves[i].last_node

        if self._buflen > i * BLAKE2S_BLOCKBYTES:
            remaining = self._buflen - i * BLAKE2S_BLOCKBYTES
            if remaining > BLAKE2S_BLOCKBYTES:
                remaining = BLAKE2S_BLOCKBYTES
            start = i * BLAKE2S_BLOCKBYTES
            leaf.update(self._buf[start:start + remaining])
        hashes.append(leaf.final())

    root = _Blake2sState()
    root.init_param(0, 1)
    root.last_node = 1
    for h in hashes:
        root.update(h)
    return root.final()