dispatcher

package
v0.1.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2023 License: Unlicense Imports: 22 Imported by: 0

Documentation

Overview

Package dispatcher is a network packet send/receive handler for peer to peer connections between relays.

Messages between peers are usually somewhat large, multi-layered onion messages that contain forwarding instructions for sending, and the dispatcher breaks them down into uniform sized segments and randomises their order.

On the receiving side, there is a buffer for incoming message segments, and when sufficient segments are received to enable reconstruction, reconstruction is attempted.

Messages are broken up into pieces with additional segments added to ensure the receiver gets enough pieces to decode the message without a message retransmit request, using Reed Solomon encoding (accelerated by AVX).

The dispatcher operates with reliable TCP connections, and does not directly influence retransmit but instead monitors the latency of the messages and identifies when there has been a retransmit and increases the redundant data added to the stream of message packet segments.

In this way, the dispatcher aims to always see sufficient data arrive in one message cycle so to minimise the latency of connections between peers, and the subsequent latency of client's routed packets.

Another feature of the dispatcher is key change processing - this is implemented as a concurrent update from the receiver specifying a new public key to use, it keeps the old key for a time to deal with in transit messages that were encrypted with the old key and the peer should update its key to use for all messages after the key has been received and updated.

Index

Constants

View Source
const (
	// DefaultStartingParity is set to 64, or 25%
	DefaultStartingParity = 64

	// DefaultDispatcherRekey is 16mb to trigger rekey.
	DefaultDispatcherRekey = 1 << 20

	// TimeoutPingCount defines the number of pings that fail to be sure the peer is
	// offline.
	TimeoutPingCount = 10
)
View Source
const (
	NewKeyMagic      = "newK"
	AcknowledgeMagic = "ackn"
	OnionMagic       = "onio"
)

Magic bytes message prefixes for Dispatcher messages.

Variables

This section is empty.

Functions

func AcknowledgeGen

func AcknowledgeGen() codec.Codec

AcknowledgeGen is a factory function that will be added to the registry for recognition and generation.

func InitRekeyGen

func InitRekeyGen() codec.Codec

InitRekeyGen is a factory function to generate a NewKey.

func OnionGen

func OnionGen() codec.Codec

OnionGen is a factory function for creating a new Onion.

Types

type Acknowledge

type Acknowledge struct {
	*RxRecord
}

Acknowledge wraps up an RxRecord to tell the other side how a message transmission went.

func (*Acknowledge) Decode

func (a *Acknowledge) Decode(s *splice.Splice) (e error)

Decode a splice with the cursor at the first byte after the magic.

func (*Acknowledge) Encode

func (a *Acknowledge) Encode(s *splice.Splice) (e error)

Encode an Acknowledge message to a splice.

func (*Acknowledge) Len

func (a *Acknowledge) Len() int

Len returns the length of an Acknowledge message in bytes.

func (*Acknowledge) Magic

func (a *Acknowledge) Magic() string

Magic is the identifying 4 byte prefix of an Acknowledge in binary form.

func (*Acknowledge) Unwrap added in v0.1.19

func (a *Acknowledge) Unwrap() interface{}

Unwrap returns nil because there is no onion inside an Acknowledge.

type Completion

type Completion struct {
	ID   nonce.ID
	Time time.Time
}

Completion is a record of a completed transmission, identified by its nonce.ID.

type Dispatcher

type Dispatcher struct {

	// Parity is the parity parameter to use in packet segments, this value
	// should be adjusted up and down in proportion with collected data on how
	// many packets led to receipt against the ratio of DataSent / TotalSent *
	// PingDivergence.
	Parity atomic.Uint32

	// DataSent is the amount of actual data sent in messages.
	DataSent *big.Int

	// DataReceived is the amount of actual data received in messages.
	DataReceived *big.Int

	// TotalSent is the amount of bytes that were needed to successfully
	// transmit, including packet overhead. This is the raw size of the
	// segmented packets that were sent.
	TotalSent *big.Int

	// TotalReceived is the amount of bytes that were needed to successfully
	// transmit, including packet overhead. This is the raw size of the
	// segmented packets that were received.
	TotalReceived *big.Int

	// ErrorEWMA is the exponential weighted moving average of the error rate in
	// transmissions to a peer.
	ErrorEWMA ewma.MovingAverage

	// Ping records the exponential wegihted moving average of the round trip time to a peer.
	Ping ewma.MovingAverage

	// PingDivergence represents the proportion of time between start of send
	// and receiving acknowledgement, versus the ping RTT being actively
	// measured concurrently. Shorter/equal time means it can reduce redundancy,
	// longer time needs to increase it.
	//
	// Combined with DataSent / TotalSent this guides the error correction
	// parameter for a given transmission that minimises latency. Onion routing
	// necessarily amplifies any latency so making a transmission get across
	// before/without retransmits is as good as the path can provide.
	PingDivergence ewma.MovingAverage

	// Duplex is the in-memory atomic FIFO that is used in process to read and write
	// to the network data processing pipeline.
	Duplex *transport.DuplexByteChan

	// Done stores the completed transmissions and their completion time. Used to
	// evaluate the quality of the connection via relative time delays.
	Done []Completion

	// PendingInbound stores records for transmissions that are in process but have
	// not either failed all pieces recieved yet.
	PendingInbound []*RxRecord

	// PendingOutbound keeps track of all the messages dispatched to the peer but not
	// yet acknowledged or timed out.
	PendingOutbound []*TxRecord

	// Partials stores the received message segments identified by the transmission
	// nonce.ID.
	Partials map[nonce.ID]packet.Packets

	// Prv is the list of keys used in this connection, in case a transmission gets
	// delayed extraordinarily long time it can still be decrypted. GC should be done
	// on these to keep no more than a dozen or so past keys.
	Prv []*crypto.Prv

	// KeyLock is a mutex specifically for accessing the Prv field above.
	KeyLock sync.Mutex

	// Conn is the transport connection that messages to this peer are sent to and
	// received from.
	Conn *transport.Conn

	// Mutex lock for (todo: maybe we don't need so many of these?)
	Mutex sync.Mutex

	// Ready is a signal channel that is closed when the dispatcher is operational.
	Ready qu.C
	// contains filtered or unexported fields
}

Dispatcher is a message splitter/joiner and error correction adjustment system that aims to minimise message latency by trading it for bandwidth especially to cope with radio connections.

Each connection has a dispatcher for handling messages, all relevant values in the structure relate to the connection to a single peer.

In its initial implementation by necessity reliable network transports are used, which means that the message transit time is increased for packet retransmits, thus a longer transit time than the ping indicates packet transmit failures.

PingDivergence is adjusted with each acknowledgement from the message transit time compared to the current ping, if it is within range of the ping RTT this doesn't affect the adjustment.

DataSent / TotalSent provides the ratio of redundancy the channel is using. TotalSent is not from the parameters at send but from acknowledgements of how much data was received before a message was reconstructed. Thus, it is used in combination with the PingDivergence to recompute the Parity parameter used for adjusting error correction redundancy as each message is decoded.

func NewDispatcher

func NewDispatcher(l *transport.Conn, ctx context.Context,
	ks *crypto.KeySet) (d *Dispatcher)

NewDispatcher initialises and starts up a Dispatcher with the provided connection, acquired by dialing or Accepting inbound connection from a peer.

func (*Dispatcher) GetRxRecordAndPartials

func (d *Dispatcher) GetRxRecordAndPartials(id nonce.ID) (rxr *RxRecord,
	packets packet.Packets)

GetRxRecordAndPartials returns the receive record and packets received so far for a message with a given nonce.ID.

func (*Dispatcher) Handle

func (d *Dispatcher) Handle(m slice.Bytes, rxr *RxRecord)

Handle the message. This is expected to be called with the mutex locked, so nothing in it should be trying to lock it.

func (*Dispatcher) HandlePing

func (d *Dispatcher) HandlePing(p ping.Result)

HandlePing adds a current ping result and combines it into the running exponential weighted moving average.

func (*Dispatcher) Mx

func (d *Dispatcher) Mx(fn func() bool) bool

Mx runs a closure with the dispatcher mutex locked which returns a bool that passes through to the result of the dispatcher.Mx function. Don't call anything that touches the dispatcher's Mutex in this closure.

func (*Dispatcher) ReKey

func (d *Dispatcher) ReKey()

ReKey sends a new key for the other party to use for sending future messages.

func (*Dispatcher) RecvFromConn

func (d *Dispatcher) RecvFromConn(m slice.Bytes)

RecvFromConn receives a new message from the connection, checks if it can be reassembled and if it can, dispatches it to the receiver channel.

func (*Dispatcher) RunGC

func (d *Dispatcher) RunGC()

RunGC runs the garbage collection for the dispatcher. Stale data and completed transmissions are purged from memory.

func (*Dispatcher) SendAck

func (d *Dispatcher) SendAck(rxr *RxRecord)

SendAck sends an acknowledgement record for a successful transmission of a message.

func (*Dispatcher) SendToConn

func (d *Dispatcher) SendToConn(m slice.Bytes) (pieces int)

SendToConn delivers a buffer to be sent over the connection, and returns the number of packets that were sent.

type NewKey

type NewKey struct {
	NewPubkey *crypto.Pub
}

NewKey delivers a new public key for the other side to use to encrypt messages.

func (*NewKey) Decode

func (k *NewKey) Decode(s *splice.Splice) (e error)

Decode a NewKey out of a splice with cursor pointing to the first byte after the magic.

func (*NewKey) Encode

func (k *NewKey) Encode(s *splice.Splice) (e error)

Encode a NewKey into the provided splice.

func (*NewKey) Len

func (k *NewKey) Len() int

Len returns the length of an NewKey message in bytes.

func (*NewKey) Magic

func (k *NewKey) Magic() string

Magic is the identifying 4 byte prefix of an NewKey in binary form.

func (*NewKey) Unwrap added in v0.1.19

func (k *NewKey) Unwrap() interface{}

Unwrap returns nil because there is no onion inside an NewKey.

type Onion

type Onion struct {
	slice.Bytes // contains an encoded Onion.
}

Onion is an onion, intended to be processed by the recipient, its layer decoded and the enclosed message received and processed appropriately.

func (*Onion) Decode

func (o *Onion) Decode(s *splice.Splice) (e error)

Decode an Onion out of a splice with cursor pointing to the first byte after the magic.

func (*Onion) Encode

func (o *Onion) Encode(s *splice.Splice) (e error)

Encode an Onion into the provided splice.

func (*Onion) Len

func (o *Onion) Len() int

func (*Onion) Magic

func (o *Onion) Magic() string

func (Onion) Unpack

func (o Onion) Unpack() (mu ont.Onion)

func (*Onion) Unwrap added in v0.1.19

func (o *Onion) Unwrap() interface{}

Unwrap invockes Unpack, which returns the onion.

type RxRecord

type RxRecord struct {
	ID nonce.ID
	// Hash is the hash of the reconstructed message received.
	Hash sha256.Hash
	// First is when the first packet was received.
	First time.Time
	// Last is when the last packet was received. A longer time than the current
	// ping RTT after First indicates retransmits.
	Last time.Time
	// Size of the message as found in the packet headers.
	Size uint64
	// Received is the number of bytes received upon reconstruction, including
	// packet overhead.
	Received uint64
	// Ping is the average ping RTT on the connection calculated at each packet
	// receive, used with the total message transmit time to estimate an
	// adjustment in the parity shards to be used in sending on this connection.
	Ping time.Duration
}

RxRecord is the details of a message reception and mostly forms the data sent in a message received acknowledgement. This data goes into an acknowledgement message.

type TxRecord

type TxRecord struct {
	ID nonce.ID
	// Hash is the record of the hash of the original message.
	sha256.Hash
	// First is the time the first piece was sent.
	First time.Time
	// Last is the time the last piece was sent.
	Last time.Time
	// Size is the number of bytes in the message payload.
	Size int
	// Ping is the recorded average current round trip time at send.
	Ping time.Duration
}

TxRecord is the details of a send operation in progress. This is used with the data received in the acknowledgement, which is a completed RxRecord..

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL