Module refinery.units.formats.pe.pedebloat
Expand source code Browse git
from __future__ import annotations
from typing import Generator, Iterable, Optional, TYPE_CHECKING
from refinery.units.formats.pe import OverlayUnit, Arg
from refinery.units.formats.pe.perc import RSRC
from refinery.lib.executable import Executable
from refinery.lib.meta import TerseSizeInt as TI, SizeInt
import zlib
from fnmatch import fnmatch
if TYPE_CHECKING:
from pefile import PE, Structure, SectionStructure
_KB = 1000
_MB = _KB * _KB
_STRIP = TI(10 * _MB)
_ASCII = Executable.ascii
class BrokenLink(Exception):
pass
class pedebloat(OverlayUnit):
"""
Removes junk or excess data from PE files and returns the stripped executable. By default, only
the PE overlay is considered; use the flags `-r` and `-s` to also consider resources and entire
sections. Any buffer is only considered for removal if it exceeds a certain size. If this
condition is met, a binary search is performed to determine the offset inside the buffer up to
which the compression ratio is above a certain threshold; everything beyond that point is then
removed. By setting the threshold compression ratio to 1, each large buffer is removed entirely.
"""
def __init__(
self,
*names: Arg.String(),
certificate=False,
directories=False,
memdump=False,
resources: Arg.Switch('-r', help='Strip large resources.') = False,
sections : Arg.Switch('-s', help='Strip large sections.') = False,
trim_code: Arg.Switch('-X', help='Lift the exception on code sections for stripping.') = False,
trim_rsrc: Arg.Switch('-Y', help='Lift the exception on rsrc sections for stripping.') = False,
threshold: Arg.Double('-t', metavar='T', help=(
'Trailing data from resources and sections is stripped until the compression ratio '
'of the remaining data rises above this threshold. The default value is {default}. '
'Set this to 1 to ignore the limit entirely and trim every structure as much as '
'possible without violating alignment. Setting this value to 0 will only strip repeated '
'occurrences of the last byte.')) = 0.05,
size_limit: Arg.Number('-l', help=(
'Structures below this size are not stripped. Default is {default!r}.')) = _STRIP,
keep_limit: Arg.Switch('-k', help=(
'Do not strip structures to below the above size limit.')) = False,
aggressive: Arg.Switch('-a', help=(
'Equivalent to -srt1: Strip large sections and resources aggressively.')) = False,
):
if aggressive:
sections = True
resources = True
threshold = 1
super().__init__(
certificate,
directories,
memdump,
sections=sections,
resources=resources,
size_limit=size_limit,
keep_limit=keep_limit,
threshold=threshold,
trim_rsrc=trim_rsrc,
trim_code=trim_code,
names=names,
)
@OverlayUnit.Requires('pefile', ['default', 'extended'])
def _pefile():
import pefile
return pefile
def _right_strip_data(self, data: memoryview, alignment=1, block_size=_MB) -> int:
if not data:
return 0
threshold = self.args.threshold
data_overhang = len(data) % alignment
result = data_overhang
if 0 < threshold < 1:
def compression_ratio(offset: int):
ratio = len(zlib.compress(data[:offset], level=1)) / offset
self.log_debug(F'compressing {SizeInt(offset)!r} ratio={ratio:6.4f}')
return ratio
upper = len(data)
lower = result
if compression_ratio(upper) <= threshold:
while block_size < upper - lower:
pivot = (lower + upper) // 2
ratio = compression_ratio(pivot)
if ratio > threshold:
lower = pivot + 1
continue
upper = pivot
if abs(ratio - threshold) < 1e-10:
break
result = upper
elif threshold == 0:
result = len(data)
elif threshold == 1:
result = 0
while result > 1 and data[result - 2] == data[result - 1]:
result -= 1
result = max(result, data_overhang)
if self.args.keep_limit:
result = max(result, self.args.size_limit)
result = result + (data_overhang - result) % alignment
if result > len(data):
excess = result - len(data)
excess = excess + (-excess % alignment)
result = result - excess
return result
def _adjust_offsets(self, pe: PE, gap_offset: int, gap_size: int):
base = pe.OPTIONAL_HEADER.ImageBase
alignment = pe.OPTIONAL_HEADER.FileAlignment
rva_offset = pe.get_rva_from_offset(gap_offset)
tva_offset = rva_offset + base
section = pe.get_section_by_offset(gap_offset)
new_section_size = section.SizeOfRawData - gap_size
if new_section_size % alignment != 0:
raise RuntimeError(
F'trimming 0x{gap_size:X} bytes from section {_ASCII(section.Name)} of size 0x{section.SizeOfRawData:X} '
F'violates required section alignment of 0x{alignment:X} bytes')
inside_section_offset = gap_offset - section.PointerToRawData
if inside_section_offset > new_section_size:
overlap = inside_section_offset - new_section_size
raise RuntimeError(F'trimming from section {_ASCII(section.Name)}; data extends {overlap} beyond section')
rva_lbound = section.VirtualAddress
rva_ubound = section.VirtualAddress + section.Misc_VirtualSize - 1
tva_lbound = rva_lbound + base
tva_ubound = rva_ubound + base
def adjust_attributes_of_structure(
structure: Structure,
gap_offset: int,
valid_values_lower_bound: Optional[int],
valid_values_upper_bound: Optional[int],
attributes: Iterable[str]
):
for attribute in attributes:
old_value = getattr(structure, attribute, 0)
if old_value <= gap_offset:
continue
if valid_values_lower_bound is not None and old_value < valid_values_lower_bound:
continue
if valid_values_upper_bound is not None and old_value > valid_values_upper_bound:
continue
new_value = old_value - gap_size
if new_value < gap_offset:
raise BrokenLink(F'attribute {attribute} points into removed region')
self.log_debug(F'adjusting field in {structure.name}: {attribute}')
setattr(structure, attribute, new_value)
it: Iterable[Structure] = iter(pe.__structures__)
structure_class = self._pefile.SectionStructure
remove = []
for index, structure in enumerate(it):
old_offset = structure.get_file_offset()
new_offset = old_offset - gap_offset
if old_offset > gap_offset:
if old_offset < gap_offset + gap_size:
self.log_debug(F'removing structure {structure.name}; starts inside removed region')
remove.append(index)
continue
if isinstance(structure, structure_class) and new_offset % alignment != 0:
raise RuntimeError(
F'structure {structure.name} would be moved to offset 0x{new_offset:X}, '
F'violating section alignment value 0x{alignment:X}.')
structure.set_file_offset(new_offset)
try:
adjust_attributes_of_structure(structure, rva_offset, rva_lbound, rva_ubound, (
'OffsetToData',
'AddressOfData',
'VirtualAddress',
'AddressOfNames',
'AddressOfNameOrdinals',
'AddressOfFunctions',
'AddressOfEntryPoint',
'AddressOfRawData',
'BaseOfCode',
'BaseOfData',
))
adjust_attributes_of_structure(structure, tva_offset, tva_lbound, tva_ubound, (
'StartAddressOfRawData',
'EndAddressOfRawData',
'AddressOfIndex',
'AddressOfCallBacks',
))
adjust_attributes_of_structure(structure, gap_offset, None, None, (
'OffsetModuleName',
'PointerToRawData',
))
except BrokenLink as error:
self.log_debug(F'removing structure {structure.name}; {error!s}')
remove.append(index)
continue
for attribute in (
'CvHeaderOffset',
'OffsetIn2Qwords',
'OffsetInQwords',
'Offset',
'OffsetLow',
'OffsetHigh'
):
if not hasattr(structure, attribute):
continue
self.log_warn(F'potential offset in structure {structure.name} ignored: {attribute}')
while remove:
index = remove.pop()
pe.__structures__[index:index + 1] = []
section.SizeOfRawData = new_section_size
def _trim_sections(self, pe: PE, data: bytearray) -> int:
S = self.args.size_limit
P = self.args.names
trimmed = 0
for section in pe.sections:
section: SectionStructure
offset = section.PointerToRawData
name = _ASCII(section.Name)
if not self.args.trim_code and name.lower() in ('.text', '.code'):
self.log_debug(F'skipping code section {name}; specify --trim-code to override.')
continue
if not self.args.trim_rsrc and name.lower() == '.rsrc':
self.log_debug(F'skipping rsrc section {name}; specify --trim-rsrc to override.')
continue
old_size = section.SizeOfRawData
if old_size <= S and not any(fnmatch(name, p) for p in P):
self.log_debug(F'criteria not satisfied for section: {SizeInt(old_size)!r} {name}')
continue
new_size = self._right_strip_data(
memoryview(data)[offset:offset + old_size],
pe.OPTIONAL_HEADER.FileAlignment)
if new_size == old_size:
continue
self.log_info(F'stripping section {name} from {TI(old_size)!r} to {TI(new_size)!r}')
gap_size = old_size - new_size
gap_offset = offset + new_size
if gap_size <= 0:
continue
self._adjust_offsets(pe, gap_offset, gap_size)
trimmed += gap_size
data[gap_offset:gap_offset + gap_size] = []
return trimmed
def _trim_pe_resources(self, pe: PE, data: bytearray) -> int:
S = self.args.size_limit
P = self.args.names
trimmed = 0
def find_bloated_resources(pe: PE, directory, level: int = 0, *path) -> Generator[Structure, None, None]:
for entry in directory.entries:
name = getattr(entry, 'name')
numeric = getattr(entry, 'id')
if not name:
if level == 0 and numeric in iter(RSRC):
name = RSRC(entry.id)
elif numeric is not None:
name = str(numeric)
name = name and str(name) or '?'
if entry.struct.DataIsDirectory:
yield from find_bloated_resources(pe, entry.directory, level + 1, *path, name)
continue
struct: Structure = entry.data.struct
name = '/'.join((*path, name))
if struct.Size <= S and not any(fnmatch(name, p) for p in P):
self.log_debug(F'criteria not satisfied for resource: {SizeInt(struct.Size)!r} {name}')
continue
yield name, struct
RSRC_INDEX = self._pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE']
pe.parse_data_directories(directories=[RSRC_INDEX])
try:
resources = pe.DIRECTORY_ENTRY_RESOURCE
except AttributeError:
return 0
for name, resource in find_bloated_resources(pe, resources):
offset = pe.get_offset_from_rva(resource.OffsetToData)
old_size = resource.Size
new_size = self._right_strip_data(
memoryview(data)[offset:offset + old_size],
pe.OPTIONAL_HEADER.FileAlignment)
self.log_info(F'stripping resource {name} from {old_size} to {new_size}')
gap_size = old_size - new_size
gap_offset = offset + new_size
if gap_size <= 0:
continue
resource.Size = new_size
self._adjust_offsets(pe, gap_offset, gap_size)
trimmed += gap_size
data[gap_offset:gap_offset + gap_size] = []
pe.OPTIONAL_HEADER.DATA_DIRECTORY[RSRC_INDEX].Size -= trimmed
self.log_info(F'trimming size of resource data directory by {TI(trimmed)!r}')
return trimmed
def process(self, data: bytearray) -> bytearray:
overlay_offset = self._get_size(data)
if len(data) - overlay_offset >= self.args.size_limit:
view = memoryview(data)
overlay_length = self._right_strip_data(view[overlay_offset:])
body_size = overlay_offset + overlay_length
try:
data[body_size:] = []
except Exception:
data = data[:body_size]
if not self.args.resources and not self.args.sections:
return data
pe = self._pefile.PE(data=data, fast_load=True)
total = len(data)
trimmed = 0
view = pe.__data__
copy = False
if not isinstance(view, bytearray):
view = memoryview(view)
try:
view[0] = 0x4D
except Exception:
copy = True
view = bytearray(pe.__data__)
if self.args.resources:
trimmed += self._trim_pe_resources(pe, view)
if self.args.sections:
trimmed += self._trim_sections(pe, view)
if copy:
pe.__data__ = view
data = pe.write()
end = total - trimmed
if end < len(data):
self.log_warn(F'output contains {len(data) - end} trailing bytes')
return data
Classes
class BrokenLink (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code Browse git
class BrokenLink(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class pedebloat (*names, certificate=False, directories=False, memdump=False, resources=False, sections=False, trim_code=False, trim_rsrc=False, threshold=0.05, size_limit=10.0 MB, keep_limit=False, aggressive=False)
-
Removes junk or excess data from PE files and returns the stripped executable. By default, only the PE overlay is considered; use the flags
-r
and-s
to also consider resources and entire sections. Any buffer is only considered for removal if it exceeds a certain size. If this condition is met, a binary search is performed to determine the offset inside the buffer up to which the compression ratio is above a certain threshold; everything beyond that point is then removed. By setting the threshold compression ratio to 1, each large buffer is removed entirely.Expand source code Browse git
class pedebloat(OverlayUnit): """ Removes junk or excess data from PE files and returns the stripped executable. By default, only the PE overlay is considered; use the flags `-r` and `-s` to also consider resources and entire sections. Any buffer is only considered for removal if it exceeds a certain size. If this condition is met, a binary search is performed to determine the offset inside the buffer up to which the compression ratio is above a certain threshold; everything beyond that point is then removed. By setting the threshold compression ratio to 1, each large buffer is removed entirely. """ def __init__( self, *names: Arg.String(), certificate=False, directories=False, memdump=False, resources: Arg.Switch('-r', help='Strip large resources.') = False, sections : Arg.Switch('-s', help='Strip large sections.') = False, trim_code: Arg.Switch('-X', help='Lift the exception on code sections for stripping.') = False, trim_rsrc: Arg.Switch('-Y', help='Lift the exception on rsrc sections for stripping.') = False, threshold: Arg.Double('-t', metavar='T', help=( 'Trailing data from resources and sections is stripped until the compression ratio ' 'of the remaining data rises above this threshold. The default value is {default}. ' 'Set this to 1 to ignore the limit entirely and trim every structure as much as ' 'possible without violating alignment. Setting this value to 0 will only strip repeated ' 'occurrences of the last byte.')) = 0.05, size_limit: Arg.Number('-l', help=( 'Structures below this size are not stripped. Default is {default!r}.')) = _STRIP, keep_limit: Arg.Switch('-k', help=( 'Do not strip structures to below the above size limit.')) = False, aggressive: Arg.Switch('-a', help=( 'Equivalent to -srt1: Strip large sections and resources aggressively.')) = False, ): if aggressive: sections = True resources = True threshold = 1 super().__init__( certificate, directories, memdump, sections=sections, resources=resources, size_limit=size_limit, keep_limit=keep_limit, threshold=threshold, trim_rsrc=trim_rsrc, trim_code=trim_code, names=names, ) @OverlayUnit.Requires('pefile', ['default', 'extended']) def _pefile(): import pefile return pefile def _right_strip_data(self, data: memoryview, alignment=1, block_size=_MB) -> int: if not data: return 0 threshold = self.args.threshold data_overhang = len(data) % alignment result = data_overhang if 0 < threshold < 1: def compression_ratio(offset: int): ratio = len(zlib.compress(data[:offset], level=1)) / offset self.log_debug(F'compressing {SizeInt(offset)!r} ratio={ratio:6.4f}') return ratio upper = len(data) lower = result if compression_ratio(upper) <= threshold: while block_size < upper - lower: pivot = (lower + upper) // 2 ratio = compression_ratio(pivot) if ratio > threshold: lower = pivot + 1 continue upper = pivot if abs(ratio - threshold) < 1e-10: break result = upper elif threshold == 0: result = len(data) elif threshold == 1: result = 0 while result > 1 and data[result - 2] == data[result - 1]: result -= 1 result = max(result, data_overhang) if self.args.keep_limit: result = max(result, self.args.size_limit) result = result + (data_overhang - result) % alignment if result > len(data): excess = result - len(data) excess = excess + (-excess % alignment) result = result - excess return result def _adjust_offsets(self, pe: PE, gap_offset: int, gap_size: int): base = pe.OPTIONAL_HEADER.ImageBase alignment = pe.OPTIONAL_HEADER.FileAlignment rva_offset = pe.get_rva_from_offset(gap_offset) tva_offset = rva_offset + base section = pe.get_section_by_offset(gap_offset) new_section_size = section.SizeOfRawData - gap_size if new_section_size % alignment != 0: raise RuntimeError( F'trimming 0x{gap_size:X} bytes from section {_ASCII(section.Name)} of size 0x{section.SizeOfRawData:X} ' F'violates required section alignment of 0x{alignment:X} bytes') inside_section_offset = gap_offset - section.PointerToRawData if inside_section_offset > new_section_size: overlap = inside_section_offset - new_section_size raise RuntimeError(F'trimming from section {_ASCII(section.Name)}; data extends {overlap} beyond section') rva_lbound = section.VirtualAddress rva_ubound = section.VirtualAddress + section.Misc_VirtualSize - 1 tva_lbound = rva_lbound + base tva_ubound = rva_ubound + base def adjust_attributes_of_structure( structure: Structure, gap_offset: int, valid_values_lower_bound: Optional[int], valid_values_upper_bound: Optional[int], attributes: Iterable[str] ): for attribute in attributes: old_value = getattr(structure, attribute, 0) if old_value <= gap_offset: continue if valid_values_lower_bound is not None and old_value < valid_values_lower_bound: continue if valid_values_upper_bound is not None and old_value > valid_values_upper_bound: continue new_value = old_value - gap_size if new_value < gap_offset: raise BrokenLink(F'attribute {attribute} points into removed region') self.log_debug(F'adjusting field in {structure.name}: {attribute}') setattr(structure, attribute, new_value) it: Iterable[Structure] = iter(pe.__structures__) structure_class = self._pefile.SectionStructure remove = [] for index, structure in enumerate(it): old_offset = structure.get_file_offset() new_offset = old_offset - gap_offset if old_offset > gap_offset: if old_offset < gap_offset + gap_size: self.log_debug(F'removing structure {structure.name}; starts inside removed region') remove.append(index) continue if isinstance(structure, structure_class) and new_offset % alignment != 0: raise RuntimeError( F'structure {structure.name} would be moved to offset 0x{new_offset:X}, ' F'violating section alignment value 0x{alignment:X}.') structure.set_file_offset(new_offset) try: adjust_attributes_of_structure(structure, rva_offset, rva_lbound, rva_ubound, ( 'OffsetToData', 'AddressOfData', 'VirtualAddress', 'AddressOfNames', 'AddressOfNameOrdinals', 'AddressOfFunctions', 'AddressOfEntryPoint', 'AddressOfRawData', 'BaseOfCode', 'BaseOfData', )) adjust_attributes_of_structure(structure, tva_offset, tva_lbound, tva_ubound, ( 'StartAddressOfRawData', 'EndAddressOfRawData', 'AddressOfIndex', 'AddressOfCallBacks', )) adjust_attributes_of_structure(structure, gap_offset, None, None, ( 'OffsetModuleName', 'PointerToRawData', )) except BrokenLink as error: self.log_debug(F'removing structure {structure.name}; {error!s}') remove.append(index) continue for attribute in ( 'CvHeaderOffset', 'OffsetIn2Qwords', 'OffsetInQwords', 'Offset', 'OffsetLow', 'OffsetHigh' ): if not hasattr(structure, attribute): continue self.log_warn(F'potential offset in structure {structure.name} ignored: {attribute}') while remove: index = remove.pop() pe.__structures__[index:index + 1] = [] section.SizeOfRawData = new_section_size def _trim_sections(self, pe: PE, data: bytearray) -> int: S = self.args.size_limit P = self.args.names trimmed = 0 for section in pe.sections: section: SectionStructure offset = section.PointerToRawData name = _ASCII(section.Name) if not self.args.trim_code and name.lower() in ('.text', '.code'): self.log_debug(F'skipping code section {name}; specify --trim-code to override.') continue if not self.args.trim_rsrc and name.lower() == '.rsrc': self.log_debug(F'skipping rsrc section {name}; specify --trim-rsrc to override.') continue old_size = section.SizeOfRawData if old_size <= S and not any(fnmatch(name, p) for p in P): self.log_debug(F'criteria not satisfied for section: {SizeInt(old_size)!r} {name}') continue new_size = self._right_strip_data( memoryview(data)[offset:offset + old_size], pe.OPTIONAL_HEADER.FileAlignment) if new_size == old_size: continue self.log_info(F'stripping section {name} from {TI(old_size)!r} to {TI(new_size)!r}') gap_size = old_size - new_size gap_offset = offset + new_size if gap_size <= 0: continue self._adjust_offsets(pe, gap_offset, gap_size) trimmed += gap_size data[gap_offset:gap_offset + gap_size] = [] return trimmed def _trim_pe_resources(self, pe: PE, data: bytearray) -> int: S = self.args.size_limit P = self.args.names trimmed = 0 def find_bloated_resources(pe: PE, directory, level: int = 0, *path) -> Generator[Structure, None, None]: for entry in directory.entries: name = getattr(entry, 'name') numeric = getattr(entry, 'id') if not name: if level == 0 and numeric in iter(RSRC): name = RSRC(entry.id) elif numeric is not None: name = str(numeric) name = name and str(name) or '?' if entry.struct.DataIsDirectory: yield from find_bloated_resources(pe, entry.directory, level + 1, *path, name) continue struct: Structure = entry.data.struct name = '/'.join((*path, name)) if struct.Size <= S and not any(fnmatch(name, p) for p in P): self.log_debug(F'criteria not satisfied for resource: {SizeInt(struct.Size)!r} {name}') continue yield name, struct RSRC_INDEX = self._pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_RESOURCE'] pe.parse_data_directories(directories=[RSRC_INDEX]) try: resources = pe.DIRECTORY_ENTRY_RESOURCE except AttributeError: return 0 for name, resource in find_bloated_resources(pe, resources): offset = pe.get_offset_from_rva(resource.OffsetToData) old_size = resource.Size new_size = self._right_strip_data( memoryview(data)[offset:offset + old_size], pe.OPTIONAL_HEADER.FileAlignment) self.log_info(F'stripping resource {name} from {old_size} to {new_size}') gap_size = old_size - new_size gap_offset = offset + new_size if gap_size <= 0: continue resource.Size = new_size self._adjust_offsets(pe, gap_offset, gap_size) trimmed += gap_size data[gap_offset:gap_offset + gap_size] = [] pe.OPTIONAL_HEADER.DATA_DIRECTORY[RSRC_INDEX].Size -= trimmed self.log_info(F'trimming size of resource data directory by {TI(trimmed)!r}') return trimmed def process(self, data: bytearray) -> bytearray: overlay_offset = self._get_size(data) if len(data) - overlay_offset >= self.args.size_limit: view = memoryview(data) overlay_length = self._right_strip_data(view[overlay_offset:]) body_size = overlay_offset + overlay_length try: data[body_size:] = [] except Exception: data = data[:body_size] if not self.args.resources and not self.args.sections: return data pe = self._pefile.PE(data=data, fast_load=True) total = len(data) trimmed = 0 view = pe.__data__ copy = False if not isinstance(view, bytearray): view = memoryview(view) try: view[0] = 0x4D except Exception: copy = True view = bytearray(pe.__data__) if self.args.resources: trimmed += self._trim_pe_resources(pe, view) if self.args.sections: trimmed += self._trim_sections(pe, view) if copy: pe.__data__ = view data = pe.write() end = total - trimmed if end < len(data): self.log_warn(F'output contains {len(data) - end} trailing bytes') return data
Ancestors
Subclasses
Class variables
var required_dependencies
var console
var reverse
var optional_dependencies
Inherited members