Module refinery.units.formats.msi

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import List, Dict, NamedTuple, Union, Optional

import codecs
import collections
import enum
import json
import re
import struct

from refinery.lib.structures import StructReader
from refinery.units.formats.office.xtdoc import xtdoc, UnpackResult
from refinery.lib import chunks
from refinery.lib.types import ByteStr
from refinery.lib.mime import FileMagicInfo
from refinery.lib.tools import cached_property


class MsiType(enum.IntEnum):
    """
    Known data types for MSI table cell entries.
    """

    Long = 0x104
    Short = 0x502
    Binary = 0x900
    String = 0xD00
    StringLocalized = 0xF00
    Unknown = 0

    def __str__(self):
        return self.name


class MSITableColumnInfo(NamedTuple):
    """
    Represents information about an MSI table column. See also:
    https://doxygen.reactos.org/db/de4/msipriv_8h.html
    """
    number: int
    attributes: int

    @property
    def type(self) -> MsiType:
        try:
            if self.is_integer:
                return MsiType(self.attributes & 0xFFF)
            else:
                return MsiType(self.attributes & 0xF00)
        except Exception:
            return MsiType.Unknown

    @property
    def is_integer(self) -> bool:
        return self.attributes & 0x0F00 < 0x800

    @property
    def is_key(self) -> bool:
        return self.attributes & 0x2000 == 0x2000

    @property
    def is_nullable(self) -> bool:
        return self.attributes & 0x1000 == 0x1000

    @property
    def length(self) -> int:
        vt = self.type
        if vt is MsiType.Long:
            return 4
        if vt is MsiType.Short:
            return 2
        return self.attributes & 0xFF

    @property
    def struct_format(self) -> str:
        vt = self.type
        if vt is MsiType.Long:
            return 'I'
        elif vt is MsiType.Short:
            return 'H'
        else:
            return 'H'


class MSIStringData:
    def __init__(self, string_data: ByteStr, string_pool: ByteStr):
        data = StructReader(string_data)
        pool = StructReader(string_pool)
        self.strings: List[bytes] = []
        self.provided_ref_count: List[int] = []
        self.computed_ref_count: List[int] = []
        self.codepage = pool.u16()
        self._unknown = pool.u16()
        while not pool.eof:
            size, rc = pool.read_struct('<HH')
            string = data.read_bytes(size)
            self.strings.append(string)
            self.provided_ref_count.append(rc)
            self.computed_ref_count.append(0)

    @cached_property
    def codec(self):
        try:
            return codecs.lookup(F'cp{self.codepage}').name
        except Exception:
            xtmsi.log_info('failed looking up codec', self.codepage)
            return 'latin1'

    def __len__(self):
        return len(self.strings)

    def __iter__(self):
        yield from range(1, len(self) + 1)

    def __contains__(self, index):
        return 0 < index <= len(self)

    def ref(self, index: int, increment=True) -> Union[str, bytes]:
        assert index > 0
        index -= 1
        if increment:
            self.computed_ref_count[index] += 1
        data = self.strings[index]
        data = data.decode(self.codec)
        return data


class xtmsi(xtdoc):
    """
    Extract files and metadata from Microsoft Installer (MSI) archives. The synthetic file {FN} contains
    parsed MSI table information, similar to the output of the Orca tool. Binary streams are placed in a
    virtual folder called "Binary", and extracted scripts from custom actions are separately extracted in
    a virtual folder named "Action".
    """

    _SYNTHETIC_STREAMS_FILENAME = 'MsiTables.json'

    # https://learn.microsoft.com/en-us/windows/win32/msi/summary-list-of-all-custom-action-types
    _CUSTOM_ACTION_TYPES = {
        0x01: 'DLL file stored in a Binary table stream.',
        0x02: 'EXE file stored in a Binary table stream.',
        0x05: 'JScript file stored in a Binary table stream.',
        0x06: 'VBScript file stored in a Binary table stream.',
        0x11: 'DLL file that is installed with a product.',
        0x12: 'EXE file that is installed with a product.',
        0x13: 'Displays a specified error message and returns failure, terminating the installation.',
        0x15: 'JScript file that is installed with a product.',
        0x16: 'VBScript file that is installed with a product.',
        0x22: 'EXE file having a path referencing a directory.',
        0x23: 'Directory set with formatted text.',
        0x25: 'JScript text stored in this sequence table.',
        0x26: 'VBScript text stored in this sequence table.',
        0x32: 'EXE file having a path specified by a property value.',
        0x33: 'Property set with formatted text.',
        0x35: 'JScript text specified by a property value.',
        0x36: 'VBScript text specified by a property value.',
    }

    def unpack(self, data):
        streams = {result.path: result for result in super().unpack(data)}

        def stream(name: str):
            return streams.pop(name).get_data()

        def column_formats(table: Dict[str, MSITableColumnInfo]) -> str:
            return ''.join(v.struct_format for v in table.values())

        def stream_to_rows(data: ByteStr, row_format: str):
            row_size = struct.calcsize(F'<{row_format}')
            row_count = int(len(data) / row_size)
            reader = StructReader(data)
            columns = [reader.read_struct(F'<{sc * row_count}') for sc in row_format]
            for i in range(row_count):
                yield [c[i] for c in columns]

        tables: Dict[str, Dict[str, MSITableColumnInfo]] = collections.defaultdict(collections.OrderedDict)
        strings = MSIStringData(stream('!_StringData'), stream('!_StringPool'))

        for tbl_name_id, col_number, col_name_id, col_attributes in stream_to_rows(stream('!_Columns'), 'HHHH'):
            tbl_name = strings.ref(tbl_name_id)
            col_name = strings.ref(col_name_id)
            tables[tbl_name][col_name] = MSITableColumnInfo(col_number, col_attributes)

        table_names_given = {strings.ref(k) for k in chunks.unpack(stream('!_Tables'), 2, False)}
        table_names_known = set(tables)

        for name in table_names_known - table_names_given:
            self.log_warn(F'table name known but not given: {name}')
        for name in table_names_given - table_names_known:
            self.log_warn(F'table name given but not known: {name}')

        class ScriptItem(NamedTuple):
            row_index: int
            extension: Optional[str]

        processed_table_data: Dict[str, List[Dict[str, str]]] = {}
        tbl_properties: Dict[str, str] = {}
        tbl_files: Dict[str, str] = {}
        tbl_components: Dict[str, str] = {}
        postprocessing: List[ScriptItem] = []

        def format_string(string: str):
            # https://learn.microsoft.com/en-us/windows/win32/msi/formatted
            def _replace(match: re.Match[str]):
                _replace.done = False
                prefix, name = match.groups()
                if not prefix:
                    tbl = tbl_properties
                elif prefix in '%':
                    name = name.rstrip('%').upper()
                    return F'%{name}%'
                elif prefix in '!#':
                    tbl = tbl_files
                elif prefix in '$':
                    tbl = tbl_components
                else:
                    raise ValueError
                return tbl.get(name, '')
            while True:
                _replace.done = True
                string = re.sub(R'''(?x)
                    \[             # open square brackent
                      (?![~\\])    # not followed by escapes
                      ([%$!#]?)    # any of the valid prefix characters
                      ([^[\]{}]+)  # no brackets or braces
                    \]''', _replace, string)
                if _replace.done:
                    break
            string = re.sub(r'\[\\(.)\]', r'\1', string)
            string = string.replace('[~]', '\0')
            return string

        for table_name, table in tables.items():
            stream_name = F'!{table_name}'
            if stream_name not in streams:
                continue
            processed = []
            info = list(table.values())
            for r, row in enumerate(stream_to_rows(stream(stream_name), column_formats(table))):
                values = []
                for index, value in enumerate(row):
                    vt = info[index].type
                    if vt is MsiType.Long:
                        if value != 0:
                            value -= 0x80000000
                    elif vt is MsiType.Short:
                        if value != 0:
                            value -= 0x8000
                    elif value in strings:
                        value = strings.ref(value)
                    elif not info[index].is_integer:
                        value = ''
                    values.append(value)
                if table_name == 'Property':
                    tbl_properties[values[0]] = values[1]
                if table_name == 'File':
                    tbl_properties[values[0]] = values[2]
                if table_name == 'Component':
                    tbl_properties[values[0]] = F'%{values[2]}%'
                entry = dict(zip(table, values))
                einfo = {t: i for t, i in zip(table, info)}
                if table_name == 'MsiFileHash':
                    entry['Hash'] = struct.pack(
                        '<IIII',
                        row[2] ^ 0x80000000,
                        row[3] ^ 0x80000000,
                        row[4] ^ 0x80000000,
                        row[5] ^ 0x80000000,
                    ).hex()
                if table_name == 'CustomAction':
                    code = row[1] & 0x3F
                    try:
                        entry['Comment'] = self._CUSTOM_ACTION_TYPES[code]
                    except LookupError:
                        pass
                    t = einfo.get('Target')
                    c = {0x25: 'js', 0x26: 'vbs', 0x33: None}
                    if code in c and t and not t.is_integer:
                        postprocessing.append(ScriptItem(r, c[code]))
                processed.append(entry)
            if processed:
                processed_table_data[table_name] = processed

        ca = processed_table_data.get('CustomAction', None)
        for item in postprocessing:
            entry = ca[item.row_index]
            try:
                path: str = entry['Action']
                data: str = entry['Target']
            except KeyError:
                continue
            root = F'Action/{path}'
            if item.extension:
                path = F'{root}.{item.extension}'
                streams[path] = UnpackResult(path, data.encode(self.codec))
                continue
            data = format_string(data)
            parts = [part.partition('\x02') for part in data.split('\x01')]
            if not all(part[1] == '\x02' for part in parts):
                continue
            for name, _, script in parts:
                if not name.lower().startswith('script'):
                    continue
                if not script:
                    continue
                path = F'{root}.{name}'
                streams[path] = UnpackResult(path, script.encode(self.codec))

        for ignored_stream in [
            '[5]SummaryInformation',
            '[5]DocumentSummaryInformation',
            '[5]DigitalSignature',
            '[5]MsiDigitalSignatureEx'
        ]:
            streams.pop(ignored_stream, None)

        inconsistencies = 0
        for k in range(len(strings)):
            c = strings.computed_ref_count[k]
            p = strings.provided_ref_count[k]
            if c != p and not self.log_debug(F'string reference count computed={c} provided={p}:', strings.ref(k + 1, False)):
                inconsistencies += 1
        if inconsistencies:
            self.log_info(F'found {inconsistencies} incorrect string reference counts')

        def fix_msi_path(path: str):
            prefix, dot, name = path.partition('.')
            if dot == '.' and prefix.lower() == 'binary':
                path = F'{prefix}/{name}'
            return path

        streams = {fix_msi_path(path): item for path, item in streams.items()}
        ds = UnpackResult(self._SYNTHETIC_STREAMS_FILENAME,
                json.dumps(processed_table_data, indent=4).encode(self.codec))
        streams[ds.path] = ds

        for path in sorted(streams):
            streams[path].path = path
            yield streams[path]

    @classmethod
    def handles(self, data: bytearray):
        if not data.startswith(B'\xD0\xCF\x11\xE0'):
            return False
        return FileMagicInfo(data).extension == 'msi'


xtmsi.__doc__ = xtmsi.__doc__.format(FN=xtmsi._SYNTHETIC_STREAMS_FILENAME)

Classes

class MsiType (value, names=None, *, module=None, qualname=None, type=None, start=1)

Known data types for MSI table cell entries.

Expand source code Browse git
class MsiType(enum.IntEnum):
    """
    Known data types for MSI table cell entries.
    """

    Long = 0x104
    Short = 0x502
    Binary = 0x900
    String = 0xD00
    StringLocalized = 0xF00
    Unknown = 0

    def __str__(self):
        return self.name

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var Long
var Short
var Binary
var String
var StringLocalized
var Unknown
class MSITableColumnInfo (number, attributes)

Represents information about an MSI table column. See also: https://doxygen.reactos.org/db/de4/msipriv_8h.html

Expand source code Browse git
class MSITableColumnInfo(NamedTuple):
    """
    Represents information about an MSI table column. See also:
    https://doxygen.reactos.org/db/de4/msipriv_8h.html
    """
    number: int
    attributes: int

    @property
    def type(self) -> MsiType:
        try:
            if self.is_integer:
                return MsiType(self.attributes & 0xFFF)
            else:
                return MsiType(self.attributes & 0xF00)
        except Exception:
            return MsiType.Unknown

    @property
    def is_integer(self) -> bool:
        return self.attributes & 0x0F00 < 0x800

    @property
    def is_key(self) -> bool:
        return self.attributes & 0x2000 == 0x2000

    @property
    def is_nullable(self) -> bool:
        return self.attributes & 0x1000 == 0x1000

    @property
    def length(self) -> int:
        vt = self.type
        if vt is MsiType.Long:
            return 4
        if vt is MsiType.Short:
            return 2
        return self.attributes & 0xFF

    @property
    def struct_format(self) -> str:
        vt = self.type
        if vt is MsiType.Long:
            return 'I'
        elif vt is MsiType.Short:
            return 'H'
        else:
            return 'H'

Ancestors

  • builtins.tuple

Instance variables

var number

Alias for field number 0

var attributes

Alias for field number 1

var type
Expand source code Browse git
@property
def type(self) -> MsiType:
    try:
        if self.is_integer:
            return MsiType(self.attributes & 0xFFF)
        else:
            return MsiType(self.attributes & 0xF00)
    except Exception:
        return MsiType.Unknown
var is_integer
Expand source code Browse git
@property
def is_integer(self) -> bool:
    return self.attributes & 0x0F00 < 0x800
var is_key
Expand source code Browse git
@property
def is_key(self) -> bool:
    return self.attributes & 0x2000 == 0x2000
var is_nullable
Expand source code Browse git
@property
def is_nullable(self) -> bool:
    return self.attributes & 0x1000 == 0x1000
var length
Expand source code Browse git
@property
def length(self) -> int:
    vt = self.type
    if vt is MsiType.Long:
        return 4
    if vt is MsiType.Short:
        return 2
    return self.attributes & 0xFF
var struct_format
Expand source code Browse git
@property
def struct_format(self) -> str:
    vt = self.type
    if vt is MsiType.Long:
        return 'I'
    elif vt is MsiType.Short:
        return 'H'
    else:
        return 'H'
class MSIStringData (string_data, string_pool)
Expand source code Browse git
class MSIStringData:
    def __init__(self, string_data: ByteStr, string_pool: ByteStr):
        data = StructReader(string_data)
        pool = StructReader(string_pool)
        self.strings: List[bytes] = []
        self.provided_ref_count: List[int] = []
        self.computed_ref_count: List[int] = []
        self.codepage = pool.u16()
        self._unknown = pool.u16()
        while not pool.eof:
            size, rc = pool.read_struct('<HH')
            string = data.read_bytes(size)
            self.strings.append(string)
            self.provided_ref_count.append(rc)
            self.computed_ref_count.append(0)

    @cached_property
    def codec(self):
        try:
            return codecs.lookup(F'cp{self.codepage}').name
        except Exception:
            xtmsi.log_info('failed looking up codec', self.codepage)
            return 'latin1'

    def __len__(self):
        return len(self.strings)

    def __iter__(self):
        yield from range(1, len(self) + 1)

    def __contains__(self, index):
        return 0 < index <= len(self)

    def ref(self, index: int, increment=True) -> Union[str, bytes]:
        assert index > 0
        index -= 1
        if increment:
            self.computed_ref_count[index] += 1
        data = self.strings[index]
        data = data.decode(self.codec)
        return data

Instance variables

var codec
Expand source code Browse git
@cached_property
def codec(self):
    try:
        return codecs.lookup(F'cp{self.codepage}').name
    except Exception:
        xtmsi.log_info('failed looking up codec', self.codepage)
        return 'latin1'

Methods

def ref(self, index, increment=True)
Expand source code Browse git
def ref(self, index: int, increment=True) -> Union[str, bytes]:
    assert index > 0
    index -= 1
    if increment:
        self.computed_ref_count[index] += 1
    data = self.strings[index]
    data = data.decode(self.codec)
    return data
class xtmsi (*paths, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')

Extract files and metadata from Microsoft Installer (MSI) archives. The synthetic file MsiTables.json contains parsed MSI table information, similar to the output of the Orca tool. Binary streams are placed in a virtual folder called "Binary", and extracted scripts from custom actions are separately extracted in a virtual folder named "Action".

Expand source code Browse git
class xtmsi(xtdoc):
    """
    Extract files and metadata from Microsoft Installer (MSI) archives. The synthetic file {FN} contains
    parsed MSI table information, similar to the output of the Orca tool. Binary streams are placed in a
    virtual folder called "Binary", and extracted scripts from custom actions are separately extracted in
    a virtual folder named "Action".
    """

    _SYNTHETIC_STREAMS_FILENAME = 'MsiTables.json'

    # https://learn.microsoft.com/en-us/windows/win32/msi/summary-list-of-all-custom-action-types
    _CUSTOM_ACTION_TYPES = {
        0x01: 'DLL file stored in a Binary table stream.',
        0x02: 'EXE file stored in a Binary table stream.',
        0x05: 'JScript file stored in a Binary table stream.',
        0x06: 'VBScript file stored in a Binary table stream.',
        0x11: 'DLL file that is installed with a product.',
        0x12: 'EXE file that is installed with a product.',
        0x13: 'Displays a specified error message and returns failure, terminating the installation.',
        0x15: 'JScript file that is installed with a product.',
        0x16: 'VBScript file that is installed with a product.',
        0x22: 'EXE file having a path referencing a directory.',
        0x23: 'Directory set with formatted text.',
        0x25: 'JScript text stored in this sequence table.',
        0x26: 'VBScript text stored in this sequence table.',
        0x32: 'EXE file having a path specified by a property value.',
        0x33: 'Property set with formatted text.',
        0x35: 'JScript text specified by a property value.',
        0x36: 'VBScript text specified by a property value.',
    }

    def unpack(self, data):
        streams = {result.path: result for result in super().unpack(data)}

        def stream(name: str):
            return streams.pop(name).get_data()

        def column_formats(table: Dict[str, MSITableColumnInfo]) -> str:
            return ''.join(v.struct_format for v in table.values())

        def stream_to_rows(data: ByteStr, row_format: str):
            row_size = struct.calcsize(F'<{row_format}')
            row_count = int(len(data) / row_size)
            reader = StructReader(data)
            columns = [reader.read_struct(F'<{sc * row_count}') for sc in row_format]
            for i in range(row_count):
                yield [c[i] for c in columns]

        tables: Dict[str, Dict[str, MSITableColumnInfo]] = collections.defaultdict(collections.OrderedDict)
        strings = MSIStringData(stream('!_StringData'), stream('!_StringPool'))

        for tbl_name_id, col_number, col_name_id, col_attributes in stream_to_rows(stream('!_Columns'), 'HHHH'):
            tbl_name = strings.ref(tbl_name_id)
            col_name = strings.ref(col_name_id)
            tables[tbl_name][col_name] = MSITableColumnInfo(col_number, col_attributes)

        table_names_given = {strings.ref(k) for k in chunks.unpack(stream('!_Tables'), 2, False)}
        table_names_known = set(tables)

        for name in table_names_known - table_names_given:
            self.log_warn(F'table name known but not given: {name}')
        for name in table_names_given - table_names_known:
            self.log_warn(F'table name given but not known: {name}')

        class ScriptItem(NamedTuple):
            row_index: int
            extension: Optional[str]

        processed_table_data: Dict[str, List[Dict[str, str]]] = {}
        tbl_properties: Dict[str, str] = {}
        tbl_files: Dict[str, str] = {}
        tbl_components: Dict[str, str] = {}
        postprocessing: List[ScriptItem] = []

        def format_string(string: str):
            # https://learn.microsoft.com/en-us/windows/win32/msi/formatted
            def _replace(match: re.Match[str]):
                _replace.done = False
                prefix, name = match.groups()
                if not prefix:
                    tbl = tbl_properties
                elif prefix in '%':
                    name = name.rstrip('%').upper()
                    return F'%{name}%'
                elif prefix in '!#':
                    tbl = tbl_files
                elif prefix in '$':
                    tbl = tbl_components
                else:
                    raise ValueError
                return tbl.get(name, '')
            while True:
                _replace.done = True
                string = re.sub(R'''(?x)
                    \[             # open square brackent
                      (?![~\\])    # not followed by escapes
                      ([%$!#]?)    # any of the valid prefix characters
                      ([^[\]{}]+)  # no brackets or braces
                    \]''', _replace, string)
                if _replace.done:
                    break
            string = re.sub(r'\[\\(.)\]', r'\1', string)
            string = string.replace('[~]', '\0')
            return string

        for table_name, table in tables.items():
            stream_name = F'!{table_name}'
            if stream_name not in streams:
                continue
            processed = []
            info = list(table.values())
            for r, row in enumerate(stream_to_rows(stream(stream_name), column_formats(table))):
                values = []
                for index, value in enumerate(row):
                    vt = info[index].type
                    if vt is MsiType.Long:
                        if value != 0:
                            value -= 0x80000000
                    elif vt is MsiType.Short:
                        if value != 0:
                            value -= 0x8000
                    elif value in strings:
                        value = strings.ref(value)
                    elif not info[index].is_integer:
                        value = ''
                    values.append(value)
                if table_name == 'Property':
                    tbl_properties[values[0]] = values[1]
                if table_name == 'File':
                    tbl_properties[values[0]] = values[2]
                if table_name == 'Component':
                    tbl_properties[values[0]] = F'%{values[2]}%'
                entry = dict(zip(table, values))
                einfo = {t: i for t, i in zip(table, info)}
                if table_name == 'MsiFileHash':
                    entry['Hash'] = struct.pack(
                        '<IIII',
                        row[2] ^ 0x80000000,
                        row[3] ^ 0x80000000,
                        row[4] ^ 0x80000000,
                        row[5] ^ 0x80000000,
                    ).hex()
                if table_name == 'CustomAction':
                    code = row[1] & 0x3F
                    try:
                        entry['Comment'] = self._CUSTOM_ACTION_TYPES[code]
                    except LookupError:
                        pass
                    t = einfo.get('Target')
                    c = {0x25: 'js', 0x26: 'vbs', 0x33: None}
                    if code in c and t and not t.is_integer:
                        postprocessing.append(ScriptItem(r, c[code]))
                processed.append(entry)
            if processed:
                processed_table_data[table_name] = processed

        ca = processed_table_data.get('CustomAction', None)
        for item in postprocessing:
            entry = ca[item.row_index]
            try:
                path: str = entry['Action']
                data: str = entry['Target']
            except KeyError:
                continue
            root = F'Action/{path}'
            if item.extension:
                path = F'{root}.{item.extension}'
                streams[path] = UnpackResult(path, data.encode(self.codec))
                continue
            data = format_string(data)
            parts = [part.partition('\x02') for part in data.split('\x01')]
            if not all(part[1] == '\x02' for part in parts):
                continue
            for name, _, script in parts:
                if not name.lower().startswith('script'):
                    continue
                if not script:
                    continue
                path = F'{root}.{name}'
                streams[path] = UnpackResult(path, script.encode(self.codec))

        for ignored_stream in [
            '[5]SummaryInformation',
            '[5]DocumentSummaryInformation',
            '[5]DigitalSignature',
            '[5]MsiDigitalSignatureEx'
        ]:
            streams.pop(ignored_stream, None)

        inconsistencies = 0
        for k in range(len(strings)):
            c = strings.computed_ref_count[k]
            p = strings.provided_ref_count[k]
            if c != p and not self.log_debug(F'string reference count computed={c} provided={p}:', strings.ref(k + 1, False)):
                inconsistencies += 1
        if inconsistencies:
            self.log_info(F'found {inconsistencies} incorrect string reference counts')

        def fix_msi_path(path: str):
            prefix, dot, name = path.partition('.')
            if dot == '.' and prefix.lower() == 'binary':
                path = F'{prefix}/{name}'
            return path

        streams = {fix_msi_path(path): item for path, item in streams.items()}
        ds = UnpackResult(self._SYNTHETIC_STREAMS_FILENAME,
                json.dumps(processed_table_data, indent=4).encode(self.codec))
        streams[ds.path] = ds

        for path in sorted(streams):
            streams[path].path = path
            yield streams[path]

    @classmethod
    def handles(self, data: bytearray):
        if not data.startswith(B'\xD0\xCF\x11\xE0'):
            return False
        return FileMagicInfo(data).extension == 'msi'

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    streams = {result.path: result for result in super().unpack(data)}

    def stream(name: str):
        return streams.pop(name).get_data()

    def column_formats(table: Dict[str, MSITableColumnInfo]) -> str:
        return ''.join(v.struct_format for v in table.values())

    def stream_to_rows(data: ByteStr, row_format: str):
        row_size = struct.calcsize(F'<{row_format}')
        row_count = int(len(data) / row_size)
        reader = StructReader(data)
        columns = [reader.read_struct(F'<{sc * row_count}') for sc in row_format]
        for i in range(row_count):
            yield [c[i] for c in columns]

    tables: Dict[str, Dict[str, MSITableColumnInfo]] = collections.defaultdict(collections.OrderedDict)
    strings = MSIStringData(stream('!_StringData'), stream('!_StringPool'))

    for tbl_name_id, col_number, col_name_id, col_attributes in stream_to_rows(stream('!_Columns'), 'HHHH'):
        tbl_name = strings.ref(tbl_name_id)
        col_name = strings.ref(col_name_id)
        tables[tbl_name][col_name] = MSITableColumnInfo(col_number, col_attributes)

    table_names_given = {strings.ref(k) for k in chunks.unpack(stream('!_Tables'), 2, False)}
    table_names_known = set(tables)

    for name in table_names_known - table_names_given:
        self.log_warn(F'table name known but not given: {name}')
    for name in table_names_given - table_names_known:
        self.log_warn(F'table name given but not known: {name}')

    class ScriptItem(NamedTuple):
        row_index: int
        extension: Optional[str]

    processed_table_data: Dict[str, List[Dict[str, str]]] = {}
    tbl_properties: Dict[str, str] = {}
    tbl_files: Dict[str, str] = {}
    tbl_components: Dict[str, str] = {}
    postprocessing: List[ScriptItem] = []

    def format_string(string: str):
        # https://learn.microsoft.com/en-us/windows/win32/msi/formatted
        def _replace(match: re.Match[str]):
            _replace.done = False
            prefix, name = match.groups()
            if not prefix:
                tbl = tbl_properties
            elif prefix in '%':
                name = name.rstrip('%').upper()
                return F'%{name}%'
            elif prefix in '!#':
                tbl = tbl_files
            elif prefix in '$':
                tbl = tbl_components
            else:
                raise ValueError
            return tbl.get(name, '')
        while True:
            _replace.done = True
            string = re.sub(R'''(?x)
                \[             # open square brackent
                  (?![~\\])    # not followed by escapes
                  ([%$!#]?)    # any of the valid prefix characters
                  ([^[\]{}]+)  # no brackets or braces
                \]''', _replace, string)
            if _replace.done:
                break
        string = re.sub(r'\[\\(.)\]', r'\1', string)
        string = string.replace('[~]', '\0')
        return string

    for table_name, table in tables.items():
        stream_name = F'!{table_name}'
        if stream_name not in streams:
            continue
        processed = []
        info = list(table.values())
        for r, row in enumerate(stream_to_rows(stream(stream_name), column_formats(table))):
            values = []
            for index, value in enumerate(row):
                vt = info[index].type
                if vt is MsiType.Long:
                    if value != 0:
                        value -= 0x80000000
                elif vt is MsiType.Short:
                    if value != 0:
                        value -= 0x8000
                elif value in strings:
                    value = strings.ref(value)
                elif not info[index].is_integer:
                    value = ''
                values.append(value)
            if table_name == 'Property':
                tbl_properties[values[0]] = values[1]
            if table_name == 'File':
                tbl_properties[values[0]] = values[2]
            if table_name == 'Component':
                tbl_properties[values[0]] = F'%{values[2]}%'
            entry = dict(zip(table, values))
            einfo = {t: i for t, i in zip(table, info)}
            if table_name == 'MsiFileHash':
                entry['Hash'] = struct.pack(
                    '<IIII',
                    row[2] ^ 0x80000000,
                    row[3] ^ 0x80000000,
                    row[4] ^ 0x80000000,
                    row[5] ^ 0x80000000,
                ).hex()
            if table_name == 'CustomAction':
                code = row[1] & 0x3F
                try:
                    entry['Comment'] = self._CUSTOM_ACTION_TYPES[code]
                except LookupError:
                    pass
                t = einfo.get('Target')
                c = {0x25: 'js', 0x26: 'vbs', 0x33: None}
                if code in c and t and not t.is_integer:
                    postprocessing.append(ScriptItem(r, c[code]))
            processed.append(entry)
        if processed:
            processed_table_data[table_name] = processed

    ca = processed_table_data.get('CustomAction', None)
    for item in postprocessing:
        entry = ca[item.row_index]
        try:
            path: str = entry['Action']
            data: str = entry['Target']
        except KeyError:
            continue
        root = F'Action/{path}'
        if item.extension:
            path = F'{root}.{item.extension}'
            streams[path] = UnpackResult(path, data.encode(self.codec))
            continue
        data = format_string(data)
        parts = [part.partition('\x02') for part in data.split('\x01')]
        if not all(part[1] == '\x02' for part in parts):
            continue
        for name, _, script in parts:
            if not name.lower().startswith('script'):
                continue
            if not script:
                continue
            path = F'{root}.{name}'
            streams[path] = UnpackResult(path, script.encode(self.codec))

    for ignored_stream in [
        '[5]SummaryInformation',
        '[5]DocumentSummaryInformation',
        '[5]DigitalSignature',
        '[5]MsiDigitalSignatureEx'
    ]:
        streams.pop(ignored_stream, None)

    inconsistencies = 0
    for k in range(len(strings)):
        c = strings.computed_ref_count[k]
        p = strings.provided_ref_count[k]
        if c != p and not self.log_debug(F'string reference count computed={c} provided={p}:', strings.ref(k + 1, False)):
            inconsistencies += 1
    if inconsistencies:
        self.log_info(F'found {inconsistencies} incorrect string reference counts')

    def fix_msi_path(path: str):
        prefix, dot, name = path.partition('.')
        if dot == '.' and prefix.lower() == 'binary':
            path = F'{prefix}/{name}'
        return path

    streams = {fix_msi_path(path): item for path, item in streams.items()}
    ds = UnpackResult(self._SYNTHETIC_STREAMS_FILENAME,
            json.dumps(processed_table_data, indent=4).encode(self.codec))
    streams[ds.path] = ds

    for path in sorted(streams):
        streams[path].path = path
        yield streams[path]

Inherited members