Module refinery.units.formats.html

Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations

from refinery.lib.xml import XMLNodeBase
from refinery.lib.meta import metavars
from refinery.units.formats import XMLToPathExtractorUnit, UnpackResult, Arg

from io import StringIO
from html.parser import HTMLParser

_HTML_DATA_ROOT_TAG = 'html'


class HTMLNode(XMLNodeBase):
    __slots__ = 'indent',
    indent: str

    @property
    def textual(self) -> bool:
        return self.tag is None

    @property
    def root(self) -> bool:
        return self.tag == _HTML_DATA_ROOT_TAG

    def recover(self, inner=True) -> str:
        with StringIO() as stream:
            if not inner:
                stream.write(self.content)
            for child in self.children:
                child: HTMLNode
                stream.write(child.recover(False))
            if not inner and self.tag and not self.empty:
                stream.write(F'</{self.tag}>')
            return stream.getvalue()


class HTMLTreeParser(HTMLParser):

    _SELF_CLOSING_TAGS = {
        'area',
        'base',
        'br',
        'col',
        'embed',
        'hr',
        'img',
        'input',
        'link',
        'meta',
        'param',
        'source',
        'track',
        'wb',
    }

    def __init__(self) -> None:
        super().__init__(convert_charrefs=False)
        self.root = self.tos = HTMLNode(_HTML_DATA_ROOT_TAG)

    def handle_starttag(self, tag: str, attributes):
        if tag in self._SELF_CLOSING_TAGS:
            return
        node = HTMLNode(tag, None, self.tos, self.get_starttag_text(), attributes={
            key: value for key, value in attributes if key and value})
        children = self.tos.children
        previous = children[-1] if children else None
        self.tos = node
        children.append(node)
        if not previous or previous.tag is not None:
            return
        if self.getpos() == (1, len(previous.content)):
            node.content = previous.content + node.content
            previous.content = ''
            return
        lf = previous.content.rfind('\n') + 1
        if lf <= 0:
            return
        leading_space = previous.content[lf:]
        if not leading_space.isspace():
            return
        node.content = leading_space + node.content
        previous.content = previous.content[:lf]

    def handle_entityref(self, name: str) -> None:
        ntt = F'&{name};'
        if self.tos.children:
            last = self.tos.children[-1]
            if last.textual:
                last.content += ntt
                return
        self.tos.children.append(HTMLNode(None, None, self.tos, ntt))

    def handle_charref(self, name: str) -> None:
        self.handle_entityref(F'#{name}')

    def handle_startendtag(self, tag: str, attributes) -> None:
        self.handle_starttag(tag, attributes)
        self.tos.empty = True
        self.tos = self.tos.parent

    def handle_endtag(self, tag: str):
        cursor = self.tos
        while cursor.parent and cursor.tag != tag:
            xthtml.log_info(F'skipping unclosed tag: {cursor.tag}')
            cursor = cursor.parent
        if not cursor.parent:
            xthtml.log_warn(F'ignoring closing tag that never opened: {tag}')
            return
        self.tos = cursor.parent

    def handle_data(self, data):
        self.tos.children.append(HTMLNode(None, None, self.tos, data))


class xthtml(XMLToPathExtractorUnit):
    """
    The unit processes an HTML document and extracts the contents of all elemnts in the DOM of the
    given tag. The main purpose is to extract scripts from HTML documents.
    """
    def __init__(
        self, *paths,
        outer: Arg.Switch('-o', help='Include the HTML tags for an extracted element.') = False,
        attributes: Arg.Switch('-a', help='Populate chunk metadata with HTML tag attributes.') = False,
        list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path'
    ):
        super().__init__(
            *paths,
            outer=outer,
            attributes=attributes,
            format='{tag}',
            path=path,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
        )

    def unpack(self, data):
        html = HTMLTreeParser()
        html.feed(data.decode(self.codec))
        root = html.tos
        root.reindex()

        meta = metavars(data)
        path = self._make_path_builder(meta, root)

        while root.parent:
            self.log_info(F'tag was not closed: {root.tag}')
            root = root.parent

        while len(root.children) == 1:
            child, = root.children
            if child.tag != root.tag:
                break
            root = child

        def tree(root: HTMLNode, *parts: str):

            def outer(root: HTMLNode = root):
                return root.recover(inner=False).encode(self.codec)

            def inner(root: HTMLNode = root):
                return root.recover().encode(self.codec)

            tagpath = '/'.join(parts)
            meta = {}

            if self.args.attributes:
                meta.update(root.attributes)

            if root.root:
                yield UnpackResult(tagpath, inner, **meta)
            elif self.args.outer:
                yield UnpackResult(tagpath, outer, **meta)
            else:
                yield UnpackResult(tagpath, inner, **meta)

            for child in root.children:
                if child.textual:
                    continue
                yield from tree(child, *parts, path(child))

        yield from tree(root, path(root))

    @classmethod
    def handles(self, data: bytearray):
        from refinery.lib import mime
        info = mime.get_cached_file_magic_info(data)
        if info.extension == 'html':
            return True
        if info.mime.endswith('html'):
            return True
        return False

Classes

class HTMLNode (tag, index=None, parent=None, content=None, empty=False, attributes=None)

Base class for parsed XML nodes. While this is not currently implemented, this would allow for different types of XML node classes to represent e.g. leaves / text nodes from others.

Expand source code Browse git
class HTMLNode(XMLNodeBase):
    __slots__ = 'indent',
    indent: str

    @property
    def textual(self) -> bool:
        return self.tag is None

    @property
    def root(self) -> bool:
        return self.tag == _HTML_DATA_ROOT_TAG

    def recover(self, inner=True) -> str:
        with StringIO() as stream:
            if not inner:
                stream.write(self.content)
            for child in self.children:
                child: HTMLNode
                stream.write(child.recover(False))
            if not inner and self.tag and not self.empty:
                stream.write(F'</{self.tag}>')
            return stream.getvalue()

Ancestors

Instance variables

var textual
Expand source code Browse git
@property
def textual(self) -> bool:
    return self.tag is None
var root
Expand source code Browse git
@property
def root(self) -> bool:
    return self.tag == _HTML_DATA_ROOT_TAG
var indent

Return an attribute of instance, which is of type owner.

Methods

def recover(self, inner=True)
Expand source code Browse git
def recover(self, inner=True) -> str:
    with StringIO() as stream:
        if not inner:
            stream.write(self.content)
        for child in self.children:
            child: HTMLNode
            stream.write(child.recover(False))
        if not inner and self.tag and not self.empty:
            stream.write(F'</{self.tag}>')
        return stream.getvalue()

Inherited members

class HTMLTreeParser

Find tags and other markup and call handler functions.

Usage

p = HTMLParser() p.feed(data) … p.close()

Start tags are handled by calling self.handle_starttag() or self.handle_startendtag(); end tags by self.handle_endtag(). The data between tags is passed from the parser to the derived class by calling self.handle_data() with the data as argument (the data may be split up in arbitrary chunks). If convert_charrefs is True the character references are converted automatically to the corresponding Unicode character (and self.handle_data() is no longer split in chunks), otherwise they are passed by calling self.handle_entityref() or self.handle_charref() with the string containing respectively the named or numeric reference as the argument.

Initialize and reset this instance.

If convert_charrefs is True (the default), all character references are automatically converted to the corresponding Unicode characters.

Expand source code Browse git
class HTMLTreeParser(HTMLParser):

    _SELF_CLOSING_TAGS = {
        'area',
        'base',
        'br',
        'col',
        'embed',
        'hr',
        'img',
        'input',
        'link',
        'meta',
        'param',
        'source',
        'track',
        'wb',
    }

    def __init__(self) -> None:
        super().__init__(convert_charrefs=False)
        self.root = self.tos = HTMLNode(_HTML_DATA_ROOT_TAG)

    def handle_starttag(self, tag: str, attributes):
        if tag in self._SELF_CLOSING_TAGS:
            return
        node = HTMLNode(tag, None, self.tos, self.get_starttag_text(), attributes={
            key: value for key, value in attributes if key and value})
        children = self.tos.children
        previous = children[-1] if children else None
        self.tos = node
        children.append(node)
        if not previous or previous.tag is not None:
            return
        if self.getpos() == (1, len(previous.content)):
            node.content = previous.content + node.content
            previous.content = ''
            return
        lf = previous.content.rfind('\n') + 1
        if lf <= 0:
            return
        leading_space = previous.content[lf:]
        if not leading_space.isspace():
            return
        node.content = leading_space + node.content
        previous.content = previous.content[:lf]

    def handle_entityref(self, name: str) -> None:
        ntt = F'&{name};'
        if self.tos.children:
            last = self.tos.children[-1]
            if last.textual:
                last.content += ntt
                return
        self.tos.children.append(HTMLNode(None, None, self.tos, ntt))

    def handle_charref(self, name: str) -> None:
        self.handle_entityref(F'#{name}')

    def handle_startendtag(self, tag: str, attributes) -> None:
        self.handle_starttag(tag, attributes)
        self.tos.empty = True
        self.tos = self.tos.parent

    def handle_endtag(self, tag: str):
        cursor = self.tos
        while cursor.parent and cursor.tag != tag:
            xthtml.log_info(F'skipping unclosed tag: {cursor.tag}')
            cursor = cursor.parent
        if not cursor.parent:
            xthtml.log_warn(F'ignoring closing tag that never opened: {tag}')
            return
        self.tos = cursor.parent

    def handle_data(self, data):
        self.tos.children.append(HTMLNode(None, None, self.tos, data))

Ancestors

  • html.parser.HTMLParser
  • _markupbase.ParserBase

Methods

def handle_starttag(self, tag, attributes)
Expand source code Browse git
def handle_starttag(self, tag: str, attributes):
    if tag in self._SELF_CLOSING_TAGS:
        return
    node = HTMLNode(tag, None, self.tos, self.get_starttag_text(), attributes={
        key: value for key, value in attributes if key and value})
    children = self.tos.children
    previous = children[-1] if children else None
    self.tos = node
    children.append(node)
    if not previous or previous.tag is not None:
        return
    if self.getpos() == (1, len(previous.content)):
        node.content = previous.content + node.content
        previous.content = ''
        return
    lf = previous.content.rfind('\n') + 1
    if lf <= 0:
        return
    leading_space = previous.content[lf:]
    if not leading_space.isspace():
        return
    node.content = leading_space + node.content
    previous.content = previous.content[:lf]
def handle_entityref(self, name)
Expand source code Browse git
def handle_entityref(self, name: str) -> None:
    ntt = F'&{name};'
    if self.tos.children:
        last = self.tos.children[-1]
        if last.textual:
            last.content += ntt
            return
    self.tos.children.append(HTMLNode(None, None, self.tos, ntt))
def handle_charref(self, name)
Expand source code Browse git
def handle_charref(self, name: str) -> None:
    self.handle_entityref(F'#{name}')
def handle_startendtag(self, tag, attributes)
Expand source code Browse git
def handle_startendtag(self, tag: str, attributes) -> None:
    self.handle_starttag(tag, attributes)
    self.tos.empty = True
    self.tos = self.tos.parent
def handle_endtag(self, tag)
Expand source code Browse git
def handle_endtag(self, tag: str):
    cursor = self.tos
    while cursor.parent and cursor.tag != tag:
        xthtml.log_info(F'skipping unclosed tag: {cursor.tag}')
        cursor = cursor.parent
    if not cursor.parent:
        xthtml.log_warn(F'ignoring closing tag that never opened: {tag}')
        return
    self.tos = cursor.parent
def handle_data(self, data)
Expand source code Browse git
def handle_data(self, data):
    self.tos.children.append(HTMLNode(None, None, self.tos, data))
class xthtml (*paths, outer=False, attributes=False, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')

The unit processes an HTML document and extracts the contents of all elemnts in the DOM of the given tag. The main purpose is to extract scripts from HTML documents.

Expand source code Browse git
class xthtml(XMLToPathExtractorUnit):
    """
    The unit processes an HTML document and extracts the contents of all elemnts in the DOM of the
    given tag. The main purpose is to extract scripts from HTML documents.
    """
    def __init__(
        self, *paths,
        outer: Arg.Switch('-o', help='Include the HTML tags for an extracted element.') = False,
        attributes: Arg.Switch('-a', help='Populate chunk metadata with HTML tag attributes.') = False,
        list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
        path=b'path'
    ):
        super().__init__(
            *paths,
            outer=outer,
            attributes=attributes,
            format='{tag}',
            path=path,
            list=list,
            join_path=join_path,
            drop_path=drop_path,
            fuzzy=fuzzy,
            exact=exact,
            regex=regex,
        )

    def unpack(self, data):
        html = HTMLTreeParser()
        html.feed(data.decode(self.codec))
        root = html.tos
        root.reindex()

        meta = metavars(data)
        path = self._make_path_builder(meta, root)

        while root.parent:
            self.log_info(F'tag was not closed: {root.tag}')
            root = root.parent

        while len(root.children) == 1:
            child, = root.children
            if child.tag != root.tag:
                break
            root = child

        def tree(root: HTMLNode, *parts: str):

            def outer(root: HTMLNode = root):
                return root.recover(inner=False).encode(self.codec)

            def inner(root: HTMLNode = root):
                return root.recover().encode(self.codec)

            tagpath = '/'.join(parts)
            meta = {}

            if self.args.attributes:
                meta.update(root.attributes)

            if root.root:
                yield UnpackResult(tagpath, inner, **meta)
            elif self.args.outer:
                yield UnpackResult(tagpath, outer, **meta)
            else:
                yield UnpackResult(tagpath, inner, **meta)

            for child in root.children:
                if child.textual:
                    continue
                yield from tree(child, *parts, path(child))

        yield from tree(root, path(root))

    @classmethod
    def handles(self, data: bytearray):
        from refinery.lib import mime
        info = mime.get_cached_file_magic_info(data)
        if info.extension == 'html':
            return True
        if info.mime.endswith('html'):
            return True
        return False

Ancestors

Class variables

var required_dependencies
var optional_dependencies

Methods

def unpack(self, data)
Expand source code Browse git
def unpack(self, data):
    html = HTMLTreeParser()
    html.feed(data.decode(self.codec))
    root = html.tos
    root.reindex()

    meta = metavars(data)
    path = self._make_path_builder(meta, root)

    while root.parent:
        self.log_info(F'tag was not closed: {root.tag}')
        root = root.parent

    while len(root.children) == 1:
        child, = root.children
        if child.tag != root.tag:
            break
        root = child

    def tree(root: HTMLNode, *parts: str):

        def outer(root: HTMLNode = root):
            return root.recover(inner=False).encode(self.codec)

        def inner(root: HTMLNode = root):
            return root.recover().encode(self.codec)

        tagpath = '/'.join(parts)
        meta = {}

        if self.args.attributes:
            meta.update(root.attributes)

        if root.root:
            yield UnpackResult(tagpath, inner, **meta)
        elif self.args.outer:
            yield UnpackResult(tagpath, outer, **meta)
        else:
            yield UnpackResult(tagpath, inner, **meta)

        for child in root.children:
            if child.textual:
                continue
            yield from tree(child, *parts, path(child))

    yield from tree(root, path(root))

Inherited members