Module refinery.units.formats.pcap
Expand source code Browse git
#!/usr/bin/env python3
# -*- coding: utf - 8 -* -
from __future__ import annotations
from typing import List, Union, TYPE_CHECKING
from refinery.units import Arg, Unit
from refinery.lib.vfs import VirtualFileSystem, VirtualFile
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import NoLogging
import dataclasses
if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv6Address
from pcapkit.foundation.reassembly.data.tcp import Datagram, DatagramID
from pcapkit.foundation.extraction import Packet
TIPAddr = Union[IPv4Address, IPv6Address]
@dataclasses.dataclass
class Conversation:
src_addr: str
dst_addr: str
src_port: int
dst_port: int
ack: int
@classmethod
def FromID(cls, stream_id: DatagramID):
src, sp = stream_id.src
dst, dp = stream_id.dst
return cls(str(src), str(dst), sp, dp, stream_id.ack)
@property
def src(self):
return F'{self.src_addr}:{self.src_port}'
@property
def dst(self):
return F'{self.dst_addr}:{self.dst_port}'
def __hash__(self):
return hash(frozenset((
(self.src, self.src_port),
(self.dst, self.dst_port))))
def swapped(self):
cls = self.__class__
return cls(self.dst_addr, self.src_addr, self.dst_port, self.src_port)
def __eq__(self, other):
return hash(self) == hash(other)
def __str__(self):
return F'[{self.src}] --> [{self.dst}]'
def src_to_dst(self):
return {'src': self.src, 'dst': self.dst}
def dst_to_src(self):
return {'src': self.dst, 'dst': self.src}
class pcap(Unit):
"""
Performs TCP stream reassembly from packet capture (PCAP) files. By default, the unit emits the parts of
each TCP conversation, attaching several pieces of metadata to each such output: Included are the source
and destination socket address as well as the variable `stream` which identifies the conversation which
it was part of. The chunks are returned in the order that the bytes were exchanged between source and
destination. When the `--merge` parameter is specified, the unit instead collects all bytes going forward
and backwards, respectively, and emitting these as two chunks, for each TCP conversation that took place.
"""
def __init__(
self,
merge: Arg.Switch('-m', help='Merge both parts of each TCP conversation into one chunk.') = False,
client: Arg.Switch('-c', group='D', help='Show only the client part of each conversation.') = False,
server: Arg.Switch('-s', group='D', help='Show only the server part of each conversation.') = False,
):
super().__init__(merge=merge, client=client, server=server)
@Unit.Requires('pypcapkit[scapy]>=1.3', 'all')
def _pcapkit():
with NoLogging():
import scapy.layers.tls.session # noqa
import pcapkit
return pcapkit
@Unit.Requires('scapy', 'all')
def _scapy():
import scapy
import scapy.packet
return scapy
def process(self, data):
pcapkit = self._pcapkit
merge = self.args.merge
with NoLogging(), VirtualFileSystem() as fs:
vf = VirtualFile(fs, data, 'pcap')
pcap = pcapkit.extract(
fin=vf.path,
engine=pcapkit.Scapy,
store=True,
nofile=True,
extension=False,
ip=True,
tcp=True,
reassembly=True,
reasm_strict=True,
)
tcp: List[Datagram] = list(pcap.reassembly.tcp)
tcp.sort(key=lambda p: min(p.index, default=0))
count, convo = 0, None
src_buffer = MemoryFile()
dst_buffer = MemoryFile()
self.log_debug(F'extracted {len(pcap.frame)} packets, assembled {len(tcp)} datagrams')
PT = self._scapy.packet
def payload(packet: Packet):
ok = (bytes, bytearray, PT.Raw)
no = (PT.NoPayload, PT.Padding)
circle = set()
while True:
try:
inner = packet.payload
except AttributeError:
break
if isinstance(packet, ok) and not isinstance(packet, no):
return packet.original
if id(inner) in circle:
break
packet = inner
circle.add(id(inner))
return B''
def sequence(i: int):
packet = pcap.frame[i - 1]
while len(packet):
try:
return packet.seq
except AttributeError:
pass
try:
packet = packet.payload
except AttributeError:
break
return 0
client = self.args.client
server = self.args.server
def commit():
if src_buffer.tell():
if not server:
yield self.labelled(src_buffer.getvalue(), **convo.src_to_dst())
src_buffer.truncate(0)
if dst_buffer.tell():
if not client:
yield self.labelled(dst_buffer.getvalue(), **convo.dst_to_src())
dst_buffer.truncate(0)
for datagram in tcp:
this_convo = Conversation.FromID(datagram.id)
if this_convo != convo:
if count and merge:
yield from commit()
count = count + 1
convo = this_convo
data = bytearray()
for index in sorted(datagram.index, key=sequence):
data.extend(payload(pcap.frame[index - 1]))
if not data:
continue
if not merge:
yield self.labelled(data, **this_convo.src_to_dst(), stream=count)
elif this_convo.src == convo.src:
src_buffer.write(data)
elif this_convo.dst == convo.src:
dst_buffer.write(data)
else:
raise RuntimeError(F'direction of packet {convo!s} in conversation {count} is unknown')
yield from commit()
Classes
class Conversation (src_addr, dst_addr, src_port, dst_port, ack)
-
Conversation(src_addr: 'str', dst_addr: 'str', src_port: 'int', dst_port: 'int', ack: 'int')
Expand source code Browse git
class Conversation: src_addr: str dst_addr: str src_port: int dst_port: int ack: int @classmethod def FromID(cls, stream_id: DatagramID): src, sp = stream_id.src dst, dp = stream_id.dst return cls(str(src), str(dst), sp, dp, stream_id.ack) @property def src(self): return F'{self.src_addr}:{self.src_port}' @property def dst(self): return F'{self.dst_addr}:{self.dst_port}' def __hash__(self): return hash(frozenset(( (self.src, self.src_port), (self.dst, self.dst_port)))) def swapped(self): cls = self.__class__ return cls(self.dst_addr, self.src_addr, self.dst_port, self.src_port) def __eq__(self, other): return hash(self) == hash(other) def __str__(self): return F'[{self.src}] --> [{self.dst}]' def src_to_dst(self): return {'src': self.src, 'dst': self.dst} def dst_to_src(self): return {'src': self.dst, 'dst': self.src}
Class variables
var src_addr
var dst_addr
var src_port
var dst_port
var ack
Static methods
def FromID(stream_id)
-
Expand source code Browse git
@classmethod def FromID(cls, stream_id: DatagramID): src, sp = stream_id.src dst, dp = stream_id.dst return cls(str(src), str(dst), sp, dp, stream_id.ack)
Instance variables
var src
-
Expand source code Browse git
@property def src(self): return F'{self.src_addr}:{self.src_port}'
var dst
-
Expand source code Browse git
@property def dst(self): return F'{self.dst_addr}:{self.dst_port}'
Methods
def swapped(self)
-
Expand source code Browse git
def swapped(self): cls = self.__class__ return cls(self.dst_addr, self.src_addr, self.dst_port, self.src_port)
def src_to_dst(self)
-
Expand source code Browse git
def src_to_dst(self): return {'src': self.src, 'dst': self.dst}
def dst_to_src(self)
-
Expand source code Browse git
def dst_to_src(self): return {'src': self.dst, 'dst': self.src}
class pcap (merge=False, client=False, server=False)
-
Performs TCP stream reassembly from packet capture (PCAP) files. By default, the unit emits the parts of each TCP conversation, attaching several pieces of metadata to each such output: Included are the source and destination socket address as well as the variable
stream
which identifies the conversation which it was part of. The chunks are returned in the order that the bytes were exchanged between source and destination. When the--merge
parameter is specified, the unit instead collects all bytes going forward and backwards, respectively, and emitting these as two chunks, for each TCP conversation that took place.Expand source code Browse git
class pcap(Unit): """ Performs TCP stream reassembly from packet capture (PCAP) files. By default, the unit emits the parts of each TCP conversation, attaching several pieces of metadata to each such output: Included are the source and destination socket address as well as the variable `stream` which identifies the conversation which it was part of. The chunks are returned in the order that the bytes were exchanged between source and destination. When the `--merge` parameter is specified, the unit instead collects all bytes going forward and backwards, respectively, and emitting these as two chunks, for each TCP conversation that took place. """ def __init__( self, merge: Arg.Switch('-m', help='Merge both parts of each TCP conversation into one chunk.') = False, client: Arg.Switch('-c', group='D', help='Show only the client part of each conversation.') = False, server: Arg.Switch('-s', group='D', help='Show only the server part of each conversation.') = False, ): super().__init__(merge=merge, client=client, server=server) @Unit.Requires('pypcapkit[scapy]>=1.3', 'all') def _pcapkit(): with NoLogging(): import scapy.layers.tls.session # noqa import pcapkit return pcapkit @Unit.Requires('scapy', 'all') def _scapy(): import scapy import scapy.packet return scapy def process(self, data): pcapkit = self._pcapkit merge = self.args.merge with NoLogging(), VirtualFileSystem() as fs: vf = VirtualFile(fs, data, 'pcap') pcap = pcapkit.extract( fin=vf.path, engine=pcapkit.Scapy, store=True, nofile=True, extension=False, ip=True, tcp=True, reassembly=True, reasm_strict=True, ) tcp: List[Datagram] = list(pcap.reassembly.tcp) tcp.sort(key=lambda p: min(p.index, default=0)) count, convo = 0, None src_buffer = MemoryFile() dst_buffer = MemoryFile() self.log_debug(F'extracted {len(pcap.frame)} packets, assembled {len(tcp)} datagrams') PT = self._scapy.packet def payload(packet: Packet): ok = (bytes, bytearray, PT.Raw) no = (PT.NoPayload, PT.Padding) circle = set() while True: try: inner = packet.payload except AttributeError: break if isinstance(packet, ok) and not isinstance(packet, no): return packet.original if id(inner) in circle: break packet = inner circle.add(id(inner)) return B'' def sequence(i: int): packet = pcap.frame[i - 1] while len(packet): try: return packet.seq except AttributeError: pass try: packet = packet.payload except AttributeError: break return 0 client = self.args.client server = self.args.server def commit(): if src_buffer.tell(): if not server: yield self.labelled(src_buffer.getvalue(), **convo.src_to_dst()) src_buffer.truncate(0) if dst_buffer.tell(): if not client: yield self.labelled(dst_buffer.getvalue(), **convo.dst_to_src()) dst_buffer.truncate(0) for datagram in tcp: this_convo = Conversation.FromID(datagram.id) if this_convo != convo: if count and merge: yield from commit() count = count + 1 convo = this_convo data = bytearray() for index in sorted(datagram.index, key=sequence): data.extend(payload(pcap.frame[index - 1])) if not data: continue if not merge: yield self.labelled(data, **this_convo.src_to_dst(), stream=count) elif this_convo.src == convo.src: src_buffer.write(data) elif this_convo.dst == convo.src: dst_buffer.write(data) else: raise RuntimeError(F'direction of packet {convo!s} in conversation {count} is unknown') yield from commit()
Ancestors
Class variables
var required_dependencies
var optional_dependencies
Inherited members