Module refinery.units.formats.pcap
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf - 8 -* -
from typing import NamedTuple, Union, TYPE_CHECKING
from refinery.units import Arg, Unit
from refinery.lib.vfs import VirtualFileSystem, VirtualFile
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import NoLogging
import logging
if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv6Address
TIPAddr = Union[IPv4Address, IPv6Address]
class Conversation(NamedTuple):
src_addr: str
dst_addr: str
src_port: int
dst_port: int
@classmethod
def FromID(cls, stream_id):
src, sp = stream_id.src
dst, dp = stream_id.dst
return cls(str(src), str(dst), sp, dp)
@property
def src(self):
return F'{self.src_addr}:{self.src_port}'
@property
def dst(self):
return F'{self.dst_addr}:{self.dst_port}'
def __hash__(self):
return hash(frozenset((self.src, self.dst)))
def swapped(self):
cls = self.__class__
return cls(self.dst_addr, self.src_addr, self.dst_port, self.src_port)
def __eq__(self, other):
return hash(self) == hash(other)
def __str__(self):
return F'[{self.src}] --> [{self.dst}]'
def src_to_dst(self):
return {'src': self.src, 'dst': self.dst}
def dst_to_src(self):
return {'src': self.dst, 'dst': self.src}
class pcap(Unit):
"""
Performs TCP stream reassembly from packet capture (PCAP) files. By default, the unit emits the parts of
each TCP conversation, attaching several pieces of metadata to each such output: Included are the source
and destination socket address as well as the variable `stream` which identifies the conversation which
it was part of. The chunks are returned in the order that the bytes were exchanged between source and
destination. When the `--merge` parameter is specified, the unit instead collects all bytes going forward
and backwards, respectively, and emitting these as two chunks, for each TCP conversation that took place.
"""
def __init__(self, merge: Arg.Switch('-m', help='Merge both parts of each TCP conversation into one chunk.') = False):
super().__init__(merge=merge)
@Unit.Requires('pypcapkit[scapy]<0.16.0', 'all')
def _pcapkit():
import pcapkit
return pcapkit
def process(self, data):
with NoLogging():
pcapkit = self._pcapkit
logging.getLogger('pcapkit').disabled = True
merge = self.args.merge
with VirtualFileSystem() as fs:
vf = VirtualFile(fs, data, 'pcap')
extraction = pcapkit.extract(
fin=vf.path, engine='scapy', store=False, nofile=True, extension=False, tcp=True, strict=True)
tcp: list = list(extraction.reassembly.tcp)
tcp.sort(key=lambda p: min(p.index, default=0))
count, convo = 0, None
src_buffer = MemoryFile()
dst_buffer = MemoryFile()
for stream in tcp:
this_convo = Conversation.FromID(stream.id)
if this_convo != convo:
if count and merge:
if src_buffer.tell():
yield self.labelled(src_buffer.getvalue(), **convo.src_to_dst())
src_buffer.truncate(0)
if dst_buffer.tell():
yield self.labelled(dst_buffer.getvalue(), **convo.dst_to_src())
dst_buffer.truncate(0)
count = count + 1
convo = this_convo
for packet in stream.packets:
if not merge:
yield self.labelled(packet.data, **this_convo.src_to_dst(), stream=count)
elif this_convo.src == convo.src:
src_buffer.write(packet.data)
elif this_convo.dst == convo.src:
dst_buffer.write(packet.data)
else:
raise RuntimeError(F'direction of packet {convo!s} in conversation {count} is unknown')
Classes
class Conversation (src_addr, dst_addr, src_port, dst_port)
-
Conversation(src_addr, dst_addr, src_port, dst_port)
Expand source code Browse git
class Conversation(NamedTuple): src_addr: str dst_addr: str src_port: int dst_port: int @classmethod def FromID(cls, stream_id): src, sp = stream_id.src dst, dp = stream_id.dst return cls(str(src), str(dst), sp, dp) @property def src(self): return F'{self.src_addr}:{self.src_port}' @property def dst(self): return F'{self.dst_addr}:{self.dst_port}' def __hash__(self): return hash(frozenset((self.src, self.dst))) def swapped(self): cls = self.__class__ return cls(self.dst_addr, self.src_addr, self.dst_port, self.src_port) def __eq__(self, other): return hash(self) == hash(other) def __str__(self): return F'[{self.src}] --> [{self.dst}]' def src_to_dst(self): return {'src': self.src, 'dst': self.dst} def dst_to_src(self): return {'src': self.dst, 'dst': self.src}
Ancestors
- builtins.tuple
Static methods
def FromID(stream_id)
-
Expand source code Browse git
@classmethod def FromID(cls, stream_id): src, sp = stream_id.src dst, dp = stream_id.dst return cls(str(src), str(dst), sp, dp)
Instance variables
var src_addr
-
Alias for field number 0
var dst_addr
-
Alias for field number 1
var src_port
-
Alias for field number 2
var dst_port
-
Alias for field number 3
var src
-
Expand source code Browse git
@property def src(self): return F'{self.src_addr}:{self.src_port}'
var dst
-
Expand source code Browse git
@property def dst(self): return F'{self.dst_addr}:{self.dst_port}'
Methods
def swapped(self)
-
Expand source code Browse git
def swapped(self): cls = self.__class__ return cls(self.dst_addr, self.src_addr, self.dst_port, self.src_port)
def src_to_dst(self)
-
Expand source code Browse git
def src_to_dst(self): return {'src': self.src, 'dst': self.dst}
def dst_to_src(self)
-
Expand source code Browse git
def dst_to_src(self): return {'src': self.dst, 'dst': self.src}
class pcap (merge=False)
-
Performs TCP stream reassembly from packet capture (PCAP) files. By default, the unit emits the parts of each TCP conversation, attaching several pieces of metadata to each such output: Included are the source and destination socket address as well as the variable
stream
which identifies the conversation which it was part of. The chunks are returned in the order that the bytes were exchanged between source and destination. When the--merge
parameter is specified, the unit instead collects all bytes going forward and backwards, respectively, and emitting these as two chunks, for each TCP conversation that took place.Expand source code Browse git
class pcap(Unit): """ Performs TCP stream reassembly from packet capture (PCAP) files. By default, the unit emits the parts of each TCP conversation, attaching several pieces of metadata to each such output: Included are the source and destination socket address as well as the variable `stream` which identifies the conversation which it was part of. The chunks are returned in the order that the bytes were exchanged between source and destination. When the `--merge` parameter is specified, the unit instead collects all bytes going forward and backwards, respectively, and emitting these as two chunks, for each TCP conversation that took place. """ def __init__(self, merge: Arg.Switch('-m', help='Merge both parts of each TCP conversation into one chunk.') = False): super().__init__(merge=merge) @Unit.Requires('pypcapkit[scapy]<0.16.0', 'all') def _pcapkit(): import pcapkit return pcapkit def process(self, data): with NoLogging(): pcapkit = self._pcapkit logging.getLogger('pcapkit').disabled = True merge = self.args.merge with VirtualFileSystem() as fs: vf = VirtualFile(fs, data, 'pcap') extraction = pcapkit.extract( fin=vf.path, engine='scapy', store=False, nofile=True, extension=False, tcp=True, strict=True) tcp: list = list(extraction.reassembly.tcp) tcp.sort(key=lambda p: min(p.index, default=0)) count, convo = 0, None src_buffer = MemoryFile() dst_buffer = MemoryFile() for stream in tcp: this_convo = Conversation.FromID(stream.id) if this_convo != convo: if count and merge: if src_buffer.tell(): yield self.labelled(src_buffer.getvalue(), **convo.src_to_dst()) src_buffer.truncate(0) if dst_buffer.tell(): yield self.labelled(dst_buffer.getvalue(), **convo.dst_to_src()) dst_buffer.truncate(0) count = count + 1 convo = this_convo for packet in stream.packets: if not merge: yield self.labelled(packet.data, **this_convo.src_to_dst(), stream=count) elif this_convo.src == convo.src: src_buffer.write(packet.data) elif this_convo.dst == convo.src: dst_buffer.write(packet.data) else: raise RuntimeError(F'direction of packet {convo!s} in conversation {count} is unknown')
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members