Module refinery.lib.vfs
Certain libraries insist on reading data from a file on disk which has to be specified by passing
a file path, and sometimes they will not accept a stream object or any other way to input data to
them. This module implements a virtual file system which allows us to pass in-memory data to
these libraries without having to actually write anything to disk. It works by hooking the
standard library functions builtins.open
, os.stat
, and mmap.mmap
.
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Certain libraries insist on reading data from a file on disk which has to be specified by passing
a file path, and sometimes they will not accept a stream object or any other way to input data to
them. This module implements a **virtual file system** which allows us to pass in-memory data to
these libraries without having to actually write anything to disk. It works by hooking the
standard library functions `builtins.open`, `os.stat`, and `mmap.mmap`.
"""
from __future__ import annotations
import builtins
import os
import os.path
import io
import stat
import uuid
import threading
import mmap
from typing import Dict, Optional
from refinery.lib.types import ByteStr
from refinery.lib.structures import MemoryFile, MemoryFileMethods
class VirtualFile:
"""
Represents a file in the virtual file system. It is linked to a `refinery.lib.vfs.VirtualFileSystem`
and may be initialized with a chunk of binary data. It is possible to leave `data` as `None`,
specificall to create a virtual file that would be written to, rather than read from. In that case,
the data written to the virtual file can be extracted as a binary string later. Additionally, it
is possible to specify a file extension that the virtual file should use in its randomly generated
path; this is useful in case the reader insists on a specific file extension or uses the file
extension to deduce a mode of operation.
"""
name: str
path: str
node: int
data: Optional[ByteStr]
def __init__(self, fs: VirtualFileSystem, data: Optional[ByteStr] = None, extension: Optional[str] = None):
extension = extension and F'.{extension}' or ''
self.uuid = uuid.uuid4()
self.name = F'{self.uuid!s}{extension}'
self.data = data
fs.install(self)
def __repr__(self):
return F'<VirtualFile/{self.name}>'
@property
def node(self) -> int:
return self.uuid.fields[1]
def mmap(self, length: int = 0, offset: int = 0) -> MemoryFile:
"""
Emulate the result of an `mmap` call to the virtual file.
"""
view = memoryview(self.data)
node = self.node
if length:
view = view[offset:offset + length]
class _MappedView(bytearray, MemoryFileMethods):
def __init__(self):
MemoryFileMethods.__init__(self, self, True, node)
mapped = _MappedView()
mapped[:] = view
return mapped
def open(self, mode: str) -> MemoryFile:
"""
Open the virtual file.
"""
if self.data is None and 'w' in mode:
self.data = bytearray()
fd = MemoryFile(self.data, read_as_bytes=True, fileno=self.node)
fd.name = self.path
return fd
def __len__(self):
if self.data is None:
raise FileNotFoundError
return len(self.data)
def __fspath__(self):
return self.path
@property
def path(self):
"""
Returns the absolute path to this virtual file. The virtual file is given a randomly
generated, uuid-formatted file name in the current working directory.
"""
return os.path.join(os.getcwd(), self.name)
def stat(self):
"""
Return a stat result for this virtual file. It has all permission bits set and accurately
reports the size of the file.
"""
if self.data is None:
raise FileNotFoundError(F'virtual file does not exist: {self.name}')
M = stat.S_IMODE(0xFFFF) | stat.S_IFREG
S = len(self.data)
return os.stat_result((
M, # ST_MODE
0, # ST_INO
0, # ST_DEV
1, # ST_NLINK
0, # ST_UID
0, # ST_GID
S, # ST_SIZE
0, # ST_ATIME
0, # ST_MTIME
0, # ST_CTIME
))
class VirtualFileSystem:
"""
The main class and context handler for a virtual file system. It implements the hooking of
system library functions and maps any number of `refinery.lib.vfs.VirtualFile` instances to
randomly generated file paths. It is used as follows:
with VirtualFileSystem() as vfs:
vf = VirtualFile(vfs, data, 'exe')
parsed = external_library.open(vf.path)
A `refinery.lib.vfs.VirtualFile` is given a UUID-based random name, and any query to the
virtual file system that does not correspond to a known `refinery.lib.vfs.VirtualFile` will
be forwarded to the original `builtins.open`, `os.stat`, or `mmap.mmap` function.
Nevertheless, units should attempt to spend **as little time as possible** inside the context
handler; hooking builtin, file system related functions is a dangerous business.
"""
_VFS_LOCK = threading.Lock()
def __init__(self):
self._lock: threading.RLock = threading.RLock()
self._by_name: Dict[str, VirtualFile] = {}
self._by_node: Dict[int, VirtualFile] = {}
def new(self, data: Optional[ByteStr] = None, extension: Optional[str] = None):
return VirtualFile(self, data, extension)
def install(self, file: VirtualFile):
"""
Add a new virtual file into the file system. This function is called by the constructor
of `refinery.lib.vfs.VirtualFile` and usually does not have to be called manually.
"""
with self._lock:
self._by_name[file.name] = file
self._by_node[file.node] = file
def acquire(self):
self._VFS_LOCK.acquire()
def hook_isfile(path):
try:
with self._lock:
self._by_name[os.path.basename(path)]
except BaseException:
return self._isfile(path)
else:
return True
def hook_open(file, *args, **kwargs):
try:
with self._lock:
vf = self._by_name[os.path.basename(file)]
except BaseException:
return self._builtins_open(file, *args, **kwargs)
else:
return vf.open(args[0])
def hook_stat(file):
try:
with self._lock:
vf = self._by_name[os.path.basename(file)]
except BaseException:
return self._os_stat(file)
else:
return vf.stat()
def hook_mmap(fileno, length: int, *args, **kwargs):
try:
with self._lock:
vf = self._by_node[fileno]
except BaseException:
return self._mmap_mmap(fileno, length, *args, **kwargs)
else:
return vf.mmap(length, kwargs.get('offset', 0))
def hook_exists(path):
try:
with self._lock:
return os.path.basename(path) in self._by_name
except BaseException:
return self._exists(path)
self._builtins_open = builtins.open
self._os_stat = os.stat
self._mmap_mmap = mmap.mmap
self._io_open = io.open
self._isfile = os.path.isfile
self._exists = os.path.exists
builtins.open = hook_open
io.open = hook_open
os.stat = hook_stat
mmap.mmap = hook_mmap
os.path.isfile = hook_isfile
os.path.exists = hook_exists
return self
def release(self):
os.path.isfile = self._isfile
os.path.exists = self._exists
builtins.open = self._builtins_open
os.stat = self._os_stat
mmap.mmap = self._mmap_mmap
io.open = self._io_open
self._VFS_LOCK.release()
def __enter__(self):
"""
The context handler for the virtual file system initializes all hooks and releases them
when the context is left.
"""
return self.acquire()
def __exit__(self, *args):
self.release()
return False
Classes
class VirtualFile (fs, data=None, extension=None)
-
Represents a file in the virtual file system. It is linked to a
VirtualFileSystem
and may be initialized with a chunk of binary data. It is possible to leavedata
asNone
, specificall to create a virtual file that would be written to, rather than read from. In that case, the data written to the virtual file can be extracted as a binary string later. Additionally, it is possible to specify a file extension that the virtual file should use in its randomly generated path; this is useful in case the reader insists on a specific file extension or uses the file extension to deduce a mode of operation.Expand source code Browse git
class VirtualFile: """ Represents a file in the virtual file system. It is linked to a `refinery.lib.vfs.VirtualFileSystem` and may be initialized with a chunk of binary data. It is possible to leave `data` as `None`, specificall to create a virtual file that would be written to, rather than read from. In that case, the data written to the virtual file can be extracted as a binary string later. Additionally, it is possible to specify a file extension that the virtual file should use in its randomly generated path; this is useful in case the reader insists on a specific file extension or uses the file extension to deduce a mode of operation. """ name: str path: str node: int data: Optional[ByteStr] def __init__(self, fs: VirtualFileSystem, data: Optional[ByteStr] = None, extension: Optional[str] = None): extension = extension and F'.{extension}' or '' self.uuid = uuid.uuid4() self.name = F'{self.uuid!s}{extension}' self.data = data fs.install(self) def __repr__(self): return F'<VirtualFile/{self.name}>' @property def node(self) -> int: return self.uuid.fields[1] def mmap(self, length: int = 0, offset: int = 0) -> MemoryFile: """ Emulate the result of an `mmap` call to the virtual file. """ view = memoryview(self.data) node = self.node if length: view = view[offset:offset + length] class _MappedView(bytearray, MemoryFileMethods): def __init__(self): MemoryFileMethods.__init__(self, self, True, node) mapped = _MappedView() mapped[:] = view return mapped def open(self, mode: str) -> MemoryFile: """ Open the virtual file. """ if self.data is None and 'w' in mode: self.data = bytearray() fd = MemoryFile(self.data, read_as_bytes=True, fileno=self.node) fd.name = self.path return fd def __len__(self): if self.data is None: raise FileNotFoundError return len(self.data) def __fspath__(self): return self.path @property def path(self): """ Returns the absolute path to this virtual file. The virtual file is given a randomly generated, uuid-formatted file name in the current working directory. """ return os.path.join(os.getcwd(), self.name) def stat(self): """ Return a stat result for this virtual file. It has all permission bits set and accurately reports the size of the file. """ if self.data is None: raise FileNotFoundError(F'virtual file does not exist: {self.name}') M = stat.S_IMODE(0xFFFF) | stat.S_IFREG S = len(self.data) return os.stat_result(( M, # ST_MODE 0, # ST_INO 0, # ST_DEV 1, # ST_NLINK 0, # ST_UID 0, # ST_GID S, # ST_SIZE 0, # ST_ATIME 0, # ST_MTIME 0, # ST_CTIME ))
Class variables
var name
var data
Instance variables
var node
-
Expand source code Browse git
@property def node(self) -> int: return self.uuid.fields[1]
var path
-
Returns the absolute path to this virtual file. The virtual file is given a randomly generated, uuid-formatted file name in the current working directory.
Expand source code Browse git
@property def path(self): """ Returns the absolute path to this virtual file. The virtual file is given a randomly generated, uuid-formatted file name in the current working directory. """ return os.path.join(os.getcwd(), self.name)
Methods
def mmap(self, length=0, offset=0)
-
Emulate the result of an
mmap
call to the virtual file.Expand source code Browse git
def mmap(self, length: int = 0, offset: int = 0) -> MemoryFile: """ Emulate the result of an `mmap` call to the virtual file. """ view = memoryview(self.data) node = self.node if length: view = view[offset:offset + length] class _MappedView(bytearray, MemoryFileMethods): def __init__(self): MemoryFileMethods.__init__(self, self, True, node) mapped = _MappedView() mapped[:] = view return mapped
def open(self, mode)
-
Open the virtual file.
Expand source code Browse git
def open(self, mode: str) -> MemoryFile: """ Open the virtual file. """ if self.data is None and 'w' in mode: self.data = bytearray() fd = MemoryFile(self.data, read_as_bytes=True, fileno=self.node) fd.name = self.path return fd
def stat(self)
-
Return a stat result for this virtual file. It has all permission bits set and accurately reports the size of the file.
Expand source code Browse git
def stat(self): """ Return a stat result for this virtual file. It has all permission bits set and accurately reports the size of the file. """ if self.data is None: raise FileNotFoundError(F'virtual file does not exist: {self.name}') M = stat.S_IMODE(0xFFFF) | stat.S_IFREG S = len(self.data) return os.stat_result(( M, # ST_MODE 0, # ST_INO 0, # ST_DEV 1, # ST_NLINK 0, # ST_UID 0, # ST_GID S, # ST_SIZE 0, # ST_ATIME 0, # ST_MTIME 0, # ST_CTIME ))
class VirtualFileSystem
-
The main class and context handler for a virtual file system. It implements the hooking of system library functions and maps any number of
VirtualFile
instances to randomly generated file paths. It is used as follows:with VirtualFileSystem() as vfs: vf = VirtualFile(vfs, data, 'exe') parsed = external_library.open(vf.path)
A
VirtualFile
is given a UUID-based random name, and any query to the virtual file system that does not correspond to a knownVirtualFile
will be forwarded to the originalbuiltins.open
,os.stat
, ormmap.mmap
function.Nevertheless, units should attempt to spend as little time as possible inside the context handler; hooking builtin, file system related functions is a dangerous business.
Expand source code Browse git
class VirtualFileSystem: """ The main class and context handler for a virtual file system. It implements the hooking of system library functions and maps any number of `refinery.lib.vfs.VirtualFile` instances to randomly generated file paths. It is used as follows: with VirtualFileSystem() as vfs: vf = VirtualFile(vfs, data, 'exe') parsed = external_library.open(vf.path) A `refinery.lib.vfs.VirtualFile` is given a UUID-based random name, and any query to the virtual file system that does not correspond to a known `refinery.lib.vfs.VirtualFile` will be forwarded to the original `builtins.open`, `os.stat`, or `mmap.mmap` function. Nevertheless, units should attempt to spend **as little time as possible** inside the context handler; hooking builtin, file system related functions is a dangerous business. """ _VFS_LOCK = threading.Lock() def __init__(self): self._lock: threading.RLock = threading.RLock() self._by_name: Dict[str, VirtualFile] = {} self._by_node: Dict[int, VirtualFile] = {} def new(self, data: Optional[ByteStr] = None, extension: Optional[str] = None): return VirtualFile(self, data, extension) def install(self, file: VirtualFile): """ Add a new virtual file into the file system. This function is called by the constructor of `refinery.lib.vfs.VirtualFile` and usually does not have to be called manually. """ with self._lock: self._by_name[file.name] = file self._by_node[file.node] = file def acquire(self): self._VFS_LOCK.acquire() def hook_isfile(path): try: with self._lock: self._by_name[os.path.basename(path)] except BaseException: return self._isfile(path) else: return True def hook_open(file, *args, **kwargs): try: with self._lock: vf = self._by_name[os.path.basename(file)] except BaseException: return self._builtins_open(file, *args, **kwargs) else: return vf.open(args[0]) def hook_stat(file): try: with self._lock: vf = self._by_name[os.path.basename(file)] except BaseException: return self._os_stat(file) else: return vf.stat() def hook_mmap(fileno, length: int, *args, **kwargs): try: with self._lock: vf = self._by_node[fileno] except BaseException: return self._mmap_mmap(fileno, length, *args, **kwargs) else: return vf.mmap(length, kwargs.get('offset', 0)) def hook_exists(path): try: with self._lock: return os.path.basename(path) in self._by_name except BaseException: return self._exists(path) self._builtins_open = builtins.open self._os_stat = os.stat self._mmap_mmap = mmap.mmap self._io_open = io.open self._isfile = os.path.isfile self._exists = os.path.exists builtins.open = hook_open io.open = hook_open os.stat = hook_stat mmap.mmap = hook_mmap os.path.isfile = hook_isfile os.path.exists = hook_exists return self def release(self): os.path.isfile = self._isfile os.path.exists = self._exists builtins.open = self._builtins_open os.stat = self._os_stat mmap.mmap = self._mmap_mmap io.open = self._io_open self._VFS_LOCK.release() def __enter__(self): """ The context handler for the virtual file system initializes all hooks and releases them when the context is left. """ return self.acquire() def __exit__(self, *args): self.release() return False
Methods
def new(self, data=None, extension=None)
-
Expand source code Browse git
def new(self, data: Optional[ByteStr] = None, extension: Optional[str] = None): return VirtualFile(self, data, extension)
def install(self, file)
-
Add a new virtual file into the file system. This function is called by the constructor of
VirtualFile
and usually does not have to be called manually.Expand source code Browse git
def install(self, file: VirtualFile): """ Add a new virtual file into the file system. This function is called by the constructor of `refinery.lib.vfs.VirtualFile` and usually does not have to be called manually. """ with self._lock: self._by_name[file.name] = file self._by_node[file.node] = file
def acquire(self)
-
Expand source code Browse git
def acquire(self): self._VFS_LOCK.acquire() def hook_isfile(path): try: with self._lock: self._by_name[os.path.basename(path)] except BaseException: return self._isfile(path) else: return True def hook_open(file, *args, **kwargs): try: with self._lock: vf = self._by_name[os.path.basename(file)] except BaseException: return self._builtins_open(file, *args, **kwargs) else: return vf.open(args[0]) def hook_stat(file): try: with self._lock: vf = self._by_name[os.path.basename(file)] except BaseException: return self._os_stat(file) else: return vf.stat() def hook_mmap(fileno, length: int, *args, **kwargs): try: with self._lock: vf = self._by_node[fileno] except BaseException: return self._mmap_mmap(fileno, length, *args, **kwargs) else: return vf.mmap(length, kwargs.get('offset', 0)) def hook_exists(path): try: with self._lock: return os.path.basename(path) in self._by_name except BaseException: return self._exists(path) self._builtins_open = builtins.open self._os_stat = os.stat self._mmap_mmap = mmap.mmap self._io_open = io.open self._isfile = os.path.isfile self._exists = os.path.exists builtins.open = hook_open io.open = hook_open os.stat = hook_stat mmap.mmap = hook_mmap os.path.isfile = hook_isfile os.path.exists = hook_exists return self
def release(self)
-
Expand source code Browse git
def release(self): os.path.isfile = self._isfile os.path.exists = self._exists builtins.open = self._builtins_open os.stat = self._os_stat mmap.mmap = self._mmap_mmap io.open = self._io_open self._VFS_LOCK.release()