core

package
v0.0.0-...-add6624 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2020 License: MIT Imports: 22 Imported by: 3

Documentation

Index

Constants

View Source
const (
	InternalCmdHelp = "help"
	InternalCmdEnv  = "env"
	InternalCmdList = "list"
	InternalCmdVer  = "ver"
	InternalDevice  = "dev"
)
View Source
const InternalCmdPrefix = "--"

Variables

View Source
var DefaultAssemblerOptions = AssemblerOptions{
	MaxBufferedPagesPerConnection: 0,
	MaxBufferedPagesTotal:         0,
}

DefaultAssemblerOptions provides default options for an assembler. These options are used by default when calling NewAssembler, so if modified before a NewAssembler call they'll affect the resulting Assembler.

Note that the default options can result in ever-increasing memory usage unless one of the Flush* methods is called on a regular basis.

Functions

This section is empty.

Types

type Assembler

type Assembler struct {
	AssemblerOptions
	// contains filtered or unexported fields
}

Assembler handles reassembling TCP streams. It is not safe for concurrency... after passing a packet in via the Assemble call, the caller must wait for that call to return before calling Assemble again. Callers can get around this by creating multiple assemblers that share a StreamPool. In that case, each individual stream will still be handled serially (each stream has an individual mutex associated with it), however multiple assemblers can assemble different connections concurrently.

The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing applications written in Go. The Assembler uses the following methods to be as fast as possible, to keep packet processing speedy:

Avoids Lock Contention

Assemblers locks connections, but each connection has an individual lock, and rarely will two Assemblers be looking at the same connection. Assemblers lock the StreamPool when looking up connections, but they use Reader locks initially, and only force a write lock if they need to create a new connection or close one down. These happen much less frequently than individual packet handling.

Each assembler runs in its own goroutine, and the only state shared between goroutines is through the StreamPool. Thus all internal Assembler state can be handled without any locking.

NOTE: If you can guarantee that packets going to a set of Assemblers will contain information on different connections per Assembler (for example, they're already hashed by PF_RING hashing or some other hashing mechanism), then we recommend you use a seperate StreamPool per Assembler, thus avoiding all lock contention. Only when different Assemblers could receive packets for the same Stream should a StreamPool be shared between them.

Avoids Memory Copying

In the common case, handling of a single TCP packet should result in zero memory allocations. The Assembler will look up the connection, figure out that the packet has arrived in order, and immediately pass that packet on to the appropriate connection's handling code. Only if a packet arrives out of order is its contents copied and stored in memory for later.

Avoids Memory Allocation

Assemblers try very hard to not use memory allocation unless absolutely necessary. Packet data for sequential packets is passed directly to streams with no copying or allocation. Packet data for out-of-order packets is copied into reusable pages, and new pages are only allocated rarely when the page cache runs out. Page caches are Assembler-specific, thus not used concurrently and requiring no locking.

Internal representations for connection objects are also reused over time. Because of this, the most common memory allocation done by the Assembler is generally what's done by the caller in StreamFactory.New. If no allocation is done there, then very little allocation is done ever, mostly to handle large increases in bandwidth or numbers of connections.

TODO: The page caches used by an Assembler will grow to the size necessary to handle a workload, and currently will never shrink. This means that traffic spikes can result in large memory usage which isn't garbage collected when typical traffic levels return.

func NewAssembler

func NewAssembler(pool *StreamPool) *Assembler

NewAssembler creates a new assembler. Pass in the StreamPool to use, may be shared across assemblers.

This sets some sane defaults for the assembler options, see DefaultAssemblerOptions for details.

func (*Assembler) Assemble

func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP)

Assemble calls AssembleWithTimestamp with the current timestamp, useful for packets being read directly off the wire.

func (*Assembler) AssembleWithTimestamp

func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time)

AssembleWithTimestamp reassembles the given TCP packet into its appropriate stream.

The timestamp passed in must be the timestamp the packet was seen. For packets read off the wire, time.Now() should be fine. For packets read from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp will affect which streams are flushed by a call to FlushOlderThan.

Each Assemble call results in, in order:

zero or one calls to StreamFactory.New, creating a stream
zero or one calls to Reassembled on a single stream
zero or one calls to ReassemblyComplete on the same stream

func (*Assembler) FlushAll

func (a *Assembler) FlushAll() (closed int)

FlushAll flushes all remaining data into all remaining connections, closing those connections. It returns the total number of connections flushed/closed by the call.

func (*Assembler) FlushOlderThan

func (a *Assembler) FlushOlderThan(t time.Time) (flushed, closed int)

FlushOlderThan finds any streams waiting for packets older than the given time, and pushes through the data they have (IE: tells them to stop waiting and skip the data they're waiting for).

Each Stream maintains a list of zero or more sets of bytes it has received out-of-order. For example, if it has processed up through sequence number 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of bytes also has the timestamp it was originally viewed. A flush call will look at the smallest subsequent set of bytes, in this case [15-20), and if its timestamp is older than the passed-in time, it will push it and all contiguous byte-sets out to the Stream's Reassembled function. In this case, it will push [15-20), but also [20-25), since that's contiguous. It will only push [30-50) if its timestamp is also older than the passed-in time, otherwise it will wait until the next FlushOlderThan to see if bytes [25-30) come in.

If it pushes all bytes (or there were no sets of bytes to begin with) AND the connection has not received any bytes since the passed-in time, the connection will be closed.

Returns the number of connections flushed, and of those, the number closed because of the flush.

type AssemblerOptions

type AssemblerOptions struct {
	// MaxBufferedPagesTotal is an upper limit on the total number of pages to
	// buffer while waiting for out-of-order packets.  Once this limit is
	// reached, the assembler will degrade to flushing every connection it gets
	// a packet for.  If <= 0, this is ignored.
	MaxBufferedPagesTotal int
	// MaxBufferedPagesPerConnection is an upper limit on the number of pages
	// buffered for a single connection.  Should this limit be reached for a
	// particular connection, the smallest sequence number will be flushed,
	// along with any contiguous data.  If <= 0, this is ignored.
	MaxBufferedPagesPerConnection int
}

AssemblerOptions controls the behavior of each assembler. Modify the options of each assembler you create to change their behavior.

type Cmd

type Cmd struct {
	Device string
	// contains filtered or unexported fields
}

func NewCmd

func NewCmd(p *Plug) *Cmd

func (*Cmd) Run

func (cm *Cmd) Run()

start

type Core

type Core struct {
	Version string
}

func New

func New() Core

func (*Core) Run

func (c *Core) Run()

type Dispatch

type Dispatch struct {
	Plug *Plug
	// contains filtered or unexported fields
}

func NewDispatch

func NewDispatch(plug *Plug, cmd *Cmd) *Dispatch

func (*Dispatch) Capture

func (d *Dispatch) Capture()

type ExternalPlug

type ExternalPlug struct {
	Name          string
	Version       string
	ResolvePacket func(net gopacket.Flow, transport gopacket.Flow, r io.Reader)
	BPFFilter     func() string
	SetFlag       func([]string)
}

type Plug

type Plug struct {
	ResolveStream func(net gopacket.Flow, transport gopacket.Flow, r io.Reader)
	BPF           string

	InternalPlugList map[string]PlugInterface
	ExternalPlugList map[string]ExternalPlug
	// contains filtered or unexported fields
}

func NewPlug

func NewPlug() *Plug

func (*Plug) ChangePath

func (p *Plug) ChangePath(dir string)

func (*Plug) LoadExternalPlugList

func (p *Plug) LoadExternalPlugList()

func (*Plug) LoadInternalPlugList

func (p *Plug) LoadInternalPlugList()

func (*Plug) PrintList

func (p *Plug) PrintList()

func (*Plug) SetOption

func (p *Plug) SetOption(plugName string, plugParams []string)

type PlugInterface

type PlugInterface interface {
	ResolveStream(net gopacket.Flow, transport gopacket.Flow, r io.Reader)
	BPFFilter() string
	SetFlag([]string)
	Version() string
}

All internal plug-ins must implement this interface ResolvePacket - entry BPFFilter - set BPF, like: mysql(tcp and port 3306) SetFlag - plug-in params Version - plug-in version

type ProtocolStream

type ProtocolStream struct {
	// contains filtered or unexported fields
}

type ProtocolStreamFactory

type ProtocolStreamFactory struct {
	// contains filtered or unexported fields
}

func (*ProtocolStreamFactory) New

func (m *ProtocolStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream

type Reassembly

type Reassembly struct {
	// Bytes is the next set of bytes in the stream.  May be empty.
	Bytes []byte
	// Skip is set to non-zero if bytes were skipped between this and the last
	// Reassembly.  If this is the first packet in a connection and we didn't
	// see the start, we have no idea how many bytes we skipped, so we set it to
	// -1.  Otherwise, it's set to the number of bytes skipped.
	Skip int
	// Start is set if this set of bytes has a TCP SYN accompanying it.
	Start bool
	// End is set if this set of bytes has a TCP FIN or RST accompanying it.
	End bool
	// Seen is the timestamp this set of bytes was pulled off the wire.
	Seen time.Time
}

Reassembly objects are passed by an Assembler into Streams using the Reassembled call. Callers should not need to create these structs themselves except for testing.

type Sequence

type Sequence int64

Sequence is a TCP sequence number. It provides a few convenience functions for handling TCP wrap-around. The sequence should always be in the range [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations and should never be set.

func (Sequence) Add

func (s Sequence) Add(t int) Sequence

Add adds an integer to a sequence and returns the resulting sequence.

func (Sequence) Difference

func (s Sequence) Difference(t Sequence) int

Difference defines an ordering for comparing TCP sequences that's safe for roll-overs. It returns:

> 0 : if t comes after s
< 0 : if t comes before s
  0 : if t == s

The number returned is the sequence difference, so 4.Difference(8) will return 4.

It handles rollovers by considering any sequence in the first quarter of the uint32 space to be after any sequence in the last quarter of that space, thus wrapping the uint32 space.

type Stream

type Stream interface {
	// Reassembled is called zero or more times. Assembly guarantees that the
	// set of all Reassembly objects passed in during all calls are presented in
	// the order they appear in the TCP stream. Reassembly objects are reused
	// after each Reassembled call, so it's important to copy anything you need
	// out of them (specifically out of Reassembly.Bytes) that you need to stay
	// around after you return from the Reassembled call.
	Reassembled([]Reassembly)
	// ReassemblyComplete is called when assembly decides there is no more data
	// for this Stream, either because a FIN or RST packet was seen, or because
	// the stream has timed out without any new packet data (due to a call to
	// FlushOlderThan).
	ReassemblyComplete()
}

Stream is implemented by the caller to handle incoming reassembled TCP data. Callers create a StreamFactory, then StreamPool uses it to create a new Stream for every TCP stream.

assembly will, in order:

  1. Create the stream via StreamFactory.New
  2. Call Reassembled 0 or more times, passing in reassembled TCP data in order
  3. Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.

type StreamFactory

type StreamFactory interface {
	// New should return a new stream for the given TCP key.
	New(netFlow, tcpFlow gopacket.Flow) tcpassembly.Stream
}

StreamFactory is used by assembly to create a new stream for each new TCP session.

type StreamPool

type StreamPool struct {
	// contains filtered or unexported fields
}

StreamPool stores all streams created by Assemblers, allowing multiple assemblers to work together on stream processing while enforcing the fact that a single stream receives its data serially. It is safe for concurrency, usable by multiple Assemblers at once.

StreamPool handles the creation and storage of Stream objects used by one or more Assembler objects. When a new TCP stream is found by an Assembler, it creates an associated Stream by calling its StreamFactory's New method. Thereafter (until the stream is closed), that Stream object will receive assembled TCP data via Assembler's calls to the stream's Reassembled function.

Like the Assembler, StreamPool attempts to minimize allocation. Unlike the Assembler, though, it does have to do some locking to make sure that the connection objects it stores are accessible to multiple Assemblers.

func NewStreamPool

func NewStreamPool(factory StreamFactory) *StreamPool

NewStreamPool creates a new connection pool. Streams will be created as necessary using the passed-in StreamFactory.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL