reassembly

package
v0.6.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2022 License: BSD-3-Clause, GPL-3.0 Imports: 10 Imported by: 0

Documentation

Overview

Package reassembly provides TCP stream re-assembly.

The reassembly package implements uni-directional TCP reassembly, for use in packet-sniffing applications. The caller reads packets off the wire, then presents them to an Assembler in the form of gopacket layers.TCP packets (github.com/dreadl0ck/gopacket, github.com/dreadl0ck/gopacket/layers).

The Assembler uses a user-supplied streamFactory to create a user-defined Stream interface, then passes packet data in stream order to that object. A concurrency-safe StreamPool keeps track of all current Streams being reassembled, so multiple Assemblers may run at once to assemble packets while taking advantage of multiple cores.

TODO: Add simplest example

Index

Constants

View Source
const (
	TCPStateClosed      = 0
	TCPStateSynSent     = 1
	TCPStateEstablished = 2
	TCPStateCloseWait   = 3
	TCPStateLastAck     = 4
	TCPStateReset       = 5
)

Internal values of state machine.

Variables

View Source
var Debug = false

Debug controls verbose logging.

Functions

This section is empty.

Types

type Assembler

type Assembler struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Assembler handles reassembling TCP streams. It is not safe for concurrency... after passing a packet in via the assemble call, the caller must wait for that call to return before calling assemble again. Callers can get around this by creating multiple assemblers that share a StreamPool. In that case, each individual stream will still be handled serially (each stream has an individual mutex associated with it), however multiple assemblers can assemble different connections concurrently.

The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing applications written in Go. The Assembler uses the following methods to be as fast as possible, to keep packet processing speedy:

Avoids Lock Contention

Assemblers locks connections, but each connection has an individual lock, and rarely will two Assemblers be looking at the same connection. Assemblers lock the StreamPool when looking up connections, but they use Reader locks initially, and only force a write lock if they need to create a new connection or close one down. These happen much less frequently than individual packet handling.

Each assembler runs in its own goroutine, and the only state shared between goroutines is through the StreamPool. Thus all internal Assembler state can be handled without any locking.

NOTE: If you can guarantee that packets going to a set of Assemblers will contain information on different connections per Assembler (for example, they're already hashed by PF_RING hashing or some other hashing mechanism), then we recommend you use a separate StreamPool per Assembler, thus avoiding all lock contention. Only when different Assemblers could receive packets for the same Stream should a StreamPool be shared between them.

Avoids Memory Copying

In the common case, handling of a single TCP packet should result in zero memory allocations. The Assembler will look up the connection, figure out that the packet has arrived in order, and immediately pass that packet on to the appropriate connection's handling code. Only if a packet arrives out of order is its contents copied and stored in memory for later.

Avoids Memory Allocation

Assemblers try very hard to not use memory allocation unless absolutely necessary. Packet data for sequential packets is passed directly to streams with no copying or allocation. Packet data for out-of-order packets is copied into reusable pages, and new pages are only allocated rarely when the page cache runs out. Page caches are Assembler-specific, thus not used concurrently and requiring no locking.

Internal representations for connection objects are also reused over time. Because of this, the most common memory allocation done by the Assembler is generally what's done by the caller in streamFactory.New. If no allocation is done there, then very little allocation is done ever, mostly to handle large increases in bandwidth or numbers of connections.

TODO: The page caches used by an Assembler will grow to the size necessary to handle a workload, and currently will never shrink. This means that traffic spikes can result in large memory usage which isn't garbage collected when typical traffic levels return.

func NewAssembler

func NewAssembler(pool *StreamPool) *Assembler

NewAssembler creates a new assembler. Pass in the StreamPool to use, may be shared across assemblers.

This sets some sane defaults for the assembler options, see defaultAssemblerOptions for details.

func (*Assembler) AssembleWithContext

func (a *Assembler) AssembleWithContext(netFlow gopacket.Flow, t *layers.TCP, ac AssemblerContext)

AssembleWithContext reassembles the given TCP packet into its appropriate stream.

The timestamp passed in must be the timestamp the packet was seen. For packets read off the wire, time.Now() should be fine. For packets read from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will affect which streams are flushed by a call to flushCloseOlderThan.

Each AssembleWithContext call results in, in order:

zero or one call to streamFactory.New, creating a stream
zero or one call to ReassembledSG on a single stream
zero or one call to ReassemblyComplete on the same stream

func (*Assembler) Dump

func (a *Assembler) Dump() string

Dump returns a short string describing the page usage of the Assembler.

func (*Assembler) FlushAll

func (a *Assembler) FlushAll() (closed int)

FlushAll flushes all remaining data into all remaining connections and closes those connections. It returns the total number of connections flushed/closed by the call.

func (*Assembler) FlushAllProgress

func (a *Assembler) FlushAllProgress() (closed int)

FlushAllProgress behaves like FlushAll, but displays a progress bar additionally.

func (*Assembler) FlushWithOptions

func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int)

FlushWithOptions finds any streams waiting for packets older than the given time T, and pushes through the data they have (IE: tells them to stop waiting and skip the data they're waiting for).

It also closes streams older than TC (that can be set to zero, to keep long-lived stream alive, but to flush data anyway).

Each Stream maintains a list of zero or more sets of bytes it has received out-of-order. For example, if it has processed up through sequence number 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of bytes also has the timestamp it was originally viewed. A flush call will look at the smallest subsequent set of bytes, in this case [15-20), and if its timestamp is older than the passed-in time, it will push it and all contiguous byte-sets out to the Stream's Reassembled function. In this case, it will push [15-20), but also [20-25), since that's contiguous. It will only push [30-50) if its timestamp is also older than the passed-in time, otherwise it will wait until the next flushCloseOlderThan to see if bytes [25-30) come in.

Returns the number of connections flushed, and of those, the number closed because of the flush.

type AssemblerContext

type AssemblerContext interface {
	GetCaptureInfo() gopacket.CaptureInfo
}

AssemblerContext provides method to get metadata.

type FlushOptions

type FlushOptions struct {
	T  time.Time // If nonzero, only connections with data older than T are flushed
	TC time.Time // If nonzero, only connections with data older than TC are closed (if no FIN/RST received)
}

FlushOptions provide options for flushing connections.

type ScatterGather

type ScatterGather interface {
	// Lengths returns the length of available bytes and saved bytes
	Lengths() (int, int)

	// Fetch returns the bytes up to length (shall be <= available bytes)
	Fetch(length int) []byte

	// KeepFrom tell to keep from offset
	KeepFrom(offset int)

	// CaptureInfo returns the CaptureInfo of packet corresponding to given offset
	CaptureInfo(offset int) gopacket.CaptureInfo

	// Info returns some info about the reassembled chunks
	Info() (direction TCPFlowDirection, start bool, end bool, skip int)

	// Stats returns some stats regarding the state of the stream
	Stats() TCPAssemblyStats
}

ScatterGather is used to pass reassembled data and metadata of reassembled packets to a Stream via ReassembledSG.

type Sequence

type Sequence int64

Sequence is a TCP sequence number. It provides a few convenience functions for handling TCP wrap-around. The sequence should always be in the range [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations and should never be set.

type Stream

type Stream interface {
	// Accept tells whether the TCP packet should be accepted, start could be modified to force a start even if no SYN have been seen
	Accept(tcp *layers.TCP, dir TCPFlowDirection, nextSeq Sequence) bool

	// ReassembledSG is called zero or more times.
	// ScatterGather is reused after each Reassembled call,
	// so it's important to copy anything you need out of it,
	// especially bytes (or use KeepFrom())
	ReassembledSG(sg ScatterGather, ac AssemblerContext)

	// ReassemblyComplete is called when assembly decides there is
	// no more data for this Stream, either because a FIN or RST packet
	// was seen, or because the stream has timed out without any new
	// packet data (due to a call to FlushCloseOlderThan).
	// It should return true if the connection should be removed from the pool
	// It can return false if it want to see subsequent packets with Accept(), e.g. to
	// see FIN-ACK, for deeper state-machine analysis.
	ReassemblyComplete(ac AssemblerContext, firstFlow gopacket.Flow, reason string) bool
}

Stream is implemented by the caller to handle incoming reassembled TCP data. Callers create a streamFactory, then StreamPool uses it to create a new Stream for every TCP stream.

assembly will, in order:

  1. Create the stream via streamFactory.New
  2. Call ReassembledSG 0 or more times, passing in reassembled TCP data in order
  3. Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields
}

StreamPool stores all streams created by Assemblers, allowing multiple assemblers to work together on stream processing while enforcing the fact that a single stream receives its data serially. It is safe for concurrency, usable by multiple Assemblers at once.

StreamPool handles the creation and storage of Stream objects used by one or more Assembler objects. When a new TCP stream is found by an Assembler, it creates an associated Stream by calling its streamFactory's New method. Thereafter (until the stream is closed), that Stream object will receive assembled TCP data via Assembler's calls to the stream's Reassembled function.

Like the Assembler, StreamPool attempts to minimize allocation. Unlike the Assembler, though, it does have to do some locking to make sure that the connection objects it stores are accessible to multiple Assemblers.

func NewStreamPool

func NewStreamPool(factory streamFactory) *StreamPool

NewStreamPool creates a new connection pool. Streams will be created as necessary using the passed-in streamFactory.

func (*StreamPool) DumpString

func (p *StreamPool) DumpString() string

DumpString logs all connections and returns a string.

type TCPAssemblyStats

type TCPAssemblyStats struct {
	// For this ScatterGather
	Chunks  int
	Packets int
	// For the half connection, since last call to ReassembledSG()
	QueuedBytes    int
	QueuedPackets  int
	OverlapBytes   int
	OverlapPackets int
}

TCPAssemblyStats provides some figures for a ScatterGather.

type TCPFlowDirection

type TCPFlowDirection bool

TCPFlowDirection distinguish the two half-connections directions.

TCPDirClientToServer is assigned to half-connection for the first received packet, hence might be wrong if packets are not received in order. It's up to the caller (e.g. in Accept()) to decide if the direction should be interpreted differently.

const (
	TCPDirClientToServer TCPFlowDirection = false
	TCPDirServerToClient TCPFlowDirection = true
)

Value are not really useful.

func (TCPFlowDirection) String

func (dir TCPFlowDirection) String() string

type TCPOptionCheck

type TCPOptionCheck struct {
	// contains filtered or unexported fields
}

TCPOptionCheck contains options for the two directions.

func NewTCPOptionCheck

func NewTCPOptionCheck() TCPOptionCheck

NewTCPOptionCheck creates default options.

func (*TCPOptionCheck) Accept

func (t *TCPOptionCheck) Accept(tcp *layers.TCP, dir TCPFlowDirection, nextSeq Sequence) error

Accept checks whether the packet should be accepted by checking TCP options.

type TCPSimpleFSM

type TCPSimpleFSM struct {
	// contains filtered or unexported fields
}

TCPSimpleFSM implements a very simple TCP state machine

Usage: When implementing a Stream interface and to avoid to consider packets that would be rejected due to client/server's TCP stack, the Accept() can call TCPSimpleFSM.CheckState().

Limitations: - packet should be received in-order. - no check on sequence number is performed - no RST.

func NewTCPSimpleFSM

func NewTCPSimpleFSM(options TCPSimpleFSMOptions) *TCPSimpleFSM

NewTCPSimpleFSM creates a new TCPSimpleFSM.

func (*TCPSimpleFSM) CheckState

func (t *TCPSimpleFSM) CheckState(tcp *layers.TCP, dir TCPFlowDirection) bool

CheckState returns false if tcp is invalid wrt current state or update the state machine's state.

func (*TCPSimpleFSM) String

func (t *TCPSimpleFSM) String() string

type TCPSimpleFSMOptions

type TCPSimpleFSMOptions struct {
	SupportMissingEstablishment bool // Allow missing SYN, SYN+ACK, ACK
}

TCPSimpleFSMOptions holds options for TCPSimpleFSM.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL