Module refinery.units.formats.httprequest

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import Union, Dict, List
from email.parser import BytesParser
from enum import Enum
from urllib.parse import parse_qs

from refinery.units import Unit
from refinery.lib.tools import isbuffer


def _parseparam(parameter: str):
    while parameter[:1] == ';':
        parameter = parameter[1:]
        end = parameter.find(';')
        while end > 0 and (parameter.count('"', 0, end) - parameter.count('\\"', 0, end)) % 2:
            end = parameter.find(';', end + 1)
        if end < 0:
            end = len(parameter)
        f = parameter[:end]
        yield f.strip()
        parameter = parameter[end:]


def _parse_header(line: str):
    parts = _parseparam(F';{line}')
    key = next(parts)
    pdict = {}
    for p in parts:
        i = p.find('=')
        if i < 0:
            continue
        name = p[:i].strip().lower()
        value = p[i + 1:].strip()
        if len(value) >= 2 and value[0] == value[-1] == '"':
            value = value[1:-1]
            value = value.replace('\\\\', '\\').replace('\\"', '"')
        pdict[name] = value
    return key, pdict


class _Fmt(str, Enum):
    RawBody = ''
    UrlEncode = 'application/x-www-form-urlencoded'
    Multipart = 'multipart/form-data'


class httprequest(Unit):
    """
    Parses HTTP request data, as you would obtain from a packet dump. The unit extracts
    POST data in any format; each uploaded file is emitted as a separate chunk.
    """
    def process(self, data):
        def header(line: bytes):
            name, colon, data = line.decode('utf8').partition(':')
            if colon:
                yield (name.strip().lower(), data.strip())

        head, _, body = data.partition(b'\r\n\r\n')
        request, *headers = head.splitlines(False)
        headers = dict(t for line in headers for t in header(line))
        method, path, _, *rest = request.split()

        mode = _Fmt.RawBody

        if rest:
            self.log_warn('unexpected rest data while parsing HTTP request:', rest)

        if method == b'GET' and not body:
            mode = _Fmt.UrlEncode
            body = path.partition(B'?')[1]
        if method == b'POST' and (ct := headers.get('content-type', None)):
            ct, _ = _parse_header(ct)
            mode = _Fmt(ct)

        def chunks(upload: Dict[Union[str, bytes], List[bytes]]):
            for key, values in upload.items():
                if not isinstance(key, str):
                    key = key.decode('utf8')
                for value in values:
                    yield self.labelled(value, name=key)

        if mode is _Fmt.RawBody:
            yield body
            return
        if mode is _Fmt.Multipart:
            _, _, message_data = data.partition(b'\n')
            msg = BytesParser().parsebytes(message_data)
            for part in msg.walk():
                payloads = part.get_payload(decode=True)
                if not isinstance(payloads, list):
                    payloads = [payloads]
                for payload in payloads:
                    if not isbuffer(payload):
                        continue
                    if name := part.get_filename():
                        payload = self.labelled(payload, name=name)
                    yield payload

        if mode is _Fmt.UrlEncode:
            yield from chunks(parse_qs(body, keep_blank_values=1))

    @classmethod
    def handles(self, data: bytearray) -> bool | None:
        return data.startswith(B'POST ') or data.startswith(B'GET ')

Classes

class httprequest

Parses HTTP request data, as you would obtain from a packet dump. The unit extracts POST data in any format; each uploaded file is emitted as a separate chunk.

Expand source code Browse git
class httprequest(Unit):
    """
    Parses HTTP request data, as you would obtain from a packet dump. The unit extracts
    POST data in any format; each uploaded file is emitted as a separate chunk.
    """
    def process(self, data):
        def header(line: bytes):
            name, colon, data = line.decode('utf8').partition(':')
            if colon:
                yield (name.strip().lower(), data.strip())

        head, _, body = data.partition(b'\r\n\r\n')
        request, *headers = head.splitlines(False)
        headers = dict(t for line in headers for t in header(line))
        method, path, _, *rest = request.split()

        mode = _Fmt.RawBody

        if rest:
            self.log_warn('unexpected rest data while parsing HTTP request:', rest)

        if method == b'GET' and not body:
            mode = _Fmt.UrlEncode
            body = path.partition(B'?')[1]
        if method == b'POST' and (ct := headers.get('content-type', None)):
            ct, _ = _parse_header(ct)
            mode = _Fmt(ct)

        def chunks(upload: Dict[Union[str, bytes], List[bytes]]):
            for key, values in upload.items():
                if not isinstance(key, str):
                    key = key.decode('utf8')
                for value in values:
                    yield self.labelled(value, name=key)

        if mode is _Fmt.RawBody:
            yield body
            return
        if mode is _Fmt.Multipart:
            _, _, message_data = data.partition(b'\n')
            msg = BytesParser().parsebytes(message_data)
            for part in msg.walk():
                payloads = part.get_payload(decode=True)
                if not isinstance(payloads, list):
                    payloads = [payloads]
                for payload in payloads:
                    if not isbuffer(payload):
                        continue
                    if name := part.get_filename():
                        payload = self.labelled(payload, name=name)
                    yield payload

        if mode is _Fmt.UrlEncode:
            yield from chunks(parse_qs(body, keep_blank_values=1))

    @classmethod
    def handles(self, data: bytearray) -> bool | None:
        return data.startswith(B'POST ') or data.startswith(B'GET ')

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members