Module refinery.units.misc.meow

Expand source code Browse git
from __future__ import annotations

import re
import zlib

from struct import unpack
from typing import TYPE_CHECKING, Generator

from refinery.units import Unit

if TYPE_CHECKING:
    from refinery.lib.ole.crypto import OleFile


class meow(Unit):
    """
    Extract password hashes from various file types in hashcat format.

    The following file types are supported:

    - PDF (hashcat modes 10400, 10500, 10600, 10700)
    - RAR3 (hashcat mode 12500)
    - RAR5 (hashcat mode 13000)
    - ZIP with WinZip AES encryption (hashcat mode 13600)
    - ZIP with PKZIP traditional encryption (hashcat modes 17200-17230)
    - 7-Zip (hashcat mode 11600)
    - Microsoft Office 2007/2010/2013 (hashcat modes 9400, 9500, 9600)
    - Microsoft Office 97-2003 (hashcat modes 9700, 9800)
    """

    def process(self, data: bytearray) -> bytes | Generator[bytes, None, None]:
        from refinery.lib.ole.crypto import is_ole_file
        from refinery.lib.un7z.headers import SIGNATURE as SZ_SIGNATURE
        from refinery.lib.unrar.headers import RAR_HEADER_V15, RAR_HEADER_V50, RarFormat

        view = memoryview(data)
        if view[:5] == B'%PDF-':
            return self._hash_pdf(data)
        if view[:7] == RAR_HEADER_V15:
            return self._hash_rar(data, RarFormat.RARFMT15)
        if view[:8] == RAR_HEADER_V50:
            return self._hash_rar(data, RarFormat.RARFMT50)
        if view[:2] == B'PK':
            return self._hash_zip(data)
        if view[:6] == SZ_SIGNATURE:
            return self._hash_7z(data)
        if is_ole_file(data):
            return self._hash_office(data)
        raise ValueError('unable to identify an encrypted file format')

    @Unit.Requires('pymupdf', 1)
    def _pymupdf():
        import os
        for setting in ('PYMUPDF_MESSAGE', 'PYMUPDF_LOG'):
            os.environ[setting] = F'path:{os.devnull}'
        import pymupdf
        return pymupdf

    def _hash_pdf(self, data: bytearray) -> bytes:
        doc = self._pymupdf.open(stream=bytes(data), filetype='pdf')
        if not doc.is_encrypted:
            raise ValueError('this PDF is not encrypted')
        trailer_str = doc.pdf_trailer()
        enc_ref_match = re.search(
            r'/Encrypt\s+(\d+)\s+\d+\s+R', trailer_str)
        if enc_ref_match is None:
            raise ValueError('PDF trailer does not contain /Encrypt')
        encrypt_xref = int(enc_ref_match.group(1))
        obj_str = doc.xref_object(encrypt_xref)

        def _int(key: str) -> int | None:
            kind, val = doc.xref_get_key(encrypt_xref, key)
            return int(val) if kind == 'int' else None

        def _hex(key: str) -> bytes | None:
            hit = re.search(F'/{key}\\s*<([0-9A-Fa-f]+)>', obj_str)
            return bytes.fromhex(hit.group(1)) if hit else None

        _V = _int('V')
        _R = _int('R')
        _P = _int('P')
        _O = _hex('O')
        _U = _hex('U')

        keylen = _int('Length') or 40

        if _V is None or _R is None or _P is None or _O is None or _U is None:
            raise ValueError('missing required PDF encryption fields')

        kind, id_val = doc.xref_get_key(-1, 'ID')
        id_hex_values = re.findall(r'<([0-9A-Fa-f]+)>', id_val)
        if not id_hex_values:
            raise ValueError('could not parse /ID array from trailer')
        doc_id = bytes.fromhex(id_hex_values[0])

        enc_meta = 1
        kind, val = doc.xref_get_key(encrypt_xref, 'EncryptMetadata')
        if kind == 'bool' and val.lower() == 'false':
            enc_meta = 0

        parts = [
            F'$pdf${_V}',
            str(_R),
            str(keylen),
            str(_P),
            str(enc_meta),
            str(len(doc_id)),
            doc_id.hex(),
            str(len(_U)),
            _U.hex(),
            str(len(_O)),
            _O.hex(),
        ]
        if _R >= 5:
            OE = _hex('OE')
            UE = _hex('UE')
            if OE is None or UE is None:
                raise ValueError('missing /OE or /UE for R>=5 encryption')
            parts.append(str(len(UE)))
            parts.append(UE.hex())
            parts.append(str(len(OE)))
            parts.append(OE.hex())
        return '*'.join(parts).encode(self.codec)

    def _hash_rar(self, data: bytearray, fmt) -> Generator[bytes, None, None]:
        from refinery.lib.unrar.headers import (
            CryptMethod,
            RarFormat,
            parse_headers,
        )

        view = memoryview(data)
        main, entries, _, crypt_header = parse_headers(view, fmt)

        if fmt == RarFormat.RARFMT15:
            if main is not None and main.is_encrypted and len(data) >= 24:
                tail = bytes(view[-24:])
                salt = tail[:8].hex()
                crypt_data = tail[8:].hex()
                yield F'$RAR3$*0*{salt}*{crypt_data}'.encode(self.codec)
            return

        if crypt_header is not None:
            if crypt_header.use_psw_check and crypt_header.psw_check:
                salt = bytes(crypt_header.salt).hex()
                lg2 = crypt_header.lg2_count
                if crypt_header.header_iv:
                    iv = bytes(crypt_header.header_iv).hex()
                else:
                    iv = (b'\0' * 16).hex()
                pswcheck = bytes(crypt_header.psw_check).hex()
                yield F'$rar5$16${salt}${lg2}${iv}$8${pswcheck}'.encode(self.codec)
                return

        for entry in entries:
            if not entry.is_encrypted:
                continue
            if entry.crypt_method == CryptMethod.CRYPT_RAR50:
                if entry.use_psw_check and entry.psw_check:
                    salt = bytes(entry.salt).hex()
                    lg2 = entry.lg2_count
                    iv = bytes(entry.init_v).hex()
                    pswcheck = bytes(entry.psw_check).hex()
                    yield F'$rar5$16${salt}${lg2}${iv}$8${pswcheck}'.encode(self.codec)
                    return

    def _hash_7z(self, data: bytearray) -> bytes:
        from refinery.lib.structures import StructReader
        from refinery.lib.un7z.coders import CODEC_AES256SHA256, decompress_folder
        from refinery.lib.un7z.headers import (
            SIGNATURE_HEADER_SIZE,
            ArchiveHeader,
            PropertyID,
            parse_encoded_header,
            parse_header,
            parse_signature_header,
        )

        view = memoryview(data)
        sh = parse_signature_header(view)
        header_offset = SIGNATURE_HEADER_SIZE + sh.next_header_offset
        header_end = header_offset + sh.next_header_size
        header_view = view[header_offset:header_end]
        crc = zlib.crc32(header_view) & 0xFFFFFFFF
        if crc != sh.next_header_crc:
            raise ValueError('7z header CRC mismatch')

        reader = StructReader(header_view)
        prop_id = reader.u8()

        if prop_id == PropertyID.ENCODED_HEADER:
            enc = parse_encoded_header(reader)
            if not enc.folders or enc.pack_info is None:
                raise ValueError('7z encoded header has no folders or pack info')
            folder = enc.folders[0]
            pack_offset = SIGNATURE_HEADER_SIZE + enc.pack_info.pack_pos
            if any(c.codec_id == CODEC_AES256SHA256 for c in folder.coders):
                packed = bytes(view[pack_offset:pack_offset + enc.pack_info.sizes[0]])
                return self._hash_7z_folder(folder, packed)
            packed_streams: list[memoryview] = []
            offset = pack_offset
            for size in enc.pack_info.sizes:
                packed_streams.append(view[offset:offset + size])
                offset += size
            header_data = decompress_folder(
                folder, packed_streams, folder.main_unpack_size)
            inner_reader = StructReader(memoryview(header_data))
            inner_prop = inner_reader.u8()
            if inner_prop != PropertyID.HEADER:
                raise ValueError('7z decoded header is not a plain header')
            header: ArchiveHeader = parse_header(inner_reader)
        elif prop_id == PropertyID.HEADER:
            header = parse_header(reader)
        else:
            raise ValueError('7z archive does not appear to be encrypted')

        if header.pack_info is None:
            raise ValueError('7z archive has no pack info')
        pack_offset = SIGNATURE_HEADER_SIZE + header.pack_info.pack_pos
        offset = pack_offset
        pack_starts: list[int] = []
        for size in header.pack_info.sizes:
            pack_starts.append(offset)
            offset += size
        for fi, folder in enumerate(header.folders):
            if any(c.codec_id == CODEC_AES256SHA256 for c in folder.coders):
                pi = sum(
                    len(header.folders[k].packed_indices)
                    for k in range(fi)
                )
                packed = bytes(view[pack_starts[pi]:pack_starts[pi] + header.pack_info.sizes[pi]])
                crc: int | None = folder.crc
                crc_len: int | None = None
                if crc is None and header.substreams and header.substreams.crcs:
                    for sc in header.substreams.crcs:
                        if sc is not None:
                            crc = sc
                            break
                    if header.substreams.unpack_sizes:
                        crc_len = header.substreams.unpack_sizes[0]
                return self._hash_7z_folder(folder, packed, crc, crc_len)

        raise ValueError('7z archive does not appear to be encrypted')

    def _hash_7z_folder(
        self,
        folder,
        packed_data: bytes,
        override_crc: int | None = None,
        override_crc_len: int | None = None,
    ) -> bytes:
        from refinery.lib.un7z.coders import (
            CODEC_AES256SHA256,
            CODEC_ARM,
            CODEC_ARMT,
            CODEC_BCJ_X86,
            CODEC_BZIP2,
            CODEC_COPY,
            CODEC_DEFLATE,
            CODEC_DELTA,
            CODEC_IA64,
            CODEC_LZMA,
            CODEC_LZMA2,
            CODEC_PPC,
            CODEC_PPMD,
            CODEC_SPARC,
        )
        compressor_type = {
            CODEC_COPY    : 0,
            CODEC_LZMA    : 1,
            CODEC_LZMA2   : 2,
            CODEC_PPMD    : 3,
            CODEC_BZIP2   : 6,
            CODEC_DEFLATE : 7,
        }
        filter_type = {
            CODEC_BCJ_X86 : 1,
            CODEC_PPC     : 3,
            CODEC_IA64    : 4,
            CODEC_ARM     : 5,
            CODEC_ARMT    : 6,
            CODEC_SPARC   : 7,
            CODEC_DELTA   : 9,
        }

        aes_props: bytes = b''
        data_type = 0
        coder_attrs = bytearray()

        for coder in folder.coders:
            if coder.codec_id == CODEC_AES256SHA256:
                aes_props = coder.properties
            elif coder.codec_id in compressor_type:
                data_type |= compressor_type[coder.codec_id]
                coder_attrs.extend(coder.properties)
            elif coder.codec_id in filter_type:
                data_type |= filter_type[coder.codec_id] << 4
                coder_attrs.extend(coder.properties)

        if len(aes_props) < 2:
            raise ValueError('7z AES coder has no properties')

        first_byte = aes_props[0]
        num_cycles_power = first_byte & 0x3F
        salt_size = ((first_byte >> 7) & 1) + (aes_props[1] >> 4)
        iv_size = ((first_byte >> 6) & 1) + (aes_props[1] & 0x0F)
        prop_data = aes_props[2:]
        salt = bytes(prop_data[:salt_size])
        iv = bytes(prop_data[salt_size:salt_size + iv_size])
        iv_padded = iv + b'\0' * (16 - len(iv))

        if override_crc is not None:
            crc = override_crc
        elif folder.crc is not None:
            crc = folder.crc
        else:
            crc = 0
        data_len = len(packed_data)
        unpack_size = folder.main_unpack_size

        parts = [
            F'$7z${data_type}',
            str(num_cycles_power),
            str(salt_size),
            salt.hex(),
            str(iv_size),
            iv_padded.hex(),
            str(crc),
            str(data_len),
            str(unpack_size),
            packed_data.hex(),
        ]
        result = '$'.join(parts)
        if data_type > 0:
            crc_len = override_crc_len if override_crc_len is not None else unpack_size
            result += F'${crc_len}${coder_attrs.hex()}'
        return result.encode(self.codec)

    def _hash_zip(self, data: bytearray) -> Generator[bytes, None, None]:
        from refinery.lib.zip import AExCrypto, Zip, ZipCrypto

        archive = Zip(data, read_records=True)
        pkzip_entries: list[tuple] = []

        for entry in archive.directory:
            try:
                record = archive.read(entry)
            except Exception:
                continue
            if not record.flags.Encrypted:
                continue
            enc = record.encryption
            if isinstance(enc, AExCrypto):
                yield self._hash_zip_aes(record, enc)
            elif isinstance(enc, ZipCrypto):
                pkzip_entries.append((record, enc))

        if pkzip_entries:
            yield self._hash_zip_pkzip(pkzip_entries)

    def _hash_zip_aes(self, record, enc) -> bytes:
        salt = bytes(enc.salt).hex()
        pvv = bytes(enc.pvv).hex()
        cdata = record.data
        if cdata is None:
            cdata = B''
        auth_len = enc.auth_size
        if len(cdata) >= auth_len:
            payload = bytes(cdata[:-auth_len])
            auth = bytes(cdata[-auth_len:])
        else:
            payload = bytes(cdata)
            auth = B''
        data_hex = payload.hex()
        auth_hex = auth.hex()
        mode = enc.strength
        return (
            F'$zip2$*0*{mode}*0*{salt}*{pvv}'
            F'*{len(payload):x}*{data_hex}*{auth_hex}*$/zip2$'
        ).encode(self.codec)

    def _hash_zip_pkzip(self, entries: list[tuple]) -> bytes:
        count = len(entries)
        check_bytes = 2
        for record, enc in entries:
            if record.version >= 20:
                check_bytes = 1
                break
        parts = [F'$pkzip2${count}*{check_bytes}']
        for record, enc in entries:
            enc_header = bytes(enc)
            cdata = record.data
            if cdata is None:
                cdata = B''
            full_data = enc_header + bytes(cdata)
            crc = record.crc32
            method = record.method_value
            data_len = len(full_data)
            offex = 30 + len(record.name_bytes) + len(record.xtra_data)
            if record.flags.DataDescriptor:
                cs = F'{(record.mtime >> 8) & 0xFF:02x}{record.mtime & 0xFF:02x}'
            else:
                cs = F'{(crc >> 24) & 0xFF:02x}{(crc >> 16) & 0xFF:02x}'
            tc = F'{(record.mtime >> 8) & 0xFF:02x}{record.mtime & 0xFF:02x}'
            entry_parts = [
                '2',
                '0',
                F'{record.csize:x}',
                F'{record.usize:x}',
                F'{crc:x}',
                '0',
                F'{offex:x}',
                str(method),
                F'{data_len:x}',
                cs,
                tc,
                full_data.hex(),
            ]
            parts.append('*'.join(entry_parts))
        parts.append('$/pkzip2$')
        return '*'.join(parts).encode(self.codec)

    def _hash_office(self, data: bytearray) -> bytes:
        from refinery.lib.ole.crypto import (
            AgileEncryptionInfo,
            EncryptionType,
            OleFile,
            StandardEncryptionInfo,
            _parseinfo,
        )

        ole = OleFile(data)

        if ole.exists('EncryptionInfo'):
            stream = ole.openstream('EncryptionInfo')
            enc_type, info = _parseinfo(stream)
            if enc_type == EncryptionType.STANDARD and isinstance(info, StandardEncryptionInfo):
                h = info.header
                v = info.verifier
                salt = v.salt.hex()
                enc_verifier = v.encrypted_verifier.hex()
                enc_verifier_hash = v.encrypted_verifier_hash.hex()[:64]
                return (
                    F'$office$*2007*{v.verifier_hash_size}*{h.key_size}'
                    F'*{v.salt_size}*{salt}*{enc_verifier}*{enc_verifier_hash}'
                ).encode(self.codec)
            if enc_type == EncryptionType.AGILE and isinstance(info, AgileEncryptionInfo):
                key_bits = info.password_key_bits
                year = 2013 if key_bits >= 256 else 2010
                salt = info.password_salt.hex()
                enc_verifier = info.encrypted_verifier_hash_input.hex()
                enc_verifier_hash = info.encrypted_verifier_hash_value.hex()[:64]
                return (
                    F'$office$*{year}*{info.spin_value}*{key_bits}'
                    F'*{len(info.password_salt)}*{salt}'
                    F'*{enc_verifier}*{enc_verifier_hash}'
                ).encode(self.codec)
            raise ValueError(F'unsupported OOXML encryption type: {enc_type.value}')

        if ole.exists('wordDocument'):
            return self._hash_office_doc97(ole)
        if ole.exists('Workbook'):
            return self._hash_office_xls97(ole)
        if ole.exists('PowerPoint Document'):
            return self._hash_office_ppt97(ole)
        raise ValueError('unable to identify an encrypted Office format')

    def _hash_office_doc97(self, ole: OleFile) -> bytes:
        from refinery.lib.ole.crypto import (
            _parse_header_rc4,
            _parse_header_rc4_cryptoapi,
        )
        from refinery.lib.structures import MemoryFile

        doc = ole.openstream('wordDocument')
        fib_raw = doc.read(32)
        bits = unpack('<H', fib_raw[10:12])[0]
        f_encrypted = (bits >> 8) & 1
        if not f_encrypted:
            raise ValueError('this Word document is not encrypted')
        f_which_tbl = (bits >> 9) & 1
        table_name = '1Table' if f_which_tbl else '0Table'
        with ole.openstream(table_name) as table:
            v_major, v_minor = unpack('<HH', table.read(4))
            if v_major == 1 and v_minor == 1:
                rc4_info = _parse_header_rc4(table)
                return (
                    F'$oldoffice$1*{rc4_info.salt.hex()}'
                    F'*{rc4_info.encrypted_verifier.hex()}'
                    F'*{rc4_info.encrypted_verifier_hash.hex()}'
                ).encode(self.codec)
            elif v_major in (2, 3, 4) and v_minor == 2:
                api_info = _parse_header_rc4_cryptoapi(MemoryFile(table.read()))
                typ = 3 if api_info.key_size <= 40 else 4
                return (
                    F'$oldoffice${typ}*{api_info.salt.hex()}'
                    F'*{api_info.encrypted_verifier.hex()}'
                    F'*{api_info.encrypted_verifier_hash.hex()}'
                ).encode(self.codec)
        raise ValueError('unsupported Word encryption version')

    def _hash_office_xls97(self, ole: OleFile) -> bytes:
        from refinery.lib.ole.crypto import (
            _parse_header_rc4,
            _parse_header_rc4_cryptoapi,
        )
        from refinery.lib.structures import MemoryFile

        with ole.openstream('Workbook') as wb:
            num = unpack('<H', wb.read(2))[0]
            if num != 2057:
                raise ValueError('invalid Workbook stream')
            size = unpack('<H', wb.read(2))[0]
            wb.read(size)
            while True:
                h = wb.read(4)
                if not h or len(h) < 4:
                    raise ValueError('FILEPASS record not found')
                rnum, rsize = unpack('<HH', h)
                if rnum == 47:
                    break
                wb.read(rsize)
            enc_type = unpack('<H', wb.read(2))[0]
            enc_data = MemoryFile(wb.read(rsize - 2))
            if enc_type == 0x0001:
                v_major, v_minor = unpack('<HH', enc_data.read(4))
                if v_major == 1 and v_minor == 1:
                    rc4_info = _parse_header_rc4(enc_data)
                    return (
                        F'$oldoffice$0*{rc4_info.salt.hex()}'
                        F'*{rc4_info.encrypted_verifier.hex()}'
                        F'*{rc4_info.encrypted_verifier_hash.hex()}'
                    ).encode(self.codec)
                elif v_major in (2, 3, 4) and v_minor == 2:
                    api_info = _parse_header_rc4_cryptoapi(enc_data)
                    typ = 3 if api_info.key_size <= 40 else 4
                    return (
                        F'$oldoffice${typ}*{api_info.salt.hex()}'
                        F'*{api_info.encrypted_verifier.hex()}'
                        F'*{api_info.encrypted_verifier_hash.hex()}'
                    ).encode(self.codec)
        raise ValueError('unsupported Excel encryption version')

    def _hash_office_ppt97(self, ole: OleFile) -> bytes:
        from refinery.lib.ole.crypto import (
            _construct_persist_object_directory,
            _parse_current_user_atom,
            _parse_record_header,
            _parse_user_edit_atom,
        )
        from refinery.lib.structures import MemoryFile

        cu_stream = ole.openstream('Current User')
        ppt_stream = ole.openstream('PowerPoint Document')
        pod = _construct_persist_object_directory(cu_stream, ppt_stream)
        cu_stream.seek(0)
        cu = _parse_current_user_atom(cu_stream)
        ppt_stream.seek(cu.offset_to_current_edit)
        uea = _parse_user_edit_atom(ppt_stream)
        if uea.encrypt_session_persist_id_ref is None:
            raise ValueError('this PowerPoint file is not encrypted')
        crypt_offset = pod[uea.encrypt_session_persist_id_ref]
        ppt_stream.seek(crypt_offset)
        rh = _parse_record_header(ppt_stream.read(8))
        crypt_data = ppt_stream.read(rh.rec_len)
        enc_info = MemoryFile(crypt_data)
        v_major, v_minor = unpack('<HH', enc_info.read(4))
        if v_major in (2, 3, 4) and v_minor == 2:
            from refinery.lib.ole.crypto import _parse_header_rc4_cryptoapi
            api_info = _parse_header_rc4_cryptoapi(enc_info)
            typ = 3 if api_info.key_size <= 40 else 4
            return (
                F'$oldoffice${typ}*{api_info.salt.hex()}'
                F'*{api_info.encrypted_verifier.hex()}'
                F'*{api_info.encrypted_verifier_hash.hex()}'
            ).encode(self.codec)
        raise ValueError('unsupported PowerPoint encryption version')

Classes

class meow

Extract password hashes from various file types in hashcat format.

The following file types are supported:

  • PDF (hashcat modes 10400, 10500, 10600, 10700)
  • RAR3 (hashcat mode 12500)
  • RAR5 (hashcat mode 13000)
  • ZIP with WinZip AES encryption (hashcat mode 13600)
  • ZIP with PKZIP traditional encryption (hashcat modes 17200-17230)
  • 7-Zip (hashcat mode 11600)
  • Microsoft Office 2007/2010/2013 (hashcat modes 9400, 9500, 9600)
  • Microsoft Office 97-2003 (hashcat modes 9700, 9800)
Expand source code Browse git
class meow(Unit):
    """
    Extract password hashes from various file types in hashcat format.

    The following file types are supported:

    - PDF (hashcat modes 10400, 10500, 10600, 10700)
    - RAR3 (hashcat mode 12500)
    - RAR5 (hashcat mode 13000)
    - ZIP with WinZip AES encryption (hashcat mode 13600)
    - ZIP with PKZIP traditional encryption (hashcat modes 17200-17230)
    - 7-Zip (hashcat mode 11600)
    - Microsoft Office 2007/2010/2013 (hashcat modes 9400, 9500, 9600)
    - Microsoft Office 97-2003 (hashcat modes 9700, 9800)
    """

    def process(self, data: bytearray) -> bytes | Generator[bytes, None, None]:
        from refinery.lib.ole.crypto import is_ole_file
        from refinery.lib.un7z.headers import SIGNATURE as SZ_SIGNATURE
        from refinery.lib.unrar.headers import RAR_HEADER_V15, RAR_HEADER_V50, RarFormat

        view = memoryview(data)
        if view[:5] == B'%PDF-':
            return self._hash_pdf(data)
        if view[:7] == RAR_HEADER_V15:
            return self._hash_rar(data, RarFormat.RARFMT15)
        if view[:8] == RAR_HEADER_V50:
            return self._hash_rar(data, RarFormat.RARFMT50)
        if view[:2] == B'PK':
            return self._hash_zip(data)
        if view[:6] == SZ_SIGNATURE:
            return self._hash_7z(data)
        if is_ole_file(data):
            return self._hash_office(data)
        raise ValueError('unable to identify an encrypted file format')

    @Unit.Requires('pymupdf', 1)
    def _pymupdf():
        import os
        for setting in ('PYMUPDF_MESSAGE', 'PYMUPDF_LOG'):
            os.environ[setting] = F'path:{os.devnull}'
        import pymupdf
        return pymupdf

    def _hash_pdf(self, data: bytearray) -> bytes:
        doc = self._pymupdf.open(stream=bytes(data), filetype='pdf')
        if not doc.is_encrypted:
            raise ValueError('this PDF is not encrypted')
        trailer_str = doc.pdf_trailer()
        enc_ref_match = re.search(
            r'/Encrypt\s+(\d+)\s+\d+\s+R', trailer_str)
        if enc_ref_match is None:
            raise ValueError('PDF trailer does not contain /Encrypt')
        encrypt_xref = int(enc_ref_match.group(1))
        obj_str = doc.xref_object(encrypt_xref)

        def _int(key: str) -> int | None:
            kind, val = doc.xref_get_key(encrypt_xref, key)
            return int(val) if kind == 'int' else None

        def _hex(key: str) -> bytes | None:
            hit = re.search(F'/{key}\\s*<([0-9A-Fa-f]+)>', obj_str)
            return bytes.fromhex(hit.group(1)) if hit else None

        _V = _int('V')
        _R = _int('R')
        _P = _int('P')
        _O = _hex('O')
        _U = _hex('U')

        keylen = _int('Length') or 40

        if _V is None or _R is None or _P is None or _O is None or _U is None:
            raise ValueError('missing required PDF encryption fields')

        kind, id_val = doc.xref_get_key(-1, 'ID')
        id_hex_values = re.findall(r'<([0-9A-Fa-f]+)>', id_val)
        if not id_hex_values:
            raise ValueError('could not parse /ID array from trailer')
        doc_id = bytes.fromhex(id_hex_values[0])

        enc_meta = 1
        kind, val = doc.xref_get_key(encrypt_xref, 'EncryptMetadata')
        if kind == 'bool' and val.lower() == 'false':
            enc_meta = 0

        parts = [
            F'$pdf${_V}',
            str(_R),
            str(keylen),
            str(_P),
            str(enc_meta),
            str(len(doc_id)),
            doc_id.hex(),
            str(len(_U)),
            _U.hex(),
            str(len(_O)),
            _O.hex(),
        ]
        if _R >= 5:
            OE = _hex('OE')
            UE = _hex('UE')
            if OE is None or UE is None:
                raise ValueError('missing /OE or /UE for R>=5 encryption')
            parts.append(str(len(UE)))
            parts.append(UE.hex())
            parts.append(str(len(OE)))
            parts.append(OE.hex())
        return '*'.join(parts).encode(self.codec)

    def _hash_rar(self, data: bytearray, fmt) -> Generator[bytes, None, None]:
        from refinery.lib.unrar.headers import (
            CryptMethod,
            RarFormat,
            parse_headers,
        )

        view = memoryview(data)
        main, entries, _, crypt_header = parse_headers(view, fmt)

        if fmt == RarFormat.RARFMT15:
            if main is not None and main.is_encrypted and len(data) >= 24:
                tail = bytes(view[-24:])
                salt = tail[:8].hex()
                crypt_data = tail[8:].hex()
                yield F'$RAR3$*0*{salt}*{crypt_data}'.encode(self.codec)
            return

        if crypt_header is not None:
            if crypt_header.use_psw_check and crypt_header.psw_check:
                salt = bytes(crypt_header.salt).hex()
                lg2 = crypt_header.lg2_count
                if crypt_header.header_iv:
                    iv = bytes(crypt_header.header_iv).hex()
                else:
                    iv = (b'\0' * 16).hex()
                pswcheck = bytes(crypt_header.psw_check).hex()
                yield F'$rar5$16${salt}${lg2}${iv}$8${pswcheck}'.encode(self.codec)
                return

        for entry in entries:
            if not entry.is_encrypted:
                continue
            if entry.crypt_method == CryptMethod.CRYPT_RAR50:
                if entry.use_psw_check and entry.psw_check:
                    salt = bytes(entry.salt).hex()
                    lg2 = entry.lg2_count
                    iv = bytes(entry.init_v).hex()
                    pswcheck = bytes(entry.psw_check).hex()
                    yield F'$rar5$16${salt}${lg2}${iv}$8${pswcheck}'.encode(self.codec)
                    return

    def _hash_7z(self, data: bytearray) -> bytes:
        from refinery.lib.structures import StructReader
        from refinery.lib.un7z.coders import CODEC_AES256SHA256, decompress_folder
        from refinery.lib.un7z.headers import (
            SIGNATURE_HEADER_SIZE,
            ArchiveHeader,
            PropertyID,
            parse_encoded_header,
            parse_header,
            parse_signature_header,
        )

        view = memoryview(data)
        sh = parse_signature_header(view)
        header_offset = SIGNATURE_HEADER_SIZE + sh.next_header_offset
        header_end = header_offset + sh.next_header_size
        header_view = view[header_offset:header_end]
        crc = zlib.crc32(header_view) & 0xFFFFFFFF
        if crc != sh.next_header_crc:
            raise ValueError('7z header CRC mismatch')

        reader = StructReader(header_view)
        prop_id = reader.u8()

        if prop_id == PropertyID.ENCODED_HEADER:
            enc = parse_encoded_header(reader)
            if not enc.folders or enc.pack_info is None:
                raise ValueError('7z encoded header has no folders or pack info')
            folder = enc.folders[0]
            pack_offset = SIGNATURE_HEADER_SIZE + enc.pack_info.pack_pos
            if any(c.codec_id == CODEC_AES256SHA256 for c in folder.coders):
                packed = bytes(view[pack_offset:pack_offset + enc.pack_info.sizes[0]])
                return self._hash_7z_folder(folder, packed)
            packed_streams: list[memoryview] = []
            offset = pack_offset
            for size in enc.pack_info.sizes:
                packed_streams.append(view[offset:offset + size])
                offset += size
            header_data = decompress_folder(
                folder, packed_streams, folder.main_unpack_size)
            inner_reader = StructReader(memoryview(header_data))
            inner_prop = inner_reader.u8()
            if inner_prop != PropertyID.HEADER:
                raise ValueError('7z decoded header is not a plain header')
            header: ArchiveHeader = parse_header(inner_reader)
        elif prop_id == PropertyID.HEADER:
            header = parse_header(reader)
        else:
            raise ValueError('7z archive does not appear to be encrypted')

        if header.pack_info is None:
            raise ValueError('7z archive has no pack info')
        pack_offset = SIGNATURE_HEADER_SIZE + header.pack_info.pack_pos
        offset = pack_offset
        pack_starts: list[int] = []
        for size in header.pack_info.sizes:
            pack_starts.append(offset)
            offset += size
        for fi, folder in enumerate(header.folders):
            if any(c.codec_id == CODEC_AES256SHA256 for c in folder.coders):
                pi = sum(
                    len(header.folders[k].packed_indices)
                    for k in range(fi)
                )
                packed = bytes(view[pack_starts[pi]:pack_starts[pi] + header.pack_info.sizes[pi]])
                crc: int | None = folder.crc
                crc_len: int | None = None
                if crc is None and header.substreams and header.substreams.crcs:
                    for sc in header.substreams.crcs:
                        if sc is not None:
                            crc = sc
                            break
                    if header.substreams.unpack_sizes:
                        crc_len = header.substreams.unpack_sizes[0]
                return self._hash_7z_folder(folder, packed, crc, crc_len)

        raise ValueError('7z archive does not appear to be encrypted')

    def _hash_7z_folder(
        self,
        folder,
        packed_data: bytes,
        override_crc: int | None = None,
        override_crc_len: int | None = None,
    ) -> bytes:
        from refinery.lib.un7z.coders import (
            CODEC_AES256SHA256,
            CODEC_ARM,
            CODEC_ARMT,
            CODEC_BCJ_X86,
            CODEC_BZIP2,
            CODEC_COPY,
            CODEC_DEFLATE,
            CODEC_DELTA,
            CODEC_IA64,
            CODEC_LZMA,
            CODEC_LZMA2,
            CODEC_PPC,
            CODEC_PPMD,
            CODEC_SPARC,
        )
        compressor_type = {
            CODEC_COPY    : 0,
            CODEC_LZMA    : 1,
            CODEC_LZMA2   : 2,
            CODEC_PPMD    : 3,
            CODEC_BZIP2   : 6,
            CODEC_DEFLATE : 7,
        }
        filter_type = {
            CODEC_BCJ_X86 : 1,
            CODEC_PPC     : 3,
            CODEC_IA64    : 4,
            CODEC_ARM     : 5,
            CODEC_ARMT    : 6,
            CODEC_SPARC   : 7,
            CODEC_DELTA   : 9,
        }

        aes_props: bytes = b''
        data_type = 0
        coder_attrs = bytearray()

        for coder in folder.coders:
            if coder.codec_id == CODEC_AES256SHA256:
                aes_props = coder.properties
            elif coder.codec_id in compressor_type:
                data_type |= compressor_type[coder.codec_id]
                coder_attrs.extend(coder.properties)
            elif coder.codec_id in filter_type:
                data_type |= filter_type[coder.codec_id] << 4
                coder_attrs.extend(coder.properties)

        if len(aes_props) < 2:
            raise ValueError('7z AES coder has no properties')

        first_byte = aes_props[0]
        num_cycles_power = first_byte & 0x3F
        salt_size = ((first_byte >> 7) & 1) + (aes_props[1] >> 4)
        iv_size = ((first_byte >> 6) & 1) + (aes_props[1] & 0x0F)
        prop_data = aes_props[2:]
        salt = bytes(prop_data[:salt_size])
        iv = bytes(prop_data[salt_size:salt_size + iv_size])
        iv_padded = iv + b'\0' * (16 - len(iv))

        if override_crc is not None:
            crc = override_crc
        elif folder.crc is not None:
            crc = folder.crc
        else:
            crc = 0
        data_len = len(packed_data)
        unpack_size = folder.main_unpack_size

        parts = [
            F'$7z${data_type}',
            str(num_cycles_power),
            str(salt_size),
            salt.hex(),
            str(iv_size),
            iv_padded.hex(),
            str(crc),
            str(data_len),
            str(unpack_size),
            packed_data.hex(),
        ]
        result = '$'.join(parts)
        if data_type > 0:
            crc_len = override_crc_len if override_crc_len is not None else unpack_size
            result += F'${crc_len}${coder_attrs.hex()}'
        return result.encode(self.codec)

    def _hash_zip(self, data: bytearray) -> Generator[bytes, None, None]:
        from refinery.lib.zip import AExCrypto, Zip, ZipCrypto

        archive = Zip(data, read_records=True)
        pkzip_entries: list[tuple] = []

        for entry in archive.directory:
            try:
                record = archive.read(entry)
            except Exception:
                continue
            if not record.flags.Encrypted:
                continue
            enc = record.encryption
            if isinstance(enc, AExCrypto):
                yield self._hash_zip_aes(record, enc)
            elif isinstance(enc, ZipCrypto):
                pkzip_entries.append((record, enc))

        if pkzip_entries:
            yield self._hash_zip_pkzip(pkzip_entries)

    def _hash_zip_aes(self, record, enc) -> bytes:
        salt = bytes(enc.salt).hex()
        pvv = bytes(enc.pvv).hex()
        cdata = record.data
        if cdata is None:
            cdata = B''
        auth_len = enc.auth_size
        if len(cdata) >= auth_len:
            payload = bytes(cdata[:-auth_len])
            auth = bytes(cdata[-auth_len:])
        else:
            payload = bytes(cdata)
            auth = B''
        data_hex = payload.hex()
        auth_hex = auth.hex()
        mode = enc.strength
        return (
            F'$zip2$*0*{mode}*0*{salt}*{pvv}'
            F'*{len(payload):x}*{data_hex}*{auth_hex}*$/zip2$'
        ).encode(self.codec)

    def _hash_zip_pkzip(self, entries: list[tuple]) -> bytes:
        count = len(entries)
        check_bytes = 2
        for record, enc in entries:
            if record.version >= 20:
                check_bytes = 1
                break
        parts = [F'$pkzip2${count}*{check_bytes}']
        for record, enc in entries:
            enc_header = bytes(enc)
            cdata = record.data
            if cdata is None:
                cdata = B''
            full_data = enc_header + bytes(cdata)
            crc = record.crc32
            method = record.method_value
            data_len = len(full_data)
            offex = 30 + len(record.name_bytes) + len(record.xtra_data)
            if record.flags.DataDescriptor:
                cs = F'{(record.mtime >> 8) & 0xFF:02x}{record.mtime & 0xFF:02x}'
            else:
                cs = F'{(crc >> 24) & 0xFF:02x}{(crc >> 16) & 0xFF:02x}'
            tc = F'{(record.mtime >> 8) & 0xFF:02x}{record.mtime & 0xFF:02x}'
            entry_parts = [
                '2',
                '0',
                F'{record.csize:x}',
                F'{record.usize:x}',
                F'{crc:x}',
                '0',
                F'{offex:x}',
                str(method),
                F'{data_len:x}',
                cs,
                tc,
                full_data.hex(),
            ]
            parts.append('*'.join(entry_parts))
        parts.append('$/pkzip2$')
        return '*'.join(parts).encode(self.codec)

    def _hash_office(self, data: bytearray) -> bytes:
        from refinery.lib.ole.crypto import (
            AgileEncryptionInfo,
            EncryptionType,
            OleFile,
            StandardEncryptionInfo,
            _parseinfo,
        )

        ole = OleFile(data)

        if ole.exists('EncryptionInfo'):
            stream = ole.openstream('EncryptionInfo')
            enc_type, info = _parseinfo(stream)
            if enc_type == EncryptionType.STANDARD and isinstance(info, StandardEncryptionInfo):
                h = info.header
                v = info.verifier
                salt = v.salt.hex()
                enc_verifier = v.encrypted_verifier.hex()
                enc_verifier_hash = v.encrypted_verifier_hash.hex()[:64]
                return (
                    F'$office$*2007*{v.verifier_hash_size}*{h.key_size}'
                    F'*{v.salt_size}*{salt}*{enc_verifier}*{enc_verifier_hash}'
                ).encode(self.codec)
            if enc_type == EncryptionType.AGILE and isinstance(info, AgileEncryptionInfo):
                key_bits = info.password_key_bits
                year = 2013 if key_bits >= 256 else 2010
                salt = info.password_salt.hex()
                enc_verifier = info.encrypted_verifier_hash_input.hex()
                enc_verifier_hash = info.encrypted_verifier_hash_value.hex()[:64]
                return (
                    F'$office$*{year}*{info.spin_value}*{key_bits}'
                    F'*{len(info.password_salt)}*{salt}'
                    F'*{enc_verifier}*{enc_verifier_hash}'
                ).encode(self.codec)
            raise ValueError(F'unsupported OOXML encryption type: {enc_type.value}')

        if ole.exists('wordDocument'):
            return self._hash_office_doc97(ole)
        if ole.exists('Workbook'):
            return self._hash_office_xls97(ole)
        if ole.exists('PowerPoint Document'):
            return self._hash_office_ppt97(ole)
        raise ValueError('unable to identify an encrypted Office format')

    def _hash_office_doc97(self, ole: OleFile) -> bytes:
        from refinery.lib.ole.crypto import (
            _parse_header_rc4,
            _parse_header_rc4_cryptoapi,
        )
        from refinery.lib.structures import MemoryFile

        doc = ole.openstream('wordDocument')
        fib_raw = doc.read(32)
        bits = unpack('<H', fib_raw[10:12])[0]
        f_encrypted = (bits >> 8) & 1
        if not f_encrypted:
            raise ValueError('this Word document is not encrypted')
        f_which_tbl = (bits >> 9) & 1
        table_name = '1Table' if f_which_tbl else '0Table'
        with ole.openstream(table_name) as table:
            v_major, v_minor = unpack('<HH', table.read(4))
            if v_major == 1 and v_minor == 1:
                rc4_info = _parse_header_rc4(table)
                return (
                    F'$oldoffice$1*{rc4_info.salt.hex()}'
                    F'*{rc4_info.encrypted_verifier.hex()}'
                    F'*{rc4_info.encrypted_verifier_hash.hex()}'
                ).encode(self.codec)
            elif v_major in (2, 3, 4) and v_minor == 2:
                api_info = _parse_header_rc4_cryptoapi(MemoryFile(table.read()))
                typ = 3 if api_info.key_size <= 40 else 4
                return (
                    F'$oldoffice${typ}*{api_info.salt.hex()}'
                    F'*{api_info.encrypted_verifier.hex()}'
                    F'*{api_info.encrypted_verifier_hash.hex()}'
                ).encode(self.codec)
        raise ValueError('unsupported Word encryption version')

    def _hash_office_xls97(self, ole: OleFile) -> bytes:
        from refinery.lib.ole.crypto import (
            _parse_header_rc4,
            _parse_header_rc4_cryptoapi,
        )
        from refinery.lib.structures import MemoryFile

        with ole.openstream('Workbook') as wb:
            num = unpack('<H', wb.read(2))[0]
            if num != 2057:
                raise ValueError('invalid Workbook stream')
            size = unpack('<H', wb.read(2))[0]
            wb.read(size)
            while True:
                h = wb.read(4)
                if not h or len(h) < 4:
                    raise ValueError('FILEPASS record not found')
                rnum, rsize = unpack('<HH', h)
                if rnum == 47:
                    break
                wb.read(rsize)
            enc_type = unpack('<H', wb.read(2))[0]
            enc_data = MemoryFile(wb.read(rsize - 2))
            if enc_type == 0x0001:
                v_major, v_minor = unpack('<HH', enc_data.read(4))
                if v_major == 1 and v_minor == 1:
                    rc4_info = _parse_header_rc4(enc_data)
                    return (
                        F'$oldoffice$0*{rc4_info.salt.hex()}'
                        F'*{rc4_info.encrypted_verifier.hex()}'
                        F'*{rc4_info.encrypted_verifier_hash.hex()}'
                    ).encode(self.codec)
                elif v_major in (2, 3, 4) and v_minor == 2:
                    api_info = _parse_header_rc4_cryptoapi(enc_data)
                    typ = 3 if api_info.key_size <= 40 else 4
                    return (
                        F'$oldoffice${typ}*{api_info.salt.hex()}'
                        F'*{api_info.encrypted_verifier.hex()}'
                        F'*{api_info.encrypted_verifier_hash.hex()}'
                    ).encode(self.codec)
        raise ValueError('unsupported Excel encryption version')

    def _hash_office_ppt97(self, ole: OleFile) -> bytes:
        from refinery.lib.ole.crypto import (
            _construct_persist_object_directory,
            _parse_current_user_atom,
            _parse_record_header,
            _parse_user_edit_atom,
        )
        from refinery.lib.structures import MemoryFile

        cu_stream = ole.openstream('Current User')
        ppt_stream = ole.openstream('PowerPoint Document')
        pod = _construct_persist_object_directory(cu_stream, ppt_stream)
        cu_stream.seek(0)
        cu = _parse_current_user_atom(cu_stream)
        ppt_stream.seek(cu.offset_to_current_edit)
        uea = _parse_user_edit_atom(ppt_stream)
        if uea.encrypt_session_persist_id_ref is None:
            raise ValueError('this PowerPoint file is not encrypted')
        crypt_offset = pod[uea.encrypt_session_persist_id_ref]
        ppt_stream.seek(crypt_offset)
        rh = _parse_record_header(ppt_stream.read(8))
        crypt_data = ppt_stream.read(rh.rec_len)
        enc_info = MemoryFile(crypt_data)
        v_major, v_minor = unpack('<HH', enc_info.read(4))
        if v_major in (2, 3, 4) and v_minor == 2:
            from refinery.lib.ole.crypto import _parse_header_rc4_cryptoapi
            api_info = _parse_header_rc4_cryptoapi(enc_info)
            typ = 3 if api_info.key_size <= 40 else 4
            return (
                F'$oldoffice${typ}*{api_info.salt.hex()}'
                F'*{api_info.encrypted_verifier.hex()}'
                F'*{api_info.encrypted_verifier_hash.hex()}'
            ).encode(self.codec)
        raise ValueError('unsupported PowerPoint encryption version')

Ancestors

Subclasses

Class variables

var reverse

The type of the None singleton.

Inherited members