Module refinery.units.formats.html
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from refinery.lib.xml import XMLNodeBase
from refinery.lib.meta import metavars
from refinery.units.formats import XMLToPathExtractorUnit, UnpackResult, Arg
from io import StringIO
from html.parser import HTMLParser
_HTML_DATA_ROOT_TAG = 'html'
class HTMLNode(XMLNodeBase):
__slots__ = 'indent',
indent: str
@property
def textual(self) -> bool:
return self.tag is None
@property
def root(self) -> bool:
return self.tag == _HTML_DATA_ROOT_TAG
def recover(self, inner=True) -> str:
with StringIO() as stream:
if not inner:
stream.write(self.content)
for child in self.children:
child: HTMLNode
stream.write(child.recover(False))
if not inner and self.tag and not self.empty:
stream.write(F'</{self.tag}>')
return stream.getvalue()
class HTMLTreeParser(HTMLParser):
_SELF_CLOSING_TAGS = {
'area',
'base',
'br',
'col',
'embed',
'hr',
'img',
'input',
'link',
'meta',
'param',
'source',
'track',
'wb',
}
def __init__(self) -> None:
super().__init__(convert_charrefs=False)
self.root = self.tos = HTMLNode(_HTML_DATA_ROOT_TAG)
def handle_starttag(self, tag: str, attributes):
if tag in self._SELF_CLOSING_TAGS:
return
node = HTMLNode(tag, None, self.tos, self.get_starttag_text(), attributes={
key: value for key, value in attributes if key and value})
children = self.tos.children
previous = children[-1] if children else None
self.tos = node
children.append(node)
if not previous or previous.tag is not None:
return
if self.getpos() == (1, len(previous.content)):
node.content = previous.content + node.content
previous.content = ''
return
lf = previous.content.rfind('\n') + 1
if lf <= 0:
return
leading_space = previous.content[lf:]
if not leading_space.isspace():
return
node.content = leading_space + node.content
previous.content = previous.content[:lf]
def handle_entityref(self, name: str) -> None:
ntt = F'&{name};'
if self.tos.children:
last = self.tos.children[-1]
if last.textual:
last.content += ntt
return
self.tos.children.append(HTMLNode(None, None, self.tos, ntt))
def handle_charref(self, name: str) -> None:
self.handle_entityref(F'#{name}')
def handle_startendtag(self, tag: str, attributes) -> None:
self.handle_starttag(tag, attributes)
self.tos.empty = True
self.tos = self.tos.parent
def handle_endtag(self, tag: str):
cursor = self.tos
while cursor.parent and cursor.tag != tag:
xthtml.log_info(F'skipping unclosed tag: {cursor.tag}')
cursor = cursor.parent
if not cursor.parent:
xthtml.log_warn(F'ignoring closing tag that never opened: {tag}')
return
self.tos = cursor.parent
def handle_data(self, data):
self.tos.children.append(HTMLNode(None, None, self.tos, data))
class xthtml(XMLToPathExtractorUnit):
"""
The unit processes an HTML document and extracts the contents of all elemnts in the DOM of the
given tag. The main purpose is to extract scripts from HTML documents.
"""
def __init__(
self, *paths,
outer: Arg.Switch('-o', help='Include the HTML tags for an extracted element.') = False,
attributes: Arg.Switch('-a', help='Populate chunk metadata with HTML tag attributes.') = False,
list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False,
path=b'path'
):
super().__init__(
*paths,
outer=outer,
attributes=attributes,
format='{tag}',
path=path,
list=list,
join_path=join_path,
drop_path=drop_path,
fuzzy=fuzzy,
exact=exact,
regex=regex,
)
def unpack(self, data):
html = HTMLTreeParser()
html.feed(data.decode(self.codec))
root = html.tos
root.reindex()
meta = metavars(data)
path = self._make_path_builder(meta, root)
while root.parent:
self.log_info(F'tag was not closed: {root.tag}')
root = root.parent
while len(root.children) == 1:
child, = root.children
if child.tag != root.tag:
break
root = child
def tree(root: HTMLNode, *parts: str):
def outer(root: HTMLNode = root):
return root.recover(inner=False).encode(self.codec)
def inner(root: HTMLNode = root):
return root.recover().encode(self.codec)
tagpath = '/'.join(parts)
meta = {}
if self.args.attributes:
meta.update(root.attributes)
if root.root:
yield UnpackResult(tagpath, inner, **meta)
elif self.args.outer:
yield UnpackResult(tagpath, outer, **meta)
else:
yield UnpackResult(tagpath, inner, **meta)
for child in root.children:
if child.textual:
continue
yield from tree(child, *parts, path(child))
yield from tree(root, path(root))
@classmethod
def handles(self, data: bytearray):
from refinery.lib import mime
info = mime.get_cached_file_magic_info(data)
if info.extension == 'html':
return True
if info.mime.endswith('html'):
return True
return False
Classes
class HTMLNode (tag, index=None, parent=None, content=None, empty=False, attributes=None)
-
Base class for parsed XML nodes. While this is not currently implemented, this would allow for different types of XML node classes to represent e.g. leaves / text nodes from others.
Expand source code Browse git
class HTMLNode(XMLNodeBase): __slots__ = 'indent', indent: str @property def textual(self) -> bool: return self.tag is None @property def root(self) -> bool: return self.tag == _HTML_DATA_ROOT_TAG def recover(self, inner=True) -> str: with StringIO() as stream: if not inner: stream.write(self.content) for child in self.children: child: HTMLNode stream.write(child.recover(False)) if not inner and self.tag and not self.empty: stream.write(F'</{self.tag}>') return stream.getvalue()
Ancestors
Instance variables
var textual
-
Expand source code Browse git
@property def textual(self) -> bool: return self.tag is None
var root
-
Expand source code Browse git
@property def root(self) -> bool: return self.tag == _HTML_DATA_ROOT_TAG
var indent
-
Return an attribute of instance, which is of type owner.
Methods
def recover(self, inner=True)
-
Expand source code Browse git
def recover(self, inner=True) -> str: with StringIO() as stream: if not inner: stream.write(self.content) for child in self.children: child: HTMLNode stream.write(child.recover(False)) if not inner and self.tag and not self.empty: stream.write(F'</{self.tag}>') return stream.getvalue()
Inherited members
class HTMLTreeParser
-
Find tags and other markup and call handler functions.
Usage
p = HTMLParser() p.feed(data) … p.close()
Start tags are handled by calling self.handle_starttag() or self.handle_startendtag(); end tags by self.handle_endtag(). The data between tags is passed from the parser to the derived class by calling self.handle_data() with the data as argument (the data may be split up in arbitrary chunks). If convert_charrefs is True the character references are converted automatically to the corresponding Unicode character (and self.handle_data() is no longer split in chunks), otherwise they are passed by calling self.handle_entityref() or self.handle_charref() with the string containing respectively the named or numeric reference as the argument.
Initialize and reset this instance.
If convert_charrefs is True (the default), all character references are automatically converted to the corresponding Unicode characters.
Expand source code Browse git
class HTMLTreeParser(HTMLParser): _SELF_CLOSING_TAGS = { 'area', 'base', 'br', 'col', 'embed', 'hr', 'img', 'input', 'link', 'meta', 'param', 'source', 'track', 'wb', } def __init__(self) -> None: super().__init__(convert_charrefs=False) self.root = self.tos = HTMLNode(_HTML_DATA_ROOT_TAG) def handle_starttag(self, tag: str, attributes): if tag in self._SELF_CLOSING_TAGS: return node = HTMLNode(tag, None, self.tos, self.get_starttag_text(), attributes={ key: value for key, value in attributes if key and value}) children = self.tos.children previous = children[-1] if children else None self.tos = node children.append(node) if not previous or previous.tag is not None: return if self.getpos() == (1, len(previous.content)): node.content = previous.content + node.content previous.content = '' return lf = previous.content.rfind('\n') + 1 if lf <= 0: return leading_space = previous.content[lf:] if not leading_space.isspace(): return node.content = leading_space + node.content previous.content = previous.content[:lf] def handle_entityref(self, name: str) -> None: ntt = F'&{name};' if self.tos.children: last = self.tos.children[-1] if last.textual: last.content += ntt return self.tos.children.append(HTMLNode(None, None, self.tos, ntt)) def handle_charref(self, name: str) -> None: self.handle_entityref(F'#{name}') def handle_startendtag(self, tag: str, attributes) -> None: self.handle_starttag(tag, attributes) self.tos.empty = True self.tos = self.tos.parent def handle_endtag(self, tag: str): cursor = self.tos while cursor.parent and cursor.tag != tag: xthtml.log_info(F'skipping unclosed tag: {cursor.tag}') cursor = cursor.parent if not cursor.parent: xthtml.log_warn(F'ignoring closing tag that never opened: {tag}') return self.tos = cursor.parent def handle_data(self, data): self.tos.children.append(HTMLNode(None, None, self.tos, data))
Ancestors
- html.parser.HTMLParser
- _markupbase.ParserBase
Methods
def handle_starttag(self, tag, attributes)
-
Expand source code Browse git
def handle_starttag(self, tag: str, attributes): if tag in self._SELF_CLOSING_TAGS: return node = HTMLNode(tag, None, self.tos, self.get_starttag_text(), attributes={ key: value for key, value in attributes if key and value}) children = self.tos.children previous = children[-1] if children else None self.tos = node children.append(node) if not previous or previous.tag is not None: return if self.getpos() == (1, len(previous.content)): node.content = previous.content + node.content previous.content = '' return lf = previous.content.rfind('\n') + 1 if lf <= 0: return leading_space = previous.content[lf:] if not leading_space.isspace(): return node.content = leading_space + node.content previous.content = previous.content[:lf]
def handle_entityref(self, name)
-
Expand source code Browse git
def handle_entityref(self, name: str) -> None: ntt = F'&{name};' if self.tos.children: last = self.tos.children[-1] if last.textual: last.content += ntt return self.tos.children.append(HTMLNode(None, None, self.tos, ntt))
def handle_charref(self, name)
-
Expand source code Browse git
def handle_charref(self, name: str) -> None: self.handle_entityref(F'#{name}')
def handle_startendtag(self, tag, attributes)
-
Expand source code Browse git
def handle_startendtag(self, tag: str, attributes) -> None: self.handle_starttag(tag, attributes) self.tos.empty = True self.tos = self.tos.parent
def handle_endtag(self, tag)
-
Expand source code Browse git
def handle_endtag(self, tag: str): cursor = self.tos while cursor.parent and cursor.tag != tag: xthtml.log_info(F'skipping unclosed tag: {cursor.tag}') cursor = cursor.parent if not cursor.parent: xthtml.log_warn(F'ignoring closing tag that never opened: {tag}') return self.tos = cursor.parent
def handle_data(self, data)
-
Expand source code Browse git
def handle_data(self, data): self.tos.children.append(HTMLNode(None, None, self.tos, data))
class xthtml (*paths, outer=False, attributes=False, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path')
-
The unit processes an HTML document and extracts the contents of all elemnts in the DOM of the given tag. The main purpose is to extract scripts from HTML documents.
Expand source code Browse git
class xthtml(XMLToPathExtractorUnit): """ The unit processes an HTML document and extracts the contents of all elemnts in the DOM of the given tag. The main purpose is to extract scripts from HTML documents. """ def __init__( self, *paths, outer: Arg.Switch('-o', help='Include the HTML tags for an extracted element.') = False, attributes: Arg.Switch('-a', help='Populate chunk metadata with HTML tag attributes.') = False, list=False, join_path=False, drop_path=False, fuzzy=0, exact=False, regex=False, path=b'path' ): super().__init__( *paths, outer=outer, attributes=attributes, format='{tag}', path=path, list=list, join_path=join_path, drop_path=drop_path, fuzzy=fuzzy, exact=exact, regex=regex, ) def unpack(self, data): html = HTMLTreeParser() html.feed(data.decode(self.codec)) root = html.tos root.reindex() meta = metavars(data) path = self._make_path_builder(meta, root) while root.parent: self.log_info(F'tag was not closed: {root.tag}') root = root.parent while len(root.children) == 1: child, = root.children if child.tag != root.tag: break root = child def tree(root: HTMLNode, *parts: str): def outer(root: HTMLNode = root): return root.recover(inner=False).encode(self.codec) def inner(root: HTMLNode = root): return root.recover().encode(self.codec) tagpath = '/'.join(parts) meta = {} if self.args.attributes: meta.update(root.attributes) if root.root: yield UnpackResult(tagpath, inner, **meta) elif self.args.outer: yield UnpackResult(tagpath, outer, **meta) else: yield UnpackResult(tagpath, inner, **meta) for child in root.children: if child.textual: continue yield from tree(child, *parts, path(child)) yield from tree(root, path(root)) @classmethod def handles(self, data: bytearray): from refinery.lib import mime info = mime.get_cached_file_magic_info(data) if info.extension == 'html': return True if info.mime.endswith('html'): return True return False
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Methods
def unpack(self, data)
-
Expand source code Browse git
def unpack(self, data): html = HTMLTreeParser() html.feed(data.decode(self.codec)) root = html.tos root.reindex() meta = metavars(data) path = self._make_path_builder(meta, root) while root.parent: self.log_info(F'tag was not closed: {root.tag}') root = root.parent while len(root.children) == 1: child, = root.children if child.tag != root.tag: break root = child def tree(root: HTMLNode, *parts: str): def outer(root: HTMLNode = root): return root.recover(inner=False).encode(self.codec) def inner(root: HTMLNode = root): return root.recover().encode(self.codec) tagpath = '/'.join(parts) meta = {} if self.args.attributes: meta.update(root.attributes) if root.root: yield UnpackResult(tagpath, inner, **meta) elif self.args.outer: yield UnpackResult(tagpath, outer, **meta) else: yield UnpackResult(tagpath, inner, **meta) for child in root.children: if child.textual: continue yield from tree(child, *parts, path(child)) yield from tree(root, path(root))
Inherited members