Module refinery.units.formats.httprequest

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import Union, Dict, List

from cgi import parse_header, FieldStorage
from email.message import Message
from enum import Enum
from urllib.parse import parse_qs

from refinery.units import Unit
from refinery.lib.structures import MemoryFile


class _Fmt(str, Enum):
    RawBody = ''
    UrlEncode = 'application/x-www-form-urlencoded'
    Multipart = 'multipart/form-data'


class httprequest(Unit):
    """
    Parses HTTP request data, as you would obtain from a packet dump. The unit extracts
    POST data in any format; each uploaded file is emitted as a separate chunk.
    """
    def process(self, data):
        def header(line: bytes):
            name, colon, data = line.decode('utf8').partition(':')
            if colon:
                yield (name.strip().lower(), data.strip())

        head, _, body = data.partition(b'\r\n\r\n')
        request, *headers = head.splitlines(False)
        headers = dict(t for line in headers for t in header(line))
        method, path, _, *rest = request.split()

        info = {}
        mode = _Fmt.RawBody

        if rest:
            self.log_warn('unexpected rest data while parsing HTTP request:', rest)

        if method == b'GET' and not body:
            mode = _Fmt.UrlEncode
            body = path.partition(B'?')[1]
        if method == b'POST' and (ct := headers.get('content-type', None)):
            ct, info = parse_header(ct)
            mode = _Fmt(ct)

        def chunks(upload: Dict[Union[str, bytes], List[bytes]]):
            for key, values in upload.items():
                if not isinstance(key, str):
                    key = key.decode('utf8')
                for value in values:
                    yield self.labelled(value, name=key)

        if mode is _Fmt.RawBody:
            yield body
            return
        if mode is _Fmt.Multipart:
            boundary = info['boundary']
            headers = Message()
            headers.set_type(F'{_Fmt.Multipart.value}; boundary={boundary}')
            try:
                headers['Content-Length'] = info['CONTENT-LENGTH']
            except KeyError:
                pass
            fs = FieldStorage(MemoryFile(body, read_as_bytes=True),
                headers=headers, environ={'REQUEST_METHOD': method.decode()})
            for name in fs:
                fields = fs[name]
                if not isinstance(fields, list):
                    fields = [fields]
                for field in fields:
                    field: FieldStorage
                    chunk = self.labelled(field.value)
                    if fn := field.filename:
                        chunk.meta['name'] = fn
                    yield chunk
        if mode is _Fmt.UrlEncode:
            yield from chunks(parse_qs(body, keep_blank_values=1))

    @classmethod
    def handles(self, data: bytearray) -> bool | None:
        return data.startswith(B'POST ') or data.startswith(B'GET ')

Classes

class httprequest

Parses HTTP request data, as you would obtain from a packet dump. The unit extracts POST data in any format; each uploaded file is emitted as a separate chunk.

Expand source code Browse git
class httprequest(Unit):
    """
    Parses HTTP request data, as you would obtain from a packet dump. The unit extracts
    POST data in any format; each uploaded file is emitted as a separate chunk.
    """
    def process(self, data):
        def header(line: bytes):
            name, colon, data = line.decode('utf8').partition(':')
            if colon:
                yield (name.strip().lower(), data.strip())

        head, _, body = data.partition(b'\r\n\r\n')
        request, *headers = head.splitlines(False)
        headers = dict(t for line in headers for t in header(line))
        method, path, _, *rest = request.split()

        info = {}
        mode = _Fmt.RawBody

        if rest:
            self.log_warn('unexpected rest data while parsing HTTP request:', rest)

        if method == b'GET' and not body:
            mode = _Fmt.UrlEncode
            body = path.partition(B'?')[1]
        if method == b'POST' and (ct := headers.get('content-type', None)):
            ct, info = parse_header(ct)
            mode = _Fmt(ct)

        def chunks(upload: Dict[Union[str, bytes], List[bytes]]):
            for key, values in upload.items():
                if not isinstance(key, str):
                    key = key.decode('utf8')
                for value in values:
                    yield self.labelled(value, name=key)

        if mode is _Fmt.RawBody:
            yield body
            return
        if mode is _Fmt.Multipart:
            boundary = info['boundary']
            headers = Message()
            headers.set_type(F'{_Fmt.Multipart.value}; boundary={boundary}')
            try:
                headers['Content-Length'] = info['CONTENT-LENGTH']
            except KeyError:
                pass
            fs = FieldStorage(MemoryFile(body, read_as_bytes=True),
                headers=headers, environ={'REQUEST_METHOD': method.decode()})
            for name in fs:
                fields = fs[name]
                if not isinstance(fields, list):
                    fields = [fields]
                for field in fields:
                    field: FieldStorage
                    chunk = self.labelled(field.value)
                    if fn := field.filename:
                        chunk.meta['name'] = fn
                    yield chunk
        if mode is _Fmt.UrlEncode:
            yield from chunks(parse_qs(body, keep_blank_values=1))

    @classmethod
    def handles(self, data: bytearray) -> bool | None:
        return data.startswith(B'POST ') or data.startswith(B'GET ')

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Inherited members