Module refinery.lib.access

Parser for Microsoft Access database files (.mdb / .accdb). Supported are:

Type Program
Jet3 Access 97
Jet4 Access 2000/2002/2003
ACE12 Access 2007
ACE14 Access 2010+
Expand source code Browse git
"""
Parser for Microsoft Access database files (.mdb / .accdb). Supported are:

|  Type | Program               |
|------:|:----------------------|
|  Jet3 | Access 97             |
|  Jet4 | Access 2000/2002/2003 |
| ACE12 | Access 2007           |
| ACE14 | Access 2010+          |
"""
from __future__ import annotations

import codecs
import enum
import math
import struct

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Literal, NamedTuple, overload
from uuid import UUID

from refinery.lib.structures import StructReader

_ACCESS_EPOCH = datetime(1899, 12, 30)

_TABLE_MAGIC = b'\x02\x01'
_DATA_MAGIC = b'\x01\x01'

_SYSTEM_FLAGS = {-0x80000000, -2, 0x80000000, 2}


class JetVersion(enum.IntEnum):
    V3 = 0x00
    V4 = 0x01
    V5 = 0x02
    V2010 = 0x03


class ColumnType(enum.IntEnum):
    BOOLEAN = 1
    INT8 = 2
    INT16 = 3
    INT32 = 4
    MONEY = 5
    FLOAT32 = 6
    FLOAT64 = 7
    DATETIME = 8
    BINARY = 9
    TEXT = 10
    OLE = 11
    MEMO = 12
    GUID = 15
    NUMERIC = 16
    COMPLEX = 18


class _Column(NamedTuple):
    type: int
    column_id: int
    variable_column_number: int
    column_index: int
    fixed_length: bool
    fixed_offset: int
    length: int
    name: str = ''
    collation: int | None = None
    code_page: int | None = None
    precision: int | None = None
    scale: int | None = None


class _VarLenMeta(NamedTuple):
    field_count: int
    field_offsets: list[int]
    var_len_count: int
    jump_table: list[int]
    metadata_end: int


def _mdb_date(raw: int) -> datetime | None:
    try:
        value, = struct.unpack('<d', struct.pack('<Q', raw))
        frac, whole = math.modf(value)
        result = _ACCESS_EPOCH + timedelta(days=whole) + timedelta(days=frac)
        if result == _ACCESS_EPOCH:
            return None
        return result
    except (OverflowError, ValueError, struct.error):
        return None


def _numeric_to_string(data: bytes | memoryview, scale: int = 6) -> str:
    neg, n1, n2, n3, n4 = struct.unpack_from('<BIIII', data)
    full = (n1 << 96) + (n2 << 64) + (n3 << 32) + n4
    digits = str(full)
    if len(digits) > scale:
        dot = len(digits) - scale
        digits = F'{digits[:dot]}.{digits[dot:]}'
    return F'-{digits}' if neg else digits


def _decode_text(data: bytes | memoryview) -> str:
    try:
        return codecs.decode(data, 'utf-8')
    except (UnicodeDecodeError, ValueError):
        return codecs.decode(data, 'latin-1')


@overload
def _parse_type(column_type: Literal[
    ColumnType.INT8,
    ColumnType.INT16,
    ColumnType.INT32,
    ColumnType.MONEY,
], data: bytes | memoryview, length: int | None = None, is_v3: bool = True) -> int:
    pass


@overload
def _parse_type(column_type: Literal[
    ColumnType.FLOAT32,
    ColumnType.FLOAT64,
], data: bytes | memoryview, length: int | None = None, is_v3: bool = True) -> float:
    pass


@overload
def _parse_type(column_type: Literal[
    ColumnType.OLE,
], data: bytes | memoryview, length: int | None = None, is_v3: bool = True) -> bytes:
    pass


@overload
def _parse_type(column_type: Literal[
    ColumnType.TEXT,
], data: bytes | memoryview, length: int | None = None, is_v3: bool = True) -> str:
    pass


@overload
def _parse_type(column_type: Literal[
    ColumnType.DATETIME,
], data: bytes | memoryview, length: int | None = None, is_v3: bool = True) -> datetime | None:
    pass


@overload
def _parse_type(column_type: Literal[
    ColumnType.GUID,
], data: bytes | memoryview, length: int | None = None, is_v3: bool = True) -> UUID:
    pass


@overload
def _parse_type(
    column_type: int, data: bytes | memoryview, length: int | None = None, is_v3: bool = True
) -> int | float | str | bytes | datetime | UUID | None:
    pass


def _parse_type(
    column_type: int,
    data: bytes | memoryview,
    length: int | None = None,
    is_v3: bool = True,
) -> int | float | str | bytes | datetime | UUID | None:
    if column_type == ColumnType.INT8:
        return struct.unpack_from('b', data)[0]
    if column_type == ColumnType.INT16:
        return struct.unpack_from('<h', data)[0]
    if column_type in (ColumnType.INT32, ColumnType.COMPLEX):
        return struct.unpack_from('<i', data)[0]
    if column_type == ColumnType.MONEY:
        return struct.unpack_from('<q', data)[0]
    if column_type == ColumnType.FLOAT32:
        return struct.unpack_from('<f', data)[0]
    if column_type == ColumnType.FLOAT64:
        return struct.unpack_from('<d', data)[0]
    if column_type == ColumnType.DATETIME:
        raw = struct.unpack_from('<Q', data)[0]
        return _mdb_date(raw)
    if column_type == ColumnType.BINARY:
        if length is not None:
            return bytes(data[:length])
        return bytes(data)
    if column_type == ColumnType.OLE:
        return bytes(data)
    if column_type == ColumnType.GUID:
        return UUID(bytes_le=bytes(data[:16]))
    if column_type == ColumnType.NUMERIC:
        return bytes(data[:17])
    if column_type == ColumnType.TEXT:
        if not is_v3:
            if data[:2] in (b'\xfe\xff', b'\xff\xfe'):
                text = _decode_text(data[2:])
            else:
                text = codecs.decode(data, 'utf-16-le', errors='ignore')
        else:
            text = _decode_text(data)
        return text.replace('\x00', '')
    return bytes(data)


def _parse_data_page_header(reader: StructReader[memoryview], is_v3: bool) -> tuple[int, list[int]]:
    magic = reader.read(2)
    if bytes(magic) != _DATA_MAGIC:
        raise ValueError('invalid data page magic')
    reader.u16()
    owner = reader.u32()
    if not is_v3:
        reader.u32()
    record_count = reader.u16()
    offsets = [reader.u16() for _ in range(record_count)]
    return owner, offsets


def _parse_tdef_header(reader: StructReader[memoryview]) -> tuple[int, int]:
    magic = reader.read(2)
    if bytes(magic) != _TABLE_MAGIC:
        raise ValueError('invalid table definition magic')
    reader.u16()
    next_page = reader.u32()
    header_end = reader.tell()
    return next_page, header_end


class _TableHead(NamedTuple):
    next_page: int
    row_count: int
    variable_columns: int
    column_count: int
    index_count: int
    real_index_count: int
    header_end: int


def _parse_table_head(reader: StructReader[memoryview], is_v3: bool) -> _TableHead:
    next_page, _ = _parse_tdef_header(reader)
    reader.skip(4 if is_v3 else 8)
    row_count = reader.u32()
    reader.skip(7 if is_v3 else 23)
    variable_columns = reader.u16()
    column_count = reader.u16()
    index_count = reader.u32()
    real_index_count = reader.u32()
    reader.skip(8)
    header_end = reader.tell()
    return _TableHead(
        next_page=next_page,
        row_count=row_count,
        variable_columns=variable_columns,
        column_count=column_count,
        index_count=index_count,
        real_index_count=real_index_count,
        header_end=header_end,
    )


def _parse_columns(
    reader: StructReader[memoryview],
    column_count: int,
    real_index_count: int,
    index_count: int,
    is_v3: bool,
) -> list[_Column]:
    for _ in range(real_index_count):
        reader.skip(12 if not is_v3 else 8)

    raw_columns = []
    for _ in range(column_count):
        col_type = reader.u8()
        if not is_v3:
            reader.skip(4)
        col_id = reader.u16()
        var_col_num = reader.u16()
        col_index = reader.u16()

        collation: int | None = None
        code_page: int | None = None
        precision: int | None = None
        scale: int | None = None
        if col_type in (
            ColumnType.BINARY,
            ColumnType.TEXT,
            ColumnType.OLE,
            ColumnType.MEMO,
        ):
            if is_v3:
                collation = reader.u16()
                code_page = reader.u16()
                reader.skip(2)
            else:
                collation = reader.u16()
                reader.skip(2)
        elif col_type == ColumnType.NUMERIC:
            precision = reader.u8()
            scale = reader.u8()
            reader.skip(4 if is_v3 else 2)
        elif col_type in (
            ColumnType.BOOLEAN,
            ColumnType.INT8,
            ColumnType.INT16,
            ColumnType.INT32,
            ColumnType.MONEY,
            ColumnType.FLOAT32,
            ColumnType.FLOAT64,
            ColumnType.DATETIME,
        ):
            reader.skip(6 if is_v3 else 4)
        else:
            reader.skip(6 if is_v3 else 4)

        flags_byte = reader.u8()
        fixed_length = bool(flags_byte & 0x01)
        if not is_v3:
            reader.skip(5)

        fixed_offset = reader.u16()
        length = reader.u16()
        raw_columns.append(_Column(
            type=col_type,
            column_id=col_id,
            variable_column_number=var_col_num,
            column_index=col_index,
            fixed_length=fixed_length,
            fixed_offset=fixed_offset,
            length=length,
            collation=collation,
            code_page=code_page,
            precision=precision,
            scale=scale,
        ))

    columns = []
    for col in raw_columns:
        if is_v3:
            name_len = reader.u8()
            name = codecs.decode(reader.read(name_len), 'utf-8', errors='replace')
        else:
            name_len = reader.u16()
            name = codecs.decode(reader.read(name_len), 'utf-16-le', errors='replace')
        columns.append(col._replace(name=name))

    for _ in range(real_index_count):
        reader.skip(52 if not is_v3 else 39)

    for _ in range(index_count):
        reader.skip(28 if not is_v3 else 20)

    return columns


def _parse_var_length_metadata(
    reverse_data: memoryview,
    is_v3: bool,
    jump_table_count: int = 0,
) -> _VarLenMeta | None:
    reader = StructReader[memoryview](reverse_data)
    reader.bigendian = True
    try:
        if is_v3:
            field_count = reader.u8()
            jump_table = [reader.u8() for _ in range(jump_table_count)]
            offsets = [reader.u8() for _ in range(field_count)]
            var_len_count = reader.u8()
        else:
            field_count = reader.u16()
            jump_table = []
            count = field_count & 0xFF
            offsets = [reader.u16() for _ in range(count)]
            var_len_count = reader.u16()
        return _VarLenMeta(
            field_count=field_count,
            field_offsets=offsets,
            var_len_count=var_len_count,
            jump_table=jump_table,
            metadata_end=reader.tell(),
        )
    except Exception:
        return None


class _TableObj:

    __slots__ = 'offset', 'value', 'linked_pages'

    def __init__(self, offset: int, value: memoryview, /):
        self.offset = offset
        self.value = value
        self.linked_pages: list[memoryview] = []


class AccessDatabase:
    """
    Parser for Microsoft Access database files. Accepts raw bytes as input and
    provides a `catalog` mapping table names to IDs and a `parse_table` method
    that returns `dict[str, list]` (column name to list of row values).
    """

    def __init__(self, data: bytes | bytearray | memoryview):
        mv = memoryview(data)
        self._data = mv
        self._parse_header()
        self._table_defs, self._data_pages = self._categorize_pages()
        self._tables_with_data = self._link_tables_to_data()
        self.catalog: dict[str, int] = self._parse_catalog()

    def _parse_header(self):
        reader = StructReader[memoryview](self._data)
        magic = reader.read(4)
        if bytes(magic) != b'\x00\x01\x00\x00':
            raise ValueError('not a valid Access database file')
        while reader.u8():
            pass
        raw_version = reader.u32()
        try:
            version = JetVersion(raw_version)
        except ValueError:
            version = JetVersion.V3
        self._version = version
        self._is_v3 = version == JetVersion.V3
        self._page_size = 0x800 if self._is_v3 else 0x1000

    def _categorize_pages(self):
        table_defs: dict[int, memoryview] = {}
        data_pages: dict[int, memoryview] = {}
        ps = self._page_size
        data = self._data
        for offset in range(0, len(data), ps):
            page = data[offset:offset + ps]
            if len(page) < 2:
                continue
            sig = bytes(page[:2])
            if sig == _TABLE_MAGIC:
                table_defs[offset] = page
            elif sig == _DATA_MAGIC:
                data_pages[offset] = page
        return table_defs, data_pages

    def _link_tables_to_data(self) -> dict[int, _TableObj]:
        tables: dict[int, _TableObj] = {}
        ps = self._page_size
        for offset, page in self._data_pages.items():
            try:
                reader = StructReader[memoryview](page)
                owner, _ = _parse_data_page_header(reader, self._is_v3)
            except Exception:
                continue
            page_offset = owner * ps
            if page_offset not in self._table_defs:
                continue
            if page_offset not in tables:
                tables[page_offset] = _TableObj(page_offset, self._table_defs[page_offset])
            tables[page_offset].linked_pages.append(page)
        return tables

    def _parse_catalog(self) -> dict[str, int]:
        catalog_offset = 2 * self._page_size
        if catalog_offset not in self._tables_with_data:
            return {}
        catalog_table = self._tables_with_data[catalog_offset]
        parsed = self._do_parse_table(catalog_table)
        if not parsed:
            return {}
        names = parsed.get('Name', [])
        ids = parsed.get('Id', [])
        types = parsed.get('Type', [])
        flags = parsed.get('Flags', [])
        mapping: dict[str, int] = {}
        for i, name in enumerate(names):
            if not isinstance(name, str):
                continue
            if name == 'MSysObjects':
                if i < len(ids):
                    mapping[name] = ids[i]
                continue
            if i < len(types) and types[i] == 1:
                if i < len(flags) and flags[i] not in _SYSTEM_FLAGS:
                    if i < len(ids):
                        mapping[name] = ids[i]
        return mapping

    def parse_table(self, name: str) -> dict[str, list]:
        """
        Parse a table by name. Returns a dictionary mapping column names to
        lists of row values.
        """
        table_id = self.catalog.get(name)
        if table_id is None:
            return {}
        table_offset = table_id * self._page_size
        table_obj = self._tables_with_data.get(table_offset)
        if table_obj is None:
            table_def = self._table_defs.get(table_offset)
            if table_def is not None:
                table_obj = _TableObj(table_offset, table_def)
            else:
                return {}
        return self._do_parse_table(table_obj)

    def _do_parse_table(self, table_obj: _TableObj) -> dict[str, list]:
        try:
            columns, variable_columns, column_count = self._get_table_columns(table_obj)
        except Exception:
            return {}
        if not columns:
            return {}

        parsed: dict[str, list] = defaultdict(list)

        if not table_obj.linked_pages:
            for col in columns.values():
                parsed[col.name] = []
            return dict(parsed)

        for data_chunk in table_obj.linked_pages:
            try:
                reader = StructReader[memoryview](data_chunk)
                _, record_offsets = _parse_data_page_header(reader, self._is_v3)
            except Exception:
                continue

            last_offset: int | None = None
            for rec_offset in record_offsets:
                if rec_offset & 0x8000:
                    last_offset = rec_offset & 0xFFF
                    continue
                if rec_offset & 0x4000:
                    ptr_offset = rec_offset & 0xFFF
                    last_offset = ptr_offset
                    if ptr_offset + 4 <= len(data_chunk):
                        overflow_ptr = struct.unpack_from('<I', data_chunk, ptr_offset)[0]
                        record = self._get_overflow_record(overflow_ptr)
                        if record:
                            self._parse_row(
                                record, columns, variable_columns, column_count, parsed)
                    continue

                if last_offset is None:
                    record = data_chunk[rec_offset:]
                else:
                    record = data_chunk[rec_offset:last_offset]
                last_offset = rec_offset
                if record:
                    self._parse_row(
                        record, columns, variable_columns, column_count, parsed)

        return dict(parsed)

    def _get_table_columns(self, table_obj: _TableObj):
        reader = StructReader[memoryview](table_obj.value)
        head = _parse_table_head(reader, self._is_v3)

        merged = table_obj.value[head.header_end:]
        if head.next_page:
            extra = self._merge_tdef_pages(head.next_page)
            merged = memoryview(bytes(merged) + bytes(extra))

        col_reader = StructReader[memoryview](merged)
        columns_list = _parse_columns(
            col_reader, head.column_count, head.real_index_count,
            head.index_count, self._is_v3)

        offset = min(c.column_index for c in columns_list) if columns_list else 0
        column_dict = {c.column_index - offset: c for c in columns_list}
        if len(column_dict) != len(columns_list):
            column_dict = {c.column_id: c for c in columns_list}

        return column_dict, head.variable_columns, head.column_count

    def _merge_tdef_pages(self, first_page_num: int) -> memoryview:
        parts = bytearray()
        page_data = self._table_defs.get(first_page_num * self._page_size)
        if page_data is None:
            return memoryview(parts)
        reader = StructReader[memoryview](page_data)
        next_page, header_end = _parse_tdef_header(reader)
        parts.extend(page_data[header_end:])
        while next_page:
            page_data = self._table_defs.get(next_page * self._page_size)
            if page_data is None:
                break
            reader = StructReader[memoryview](page_data)
            next_page, header_end = _parse_tdef_header(reader)
            parts.extend(page_data[header_end:])
        return memoryview(parts)

    def _parse_row(
        self,
        record: memoryview,
        columns: dict[int, _Column],
        variable_columns_count: int,
        column_count: int,
        parsed: dict[str, list],
    ):
        if len(record) < 1:
            return
        null_table_len = (column_count + 7) // 8
        if null_table_len >= len(record):
            return
        null_bytes = record[-null_table_len:]
        null_table = [
            ((null_bytes[i // 8]) & (1 << (i % 8))) != 0
            for i in range(null_table_len * 8)
        ]

        if not self._is_v3:
            if len(record) < 2:
                return
            row_data = record[2:]
        else:
            row_data = record[1:]

        var_columns: dict[int, _Column] = {}
        for i, column in columns.items():
            if not column.fixed_length:
                var_columns[i] = column
                continue
            col_name = column.name
            has_value = True
            if column.column_id < len(null_table):
                has_value = null_table[column.column_id]
            if column.type == ColumnType.BOOLEAN:
                parsed[col_name].append(has_value)
                continue
            if not has_value:
                parsed[col_name].append(None)
                continue
            if column.fixed_offset >= len(row_data):
                parsed[col_name].append(None)
                continue
            field_data = row_data[column.fixed_offset:]
            value = _parse_type(column.type, field_data, column.length, self._is_v3)
            parsed[col_name].append(value)

        if not var_columns:
            return

        var_columns = dict(sorted(var_columns.items()))
        reverse_record = record[::-1]
        reverse_after_null = reverse_record[null_table_len:]

        if self._is_v3:
            jump_table_count = (len(record) - 1) // 256
        else:
            jump_table_count = 0

        metadata = _parse_var_length_metadata(
            memoryview(reverse_after_null), self._is_v3, jump_table_count)

        if metadata is None:
            return

        if self._is_v3 and metadata.field_count != variable_columns_count:
            search_byte = variable_columns_count & 0xFF
            pos = bytes(reverse_after_null).find(bytes([search_byte]))
            if pos != -1 and pos < 10:
                adjusted = memoryview(reverse_after_null[pos:])
                metadata = _parse_var_length_metadata(
                    adjusted, self._is_v3, jump_table_count)
                if metadata is not None:
                    metadata = metadata._replace(
                        metadata_end=metadata.metadata_end + pos)
            else:
                return

        if not metadata or not metadata.field_offsets:
            return

        offsets = metadata.field_offsets
        jump_addition = 0
        for i, col_index in enumerate(var_columns):
            column = var_columns[col_index]
            col_name = column.name
            has_value = True
            if column.column_id < len(null_table):
                has_value = null_table[column.column_id]
            if not has_value:
                parsed[col_name].append(None)
                continue

            if self._is_v3 and i in metadata.jump_table:
                jump_addition += 0x100

            if i >= len(offsets):
                parsed[col_name].append(None)
                continue

            rel_start = offsets[i]
            if i + 1 < len(offsets):
                rel_end = offsets[i + 1]
            else:
                rel_end = metadata.var_len_count

            if rel_start == rel_end:
                parsed[col_name].append('')
                continue

            field_data = record[rel_start + jump_addition:rel_end + jump_addition]

            if column.type == ColumnType.MEMO:
                try:
                    value = self._parse_memo(field_data, raw=False)
                except Exception:
                    value = bytes(field_data)
            elif column.type == ColumnType.OLE:
                try:
                    value = self._parse_memo(field_data, raw=True)
                except Exception:
                    value = bytes(field_data)
            elif column.type == ColumnType.NUMERIC:
                if len(field_data) == 17:
                    scale = column.scale if column.scale is not None else 6
                    value = _numeric_to_string(field_data, scale)
                else:
                    value = bytes(field_data)
            else:
                value = _parse_type(column.type, field_data, len(field_data), self._is_v3)
            parsed[col_name].append(value)

    def _parse_memo(self, data: memoryview, raw: bool = False):
        if len(data) < 12:
            return bytes(data)
        memo_length = struct.unpack_from('<I', data, 0)[0]
        record_pointer = struct.unpack_from('<I', data, 4)[0]
        memo_end = 12

        if memo_length & 0x80000000:
            inline_length = memo_length & 0x3FFFFFFF
            if len(data) < memo_end + inline_length:
                memo_data = data[memo_end:]
            else:
                memo_data = data[memo_end:memo_end + inline_length]
        elif memo_length & 0x40000000:
            result = self._get_overflow_record(record_pointer)
            if result is None:
                return None
            memo_data = result
        else:
            rec_data = self._get_overflow_record(record_pointer)
            if rec_data is None:
                return None
            next_page = struct.unpack_from('<I', rec_data, 0)[0]
            parts = bytearray()
            while next_page:
                parts.extend(rec_data[4:])
                rec_data = self._get_overflow_record(next_page)
                if rec_data is None:
                    break
                next_page = struct.unpack_from('<I', rec_data, 0)[0]
            if rec_data is not None:
                parts.extend(rec_data[4:])
            memo_data = memoryview(parts)

        if not memo_data:
            return None
        if raw:
            return bytes(memo_data)
        return _parse_type(ColumnType.TEXT, memo_data, len(memo_data), self._is_v3)

    def _get_overflow_record(self, record_pointer: int) -> memoryview | None:
        record_offset = record_pointer & 0xFF
        page_num = record_pointer >> 8
        page = self._data_pages.get(page_num * self._page_size)
        if page is None:
            return None
        try:
            reader = StructReader[memoryview](page)
            _, offsets = _parse_data_page_header(reader, self._is_v3)
        except Exception:
            return None
        if record_offset >= len(offsets):
            return None
        start = offsets[record_offset]
        if start & 0x8000:
            start = start & 0xFFF
        if record_offset == 0:
            return page[start:]
        else:
            end = offsets[record_offset - 1]
            if end & 0x8000 and (end & 0xFF != 0):
                end = end & 0xFFF
            return page[start:end]

Classes

class JetVersion (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class JetVersion(enum.IntEnum):
    V3 = 0x00
    V4 = 0x01
    V5 = 0x02
    V2010 = 0x03

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var V3

The type of the None singleton.

var V4

The type of the None singleton.

var V5

The type of the None singleton.

var V2010

The type of the None singleton.

class ColumnType (*args, **kwds)

Enum where members are also (and must be) ints

Expand source code Browse git
class ColumnType(enum.IntEnum):
    BOOLEAN = 1
    INT8 = 2
    INT16 = 3
    INT32 = 4
    MONEY = 5
    FLOAT32 = 6
    FLOAT64 = 7
    DATETIME = 8
    BINARY = 9
    TEXT = 10
    OLE = 11
    MEMO = 12
    GUID = 15
    NUMERIC = 16
    COMPLEX = 18

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var BOOLEAN

The type of the None singleton.

var INT8

The type of the None singleton.

var INT16

The type of the None singleton.

var INT32

The type of the None singleton.

var MONEY

The type of the None singleton.

var FLOAT32

The type of the None singleton.

var FLOAT64

The type of the None singleton.

var DATETIME

The type of the None singleton.

var BINARY

The type of the None singleton.

var TEXT

The type of the None singleton.

var OLE

The type of the None singleton.

var MEMO

The type of the None singleton.

var GUID

The type of the None singleton.

var NUMERIC

The type of the None singleton.

var COMPLEX

The type of the None singleton.

class AccessDatabase (data)

Parser for Microsoft Access database files. Accepts raw bytes as input and provides a catalog mapping table names to IDs and a parse_table method that returns dict[str, list] (column name to list of row values).

Expand source code Browse git
class AccessDatabase:
    """
    Parser for Microsoft Access database files. Accepts raw bytes as input and
    provides a `catalog` mapping table names to IDs and a `parse_table` method
    that returns `dict[str, list]` (column name to list of row values).
    """

    def __init__(self, data: bytes | bytearray | memoryview):
        mv = memoryview(data)
        self._data = mv
        self._parse_header()
        self._table_defs, self._data_pages = self._categorize_pages()
        self._tables_with_data = self._link_tables_to_data()
        self.catalog: dict[str, int] = self._parse_catalog()

    def _parse_header(self):
        reader = StructReader[memoryview](self._data)
        magic = reader.read(4)
        if bytes(magic) != b'\x00\x01\x00\x00':
            raise ValueError('not a valid Access database file')
        while reader.u8():
            pass
        raw_version = reader.u32()
        try:
            version = JetVersion(raw_version)
        except ValueError:
            version = JetVersion.V3
        self._version = version
        self._is_v3 = version == JetVersion.V3
        self._page_size = 0x800 if self._is_v3 else 0x1000

    def _categorize_pages(self):
        table_defs: dict[int, memoryview] = {}
        data_pages: dict[int, memoryview] = {}
        ps = self._page_size
        data = self._data
        for offset in range(0, len(data), ps):
            page = data[offset:offset + ps]
            if len(page) < 2:
                continue
            sig = bytes(page[:2])
            if sig == _TABLE_MAGIC:
                table_defs[offset] = page
            elif sig == _DATA_MAGIC:
                data_pages[offset] = page
        return table_defs, data_pages

    def _link_tables_to_data(self) -> dict[int, _TableObj]:
        tables: dict[int, _TableObj] = {}
        ps = self._page_size
        for offset, page in self._data_pages.items():
            try:
                reader = StructReader[memoryview](page)
                owner, _ = _parse_data_page_header(reader, self._is_v3)
            except Exception:
                continue
            page_offset = owner * ps
            if page_offset not in self._table_defs:
                continue
            if page_offset not in tables:
                tables[page_offset] = _TableObj(page_offset, self._table_defs[page_offset])
            tables[page_offset].linked_pages.append(page)
        return tables

    def _parse_catalog(self) -> dict[str, int]:
        catalog_offset = 2 * self._page_size
        if catalog_offset not in self._tables_with_data:
            return {}
        catalog_table = self._tables_with_data[catalog_offset]
        parsed = self._do_parse_table(catalog_table)
        if not parsed:
            return {}
        names = parsed.get('Name', [])
        ids = parsed.get('Id', [])
        types = parsed.get('Type', [])
        flags = parsed.get('Flags', [])
        mapping: dict[str, int] = {}
        for i, name in enumerate(names):
            if not isinstance(name, str):
                continue
            if name == 'MSysObjects':
                if i < len(ids):
                    mapping[name] = ids[i]
                continue
            if i < len(types) and types[i] == 1:
                if i < len(flags) and flags[i] not in _SYSTEM_FLAGS:
                    if i < len(ids):
                        mapping[name] = ids[i]
        return mapping

    def parse_table(self, name: str) -> dict[str, list]:
        """
        Parse a table by name. Returns a dictionary mapping column names to
        lists of row values.
        """
        table_id = self.catalog.get(name)
        if table_id is None:
            return {}
        table_offset = table_id * self._page_size
        table_obj = self._tables_with_data.get(table_offset)
        if table_obj is None:
            table_def = self._table_defs.get(table_offset)
            if table_def is not None:
                table_obj = _TableObj(table_offset, table_def)
            else:
                return {}
        return self._do_parse_table(table_obj)

    def _do_parse_table(self, table_obj: _TableObj) -> dict[str, list]:
        try:
            columns, variable_columns, column_count = self._get_table_columns(table_obj)
        except Exception:
            return {}
        if not columns:
            return {}

        parsed: dict[str, list] = defaultdict(list)

        if not table_obj.linked_pages:
            for col in columns.values():
                parsed[col.name] = []
            return dict(parsed)

        for data_chunk in table_obj.linked_pages:
            try:
                reader = StructReader[memoryview](data_chunk)
                _, record_offsets = _parse_data_page_header(reader, self._is_v3)
            except Exception:
                continue

            last_offset: int | None = None
            for rec_offset in record_offsets:
                if rec_offset & 0x8000:
                    last_offset = rec_offset & 0xFFF
                    continue
                if rec_offset & 0x4000:
                    ptr_offset = rec_offset & 0xFFF
                    last_offset = ptr_offset
                    if ptr_offset + 4 <= len(data_chunk):
                        overflow_ptr = struct.unpack_from('<I', data_chunk, ptr_offset)[0]
                        record = self._get_overflow_record(overflow_ptr)
                        if record:
                            self._parse_row(
                                record, columns, variable_columns, column_count, parsed)
                    continue

                if last_offset is None:
                    record = data_chunk[rec_offset:]
                else:
                    record = data_chunk[rec_offset:last_offset]
                last_offset = rec_offset
                if record:
                    self._parse_row(
                        record, columns, variable_columns, column_count, parsed)

        return dict(parsed)

    def _get_table_columns(self, table_obj: _TableObj):
        reader = StructReader[memoryview](table_obj.value)
        head = _parse_table_head(reader, self._is_v3)

        merged = table_obj.value[head.header_end:]
        if head.next_page:
            extra = self._merge_tdef_pages(head.next_page)
            merged = memoryview(bytes(merged) + bytes(extra))

        col_reader = StructReader[memoryview](merged)
        columns_list = _parse_columns(
            col_reader, head.column_count, head.real_index_count,
            head.index_count, self._is_v3)

        offset = min(c.column_index for c in columns_list) if columns_list else 0
        column_dict = {c.column_index - offset: c for c in columns_list}
        if len(column_dict) != len(columns_list):
            column_dict = {c.column_id: c for c in columns_list}

        return column_dict, head.variable_columns, head.column_count

    def _merge_tdef_pages(self, first_page_num: int) -> memoryview:
        parts = bytearray()
        page_data = self._table_defs.get(first_page_num * self._page_size)
        if page_data is None:
            return memoryview(parts)
        reader = StructReader[memoryview](page_data)
        next_page, header_end = _parse_tdef_header(reader)
        parts.extend(page_data[header_end:])
        while next_page:
            page_data = self._table_defs.get(next_page * self._page_size)
            if page_data is None:
                break
            reader = StructReader[memoryview](page_data)
            next_page, header_end = _parse_tdef_header(reader)
            parts.extend(page_data[header_end:])
        return memoryview(parts)

    def _parse_row(
        self,
        record: memoryview,
        columns: dict[int, _Column],
        variable_columns_count: int,
        column_count: int,
        parsed: dict[str, list],
    ):
        if len(record) < 1:
            return
        null_table_len = (column_count + 7) // 8
        if null_table_len >= len(record):
            return
        null_bytes = record[-null_table_len:]
        null_table = [
            ((null_bytes[i // 8]) & (1 << (i % 8))) != 0
            for i in range(null_table_len * 8)
        ]

        if not self._is_v3:
            if len(record) < 2:
                return
            row_data = record[2:]
        else:
            row_data = record[1:]

        var_columns: dict[int, _Column] = {}
        for i, column in columns.items():
            if not column.fixed_length:
                var_columns[i] = column
                continue
            col_name = column.name
            has_value = True
            if column.column_id < len(null_table):
                has_value = null_table[column.column_id]
            if column.type == ColumnType.BOOLEAN:
                parsed[col_name].append(has_value)
                continue
            if not has_value:
                parsed[col_name].append(None)
                continue
            if column.fixed_offset >= len(row_data):
                parsed[col_name].append(None)
                continue
            field_data = row_data[column.fixed_offset:]
            value = _parse_type(column.type, field_data, column.length, self._is_v3)
            parsed[col_name].append(value)

        if not var_columns:
            return

        var_columns = dict(sorted(var_columns.items()))
        reverse_record = record[::-1]
        reverse_after_null = reverse_record[null_table_len:]

        if self._is_v3:
            jump_table_count = (len(record) - 1) // 256
        else:
            jump_table_count = 0

        metadata = _parse_var_length_metadata(
            memoryview(reverse_after_null), self._is_v3, jump_table_count)

        if metadata is None:
            return

        if self._is_v3 and metadata.field_count != variable_columns_count:
            search_byte = variable_columns_count & 0xFF
            pos = bytes(reverse_after_null).find(bytes([search_byte]))
            if pos != -1 and pos < 10:
                adjusted = memoryview(reverse_after_null[pos:])
                metadata = _parse_var_length_metadata(
                    adjusted, self._is_v3, jump_table_count)
                if metadata is not None:
                    metadata = metadata._replace(
                        metadata_end=metadata.metadata_end + pos)
            else:
                return

        if not metadata or not metadata.field_offsets:
            return

        offsets = metadata.field_offsets
        jump_addition = 0
        for i, col_index in enumerate(var_columns):
            column = var_columns[col_index]
            col_name = column.name
            has_value = True
            if column.column_id < len(null_table):
                has_value = null_table[column.column_id]
            if not has_value:
                parsed[col_name].append(None)
                continue

            if self._is_v3 and i in metadata.jump_table:
                jump_addition += 0x100

            if i >= len(offsets):
                parsed[col_name].append(None)
                continue

            rel_start = offsets[i]
            if i + 1 < len(offsets):
                rel_end = offsets[i + 1]
            else:
                rel_end = metadata.var_len_count

            if rel_start == rel_end:
                parsed[col_name].append('')
                continue

            field_data = record[rel_start + jump_addition:rel_end + jump_addition]

            if column.type == ColumnType.MEMO:
                try:
                    value = self._parse_memo(field_data, raw=False)
                except Exception:
                    value = bytes(field_data)
            elif column.type == ColumnType.OLE:
                try:
                    value = self._parse_memo(field_data, raw=True)
                except Exception:
                    value = bytes(field_data)
            elif column.type == ColumnType.NUMERIC:
                if len(field_data) == 17:
                    scale = column.scale if column.scale is not None else 6
                    value = _numeric_to_string(field_data, scale)
                else:
                    value = bytes(field_data)
            else:
                value = _parse_type(column.type, field_data, len(field_data), self._is_v3)
            parsed[col_name].append(value)

    def _parse_memo(self, data: memoryview, raw: bool = False):
        if len(data) < 12:
            return bytes(data)
        memo_length = struct.unpack_from('<I', data, 0)[0]
        record_pointer = struct.unpack_from('<I', data, 4)[0]
        memo_end = 12

        if memo_length & 0x80000000:
            inline_length = memo_length & 0x3FFFFFFF
            if len(data) < memo_end + inline_length:
                memo_data = data[memo_end:]
            else:
                memo_data = data[memo_end:memo_end + inline_length]
        elif memo_length & 0x40000000:
            result = self._get_overflow_record(record_pointer)
            if result is None:
                return None
            memo_data = result
        else:
            rec_data = self._get_overflow_record(record_pointer)
            if rec_data is None:
                return None
            next_page = struct.unpack_from('<I', rec_data, 0)[0]
            parts = bytearray()
            while next_page:
                parts.extend(rec_data[4:])
                rec_data = self._get_overflow_record(next_page)
                if rec_data is None:
                    break
                next_page = struct.unpack_from('<I', rec_data, 0)[0]
            if rec_data is not None:
                parts.extend(rec_data[4:])
            memo_data = memoryview(parts)

        if not memo_data:
            return None
        if raw:
            return bytes(memo_data)
        return _parse_type(ColumnType.TEXT, memo_data, len(memo_data), self._is_v3)

    def _get_overflow_record(self, record_pointer: int) -> memoryview | None:
        record_offset = record_pointer & 0xFF
        page_num = record_pointer >> 8
        page = self._data_pages.get(page_num * self._page_size)
        if page is None:
            return None
        try:
            reader = StructReader[memoryview](page)
            _, offsets = _parse_data_page_header(reader, self._is_v3)
        except Exception:
            return None
        if record_offset >= len(offsets):
            return None
        start = offsets[record_offset]
        if start & 0x8000:
            start = start & 0xFFF
        if record_offset == 0:
            return page[start:]
        else:
            end = offsets[record_offset - 1]
            if end & 0x8000 and (end & 0xFF != 0):
                end = end & 0xFFF
            return page[start:end]

Methods

def parse_table(self, name)

Parse a table by name. Returns a dictionary mapping column names to lists of row values.

Expand source code Browse git
def parse_table(self, name: str) -> dict[str, list]:
    """
    Parse a table by name. Returns a dictionary mapping column names to
    lists of row values.
    """
    table_id = self.catalog.get(name)
    if table_id is None:
        return {}
    table_offset = table_id * self._page_size
    table_obj = self._tables_with_data.get(table_offset)
    if table_obj is None:
        table_def = self._table_defs.get(table_offset)
        if table_def is not None:
            table_obj = _TableObj(table_offset, table_def)
        else:
            return {}
    return self._do_parse_table(table_obj)