Module refinery.units.compression.qlz

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from refinery.units import Unit, RefineryPartialResult
from refinery.lib.meta import SizeInt


_HASH_VALUES             = 0x1000  # noqa
_HASH_MASK               = 0x0FFF  # noqa
_UNCONDITIONAL_MATCHLEN  = 6       # noqa
_UNCOMPRESSED_END        = 4       # noqa


class qlz(Unit):
    """
    This unit implements QuickLZ decompression levels 1 and 3.
    """

    def process(self, data):
        source = memoryview(data)
        head = source[0]
        clvl = (head >> 2) & 0x3

        if head & 2:
            self.log_info('long header detected')
            size = int.from_bytes(source[5:9], 'little')
            source = source[9:]
        else:
            self.log_info('short header detected')
            size = source[3]
            source = source[3:]
        if head & 1 != 1:
            self.log_warn('header indicates that data is uncompressed, returning remaining data')
            return source
        else:
            self.log_info(F'compression level {clvl}, decompressed size {SizeInt(size)!r}')

        def fetchhash():
            return int.from_bytes(destination[hashvalue + 1:hashvalue + 4], byteorder='little')

        codeword = 1
        destination = bytearray()
        hashtable = [0] * _HASH_VALUES
        hashvalue = -1
        last_matchstart = size - _UNCONDITIONAL_MATCHLEN - _UNCOMPRESSED_END - 1
        fetch = 0

        if clvl == 2:
            raise ValueError("This version only supports level 1 and 3")
        while source:
            if codeword == 1:
                codeword = int.from_bytes(source[:4], byteorder='little')
                source = source[4:]
                if len(destination) <= last_matchstart:
                    c = 3 if clvl == 1 else 4
                    fetch = int.from_bytes(source[:c], byteorder='little')
            if codeword & 1:
                codeword = codeword >> 1
                if clvl == 1:
                    hash = (fetch >> 4) & 0xFFF
                    offset = hashtable[hash]
                    if fetch & 0xF:
                        matchlen = (fetch & 0xF) + 2
                        source = source[2:]
                    else:
                        matchlen = source[2]
                        source = source[3:]
                else:
                    if (fetch & 3) == 0:
                        delta = (fetch & 0xFF) >> 2
                        matchlen = 3
                        source = source[1:]
                    elif (fetch & 2) == 0:
                        delta = (fetch & 0xFFFF) >> 2
                        matchlen = 3
                        source = source[2:]
                    elif (fetch & 1) == 0:
                        delta = (fetch & 0xFFFF) >> 6
                        matchlen = ((fetch >> 2) & 15) + 3
                        source = source[2:]
                    elif (fetch & 127) != 3:
                        delta = (fetch >> 7) & 0x1FFFF
                        matchlen = ((fetch >> 2) & 0x1F) + 2
                        source = source[3:]
                    else:
                        delta = fetch >> 15
                        matchlen = ((fetch >> 7) & 255) + 3
                        source = source[4:]
                    offset = (len(destination) - delta) & 0xFFFFFFFF

                for i in range(offset, offset + matchlen):
                    destination.append(destination[i])

                if clvl == 1:
                    fetch = fetchhash()
                    while hashvalue < len(destination) - matchlen:
                        hashvalue += 1
                        hash = ((fetch >> 12) ^ fetch) & _HASH_MASK
                        hashtable[hash] = hashvalue
                        fetch = fetch >> 8 & 0xFFFF
                        try:
                            fetch |= destination[hashvalue + 3] << 16
                        except IndexError:
                            pass
                    fetch = int.from_bytes(source[:3], byteorder='little')
                else:
                    fetch = int.from_bytes(source[:4], byteorder='little')
                hashvalue = len(destination) - 1
            else:
                if len(destination) <= last_matchstart:
                    destination.append(source[0])
                    source = source[1:]
                    codeword = codeword >> 1
                    if clvl == 1:
                        while hashvalue < len(destination) - 3:
                            fetch2 = fetchhash()
                            hashvalue += 1
                            hash = ((fetch2 >> 12) ^ fetch2) & _HASH_MASK
                            hashtable[hash] = hashvalue
                        fetch = fetch >> 8 & 0xFFFF | source[2] << 16
                    else:
                        fetch = fetch >> 8 & 0xFFFF
                        fetch |= source[2] << 16
                        fetch |= source[3] << 24
                else:
                    while len(destination) <= size - 1:
                        if codeword == 1:
                            source = source[4:]
                            codeword = 0x80000000
                        destination.append(source[0])
                        source = source[1:]
                        codeword = codeword >> 1
                    break
        if len(destination) != size:
            raise RefineryPartialResult(
                F'Header indicates decompressed size 0x{size:X}, but 0x{len(destination):X} bytes '
                F'were decompressed.', destination)
        return destination

Classes

class qlz

This unit implements QuickLZ decompression levels 1 and 3.

Expand source code Browse git
class qlz(Unit):
    """
    This unit implements QuickLZ decompression levels 1 and 3.
    """

    def process(self, data):
        source = memoryview(data)
        head = source[0]
        clvl = (head >> 2) & 0x3

        if head & 2:
            self.log_info('long header detected')
            size = int.from_bytes(source[5:9], 'little')
            source = source[9:]
        else:
            self.log_info('short header detected')
            size = source[3]
            source = source[3:]
        if head & 1 != 1:
            self.log_warn('header indicates that data is uncompressed, returning remaining data')
            return source
        else:
            self.log_info(F'compression level {clvl}, decompressed size {SizeInt(size)!r}')

        def fetchhash():
            return int.from_bytes(destination[hashvalue + 1:hashvalue + 4], byteorder='little')

        codeword = 1
        destination = bytearray()
        hashtable = [0] * _HASH_VALUES
        hashvalue = -1
        last_matchstart = size - _UNCONDITIONAL_MATCHLEN - _UNCOMPRESSED_END - 1
        fetch = 0

        if clvl == 2:
            raise ValueError("This version only supports level 1 and 3")
        while source:
            if codeword == 1:
                codeword = int.from_bytes(source[:4], byteorder='little')
                source = source[4:]
                if len(destination) <= last_matchstart:
                    c = 3 if clvl == 1 else 4
                    fetch = int.from_bytes(source[:c], byteorder='little')
            if codeword & 1:
                codeword = codeword >> 1
                if clvl == 1:
                    hash = (fetch >> 4) & 0xFFF
                    offset = hashtable[hash]
                    if fetch & 0xF:
                        matchlen = (fetch & 0xF) + 2
                        source = source[2:]
                    else:
                        matchlen = source[2]
                        source = source[3:]
                else:
                    if (fetch & 3) == 0:
                        delta = (fetch & 0xFF) >> 2
                        matchlen = 3
                        source = source[1:]
                    elif (fetch & 2) == 0:
                        delta = (fetch & 0xFFFF) >> 2
                        matchlen = 3
                        source = source[2:]
                    elif (fetch & 1) == 0:
                        delta = (fetch & 0xFFFF) >> 6
                        matchlen = ((fetch >> 2) & 15) + 3
                        source = source[2:]
                    elif (fetch & 127) != 3:
                        delta = (fetch >> 7) & 0x1FFFF
                        matchlen = ((fetch >> 2) & 0x1F) + 2
                        source = source[3:]
                    else:
                        delta = fetch >> 15
                        matchlen = ((fetch >> 7) & 255) + 3
                        source = source[4:]
                    offset = (len(destination) - delta) & 0xFFFFFFFF

                for i in range(offset, offset + matchlen):
                    destination.append(destination[i])

                if clvl == 1:
                    fetch = fetchhash()
                    while hashvalue < len(destination) - matchlen:
                        hashvalue += 1
                        hash = ((fetch >> 12) ^ fetch) & _HASH_MASK
                        hashtable[hash] = hashvalue
                        fetch = fetch >> 8 & 0xFFFF
                        try:
                            fetch |= destination[hashvalue + 3] << 16
                        except IndexError:
                            pass
                    fetch = int.from_bytes(source[:3], byteorder='little')
                else:
                    fetch = int.from_bytes(source[:4], byteorder='little')
                hashvalue = len(destination) - 1
            else:
                if len(destination) <= last_matchstart:
                    destination.append(source[0])
                    source = source[1:]
                    codeword = codeword >> 1
                    if clvl == 1:
                        while hashvalue < len(destination) - 3:
                            fetch2 = fetchhash()
                            hashvalue += 1
                            hash = ((fetch2 >> 12) ^ fetch2) & _HASH_MASK
                            hashtable[hash] = hashvalue
                        fetch = fetch >> 8 & 0xFFFF | source[2] << 16
                    else:
                        fetch = fetch >> 8 & 0xFFFF
                        fetch |= source[2] << 16
                        fetch |= source[3] << 24
                else:
                    while len(destination) <= size - 1:
                        if codeword == 1:
                            source = source[4:]
                            codeword = 0x80000000
                        destination.append(source[0])
                        source = source[1:]
                        codeword = codeword >> 1
                    break
        if len(destination) != size:
            raise RefineryPartialResult(
                F'Header indicates decompressed size 0x{size:X}, but 0x{len(destination):X} bytes '
                F'were decompressed.', destination)
        return destination

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members