Version: v1.1.19 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2020 License: BSD-3-Clause Imports: 9 Imported by: 69



Package reassembly provides TCP stream re-assembly.

The reassembly package implements uni-directional TCP reassembly, for use in packet-sniffing applications. The caller reads packets off the wire, then presents them to an Assembler in the form of gopacket layers.TCP packets (,

The Assembler uses a user-supplied StreamFactory to create a user-defined Stream interface, then passes packet data in stream order to that object. A concurrency-safe StreamPool keeps track of all current Streams being reassembled, so multiple Assemblers may run at once to assemble packets while taking advantage of multiple cores.

TODO: Add simplest example



View Source
const (
	TCPStateClosed      = 0
	TCPStateSynSent     = 1
	TCPStateEstablished = 2
	TCPStateCloseWait   = 3
	TCPStateLastAck     = 4
	TCPStateReset       = 5

Internal values of state machine


View Source
var DefaultAssemblerOptions = AssemblerOptions{
	MaxBufferedPagesPerConnection: 0,
	MaxBufferedPagesTotal:         0,

DefaultAssemblerOptions provides default options for an assembler. These options are used by default when calling NewAssembler, so if modified before a NewAssembler call they'll affect the resulting Assembler.

Note that the default options can result in ever-increasing memory usage unless one of the Flush* methods is called on a regular basis.


This section is empty.


type Assembler

type Assembler struct {
	// contains filtered or unexported fields

Assembler handles reassembling TCP streams. It is not safe for concurrency... after passing a packet in via the Assemble call, the caller must wait for that call to return before calling Assemble again. Callers can get around this by creating multiple assemblers that share a StreamPool. In that case, each individual stream will still be handled serially (each stream has an individual mutex associated with it), however multiple assemblers can assemble different connections concurrently.

The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing applications written in Go. The Assembler uses the following methods to be as fast as possible, to keep packet processing speedy:

Avoids Lock Contention

Assemblers locks connections, but each connection has an individual lock, and rarely will two Assemblers be looking at the same connection. Assemblers lock the StreamPool when looking up connections, but they use Reader locks initially, and only force a write lock if they need to create a new connection or close one down. These happen much less frequently than individual packet handling.

Each assembler runs in its own goroutine, and the only state shared between goroutines is through the StreamPool. Thus all internal Assembler state can be handled without any locking.

NOTE: If you can guarantee that packets going to a set of Assemblers will contain information on different connections per Assembler (for example, they're already hashed by PF_RING hashing or some other hashing mechanism), then we recommend you use a seperate StreamPool per Assembler, thus avoiding all lock contention. Only when different Assemblers could receive packets for the same Stream should a StreamPool be shared between them.

Avoids Memory Copying

In the common case, handling of a single TCP packet should result in zero memory allocations. The Assembler will look up the connection, figure out that the packet has arrived in order, and immediately pass that packet on to the appropriate connection's handling code. Only if a packet arrives out of order is its contents copied and stored in memory for later.

Avoids Memory Allocation

Assemblers try very hard to not use memory allocation unless absolutely necessary. Packet data for sequential packets is passed directly to streams with no copying or allocation. Packet data for out-of-order packets is copied into reusable pages, and new pages are only allocated rarely when the page cache runs out. Page caches are Assembler-specific, thus not used concurrently and requiring no locking.

Internal representations for connection objects are also reused over time. Because of this, the most common memory allocation done by the Assembler is generally what's done by the caller in StreamFactory.New. If no allocation is done there, then very little allocation is done ever, mostly to handle large increases in bandwidth or numbers of connections.

TODO: The page caches used by an Assembler will grow to the size necessary to handle a workload, and currently will never shrink. This means that traffic spikes can result in large memory usage which isn't garbage collected when typical traffic levels return.

func NewAssembler

func NewAssembler(pool *StreamPool) *Assembler

NewAssembler creates a new assembler. Pass in the StreamPool to use, may be shared across assemblers.

This sets some sane defaults for the assembler options, see DefaultAssemblerOptions for details.

func (*Assembler) Assemble

func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP)

Assemble calls AssembleWithContext with the current timestamp, useful for packets being read directly off the wire.

func (*Assembler) AssembleWithContext

func (a *Assembler) AssembleWithContext(netFlow gopacket.Flow, t *layers.TCP, ac AssemblerContext)

AssembleWithContext reassembles the given TCP packet into its appropriate stream.

The timestamp passed in must be the timestamp the packet was seen. For packets read off the wire, time.Now() should be fine. For packets read from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will affect which streams are flushed by a call to FlushCloseOlderThan.

Each AssembleWithContext call results in, in order:

zero or one call to StreamFactory.New, creating a stream
zero or one call to ReassembledSG on a single stream
zero or one call to ReassemblyComplete on the same stream

func (*Assembler) Dump

func (a *Assembler) Dump() string

Dump returns a short string describing the page usage of the Assembler

func (*Assembler) FlushAll

func (a *Assembler) FlushAll() (closed int)

FlushAll flushes all remaining data into all remaining connections and closes those connections. It returns the total number of connections flushed/closed by the call.

func (*Assembler) FlushCloseOlderThan

func (a *Assembler) FlushCloseOlderThan(t time.Time) (flushed, closed int)

FlushCloseOlderThan flushes and closes streams older than given time

func (*Assembler) FlushWithOptions

func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int)

FlushWithOptions finds any streams waiting for packets older than the given time T, and pushes through the data they have (IE: tells them to stop waiting and skip the data they're waiting for).

It also closes streams older than TC (that can be set to zero, to keep long-lived stream alive, but to flush data anyway).

Each Stream maintains a list of zero or more sets of bytes it has received out-of-order. For example, if it has processed up through sequence number 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of bytes also has the timestamp it was originally viewed. A flush call will look at the smallest subsequent set of bytes, in this case [15-20), and if its timestamp is older than the passed-in time, it will push it and all contiguous byte-sets out to the Stream's Reassembled function. In this case, it will push [15-20), but also [20-25), since that's contiguous. It will only push [30-50) if its timestamp is also older than the passed-in time, otherwise it will wait until the next FlushCloseOlderThan to see if bytes [25-30) come in.

Returns the number of connections flushed, and of those, the number closed because of the flush.

type AssemblerContext

type AssemblerContext interface {
	GetCaptureInfo() gopacket.CaptureInfo

AssemblerContext provides method to get metadata

type AssemblerOptions

type AssemblerOptions struct {
	// MaxBufferedPagesTotal is an upper limit on the total number of pages to
	// buffer while waiting for out-of-order packets.  Once this limit is
	// reached, the assembler will degrade to flushing every connection it
	// gets a packet for.  If <= 0, this is ignored.
	MaxBufferedPagesTotal int
	// MaxBufferedPagesPerConnection is an upper limit on the number of pages
	// buffered for a single connection.  Should this limit be reached for a
	// particular connection, the smallest sequence number will be flushed, along
	// with any contiguous data.  If <= 0, this is ignored.
	MaxBufferedPagesPerConnection int

AssemblerOptions controls the behavior of each assembler. Modify the options of each assembler you create to change their behavior.

type FlushOptions

type FlushOptions struct {
	T  time.Time // If nonzero, only connections with data older than T are flushed
	TC time.Time // If nonzero, only connections with data older than TC are closed (if no FIN/RST received)

FlushOptions provide options for flushing connections.

type ScatterGather

type ScatterGather interface {
	// Returns the length of available bytes and saved bytes
	Lengths() (int, int)
	// Returns the bytes up to length (shall be <= available bytes)
	Fetch(length int) []byte
	// Tell to keep from offset
	KeepFrom(offset int)
	// Return CaptureInfo of packet corresponding to given offset
	CaptureInfo(offset int) gopacket.CaptureInfo
	// Return some info about the reassembled chunks
	Info() (direction TCPFlowDirection, start bool, end bool, skip int)
	// Return some stats regarding the state of the stream
	Stats() TCPAssemblyStats

ScatterGather is used to pass reassembled data and metadata of reassembled packets to a Stream via ReassembledSG

type Sequence

type Sequence int64

Sequence is a TCP sequence number. It provides a few convenience functions for handling TCP wrap-around. The sequence should always be in the range [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations and should never be set.

func (Sequence) Add

func (s Sequence) Add(t int) Sequence

Add adds an integer to a sequence and returns the resulting sequence.

func (Sequence) Difference

func (s Sequence) Difference(t Sequence) int

Difference defines an ordering for comparing TCP sequences that's safe for roll-overs. It returns:

> 0 : if t comes after s
< 0 : if t comes before s
  0 : if t == s

The number returned is the sequence difference, so 4.Difference(8) will return 4.

It handles rollovers by considering any sequence in the first quarter of the uint32 space to be after any sequence in the last quarter of that space, thus wrapping the uint32 space.

type Stream

type Stream interface {
	// Tell whether the TCP packet should be accepted, start could be modified to force a start even if no SYN have been seen
	Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool, ac AssemblerContext) bool

	// ReassembledSG is called zero or more times.
	// ScatterGather is reused after each Reassembled call,
	// so it's important to copy anything you need out of it,
	// especially bytes (or use KeepFrom())
	ReassembledSG(sg ScatterGather, ac AssemblerContext)

	// ReassemblyComplete is called when assembly decides there is
	// no more data for this Stream, either because a FIN or RST packet
	// was seen, or because the stream has timed out without any new
	// packet data (due to a call to FlushCloseOlderThan).
	// It should return true if the connection should be removed from the pool
	// It can return false if it want to see subsequent packets with Accept(), e.g. to
	// see FIN-ACK, for deeper state-machine analysis.
	ReassemblyComplete(ac AssemblerContext) bool

Stream is implemented by the caller to handle incoming reassembled TCP data. Callers create a StreamFactory, then StreamPool uses it to create a new Stream for every TCP stream.

assembly will, in order:

  1. Create the stream via StreamFactory.New
  2. Call ReassembledSG 0 or more times, passing in reassembled TCP data in order
  3. Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.

type StreamFactory

type StreamFactory interface {
	// New should return a new stream for the given TCP key.
	New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac AssemblerContext) Stream

StreamFactory is used by assembly to create a new stream for each new TCP session.

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields

StreamPool stores all streams created by Assemblers, allowing multiple assemblers to work together on stream processing while enforcing the fact that a single stream receives its data serially. It is safe for concurrency, usable by multiple Assemblers at once.

StreamPool handles the creation and storage of Stream objects used by one or more Assembler objects. When a new TCP stream is found by an Assembler, it creates an associated Stream by calling its StreamFactory's New method. Thereafter (until the stream is closed), that Stream object will receive assembled TCP data via Assembler's calls to the stream's Reassembled function.

Like the Assembler, StreamPool attempts to minimize allocation. Unlike the Assembler, though, it does have to do some locking to make sure that the connection objects it stores are accessible to multiple Assemblers.

func NewStreamPool

func NewStreamPool(factory StreamFactory) *StreamPool

NewStreamPool creates a new connection pool. Streams will be created as necessary using the passed-in StreamFactory.

func (*StreamPool) Dump

func (p *StreamPool) Dump()

Dump logs all connections

type TCPAssemblyStats

type TCPAssemblyStats struct {
	// For this ScatterGather
	Chunks  int
	Packets int
	// For the half connection, since last call to ReassembledSG()
	QueuedBytes    int
	QueuedPackets  int
	OverlapBytes   int
	OverlapPackets int

TCPAssemblyStats provides some figures for a ScatterGather

type TCPFlowDirection

type TCPFlowDirection bool

TCPFlowDirection distinguish the two half-connections directions.

TCPDirClientToServer is assigned to half-connection for the first received packet, hence might be wrong if packets are not received in order. It's up to the caller (e.g. in Accept()) to decide if the direction should be interpretted differently.

const (
	TCPDirClientToServer TCPFlowDirection = false
	TCPDirServerToClient TCPFlowDirection = true

Value are not really useful

func (TCPFlowDirection) Reverse

func (dir TCPFlowDirection) Reverse() TCPFlowDirection

Reverse returns the reversed direction

func (TCPFlowDirection) String

func (dir TCPFlowDirection) String() string

type TCPOptionCheck

type TCPOptionCheck struct {
	// contains filtered or unexported fields

TCPOptionCheck contains options for the two directions

func NewTCPOptionCheck

func NewTCPOptionCheck() TCPOptionCheck

NewTCPOptionCheck creates default options

func (*TCPOptionCheck) Accept

func (t *TCPOptionCheck) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool) error

Accept checks whether the packet should be accepted by checking TCP options

type TCPSimpleFSM

type TCPSimpleFSM struct {
	// contains filtered or unexported fields

TCPSimpleFSM implements a very simple TCP state machine

Usage: When implementing a Stream interface and to avoid to consider packets that would be rejected due to client/server's TCP stack, the Accept() can call TCPSimpleFSM.CheckState().

Limitations: - packet should be received in-order. - no check on sequence number is performed - no RST

func NewTCPSimpleFSM

func NewTCPSimpleFSM(options TCPSimpleFSMOptions) *TCPSimpleFSM

NewTCPSimpleFSM creates a new TCPSimpleFSM

func (*TCPSimpleFSM) CheckState

func (t *TCPSimpleFSM) CheckState(tcp *layers.TCP, dir TCPFlowDirection) bool

CheckState returns false if tcp is invalid wrt current state or update the state machine's state

func (*TCPSimpleFSM) String

func (t *TCPSimpleFSM) String() string

type TCPSimpleFSMOptions

type TCPSimpleFSMOptions struct {
	SupportMissingEstablishment bool // Allow missing SYN, SYN+ACK, ACK

TCPSimpleFSMOptions holds options for TCPSimpleFSM

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL