iface

package
v0.0.0-...-1e60831 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: NIST-PD-fallback Imports: 30 Imported by: 2

README

ndn-dpdk/iface

This package implements the face system, which provides network interfaces (faces) that can send and receive NDN packets. Each face has a ID, a uint16 number that identifies the face.

There are two transport-specific implementations:

  • EthFace communicates on Ethernet via DPDK ethdev.
  • SocketFace communicates on Unix/TCP/UDP tunnels via Go sockets.

Face System API

In C, public APIs are defined in term of FaceID. There are functions to query face status and to transmit a burst of packets. Notably, there isn't a function to receive packets; instead, RxLoop type is used for receiving packets.

In Go, Face type defines what methods a face must provide. Lower layer implementation invokes New to construct an object that satisfy this interface. Get function retrieves an existing Face by ID; List returns a list of all faces.

All faces are assumed to be point-to-point. Locator type identifies the endpoints of a face. It has a Scheme field that indicates the underlying network protocol, as well as other fields added by each transport-specific implementation. This type can be marshaled as JSON.

Receive Path

RxLoop type implements the receive path. Lower layer implementation places each face into one or more RxGroups, which are then added into RxLoops. RxLoop_Run function continually invokes RxGroup.rxBurst function to retrieve L2 frames arriving on these faces.

Each frame is decoded by FaceRx, which also performs NDNLPv2 reassembly on fragments using the Reassembler. Successfully decoded L3 Interest, Data, or Nack packets are passed to the upper layer (such as the forwarder's input thread) via an InputDemux of that packet type.

It's possible to receive packets arriving on one face in multiple RxLoop threads, by placing the face into multiple RxGroups. NDNLPv2 reassembly works only if all fragments of a network layer packets are arriving at the same RxGroup.

Send Path

The send path starts from Face_TxBurst function. It enqueues a burst of L3 packets in Face.txQueue (the "before-Tx queue"). Face_TxBurst function is thread-safe.

TxLoop type implements the send path. It dequeues a burst of L3 packets from Face.txQueue, calls FaceTx_Output to encode them into L2 frames. It then passes a burst of L2 frames to the lower layer implementation via Face_TxBurstFunc function.

Packet Queue

PktQueue type implements a packet queue that can operate in one of three modes.

Plain mode: a simple drop-tail queue.

Delay mode: a drop-tail queue that enforces a minimum amount of delay. This is useful for simulating a processing delay.

CoDel mode: a queue that uses the CoDel algorithm. This CoDel implementation differs from a standard implementation in that it dequeues packets in bursts instead of one at a time. The last packet in each burst is used to calculate the sojourn time, and at most one packet can be dropped in each burst. The CoDel_* functions are adapted from the CoDel implementation in the Linux kernel, under the BSD license (see codel.LICENSE).

Documentation

Overview

Package iface implements basics of the face system.

Index

Constants

View Source
const (
	// MaxBurstSize is the maximum and default burst size.
	MaxBurstSize = 64

	// MaxInputDemuxDest is the maximum number of destinations in InputDemux.
	MaxInputDemuxDest = 128

	// MaxFaceRxThreads is the maximum number of RX threads in a face.
	MaxFaceRxThreads = 8

	// MaxFaceTxThreads is the maximum number of TX threads in a face.
	MaxFaceTxThreads = 1

	// MinReassemblerCapacity is the minimum partial message store capacity in the reassembler.
	MinReassemblerCapacity = 4

	// MaxReassemblerCapacity is the maximum partial message store capacity in the reassembler.
	MaxReassemblerCapacity = 8192

	// DefaultReassemblerCapacity is the default partial message store capacity in the reassembler.
	DefaultReassemblerCapacity = 64

	// MinOutputQueueSize is the minimum packet queue capacity before the output thread.
	MinOutputQueueSize = 256

	// DefaultOutputQueueSize is the default packet queue capacity before the output thread.
	DefaultOutputQueueSize = 1024

	// MinMTU is the minimum value of Maximum Transmission Unit (MTU).
	MinMTU = 960

	// MaxMTU is the maximum value of Maximum Transmission Unit (MTU).
	MaxMTU = 65000
)
View Source
const (
	StateUnused = iota
	StateUp
	StateDown
	StateRemoved
)

State values.

View Source
const (
	MinID = 0x1000
	MaxID = 0xEFFF
)

ID limits.

View Source
const RoleRx = "RX"

RoleRx is the thread role for RxLoop.

View Source
const RoleTx = "TX"

RoleTx is the thread role for TxLoop.

Variables

View Source
var (
	GqlPktQueueInput    *graphql.InputObject
	GqlFaceType         *gqlserver.NodeType[Face]
	GqlRxCountersType   *graphql.Object
	GqlTxCountersType   *graphql.Object
	GqlCountersType     *graphql.Object
	GqlRxGroupInterface *gqlserver.Interface
)

GraphQL types.

View Source
var (
	// ChooseRxLoop customizes RxLoop selection in ActivateRxGroup.
	// Return nil to use default algorithm.
	ChooseRxLoop = func(rxg RxGroup) RxLoop { return nil }
)
View Source
var (
	// ChooseTxLoop customizes TxLoop selection in ActivateTxFace.
	// Return nil to use default algorithm.
	ChooseTxLoop = func(face Face) TxLoop { return nil }
)
View Source
var (
	// GqlCreateFaceAllowed indicates whether face creation via GraphQL is allowed.
	GqlCreateFaceAllowed bool
)
View Source
var RxParseFor = ndni.ParseForAny

RxParseFor indicates decoder parsing purpose for newly created faces.

Functions

func CloseAll

func CloseAll() error

CloseAll closes all faces, RxLoops, and TxLoops.

func DeactivateRxGroup

func DeactivateRxGroup(rxg RxGroup)

DeactivateRxGroup removes the RxGroup from the owning RxLoop.

func DeactivateTxFace

func DeactivateTxFace(face Face)

DeactivateTxFace removes the Face from the owning TxLoop.

func IsDown

func IsDown(id ID) bool

IsDown returns true if the face does not exist or is down.

func LocatorString

func LocatorString(loc Locator) string

LocatorString converts a locator to JSON string.

func OnCloseAll

func OnCloseAll(cb func()) (cancel func())

OnCloseAll registers a callback when CloseAll() is requested. Return a function that cancels the callback registration.

func OnFaceClosed

func OnFaceClosed(cb func(ID)) (cancel func())

OnFaceClosed registers a callback when a face is closed. Return a function that cancels the callback registration.

func OnFaceClosing

func OnFaceClosing(cb func(ID)) (cancel func())

OnFaceClosing registers a callback when a face is closing. Return a function that cancels the callback registration.

func OnFaceDown

func OnFaceDown(cb func(ID)) (cancel func())

OnFaceDown registers a callback when a face becomes DOWN. Return a function that cancels the callback registration.

func OnFaceNew

func OnFaceNew(cb func(ID)) (cancel func())

OnFaceNew registers a callback when a new face is created. Return a function that cancels the callback registration.

func OnFaceUp

func OnFaceUp(cb func(ID)) (cancel func())

OnFaceUp registers a callback when a face becomes UP. Return a function that cancels the callback registration.

func RegisterGqlRxGroupType

func RegisterGqlRxGroupType[T RxGroup](oc graphql.ObjectConfig) (object *graphql.Object)

RegisterGqlRxGroupType registers an implementation of GraphQL RxGroup interface.

func RegisterLocatorScheme

func RegisterLocatorScheme[T Locator](schemes ...string)

RegisterLocatorScheme registers Locator schemes.

func TxBurst

func TxBurst(id ID, pkts []*ndni.Packet)

TxBurst transmits a burst of L3 packets.

Types

type Config

type Config struct {
	// ReassemblerCapacity is the partial message store capacity in the reassembler.
	//
	// If this value is zero, it defaults to DefaultReassemblerCapacity.
	// Otherwise, it is clamped between MinReassemblerCapacity and MaxReassemblerCapacity.
	ReassemblerCapacity int `json:"reassemblerCapacity,omitempty"`

	// OutputQueueSize is the packet queue capacity before the output thread.
	//
	// The minimum is MinOutputQueueSize.
	// If this value is less than the minimum, it defaults to DefaultOutputQueueSize.
	// Otherwise, it is adjusted up to the next power of 2.
	OutputQueueSize int `json:"outputQueueSize,omitempty"`

	// MTU is the maximum size of outgoing NDNLP packets.
	// This excludes lower layer headers, such as Ethernet/VXLAN headers.
	//
	// Default is the lesser of MaxMTU and what's allowed by network interface and lower layer protocols.
	// If this is less than MinMTU or greater than the maximum, the face will fail to initialize.
	MTU int `json:"mtu,omitempty"`
	// contains filtered or unexported fields
}

Config contains face configuration.

func (*Config) ApplyDefaults

func (c *Config) ApplyDefaults()

ApplyDefaults applies defaults.

func (Config) WithMaxMTU

func (c Config) WithMaxMTU(max int) Config

WithMaxMTU returns a copy of Config with consideration of device MTU.

type Counters

type Counters struct {
	RxCounters
	TxCounters

	RxThreads []RxCounters `json:"rxThreads"`
}

Counters contains face counters.

func (Counters) String

func (cnt Counters) String() string

type Face

type Face interface {
	eal.WithNumaSocket
	WithInputDemuxes
	io.Closer

	// Ptr returns *C.Face pointer.
	Ptr() unsafe.Pointer

	// ID returns face ID.
	ID() ID

	// Locator returns a Locator describing face endpoints.
	Locator() Locator

	// Counters returns basic face counters.
	Counters() Counters

	// ExCounters returns extended counters.
	ExCounters() any

	// TxAlign returns TX packet alignment requirement.
	TxAlign() ndni.PacketTxAlign

	// EnableInputDemuxes enables per-face InputDemuxes.
	// They can then be retrieved with DemuxOf() method.
	EnableInputDemuxes()

	// SetDown changes face UP/DOWN state.
	SetDown(isDown bool)
}

Face represents a network layer face.

func Get

func Get(id ID) Face

Get retrieves face by ID. Returns nil if id is invalid.

func List

func List() (list []Face)

List returns a list of existing faces.

func New

func New(p NewParams) (face Face, e error)

New creates a Face.

type ID

type ID uint16

ID identifies a face. Zero ID is invalid.

func AllocID

func AllocID() (id ID)

AllocID allocates a random ID. Warning: endless loop if all possible IDs are used up.

func (ID) Valid

func (id ID) Valid() bool

Valid determines whether id is valid.

func (ID) ZapField

func (id ID) ZapField(key string) zap.Field

ZapField returns a zap.Field for logging.

type InitResult

type InitResult struct {
	// Face is a Face interface implementation that would be returned via Get(id).
	// It must embed the base Face passed to NewParams.Init().
	Face Face

	// TxLinearize indicates whether TX mbufs must be direct mbufs in contiguous memory.
	// See C.PacketTxAlign.linearize field.
	TxLinearize bool

	// TxBurst is a C function of C.Face_TxBurstFunc type.
	TxBurst unsafe.Pointer
}

InitResult contains results of NewParams.Init callback.

type InputDemux

type InputDemux C.InputDemux

InputDemux is a demultiplexer for incoming packets of one L3 type.

The zero value drops all packets.

func InputDemuxFromPtr

func InputDemuxFromPtr(ptr unsafe.Pointer) *InputDemux

InputDemuxFromPtr converts *C.InputDemux pointer to InputDemux.

func (*InputDemux) Counters

func (demux *InputDemux) Counters() (cnt InputDemuxCounters)

Counters reads counters.

func (*InputDemux) DestCounters

func (demux *InputDemux) DestCounters(i int) (cnt InputDemuxDestCounters)

DestCounters returns counters of i-th destination.

func (*InputDemux) Dispatch

func (demux *InputDemux) Dispatch(pkt *ndni.Packet) bool

Dispatch submits a packet for dispatching. Returns true if accepted, false if rejected (in this case, pkt must be freed by caller).

func (*InputDemux) InitDrop

func (demux *InputDemux) InitDrop()

InitDrop configures to drop all packets.

func (*InputDemux) InitFirst

func (demux *InputDemux) InitFirst()

InitFirst configures to pass all packets to the first and only destination.

func (*InputDemux) InitGenericHash

func (demux *InputDemux) InitGenericHash(nDest int)

InitGenericHash configures to dispatch according to hash of GenericNameComponents.

func (*InputDemux) InitNdt

func (demux *InputDemux) InitNdt() *ndt.Querier

InitNdt configures to dispatch via NDT lookup.

Caller must Init() the returned NDT querier to link with a valid NDT table and arrange to Clear() the NDT querier before freeing the InputDemux or changing dispatch function.

func (*InputDemux) InitRoundrobin

func (demux *InputDemux) InitRoundrobin(nDest int)

InitRoundrobin configures to pass all packets to each destination in a round-robin fashion.

func (*InputDemux) InitToken

func (demux *InputDemux) InitToken(offset int)

InitToken configures to dispatch according to specified octet in the PIT token.

func (*InputDemux) SetDest

func (demux *InputDemux) SetDest(i int, q *PktQueue)

SetDest assigns i-th destination.

type InputDemuxCounters

type InputDemuxCounters struct {
	NDrops uint64
}

InputDemuxCounters contains InputDemux counters.

type InputDemuxDestCounters

type InputDemuxDestCounters struct {
	NQueued  uint64
	NDropped uint64
}

InputDemuxDestCounters contains counters of an InputDemux destination.

type Locator

type Locator interface {
	// Scheme returns a string that identifies the type of this Locator.
	// Possible values must be registered through RegisterLocatorScheme().
	Scheme() string

	// Validate checks whether Locator fields are correct according to the chosen scheme.
	Validate() error

	// CreateFace creates a face from this Locator.
	CreateFace() (Face, error)
}

Locator identifies the endpoints of a face.

type LocatorWrapper

type LocatorWrapper struct {
	Locator
}

LocatorWrapper wraps Locator to facilitate JSON serialization.

func (LocatorWrapper) MarshalJSON

func (locw LocatorWrapper) MarshalJSON() (data []byte, e error)

MarshalJSON implements json.Marshaler interface.

func (*LocatorWrapper) UnmarshalJSON

func (locw *LocatorWrapper) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler interface.

type NewParams

type NewParams struct {
	Config

	// Socket indicates where to allocate memory.
	Socket eal.NumaSocket

	// SizeOfPriv is the size of C.FaceImpl.priv struct.
	SizeofPriv uintptr

	// Init callback is invoked after allocating C.FaceImpl.
	// This is always invoked on the main thread.
	Init func(f Face) (InitResult, error)

	// Start callback is invoked after data structure initialization.
	// It should activate the face in RxLoop and TxLoop.
	// This is always invoked on the main thread.
	Start func() error

	// Locator callback returns a Locator describing the face.
	Locator func() Locator

	// Stop callback is invoked to stop the face.
	// It should deactivate the face in RxLoop and TxLoop.
	// This is always invoked on the main thread.
	Stop func() error

	// Close callback is invoked after the face has been removed.
	// This is optional.
	// This is always invoked on the main thread.
	Close func() error

	// ExCounters callback returns extended counters.
	// This is optional.
	ExCounters func() any
}

NewParams contains parameters to New().

type PktQueue

type PktQueue C.PktQueue

PktQueue is a packet queue with simplified CoDel algorithm.

func PktQueueFromPtr

func PktQueueFromPtr(ptr unsafe.Pointer) (q *PktQueue)

PktQueueFromPtr converts *C.PktQueue to PktQueue.

func (*PktQueue) Close

func (q *PktQueue) Close() error

Close drains and deallocates the PktQueue. It will not free *C.PktQueue itself.

func (*PktQueue) Counters

func (q *PktQueue) Counters() (cnt PktQueueCounters)

Counters reads counters.

func (*PktQueue) Init

func (q *PktQueue) Init(cfg PktQueueConfig, socket eal.NumaSocket) error

Init initializes PktQueue.

func (*PktQueue) Pop

func (q *PktQueue) Pop(vec pktmbuf.Vector, now eal.TscTime) (count int, drop bool)

Pop dequeues a slice of packets.

func (*PktQueue) Ptr

func (q *PktQueue) Ptr() unsafe.Pointer

Ptr return *C.PktQueue pointer.

func (*PktQueue) Push

func (q *PktQueue) Push(vec pktmbuf.Vector) (nRej int)

Push enqueues a slice of packets. Timestamps must have been assigned on the packets. Caller must free rejected packets.

func (*PktQueue) Ring

func (q *PktQueue) Ring() *ringbuffer.Ring

Ring provides access to the internal ring.

type PktQueueConfig

type PktQueueConfig struct {
	// Ring capacity, must be power of 2, default 131072 with delay/CoDel or 4096 without
	Capacity int `json:"capacity,omitempty"`
	// dequeue burst size limit, default MaxBurstSize
	DequeueBurstSize int `json:"dequeueBurstSize,omitempty"`

	// if non-zero, enforce minimum delay, implies DisableCoDel
	Delay nnduration.Nanoseconds `json:"delay,omitempty"`
	// if true, disable CoDel algorithm
	DisableCoDel bool `json:"disableCoDel,omitempty"`
	// CoDel TARGET parameter, default 5ms
	Target nnduration.Nanoseconds `json:"target,omitempty"`
	// CoDel INTERVAL parameter, default 100ms
	Interval nnduration.Nanoseconds `json:"interval,omitempty"`
}

PktQueueConfig contains PktQueue configuration.

type PktQueueCounters

type PktQueueCounters struct {
	NDrops uint64 `json:"nDrops"`
}

PktQueueCounters contains PktQueue counters.

type RxCounters

type RxCounters struct {
	RxFrames    uint64 `json:"rxFrames" gqldesc:"RX total frames."`
	RxOctets    uint64 `json:"rxOctets" gqldesc:"RX total bytes."`
	RxInterests uint64 `json:"rxInterests" gqldesc:"RX Interest packets."`
	RxData      uint64 `json:"rxData" gqldesc:"RX Data packets."`
	RxNacks     uint64 `json:"rxNacks" gqldesc:"RX Nack packets."`

	RxDecodeErrs   uint64 `json:"rxDecodeErrs" gqldesc:"RX decode errors."`
	RxReassPackets uint64 `json:"rxReassPackets" gqldesc:"RX packets that were reassembled."`
	RxReassDrops   uint64 `json:"rxReassDrops" gqldesc:"RX frames that were dropped by reassembler."`
}

RxCounters contains face/queue RX counters.

func (RxCounters) String

func (cnt RxCounters) String() string

type RxGroup

type RxGroup interface {
	eal.WithNumaSocket

	// RxGroup returns *C.RxGroup pointer and description.
	RxGroup() (ptr unsafe.Pointer, desc string)

	// Faces returns a list of faces contained in this RxGroup.
	Faces() []Face
}

RxGroup is a receive channel for faces. An RxGroup may serve multiple faces; a face may have multiple RxGroups.

type RxGroupSingleFace

type RxGroupSingleFace interface {
	RxGroup
	RxGroupIsSingleFace()
}

RxGroupSingleFace indicates this kind of RxGroup can serve at most one face.

type RxLoop

type RxLoop interface {
	eal.WithNumaSocket
	ealthread.ThreadWithRole
	ealthread.ThreadWithLoadStat
	WithInputDemuxes
	io.Closer

	CountRxGroups() int
	List() []RxGroup
	Add(rxg RxGroup)
	Remove(rxg RxGroup)
}

RxLoop is the input thread that processes incoming packets on a set of RxGroups. Functions are non-thread-safe.

func ActivateRxGroup

func ActivateRxGroup(rxg RxGroup) RxLoop

ActivateRxGroup selects an RxLoop and adds the RxGroup to it. Returns chosen RxLoop.

The default logic selects among existing RxLoops for the least loaded one, preferably on the same NUMA socket as the RxGroup. In case no RxLoop exists, one is created and launched automatically. This does not respect LCoreAlloc, and may panic if no LCore is available.

This logic may be overridden via ChooseRxLoop.

func ListRxLoops

func ListRxLoops() []RxLoop

ListRxLoops returns a list of RxLoops.

func NewRxLoop

func NewRxLoop(socket eal.NumaSocket) RxLoop

NewRxLoop creates an RxLoop.

type State

type State uint8

State indicates face state.

func (State) String

func (st State) String() string

type TxCounters

type TxCounters struct {
	TxFrames    uint64 `json:"txFrames" gqldesc:"TX total frames."`
	TxOctets    uint64 `json:"txOctets" gqldesc:"TX total bytes."`
	TxInterests uint64 `json:"txInterests" gqldesc:"TX Interest packets."`
	TxData      uint64 `json:"txData" gqldesc:"TX Data packets."`
	TxNacks     uint64 `json:"txNacks" gqldesc:"TX Nack packets."`

	TxFragGood  uint64 `json:"txFragGood" gqldesc:"TX fragmented L3 packets."`
	TxFragBad   uint64 `json:"txFragBad" gqldesc:"TX fragmentation failures."`
	TxAllocErrs uint64 `json:"txAllocErrs" gqldesc:"TX allocation errors."`
	TxDropped   uint64 `json:"txDropped" gqldesc:"TX dropped L2 frames due to full queue."`
}

TxCounters contains face/queue TX counters.

func (TxCounters) String

func (cnt TxCounters) String() string

type TxLoop

type TxLoop interface {
	eal.WithNumaSocket
	ealthread.ThreadWithRole
	ealthread.ThreadWithLoadStat
	io.Closer

	CountFaces() int
	Add(face Face)
	Remove(face Face)
}

TxLoop is the output thread that processes outgoing packets on a set of TxGroups. Functions are non-thread-safe.

func ActivateTxFace

func ActivateTxFace(face Face) TxLoop

ActivateTxFace selects an TxLoop and adds the face to it. Returns chosen TxLoop.

The default logic selects among existing TxLoops for the least loaded one, preferably on the same NUMA socket as the face. In case no TxLoop exists, one is created and launched automatically. This does not respect LCoreAlloc, and may panic if no LCore is available.

This logic may be overridden via ChooseTxLoop.

func ListTxLoops

func ListTxLoops() (list []TxLoop)

ListTxLoops returns a list of TxLoops.

func NewTxLoop

func NewTxLoop(socket eal.NumaSocket) TxLoop

NewTxLoop creates a TxLoop.

type WithInputDemuxes

type WithInputDemuxes interface {
	// DemuxOf returns InputDemux of specified PktType.
	// t must be a valid L3 type (not ndni.PktFragment).
	DemuxOf(t ndni.PktType) *InputDemux
}

WithInputDemuxes is an object that contains per L3 type InputDemux.

Directories

Path Synopsis
Package ethface implements Ethernet-based faces.
Package ethface implements Ethernet-based faces.
Package ethport implements faces using DPDK Ethernet device as transport.
Package ethport implements faces using DPDK Ethernet device as transport.
Package ifacetestenv provides a test fixture for a face type.
Package ifacetestenv provides a test fixture for a face type.
Package intface implements an internal face for internal applications.
Package intface implements an internal face for internal applications.
Package memifface implements memif faces.
Package memifface implements memif faces.
Package socketface implements UDP/TCP socket faces using Go net.Conn type.
Package socketface implements UDP/TCP socket faces using Go net.Conn type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL