Documentation

Overview

    Package reassembly provides TCP stream re-assembly.

    The reassembly package implements uni-directional TCP reassembly, for use in packet-sniffing applications. The caller reads packets off the wire, then presents them to an Assembler in the form of gopacket layers.TCP packets (github.com/google/gopacket, github.com/google/gopacket/layers).

    The Assembler uses a user-supplied StreamFactory to create a user-defined Stream interface, then passes packet data in stream order to that object. A concurrency-safe StreamPool keeps track of all current Streams being reassembled, so multiple Assemblers may run at once to assemble packets while taking advantage of multiple cores.

    TODO: Add simplest example

    Index

    Constants

    View Source
    const (
    	TCPStateClosed      = 0
    	TCPStateSynSent     = 1
    	TCPStateEstablished = 2
    	TCPStateCloseWait   = 3
    	TCPStateLastAck     = 4
    	TCPStateReset       = 5
    )

      Internal values of state machine

      Variables

      View Source
      var DefaultAssemblerOptions = AssemblerOptions{
      	MaxBufferedPagesPerConnection: 0,
      	MaxBufferedPagesTotal:         0,
      }

        DefaultAssemblerOptions provides default options for an assembler. These options are used by default when calling NewAssembler, so if modified before a NewAssembler call they'll affect the resulting Assembler.

        Note that the default options can result in ever-increasing memory usage unless one of the Flush* methods is called on a regular basis.

        Functions

        This section is empty.

        Types

        type Assembler

        type Assembler struct {
        	AssemblerOptions
        	// contains filtered or unexported fields
        }

        Assembler handles reassembling TCP streams. It is not safe for concurrency... after passing a packet in via the Assemble call, the caller must wait for that call to return before calling Assemble again. Callers can get around this by creating multiple assemblers that share a StreamPool. In that case, each individual stream will still be handled serially (each stream has an individual mutex associated with it), however multiple assemblers can assemble different connections concurrently.

        The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing applications written in Go. The Assembler uses the following methods to be as fast as possible, to keep packet processing speedy:

        Avoids Lock Contention

        Assemblers locks connections, but each connection has an individual lock, and rarely will two Assemblers be looking at the same connection. Assemblers lock the StreamPool when looking up connections, but they use Reader locks initially, and only force a write lock if they need to create a new connection or close one down. These happen much less frequently than individual packet handling.

        Each assembler runs in its own goroutine, and the only state shared between goroutines is through the StreamPool. Thus all internal Assembler state can be handled without any locking.

        NOTE: If you can guarantee that packets going to a set of Assemblers will contain information on different connections per Assembler (for example, they're already hashed by PF_RING hashing or some other hashing mechanism), then we recommend you use a seperate StreamPool per Assembler, thus avoiding all lock contention. Only when different Assemblers could receive packets for the same Stream should a StreamPool be shared between them.

        Avoids Memory Copying

        In the common case, handling of a single TCP packet should result in zero memory allocations. The Assembler will look up the connection, figure out that the packet has arrived in order, and immediately pass that packet on to the appropriate connection's handling code. Only if a packet arrives out of order is its contents copied and stored in memory for later.

        Avoids Memory Allocation

        Assemblers try very hard to not use memory allocation unless absolutely necessary. Packet data for sequential packets is passed directly to streams with no copying or allocation. Packet data for out-of-order packets is copied into reusable pages, and new pages are only allocated rarely when the page cache runs out. Page caches are Assembler-specific, thus not used concurrently and requiring no locking.

        Internal representations for connection objects are also reused over time. Because of this, the most common memory allocation done by the Assembler is generally what's done by the caller in StreamFactory.New. If no allocation is done there, then very little allocation is done ever, mostly to handle large increases in bandwidth or numbers of connections.

        TODO: The page caches used by an Assembler will grow to the size necessary to handle a workload, and currently will never shrink. This means that traffic spikes can result in large memory usage which isn't garbage collected when typical traffic levels return.

        func NewAssembler

        func NewAssembler(pool *StreamPool) *Assembler

          NewAssembler creates a new assembler. Pass in the StreamPool to use, may be shared across assemblers.

          This sets some sane defaults for the assembler options, see DefaultAssemblerOptions for details.

          func (*Assembler) Assemble

          func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP)

            Assemble calls AssembleWithContext with the current timestamp, useful for packets being read directly off the wire.

            func (*Assembler) AssembleWithContext

            func (a *Assembler) AssembleWithContext(netFlow gopacket.Flow, t *layers.TCP, ac AssemblerContext)

              AssembleWithContext reassembles the given TCP packet into its appropriate stream.

              The timestamp passed in must be the timestamp the packet was seen. For packets read off the wire, time.Now() should be fine. For packets read from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will affect which streams are flushed by a call to FlushCloseOlderThan.

              Each AssembleWithContext call results in, in order:

              zero or one call to StreamFactory.New, creating a stream
              zero or one call to ReassembledSG on a single stream
              zero or one call to ReassemblyComplete on the same stream
              

              func (*Assembler) Dump

              func (a *Assembler) Dump() string

                Dump returns a short string describing the page usage of the Assembler

                func (*Assembler) FlushAll

                func (a *Assembler) FlushAll() (closed int)

                  FlushAll flushes all remaining data into all remaining connections and closes those connections. It returns the total number of connections flushed/closed by the call.

                  func (*Assembler) FlushCloseOlderThan

                  func (a *Assembler) FlushCloseOlderThan(t time.Time) (flushed, closed int)

                    FlushCloseOlderThan flushes and closes streams older than given time

                    func (*Assembler) FlushWithOptions

                    func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int)

                      FlushWithOptions finds any streams waiting for packets older than the given time T, and pushes through the data they have (IE: tells them to stop waiting and skip the data they're waiting for).

                      It also closes streams older than TC (that can be set to zero, to keep long-lived stream alive, but to flush data anyway).

                      Each Stream maintains a list of zero or more sets of bytes it has received out-of-order. For example, if it has processed up through sequence number 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of bytes also has the timestamp it was originally viewed. A flush call will look at the smallest subsequent set of bytes, in this case [15-20), and if its timestamp is older than the passed-in time, it will push it and all contiguous byte-sets out to the Stream's Reassembled function. In this case, it will push [15-20), but also [20-25), since that's contiguous. It will only push [30-50) if its timestamp is also older than the passed-in time, otherwise it will wait until the next FlushCloseOlderThan to see if bytes [25-30) come in.

                      Returns the number of connections flushed, and of those, the number closed because of the flush.

                      type AssemblerContext

                      type AssemblerContext interface {
                      	GetCaptureInfo() gopacket.CaptureInfo
                      }

                        AssemblerContext provides method to get metadata

                        type AssemblerOptions

                        type AssemblerOptions struct {
                        	// MaxBufferedPagesTotal is an upper limit on the total number of pages to
                        	// buffer while waiting for out-of-order packets.  Once this limit is
                        	// reached, the assembler will degrade to flushing every connection it
                        	// gets a packet for.  If <= 0, this is ignored.
                        	MaxBufferedPagesTotal int
                        	// MaxBufferedPagesPerConnection is an upper limit on the number of pages
                        	// buffered for a single connection.  Should this limit be reached for a
                        	// particular connection, the smallest sequence number will be flushed, along
                        	// with any contiguous data.  If <= 0, this is ignored.
                        	MaxBufferedPagesPerConnection int
                        }

                          AssemblerOptions controls the behavior of each assembler. Modify the options of each assembler you create to change their behavior.

                          type FlushOptions

                          type FlushOptions struct {
                          	T  time.Time // If nonzero, only connections with data older than T are flushed
                          	TC time.Time // If nonzero, only connections with data older than TC are closed (if no FIN/RST received)
                          }

                            FlushOptions provide options for flushing connections.

                            type ScatterGather

                            type ScatterGather interface {
                            	// Returns the length of available bytes and saved bytes
                            	Lengths() (int, int)
                            	// Returns the bytes up to length (shall be <= available bytes)
                            	Fetch(length int) []byte
                            	// Tell to keep from offset
                            	KeepFrom(offset int)
                            	// Return CaptureInfo of packet corresponding to given offset
                            	CaptureInfo(offset int) gopacket.CaptureInfo
                            	// Return some info about the reassembled chunks
                            	Info() (direction TCPFlowDirection, start bool, end bool, skip int)
                            	// Return some stats regarding the state of the stream
                            	Stats() TCPAssemblyStats
                            }

                              ScatterGather is used to pass reassembled data and metadata of reassembled packets to a Stream via ReassembledSG

                              type Sequence

                              type Sequence int64

                                Sequence is a TCP sequence number. It provides a few convenience functions for handling TCP wrap-around. The sequence should always be in the range [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations and should never be set.

                                func (Sequence) Add

                                func (s Sequence) Add(t int) Sequence

                                  Add adds an integer to a sequence and returns the resulting sequence.

                                  func (Sequence) Difference

                                  func (s Sequence) Difference(t Sequence) int

                                    Difference defines an ordering for comparing TCP sequences that's safe for roll-overs. It returns:

                                    > 0 : if t comes after s
                                    < 0 : if t comes before s
                                      0 : if t == s
                                    

                                    The number returned is the sequence difference, so 4.Difference(8) will return 4.

                                    It handles rollovers by considering any sequence in the first quarter of the uint32 space to be after any sequence in the last quarter of that space, thus wrapping the uint32 space.

                                    type Stream

                                    type Stream interface {
                                    	// Tell whether the TCP packet should be accepted, start could be modified to force a start even if no SYN have been seen
                                    	Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool, ac AssemblerContext) bool
                                    
                                    	// ReassembledSG is called zero or more times.
                                    	// ScatterGather is reused after each Reassembled call,
                                    	// so it's important to copy anything you need out of it,
                                    	// especially bytes (or use KeepFrom())
                                    	ReassembledSG(sg ScatterGather, ac AssemblerContext)
                                    
                                    	// ReassemblyComplete is called when assembly decides there is
                                    	// no more data for this Stream, either because a FIN or RST packet
                                    	// was seen, or because the stream has timed out without any new
                                    	// packet data (due to a call to FlushCloseOlderThan).
                                    	// It should return true if the connection should be removed from the pool
                                    	// It can return false if it want to see subsequent packets with Accept(), e.g. to
                                    	// see FIN-ACK, for deeper state-machine analysis.
                                    	ReassemblyComplete(ac AssemblerContext) bool
                                    }

                                      Stream is implemented by the caller to handle incoming reassembled TCP data. Callers create a StreamFactory, then StreamPool uses it to create a new Stream for every TCP stream.

                                      assembly will, in order:

                                      1) Create the stream via StreamFactory.New
                                      2) Call ReassembledSG 0 or more times, passing in reassembled TCP data in order
                                      3) Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.
                                      

                                      type StreamFactory

                                      type StreamFactory interface {
                                      	// New should return a new stream for the given TCP key.
                                      	New(netFlow, tcpFlow gopacket.Flow, tcp *layers.TCP, ac AssemblerContext) Stream
                                      }

                                        StreamFactory is used by assembly to create a new stream for each new TCP session.

                                        type StreamPool

                                        type StreamPool struct {
                                        	// contains filtered or unexported fields
                                        }

                                          StreamPool stores all streams created by Assemblers, allowing multiple assemblers to work together on stream processing while enforcing the fact that a single stream receives its data serially. It is safe for concurrency, usable by multiple Assemblers at once.

                                          StreamPool handles the creation and storage of Stream objects used by one or more Assembler objects. When a new TCP stream is found by an Assembler, it creates an associated Stream by calling its StreamFactory's New method. Thereafter (until the stream is closed), that Stream object will receive assembled TCP data via Assembler's calls to the stream's Reassembled function.

                                          Like the Assembler, StreamPool attempts to minimize allocation. Unlike the Assembler, though, it does have to do some locking to make sure that the connection objects it stores are accessible to multiple Assemblers.

                                          func NewStreamPool

                                          func NewStreamPool(factory StreamFactory) *StreamPool

                                            NewStreamPool creates a new connection pool. Streams will be created as necessary using the passed-in StreamFactory.

                                            func (*StreamPool) Dump

                                            func (p *StreamPool) Dump()

                                              Dump logs all connections

                                              type TCPAssemblyStats

                                              type TCPAssemblyStats struct {
                                              	// For this ScatterGather
                                              	Chunks  int
                                              	Packets int
                                              	// For the half connection, since last call to ReassembledSG()
                                              	QueuedBytes    int
                                              	QueuedPackets  int
                                              	OverlapBytes   int
                                              	OverlapPackets int
                                              }

                                                TCPAssemblyStats provides some figures for a ScatterGather

                                                type TCPFlowDirection

                                                type TCPFlowDirection bool

                                                  TCPFlowDirection distinguish the two half-connections directions.

                                                  TCPDirClientToServer is assigned to half-connection for the first received packet, hence might be wrong if packets are not received in order. It's up to the caller (e.g. in Accept()) to decide if the direction should be interpretted differently.

                                                  const (
                                                  	TCPDirClientToServer TCPFlowDirection = false
                                                  	TCPDirServerToClient TCPFlowDirection = true
                                                  )

                                                    Value are not really useful

                                                    func (TCPFlowDirection) Reverse

                                                    func (dir TCPFlowDirection) Reverse() TCPFlowDirection

                                                      Reverse returns the reversed direction

                                                      func (TCPFlowDirection) String

                                                      func (dir TCPFlowDirection) String() string

                                                      type TCPOptionCheck

                                                      type TCPOptionCheck struct {
                                                      	// contains filtered or unexported fields
                                                      }

                                                        TCPOptionCheck contains options for the two directions

                                                        func NewTCPOptionCheck

                                                        func NewTCPOptionCheck() TCPOptionCheck

                                                          NewTCPOptionCheck creates default options

                                                          func (*TCPOptionCheck) Accept

                                                          func (t *TCPOptionCheck) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir TCPFlowDirection, nextSeq Sequence, start *bool) error

                                                            Accept checks whether the packet should be accepted by checking TCP options

                                                            type TCPSimpleFSM

                                                            type TCPSimpleFSM struct {
                                                            	// contains filtered or unexported fields
                                                            }

                                                              TCPSimpleFSM implements a very simple TCP state machine

                                                              Usage: When implementing a Stream interface and to avoid to consider packets that would be rejected due to client/server's TCP stack, the Accept() can call TCPSimpleFSM.CheckState().

                                                              Limitations: - packet should be received in-order. - no check on sequence number is performed - no RST

                                                              func NewTCPSimpleFSM

                                                              func NewTCPSimpleFSM(options TCPSimpleFSMOptions) *TCPSimpleFSM

                                                                NewTCPSimpleFSM creates a new TCPSimpleFSM

                                                                func (*TCPSimpleFSM) CheckState

                                                                func (t *TCPSimpleFSM) CheckState(tcp *layers.TCP, dir TCPFlowDirection) bool

                                                                  CheckState returns false if tcp is invalid wrt current state or update the state machine's state

                                                                  func (*TCPSimpleFSM) String

                                                                  func (t *TCPSimpleFSM) String() string

                                                                  type TCPSimpleFSMOptions

                                                                  type TCPSimpleFSMOptions struct {
                                                                  	SupportMissingEstablishment bool // Allow missing SYN, SYN+ACK, ACK
                                                                  }

                                                                    TCPSimpleFSMOptions holds options for TCPSimpleFSM