Module refinery.lib.tools
Miscellaneous helper functions.
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Miscellaneous helper functions.
"""
import datetime
import functools
import inspect
import itertools
import logging
import os
import sys
import io
import warnings
import re
from typing import ByteString, Callable, Generator, Iterable, Optional, Tuple, TypeVar
from math import log
from enum import IntFlag
_T = TypeVar('_T')
def lookahead(iterator: Iterable[_T]) -> Generator[Tuple[bool, _T], None, None]:
"""
Implements a new iterator from a given one which returns elements `(last, item)` where each
`item` is taken from the original iterator and `last` is a boolean indicating whether this is
the last item.
"""
last = False
it = iter(iterator)
try:
peek = next(it)
except StopIteration:
return
while not last:
item = peek
try:
peek = next(it)
except StopIteration:
last = True
yield last, item
def get_terminal_size(default=0):
"""
Returns the size of the currently attached terminal. If the environment variable
`REFINERY_TERM_SIZE` is set to an integer value, it takes prescedence. If the width of the
terminal cannot be determined or if the width is less than 8 characters, the function
returns zero.
"""
from refinery.lib.environment import environment
ev_terminal_size = environment.term_size.value
if ev_terminal_size > 0:
return ev_terminal_size
width = default
for stream in (sys.stderr, sys.stdout):
if stream.isatty():
try:
width = os.get_terminal_size(stream.fileno()).columns
except Exception:
width = default
else:
break
return default if width < 2 else width - 1
def terminalfit(text: str, delta: int = 0, width: int = 0, parsep: str = '\n\n', **kw) -> str:
"""
Reformats text to fit the given width while not mangling bullet point lists.
"""
import textwrap
import re
width = width or get_terminal_size()
width = width - delta
def isol(t): return re.match(R'^\(\d+\)|\d+[.:;]', t)
def isul(t): return t.startswith('-') or t.startswith('*')
def issp(t): return t.startswith(' ')
text = text.replace('\r', '')
def bulletpoint(line):
wrapped = textwrap.wrap(line, width - 2, **kw)
indent = ' ' if isul(line) else ' '
wrapped[1:] = ['{}{}'.format(indent, line) for line in wrapped[1:]]
return '\n'.join(wrapped)
def fitted(paragraphs):
for k, p in enumerate(paragraphs):
if p.startswith(' '):
yield p
continue
ol, ul = isol(p), isul(p)
if ol or ul:
input_lines = p.splitlines(keepends=False)
unwrapped_line = input_lines[0].rstrip()
lines = []
if (ol and all(isol(t) or issp(t) for t in input_lines) or ul and all(isul(t) or issp(t) for t in input_lines)):
for line in input_lines[1:]:
if not (ol and isol(line) or ul and isul(line)):
unwrapped_line += ' ' + line.strip()
continue
lines.append(bulletpoint(unwrapped_line))
unwrapped_line = line.rstrip()
lines.append(bulletpoint(unwrapped_line))
yield '\n'.join(lines)
continue
yield '\n'.join(textwrap.wrap(p, width, **kw))
return parsep.join(fitted(text.split('\n\n')))
def documentation(unit):
"""
Return the documentation string of a given unit as it should be displayed on the command line.
Certain pdoc3-specific reference strings are removed.
"""
import re
docs = inspect.getdoc(unit)
docs = re.sub(R'`refinery\.(?:\w+\.)*(\w+)`', R'\1', docs)
return docs.replace('`', '')
def begin(iterable: Iterable[_T]) -> Optional[Tuple[_T, Iterable[_T]]]:
"""
Iterates the first element of an iterator and returns None if this fails. Otherwise, it returns
both the first element and a new iterable which will return the same elements as the input.
"""
try:
body = iter(iterable)
head = next(body)
except StopIteration:
return None
else:
def _fused():
yield head
yield from body
return head, _fused()
def skipfirst(iterable: Iterable[_T]) -> Generator[_T, None, None]:
"""
Returns an interable where the first element of the input iterable was skipped.
"""
it = iter(iterable)
next(it)
yield from it
def autoinvoke(method: Callable[..., _T], keywords: dict) -> _T:
"""
For each parameter that `method` expects, this function looks for an entry in `keywords` which
has the same name as that parameter. `autoinvoke` then calls `method` with all matching
parameters forwarded in the appropriate manner.
"""
kwdargs = {}
posargs = []
varargs = []
kwdjoin = False
for p in inspect.signature(method).parameters.values():
if p.kind is p.VAR_KEYWORD:
kwdjoin = True
try:
value = keywords.pop(p.name)
except KeyError:
if p.kind is p.VAR_KEYWORD:
continue
value = p.default
if value is p.empty:
raise ValueError(F'missing required parameter {p.name}')
if p.kind is p.POSITIONAL_OR_KEYWORD or p.kind is p.POSITIONAL_ONLY:
if value == p.default:
# when equality holds, we force identity
value = p.default
posargs.append(value)
elif p.kind is p.VAR_POSITIONAL:
varargs = value
elif p.kind is p.KEYWORD_ONLY:
kwdargs[p.name] = value
if kwdjoin:
kwdargs.update(keywords)
return method(*posargs, *varargs, **kwdargs)
def entropy_fallback(data: ByteString) -> float:
"""
This method is called by `refinery.lib.tools.entropy` when the `numpy` module is not available.
It computes the shannon entropy of the input byte string and is written in pure Python.
"""
if isinstance(data, memoryview):
# this copy is better than re-implementing count in Python for memory views
data = bytes(data)
histogram = {b: data.count(b) for b in range(0x100)}
S = [histogram[b] / len(data) for b in histogram]
return 0.0 + -sum(p * log(p, 2) for p in S if p) / 8.0
def entropy(data: ByteString) -> float:
"""
Computes the entropy of `data` over the alphabet of all bytes.
"""
if not data:
return 0.0
try:
import numpy
except ImportError:
return entropy_fallback(data)
hist = numpy.unique(memoryview(data), return_counts=True)[1]
prob = hist / len(data)
# 8 bits are the maximum number of bits of information in a byte
return 0.0 - (numpy.log2(prob) * prob).sum() / 8.0
def index_of_coincidence(data: bytearray) -> float:
"""
Computes the index of coincidence of `data` over the alphabet of all bytes.
"""
if not data:
return 0.0
N = len(data)
if N < 2:
return 0.0
try:
import numpy
except ImportError:
C = [data.count(b) for b in range(0x100)]
else:
C = numpy.histogram(
numpy.frombuffer(data, dtype=numpy.uint8),
numpy.arange(0x100))[0]
d = 1 / N / (N - 1)
return float(sum(x * (x - 1) * d for x in C))
def isstream(obj) -> bool:
"""
Tests whether `obj` is a stream. This is currently done by simply testing whether the object
has an attribute called `read`.
"""
return hasattr(obj, 'read')
def isbuffer(obj) -> bool:
"""
Test whether `obj` is an object that supports the buffer API, like a bytes or bytearray object.
"""
try:
with memoryview(obj):
return True
except TypeError:
return False
def splitchunks(
data: ByteString,
size: int,
step: Optional[int] = None,
truncate: bool = False
) -> Iterable[ByteString]:
"""
Split `data` into chunks of size `size`. The cursor advances by `step` bytes after extracting a
block, the default value for `step` is equal to `size`. The boolean parameter `truncate`
specifies whether any chunks of size smaller than `size` are generated or whether to abort as
soon as the last complete chunk of the given size is extracted.
"""
if step is None:
step = size
if len(data) <= size:
if not truncate or len(data) == size:
yield data
return
for k in range(0, len(data), step):
chunk = data[k:k + size]
if not chunk:
break
if len(chunk) < size and truncate:
break
yield chunk
def make_buffer_mutable(data: ByteString):
"""
Returns a mutable version of the input data. Already mutable inputs are returned
as themselves, i.e. no copy operation occurs in these cases.
"""
if isinstance(data, bytearray):
return data
if isinstance(data, memoryview) and not data.readonly:
return data
return bytearray(data)
def infinitize(it):
if not isinstance(it, (
itertools.cycle,
itertools.repeat,
itertools.count
)):
if not hasattr(it, '__iter__') and not hasattr(it, '__next__'):
it = (it, )
return itertools.cycle(it)
return it
try:
cached_property = functools.cached_property
except AttributeError:
def cached_property(p):
return property(functools.lru_cache(maxsize=1)(p))
class NoLogging:
"""
A context manager to prevent various unwanted kinds of logging messages to appear.
The class is initialized with a given mode that encodes the logging channels to be
suppressed. After the context is exited, the original logging behavior is restored.
"""
class Mode(IntFlag):
"""
A set of flags for different logging mechanisms to be suppressed.
"""
STD_OUT = 0b0001
"""Silence the standard output channel."""
STD_ERR = 0b0010
"""Silence the standard error channel."""
WARNING = 0b0100
"""Silence the Python warning module."""
LOGGING = 0b1000
"""Silence the Python logging module."""
ALL = 0b1111 # noqa
"""Silence all known logging mechanisms."""
def __init__(self, mode: Mode = Mode.WARNING | Mode.LOGGING):
self.mode = mode
def __enter__(self):
if self.mode & NoLogging.Mode.LOGGING:
logging.disable(logging.CRITICAL)
if self.mode & NoLogging.Mode.WARNING:
self._warning_filters = list(warnings.filters)
warnings.filterwarnings('ignore')
if self.mode & NoLogging.Mode.STD_ERR:
self._stderr = sys.stderr
sys.stderr = io.TextIOWrapper(open(os.devnull, 'wb'), encoding='latin1')
if self.mode & NoLogging.Mode.STD_OUT:
self._stdout = sys.stdout
sys.stdout = io.TextIOWrapper(open(os.devnull, 'wb'), encoding='latin1')
return self
def __exit__(self, *_):
if self.mode & NoLogging.Mode.LOGGING:
logging.disable(logging.NOTSET)
if self.mode & NoLogging.Mode.WARNING:
warnings.resetwarnings()
warnings.filters[:] = self._warning_filters
if self.mode & NoLogging.Mode.STD_ERR:
sys.stderr.close()
sys.stderr = self._stderr
if self.mode & NoLogging.Mode.STD_OUT:
sys.stdout.close()
sys.stdout = self._stdout
class NotOne(LookupError):
"""
A custom exception raised by `refinery.lib.tools.one` if the input iterator does not yield
exactly one element. The property `empty` indicates whether the iterator was empty; if it is
false, then the exception was raised because the iterator contained more than one element.
"""
def __init__(self, empty: bool):
how = 'none' if empty else 'more'
super().__init__(F'Expected a single item, but the iterator was {how}')
self.empty = empty
def one(iterable: Iterable[_T]) -> _T:
"""
The function expects the input `iterable` to be an iterable that yields exactly one element
and returns that element. Raises `refinery.lib.tools.NotOne` for invalid inputs.
"""
it = iter(iterable)
try:
top = next(it)
except StopIteration:
raise NotOne(True)
try:
next(it)
except StopIteration:
return top
else:
raise NotOne(False)
def isodate(iso: str) -> Optional[datetime.datetime]:
"""
Convert an input date string in ISO format to a `datetime` object. Contains fallbacks for early
Python versions.
"""
if len(iso) not in range(16, 25):
return None
iso = iso[:19].replace(' ', 'T', 1)
try:
try:
return datetime.datetime.fromisoformat(iso)
except AttributeError:
return datetime.datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S")
except ValueError:
return None
def date_from_timestamp(ts: int):
"""
Convert a UTC timestamp to a datetime object.
"""
if sys.version_info >= (3, 12):
dt = datetime.datetime.fromtimestamp(ts, datetime.UTC)
else:
dt = datetime.datetime.utcfromtimestamp(ts)
return dt.replace(tzinfo=None)
def integers_of_slice(s: slice) -> Iterable[int]:
"""
Returns an iterable that iterates the integers in the range given by the input slice.
"""
if s.stop is None:
return itertools.count(s.start or 0, s.step or 1)
else:
return range(s.start or 0, s.stop, s.step or 1)
def normalize_word_separators(words: str, unified_separator: str, strip: bool = True):
"""
For a sequence of words separated by whitespace, punctuation, slashes, dashes or underscores,
normalize all occurrences of one or more of these separators to one given symbol. Leading and
trailing occurrences of separators are removed.
"""
normalized = re.sub('[-\\s_.,;:/\\\\]+', unified_separator, words)
if strip:
normalized = normalized.strip(unified_separator)
return normalized
def normalize_to_display(words: str, strip: bool = True):
"""
Normalizes all separators to dashes.
"""
return normalize_word_separators(words, '-', strip)
def normalize_to_identifier(words: str, strip: bool = True):
"""
Normalizes all separators to underscores.
"""
return normalize_word_separators(words, '_', strip)
def typename(thing):
"""
Determines the name of the type of an object.
"""
if not isinstance(thing, type):
thing = type(thing)
mro = [c for c in thing.__mro__ if c is not object]
if mro:
thing = mro[~0]
try:
return thing.__name__
except AttributeError:
return repr(thing)
def exception_to_string(exception: BaseException, default=None) -> str:
"""
Attempts to convert a given exception to a good description that can be exposed to the user.
"""
if not exception.args:
return exception.__class__.__name__
it = (a for a in exception.args if isinstance(a, str))
if default is None:
default = str(exception)
return max(it, key=len, default=default).strip()
def nopdoc(obj: object):
"""
This decorator can be applied to any object to exclude it from the automatically generated
documentation.
"""
pdoc: dict = sys.modules[obj.__module__].__dict__.setdefault('__pdoc__', {})
pdoc[obj.__qualname__] = False
return obj
Functions
def lookahead(iterator)
-
Implements a new iterator from a given one which returns elements
(last, item)
where eachitem
is taken from the original iterator andlast
is a boolean indicating whether this is the last item.Expand source code Browse git
def lookahead(iterator: Iterable[_T]) -> Generator[Tuple[bool, _T], None, None]: """ Implements a new iterator from a given one which returns elements `(last, item)` where each `item` is taken from the original iterator and `last` is a boolean indicating whether this is the last item. """ last = False it = iter(iterator) try: peek = next(it) except StopIteration: return while not last: item = peek try: peek = next(it) except StopIteration: last = True yield last, item
def get_terminal_size(default=0)
-
Returns the size of the currently attached terminal. If the environment variable
REFINERY_TERM_SIZE
is set to an integer value, it takes prescedence. If the width of the terminal cannot be determined or if the width is less than 8 characters, the function returns zero.Expand source code Browse git
def get_terminal_size(default=0): """ Returns the size of the currently attached terminal. If the environment variable `REFINERY_TERM_SIZE` is set to an integer value, it takes prescedence. If the width of the terminal cannot be determined or if the width is less than 8 characters, the function returns zero. """ from refinery.lib.environment import environment ev_terminal_size = environment.term_size.value if ev_terminal_size > 0: return ev_terminal_size width = default for stream in (sys.stderr, sys.stdout): if stream.isatty(): try: width = os.get_terminal_size(stream.fileno()).columns except Exception: width = default else: break return default if width < 2 else width - 1
def terminalfit(text, delta=0, width=0, parsep='\n\n', **kw)
-
Reformats text to fit the given width while not mangling bullet point lists.
Expand source code Browse git
def terminalfit(text: str, delta: int = 0, width: int = 0, parsep: str = '\n\n', **kw) -> str: """ Reformats text to fit the given width while not mangling bullet point lists. """ import textwrap import re width = width or get_terminal_size() width = width - delta def isol(t): return re.match(R'^\(\d+\)|\d+[.:;]', t) def isul(t): return t.startswith('-') or t.startswith('*') def issp(t): return t.startswith(' ') text = text.replace('\r', '') def bulletpoint(line): wrapped = textwrap.wrap(line, width - 2, **kw) indent = ' ' if isul(line) else ' ' wrapped[1:] = ['{}{}'.format(indent, line) for line in wrapped[1:]] return '\n'.join(wrapped) def fitted(paragraphs): for k, p in enumerate(paragraphs): if p.startswith(' '): yield p continue ol, ul = isol(p), isul(p) if ol or ul: input_lines = p.splitlines(keepends=False) unwrapped_line = input_lines[0].rstrip() lines = [] if (ol and all(isol(t) or issp(t) for t in input_lines) or ul and all(isul(t) or issp(t) for t in input_lines)): for line in input_lines[1:]: if not (ol and isol(line) or ul and isul(line)): unwrapped_line += ' ' + line.strip() continue lines.append(bulletpoint(unwrapped_line)) unwrapped_line = line.rstrip() lines.append(bulletpoint(unwrapped_line)) yield '\n'.join(lines) continue yield '\n'.join(textwrap.wrap(p, width, **kw)) return parsep.join(fitted(text.split('\n\n')))
def documentation(unit)
-
Return the documentation string of a given unit as it should be displayed on the command line. Certain pdoc3-specific reference strings are removed.
Expand source code Browse git
def documentation(unit): """ Return the documentation string of a given unit as it should be displayed on the command line. Certain pdoc3-specific reference strings are removed. """ import re docs = inspect.getdoc(unit) docs = re.sub(R'`refinery\.(?:\w+\.)*(\w+)`', R'\1', docs) return docs.replace('`', '')
def begin(iterable)
-
Iterates the first element of an iterator and returns None if this fails. Otherwise, it returns both the first element and a new iterable which will return the same elements as the input.
Expand source code Browse git
def begin(iterable: Iterable[_T]) -> Optional[Tuple[_T, Iterable[_T]]]: """ Iterates the first element of an iterator and returns None if this fails. Otherwise, it returns both the first element and a new iterable which will return the same elements as the input. """ try: body = iter(iterable) head = next(body) except StopIteration: return None else: def _fused(): yield head yield from body return head, _fused()
def skipfirst(iterable)
-
Returns an interable where the first element of the input iterable was skipped.
Expand source code Browse git
def skipfirst(iterable: Iterable[_T]) -> Generator[_T, None, None]: """ Returns an interable where the first element of the input iterable was skipped. """ it = iter(iterable) next(it) yield from it
def autoinvoke(method, keywords)
-
For each parameter that
method
expects, this function looks for an entry inkeywords
which has the same name as that parameter.autoinvoke()
then callsmethod
with all matching parameters forwarded in the appropriate manner.Expand source code Browse git
def autoinvoke(method: Callable[..., _T], keywords: dict) -> _T: """ For each parameter that `method` expects, this function looks for an entry in `keywords` which has the same name as that parameter. `autoinvoke` then calls `method` with all matching parameters forwarded in the appropriate manner. """ kwdargs = {} posargs = [] varargs = [] kwdjoin = False for p in inspect.signature(method).parameters.values(): if p.kind is p.VAR_KEYWORD: kwdjoin = True try: value = keywords.pop(p.name) except KeyError: if p.kind is p.VAR_KEYWORD: continue value = p.default if value is p.empty: raise ValueError(F'missing required parameter {p.name}') if p.kind is p.POSITIONAL_OR_KEYWORD or p.kind is p.POSITIONAL_ONLY: if value == p.default: # when equality holds, we force identity value = p.default posargs.append(value) elif p.kind is p.VAR_POSITIONAL: varargs = value elif p.kind is p.KEYWORD_ONLY: kwdargs[p.name] = value if kwdjoin: kwdargs.update(keywords) return method(*posargs, *varargs, **kwdargs)
def entropy_fallback(data)
-
This method is called by
entropy()
when thenumpy
module is not available. It computes the shannon entropy of the input byte string and is written in pure Python.Expand source code Browse git
def entropy_fallback(data: ByteString) -> float: """ This method is called by `refinery.lib.tools.entropy` when the `numpy` module is not available. It computes the shannon entropy of the input byte string and is written in pure Python. """ if isinstance(data, memoryview): # this copy is better than re-implementing count in Python for memory views data = bytes(data) histogram = {b: data.count(b) for b in range(0x100)} S = [histogram[b] / len(data) for b in histogram] return 0.0 + -sum(p * log(p, 2) for p in S if p) / 8.0
def entropy(data)
-
Computes the entropy of
data
over the alphabet of all bytes.Expand source code Browse git
def entropy(data: ByteString) -> float: """ Computes the entropy of `data` over the alphabet of all bytes. """ if not data: return 0.0 try: import numpy except ImportError: return entropy_fallback(data) hist = numpy.unique(memoryview(data), return_counts=True)[1] prob = hist / len(data) # 8 bits are the maximum number of bits of information in a byte return 0.0 - (numpy.log2(prob) * prob).sum() / 8.0
def index_of_coincidence(data)
-
Computes the index of coincidence of
data
over the alphabet of all bytes.Expand source code Browse git
def index_of_coincidence(data: bytearray) -> float: """ Computes the index of coincidence of `data` over the alphabet of all bytes. """ if not data: return 0.0 N = len(data) if N < 2: return 0.0 try: import numpy except ImportError: C = [data.count(b) for b in range(0x100)] else: C = numpy.histogram( numpy.frombuffer(data, dtype=numpy.uint8), numpy.arange(0x100))[0] d = 1 / N / (N - 1) return float(sum(x * (x - 1) * d for x in C))
def isstream(obj)
-
Tests whether
obj
is a stream. This is currently done by simply testing whether the object has an attribute calledread
.Expand source code Browse git
def isstream(obj) -> bool: """ Tests whether `obj` is a stream. This is currently done by simply testing whether the object has an attribute called `read`. """ return hasattr(obj, 'read')
def isbuffer(obj)
-
Test whether
obj
is an object that supports the buffer API, like a bytes or bytearray object.Expand source code Browse git
def isbuffer(obj) -> bool: """ Test whether `obj` is an object that supports the buffer API, like a bytes or bytearray object. """ try: with memoryview(obj): return True except TypeError: return False
def splitchunks(data, size, step=None, truncate=False)
-
Split
data
into chunks of sizesize
. The cursor advances bystep
bytes after extracting a block, the default value forstep
is equal tosize
. The boolean parametertruncate
specifies whether any chunks of size smaller thansize
are generated or whether to abort as soon as the last complete chunk of the given size is extracted.Expand source code Browse git
def splitchunks( data: ByteString, size: int, step: Optional[int] = None, truncate: bool = False ) -> Iterable[ByteString]: """ Split `data` into chunks of size `size`. The cursor advances by `step` bytes after extracting a block, the default value for `step` is equal to `size`. The boolean parameter `truncate` specifies whether any chunks of size smaller than `size` are generated or whether to abort as soon as the last complete chunk of the given size is extracted. """ if step is None: step = size if len(data) <= size: if not truncate or len(data) == size: yield data return for k in range(0, len(data), step): chunk = data[k:k + size] if not chunk: break if len(chunk) < size and truncate: break yield chunk
def make_buffer_mutable(data)
-
Returns a mutable version of the input data. Already mutable inputs are returned as themselves, i.e. no copy operation occurs in these cases.
Expand source code Browse git
def make_buffer_mutable(data: ByteString): """ Returns a mutable version of the input data. Already mutable inputs are returned as themselves, i.e. no copy operation occurs in these cases. """ if isinstance(data, bytearray): return data if isinstance(data, memoryview) and not data.readonly: return data return bytearray(data)
def infinitize(it)
-
Expand source code Browse git
def infinitize(it): if not isinstance(it, ( itertools.cycle, itertools.repeat, itertools.count )): if not hasattr(it, '__iter__') and not hasattr(it, '__next__'): it = (it, ) return itertools.cycle(it) return it
def one(iterable)
-
The function expects the input
iterable
to be an iterable that yields exactly one element and returns that element. RaisesNotOne
for invalid inputs.Expand source code Browse git
def one(iterable: Iterable[_T]) -> _T: """ The function expects the input `iterable` to be an iterable that yields exactly one element and returns that element. Raises `refinery.lib.tools.NotOne` for invalid inputs. """ it = iter(iterable) try: top = next(it) except StopIteration: raise NotOne(True) try: next(it) except StopIteration: return top else: raise NotOne(False)
def isodate(iso)
-
Convert an input date string in ISO format to a
datetime
object. Contains fallbacks for early Python versions.Expand source code Browse git
def isodate(iso: str) -> Optional[datetime.datetime]: """ Convert an input date string in ISO format to a `datetime` object. Contains fallbacks for early Python versions. """ if len(iso) not in range(16, 25): return None iso = iso[:19].replace(' ', 'T', 1) try: try: return datetime.datetime.fromisoformat(iso) except AttributeError: return datetime.datetime.strptime(iso, "%Y-%m-%dT%H:%M:%S") except ValueError: return None
def date_from_timestamp(ts)
-
Convert a UTC timestamp to a datetime object.
Expand source code Browse git
def date_from_timestamp(ts: int): """ Convert a UTC timestamp to a datetime object. """ if sys.version_info >= (3, 12): dt = datetime.datetime.fromtimestamp(ts, datetime.UTC) else: dt = datetime.datetime.utcfromtimestamp(ts) return dt.replace(tzinfo=None)
def integers_of_slice(s)
-
Returns an iterable that iterates the integers in the range given by the input slice.
Expand source code Browse git
def integers_of_slice(s: slice) -> Iterable[int]: """ Returns an iterable that iterates the integers in the range given by the input slice. """ if s.stop is None: return itertools.count(s.start or 0, s.step or 1) else: return range(s.start or 0, s.stop, s.step or 1)
def normalize_word_separators(words, unified_separator, strip=True)
-
For a sequence of words separated by whitespace, punctuation, slashes, dashes or underscores, normalize all occurrences of one or more of these separators to one given symbol. Leading and trailing occurrences of separators are removed.
Expand source code Browse git
def normalize_word_separators(words: str, unified_separator: str, strip: bool = True): """ For a sequence of words separated by whitespace, punctuation, slashes, dashes or underscores, normalize all occurrences of one or more of these separators to one given symbol. Leading and trailing occurrences of separators are removed. """ normalized = re.sub('[-\\s_.,;:/\\\\]+', unified_separator, words) if strip: normalized = normalized.strip(unified_separator) return normalized
def normalize_to_display(words, strip=True)
-
Normalizes all separators to dashes.
Expand source code Browse git
def normalize_to_display(words: str, strip: bool = True): """ Normalizes all separators to dashes. """ return normalize_word_separators(words, '-', strip)
def normalize_to_identifier(words, strip=True)
-
Normalizes all separators to underscores.
Expand source code Browse git
def normalize_to_identifier(words: str, strip: bool = True): """ Normalizes all separators to underscores. """ return normalize_word_separators(words, '_', strip)
def typename(thing)
-
Determines the name of the type of an object.
Expand source code Browse git
def typename(thing): """ Determines the name of the type of an object. """ if not isinstance(thing, type): thing = type(thing) mro = [c for c in thing.__mro__ if c is not object] if mro: thing = mro[~0] try: return thing.__name__ except AttributeError: return repr(thing)
def exception_to_string(exception, default=None)
-
Attempts to convert a given exception to a good description that can be exposed to the user.
Expand source code Browse git
def exception_to_string(exception: BaseException, default=None) -> str: """ Attempts to convert a given exception to a good description that can be exposed to the user. """ if not exception.args: return exception.__class__.__name__ it = (a for a in exception.args if isinstance(a, str)) if default is None: default = str(exception) return max(it, key=len, default=default).strip()
def nopdoc(obj)
-
This decorator can be applied to any object to exclude it from the automatically generated documentation.
Expand source code Browse git
def nopdoc(obj: object): """ This decorator can be applied to any object to exclude it from the automatically generated documentation. """ pdoc: dict = sys.modules[obj.__module__].__dict__.setdefault('__pdoc__', {}) pdoc[obj.__qualname__] = False return obj
Classes
class NoLogging (mode=Mode.LOGGING|WARNING)
-
A context manager to prevent various unwanted kinds of logging messages to appear. The class is initialized with a given mode that encodes the logging channels to be suppressed. After the context is exited, the original logging behavior is restored.
Expand source code Browse git
class NoLogging: """ A context manager to prevent various unwanted kinds of logging messages to appear. The class is initialized with a given mode that encodes the logging channels to be suppressed. After the context is exited, the original logging behavior is restored. """ class Mode(IntFlag): """ A set of flags for different logging mechanisms to be suppressed. """ STD_OUT = 0b0001 """Silence the standard output channel.""" STD_ERR = 0b0010 """Silence the standard error channel.""" WARNING = 0b0100 """Silence the Python warning module.""" LOGGING = 0b1000 """Silence the Python logging module.""" ALL = 0b1111 # noqa """Silence all known logging mechanisms.""" def __init__(self, mode: Mode = Mode.WARNING | Mode.LOGGING): self.mode = mode def __enter__(self): if self.mode & NoLogging.Mode.LOGGING: logging.disable(logging.CRITICAL) if self.mode & NoLogging.Mode.WARNING: self._warning_filters = list(warnings.filters) warnings.filterwarnings('ignore') if self.mode & NoLogging.Mode.STD_ERR: self._stderr = sys.stderr sys.stderr = io.TextIOWrapper(open(os.devnull, 'wb'), encoding='latin1') if self.mode & NoLogging.Mode.STD_OUT: self._stdout = sys.stdout sys.stdout = io.TextIOWrapper(open(os.devnull, 'wb'), encoding='latin1') return self def __exit__(self, *_): if self.mode & NoLogging.Mode.LOGGING: logging.disable(logging.NOTSET) if self.mode & NoLogging.Mode.WARNING: warnings.resetwarnings() warnings.filters[:] = self._warning_filters if self.mode & NoLogging.Mode.STD_ERR: sys.stderr.close() sys.stderr = self._stderr if self.mode & NoLogging.Mode.STD_OUT: sys.stdout.close() sys.stdout = self._stdout
Class variables
var Mode
-
A set of flags for different logging mechanisms to be suppressed.
class NotOne (empty)
-
A custom exception raised by
one()
if the input iterator does not yield exactly one element. The propertyempty
indicates whether the iterator was empty; if it is false, then the exception was raised because the iterator contained more than one element.Expand source code Browse git
class NotOne(LookupError): """ A custom exception raised by `refinery.lib.tools.one` if the input iterator does not yield exactly one element. The property `empty` indicates whether the iterator was empty; if it is false, then the exception was raised because the iterator contained more than one element. """ def __init__(self, empty: bool): how = 'none' if empty else 'more' super().__init__(F'Expected a single item, but the iterator was {how}') self.empty = empty
Ancestors
- builtins.LookupError
- builtins.Exception
- builtins.BaseException