coord

package
v2.0.0-...-03adce6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2023 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// IncludeQueryID is the id for connectivity checks performed by the include state machine.
	// This identifier is used for routing network responses to the state machine.
	IncludeQueryID = coordt.QueryID("include")

	// ProbeQueryID is the id for connectivity checks performed by the probe state machine
	// This identifier is used for routing network responses to the state machine.
	ProbeQueryID = coordt.QueryID("probe")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Behaviour

type Behaviour[I BehaviourEvent, O BehaviourEvent] interface {
	// Ready returns a channel that signals when the behaviour is ready to perform work.
	Ready() <-chan struct{}

	// Notify informs the behaviour of an event. The behaviour may perform the event
	// immediately and queue the result, causing the behaviour to become ready.
	// It is safe to call Notify from the Perform method.
	Notify(ctx context.Context, ev I)

	// Perform gives the behaviour the opportunity to perform work or to return a queued
	// result as an event.
	Perform(ctx context.Context) (O, bool)
}

type BehaviourEvent

type BehaviourEvent interface {
	// contains filtered or unexported methods
}

type BrdcstCommand

type BrdcstCommand interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

BrdcstCommand is a type of BehaviourEvent that instructs a [BrdcstBehaviour] to perform an action.

type BufferedRoutingNotifier

type BufferedRoutingNotifier struct {
	// contains filtered or unexported fields
}

A BufferedRoutingNotifier is a RoutingNotifier that buffers RoutingNotification events and provides methods to expect occurrences of specific events. It is designed for use in a test environment.

func NewBufferedRoutingNotifier

func NewBufferedRoutingNotifier() *BufferedRoutingNotifier

func (*BufferedRoutingNotifier) Expect

func (*BufferedRoutingNotifier) ExpectRoutingRemoved

func (w *BufferedRoutingNotifier) ExpectRoutingRemoved(ctx context.Context, id kadt.PeerID) (*EventRoutingRemoved, error)

ExpectRoutingRemoved blocks until an EventRoutingRemoved event is seen for the specified peer id

func (*BufferedRoutingNotifier) ExpectRoutingUpdated

func (w *BufferedRoutingNotifier) ExpectRoutingUpdated(ctx context.Context, id kadt.PeerID) (*EventRoutingUpdated, error)

ExpectRoutingUpdated blocks until an EventRoutingUpdated event is seen for the specified peer id

func (*BufferedRoutingNotifier) Notify

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

A Coordinator coordinates the state machines that comprise a Kademlia DHT

func (*Coordinator) AddNodes

func (c *Coordinator) AddNodes(ctx context.Context, ids []kadt.PeerID) error

AddNodes suggests new DHT nodes to be added to the routing table. If the routing table is updated as a result of this operation an EventRoutingUpdated notification is emitted on the routing notification channel.

func (*Coordinator) Bootstrap

func (c *Coordinator) Bootstrap(ctx context.Context, seeds []kadt.PeerID) error

Bootstrap instructs the dht to begin bootstrapping the routing table.

func (*Coordinator) BroadcastRecord

func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) error

func (*Coordinator) BroadcastStatic

func (c *Coordinator) BroadcastStatic(ctx context.Context, msg *pb.Message, seeds []kadt.PeerID) error

func (*Coordinator) Close

func (c *Coordinator) Close() error

Close cleans up all resources associated with this Coordinator.

func (*Coordinator) GetClosestNodes

func (c *Coordinator) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error)

GetClosestNodes requests the n closest nodes to the key from the node's local routing table.

func (*Coordinator) GetNode

func (c *Coordinator) GetNode(ctx context.Context, id kadt.PeerID) (coordt.Node, error)

GetNode retrieves the node associated with the given node id from the DHT's local routing table. If the node isn't found in the table, it returns ErrNodeNotFound.

func (*Coordinator) GetValue

func (c *Coordinator) GetValue(ctx context.Context, k kadt.Key) (coordt.Value, error)

GetValue requests that the node return any value associated with the supplied key. If the node does not have a value for the key it returns ErrValueNotFound.

func (*Coordinator) ID

func (c *Coordinator) ID() kadt.PeerID

func (*Coordinator) NotifyConnectivity

func (c *Coordinator) NotifyConnectivity(ctx context.Context, id kadt.PeerID)

NotifyConnectivity notifies the coordinator that a peer has passed a connectivity check which means it is connected and supports finding closer nodes

func (*Coordinator) NotifyNonConnectivity

func (c *Coordinator) NotifyNonConnectivity(ctx context.Context, id kadt.PeerID)

NotifyNonConnectivity notifies the coordinator that a peer has failed a connectivity check which means it is not connected and/or it doesn't support finding closer nodes

func (*Coordinator) PutValue

func (c *Coordinator) PutValue(ctx context.Context, r coordt.Value, q int) error

PutValue requests that the node stores a value to be associated with the supplied key. If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.

func (*Coordinator) QueryClosest

func (c *Coordinator) QueryClosest(ctx context.Context, target kadt.Key, fn coordt.QueryFunc, numResults int) ([]kadt.PeerID, coordt.QueryStats, error)

QueryClosest starts a query that attempts to find the closest nodes to the target key. It returns the closest nodes found to the target key and statistics on the actions of the query.

The supplied [QueryFunc] is called after each successful request to a node with the ID of the node, the response received from the find nodes request made to the node and the current query stats. The query terminates when [QueryFunc] returns an error or when the query has visited the configured minimum number of closest nodes (default 20)

numResults specifies the minimum number of nodes to successfully contact before considering iteration complete. The query is considered to be exhausted when it has received responses from at least this number of nodes and there are no closer nodes remaining to be contacted. A default of 20 is used if this value is less than 1.

func (*Coordinator) QueryMessage

func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coordt.QueryFunc, numResults int) ([]kadt.PeerID, coordt.QueryStats, error)

QueryMessage starts a query that iterates over the closest nodes to the target key in the supplied message. The message is sent to each node that is visited.

The supplied [QueryFunc] is called after each successful request to a node with the ID of the node, the response received from the find nodes request made to the node and the current query stats. The query terminates when [QueryFunc] returns an error or when the query has visited the configured minimum number of closest nodes (default 20)

numResults specifies the minimum number of nodes to successfully contact before considering iteration complete. The query is considered to be exhausted when it has received responses from at least this number of nodes and there are no closer nodes remaining to be contacted. A default of 20 is used if this value is less than 1.

func (*Coordinator) SetRoutingNotifier

func (c *Coordinator) SetRoutingNotifier(rn RoutingNotifier)

type CoordinatorConfig

type CoordinatorConfig struct {
	// Clock is a clock that may replaced by a mock when testing
	Clock clock.Clock

	// Logger is a structured logger that will be used when logging.
	Logger *slog.Logger

	// MeterProvider is the the meter provider to use when initialising metric instruments.
	MeterProvider metric.MeterProvider

	// TracerProvider is the tracer provider to use when initialising tracing
	TracerProvider trace.TracerProvider

	// Routing is the configuration used for the [RoutingBehaviour] which maintains the health of the routing table.
	Routing RoutingConfig

	// Query is the configuration used for the [PooledQueryBehaviour] which manages the execution of user queries.
	Query PooledQueryConfig
}

func DefaultCoordinatorConfig

func DefaultCoordinatorConfig() *CoordinatorConfig

func (*CoordinatorConfig) Validate

func (cfg *CoordinatorConfig) Validate() error

Validate checks the configuration options and returns an error if any have invalid values.

type EventAddNode

type EventAddNode struct {
	NodeID kadt.PeerID
}

EventAddNode notifies the routing behaviour of a potential new peer.

type EventBootstrapFinished

type EventBootstrapFinished struct {
	Stats query.QueryStats
}

EventBootstrapFinished is emitted by the coordinator when a bootstrap has finished, either through running to completion or by being canceled.

type EventBroadcastFinished

type EventBroadcastFinished struct {
	QueryID   coordt.QueryID
	Contacted []kadt.PeerID
	Errors    map[string]struct {
		Node kadt.PeerID
		Err  error
	}
}

EventBroadcastFinished is emitted by the coordinator when a broadcasting a record to the network has finished, either through running to completion or by being canceled.

type EventGetCloserNodesFailure

type EventGetCloserNodesFailure struct {
	QueryID coordt.QueryID
	To      kadt.PeerID // To is the peer that the GetCloserNodes request was sent to.
	Target  kadt.Key
	Err     error
}

EventGetCloserNodesFailure notifies a behaviour that a GetCloserNodes request, initiated by an EventOutboundGetCloserNodes event has failed to produce a valid response.

type EventGetCloserNodesSuccess

type EventGetCloserNodesSuccess struct {
	QueryID     coordt.QueryID
	To          kadt.PeerID // To is the peer that the GetCloserNodes request was sent to.
	Target      kadt.Key
	CloserNodes []kadt.PeerID
}

EventGetCloserNodesSuccess notifies a behaviour that a GetCloserNodes request, initiated by an EventOutboundGetCloserNodes event has produced a successful response.

type EventNotifyConnectivity

type EventNotifyConnectivity struct {
	NodeID kadt.PeerID
}

EventNotifyConnectivity notifies a behaviour that a peer's connectivity and support for finding closer nodes has been confirmed such as from a successful query response or an inbound query. This should not be used for general connections to the host but only when it is confirmed that the peer responds to requests for closer nodes.

type EventNotifyNonConnectivity

type EventNotifyNonConnectivity struct {
	NodeID kadt.PeerID
}

EventNotifyNonConnectivity notifies a behaviour that a peer does not have connectivity and/or does not support finding closer nodes is known.

type EventOutboundGetCloserNodes

type EventOutboundGetCloserNodes struct {
	QueryID coordt.QueryID
	To      kadt.PeerID
	Target  kadt.Key
	Notify  Notify[BehaviourEvent]
}

type EventOutboundSendMessage

type EventOutboundSendMessage struct {
	QueryID coordt.QueryID
	To      kadt.PeerID
	Message *pb.Message
	Notify  Notify[BehaviourEvent]
}

type EventQueryFinished

type EventQueryFinished struct {
	QueryID      coordt.QueryID
	Stats        query.QueryStats
	ClosestNodes []kadt.PeerID
}

EventQueryFinished is emitted by the coordinator when a query has finished, either through running to completion or by being canceled.

type EventQueryProgressed

type EventQueryProgressed struct {
	QueryID  coordt.QueryID
	NodeID   kadt.PeerID
	Response *pb.Message
	Stats    query.QueryStats
}

EventQueryProgressed is emitted by the coordinator when a query has received a response from a node.

type EventRoutingPoll

type EventRoutingPoll struct{}

EventRoutingPoll notifies a routing behaviour that it may proceed with any pending work.

type EventRoutingRemoved

type EventRoutingRemoved struct {
	NodeID kadt.PeerID
}

EventRoutingRemoved is emitted by the coordinator when new node has been removed from the routing table.

type EventRoutingUpdated

type EventRoutingUpdated struct {
	NodeID kadt.PeerID
}

EventRoutingUpdated is emitted by the coordinator when a new node has been verified and added to the routing table.

type EventSendMessageFailure

type EventSendMessageFailure struct {
	QueryID coordt.QueryID
	Request *pb.Message
	To      kadt.PeerID // To is the peer that the SendMessage request was sent to.
	Target  kadt.Key
	Err     error
}

EventSendMessageFailure notifies a behaviour that a SendMessage request, initiated by an EventOutboundSendMessage event has failed to produce a valid response.

type EventSendMessageSuccess

type EventSendMessageSuccess struct {
	QueryID     coordt.QueryID
	Request     *pb.Message
	To          kadt.PeerID // To is the peer that the SendMessage request was sent to.
	Response    *pb.Message
	CloserNodes []kadt.PeerID
}

EventSendMessageSuccess notifies a behaviour that a SendMessage request, initiated by an EventOutboundSendMessage event has produced a successful response.

type EventStartBootstrap

type EventStartBootstrap struct {
	SeedNodes []kadt.PeerID
}

type EventStartBroadcast

type EventStartBroadcast struct {
	QueryID coordt.QueryID
	Target  kadt.Key
	Message *pb.Message
	Seed    []kadt.PeerID
	Config  brdcst.Config
	Notify  NotifyCloser[BehaviourEvent]
}

EventStartBroadcast starts a new

type EventStartFindCloserQuery

type EventStartFindCloserQuery struct {
	QueryID           coordt.QueryID
	Target            kadt.Key
	KnownClosestNodes []kadt.PeerID
	Notify            NotifyCloser[BehaviourEvent]
	NumResults        int // the minimum number of nodes to successfully contact before considering iteration complete
}

type EventStartMessageQuery

type EventStartMessageQuery struct {
	QueryID           coordt.QueryID
	Target            kadt.Key
	Message           *pb.Message
	KnownClosestNodes []kadt.PeerID
	Notify            NotifyCloser[BehaviourEvent]
	NumResults        int // the minimum number of nodes to successfully contact before considering iteration complete
}

type EventStopQuery

type EventStopQuery struct {
	QueryID coordt.QueryID
}

type NetworkBehaviour

type NetworkBehaviour struct {
	// contains filtered or unexported fields
}

func NewNetworkBehaviour

func NewNetworkBehaviour(rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NetworkBehaviour

func (*NetworkBehaviour) Notify

func (b *NetworkBehaviour) Notify(ctx context.Context, ev BehaviourEvent)

func (*NetworkBehaviour) Perform

func (b *NetworkBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool)

func (*NetworkBehaviour) Ready

func (b *NetworkBehaviour) Ready() <-chan struct{}

type NetworkCommand

type NetworkCommand interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

NetworkCommand is a type of BehaviourEvent that instructs a NetworkBehaviour to perform an action.

type NodeHandler

type NodeHandler struct {
	// contains filtered or unexported fields
}

func NewNodeHandler

func NewNodeHandler(self kadt.PeerID, rtr coordt.Router[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *NodeHandler

func (*NodeHandler) GetClosestNodes

func (h *NodeHandler) GetClosestNodes(ctx context.Context, k kadt.Key, n int) ([]coordt.Node, error)

GetClosestNodes requests the n closest nodes to the key from the node's local routing table. The node may return fewer nodes than requested.

func (*NodeHandler) GetValue

func (h *NodeHandler) GetValue(ctx context.Context, key kadt.Key) (coordt.Value, error)

GetValue requests that the node return any value associated with the supplied key. If the node does not have a value for the key it returns ErrValueNotFound.

func (*NodeHandler) ID

func (h *NodeHandler) ID() kadt.PeerID

func (*NodeHandler) Notify

func (h *NodeHandler) Notify(ctx context.Context, ev NodeHandlerRequest)

func (*NodeHandler) PutValue

func (h *NodeHandler) PutValue(ctx context.Context, r coordt.Value, q int) error

PutValue requests that the node stores a value to be associated with the supplied key. If the node cannot or chooses not to store the value for the key it returns ErrValueNotAccepted.

type NodeHandlerRequest

type NodeHandlerRequest interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

type NodeHandlerResponse

type NodeHandlerResponse interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

type Notify

type Notify[E BehaviourEvent] interface {
	Notify(ctx context.Context, ev E)
}

Notify is the interface that a components to implement to be notified of BehaviourEvent's.

type NotifyCloser

type NotifyCloser[E BehaviourEvent] interface {
	Notify[E]
	Close()
}

type NotifyFunc

type NotifyFunc[E BehaviourEvent] func(ctx context.Context, ev E)

func (NotifyFunc[E]) Notify

func (f NotifyFunc[E]) Notify(ctx context.Context, ev E)

type PooledBroadcastBehaviour

type PooledBroadcastBehaviour struct {
	// contains filtered or unexported fields
}

func NewPooledBroadcastBehaviour

func NewPooledBroadcastBehaviour(brdcstPool *brdcst.Pool[kadt.Key, kadt.PeerID, *pb.Message], logger *slog.Logger, tracer trace.Tracer) *PooledBroadcastBehaviour

func (*PooledBroadcastBehaviour) Notify

func (*PooledBroadcastBehaviour) Perform

func (*PooledBroadcastBehaviour) Ready

func (b *PooledBroadcastBehaviour) Ready() <-chan struct{}

type PooledQueryBehaviour

type PooledQueryBehaviour struct {
	// contains filtered or unexported fields
}

func NewPooledQueryBehaviour

func NewPooledQueryBehaviour(self kadt.PeerID, cfg *PooledQueryConfig) (*PooledQueryBehaviour, error)

func (*PooledQueryBehaviour) Notify

func (*PooledQueryBehaviour) Perform

func (*PooledQueryBehaviour) Ready

func (p *PooledQueryBehaviour) Ready() <-chan struct{}

type PooledQueryConfig

type PooledQueryConfig struct {
	// Clock is a clock that may replaced by a mock when testing
	Clock clock.Clock

	// Logger is a structured logger that will be used when logging.
	Logger *slog.Logger

	// Tracer is the tracer that should be used to trace execution.
	Tracer trace.Tracer

	// Concurrency is the maximum number of queries that may be waiting for message responses at any one time.
	Concurrency int

	// Timeout the time to wait before terminating a query that is not making progress.
	Timeout time.Duration

	// RequestConcurrency is the maximum number of concurrent requests that each query may have in flight.
	RequestConcurrency int

	// RequestTimeout is the timeout queries should use for contacting a single node
	RequestTimeout time.Duration
}

func DefaultPooledQueryConfig

func DefaultPooledQueryConfig() *PooledQueryConfig

func (*PooledQueryConfig) Validate

func (cfg *PooledQueryConfig) Validate() error

Validate checks the configuration options and returns an error if any have invalid values.

type QueryCommand

type QueryCommand interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

QueryCommand is a type of BehaviourEvent that instructs a [QueryBehaviour] to perform an action.

type RoutingBehaviour

type RoutingBehaviour struct {
	// contains filtered or unexported fields
}

A RoutingBehaviour provides the behaviours for bootstrapping and maintaining a DHT's routing table.

func ComposeRoutingBehaviour

ComposeRoutingBehaviour creates a RoutingBehaviour composed of the supplied state machines. The state machines are assumed to pre-configured so any RoutingConfig values relating to the state machines will not be applied.

func (*RoutingBehaviour) Notify

func (r *RoutingBehaviour) Notify(ctx context.Context, ev BehaviourEvent)

func (*RoutingBehaviour) Perform

func (r *RoutingBehaviour) Perform(ctx context.Context) (BehaviourEvent, bool)

func (*RoutingBehaviour) Ready

func (r *RoutingBehaviour) Ready() <-chan struct{}

type RoutingCommand

type RoutingCommand interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

RoutingCommand is a type of BehaviourEvent that instructs a RoutingBehaviour to perform an action.

type RoutingConfig

type RoutingConfig struct {
	// Clock is a clock that may replaced by a mock when testing
	Clock clock.Clock

	// Logger is a structured logger that will be used when logging.
	Logger *slog.Logger

	// Tracer is the tracer that should be used to trace execution.
	Tracer trace.Tracer

	// Meter is the meter that should be used to record metrics.
	Meter metric.Meter

	// BootstrapTimeout is the time the behaviour should wait before terminating a bootstrap if it is not making progress.
	BootstrapTimeout time.Duration

	// BootstrapRequestConcurrency is the maximum number of concurrent requests that the behaviour may have in flight during bootstrap.
	BootstrapRequestConcurrency int

	// BootstrapRequestTimeout is the timeout the behaviour should use when attempting to contact a node during bootstrap.
	BootstrapRequestTimeout time.Duration

	// ConnectivityCheckTimeout is the timeout the behaviour should use when performing a connectivity check.
	ConnectivityCheckTimeout time.Duration

	// ProbeRequestConcurrency is the maximum number of concurrent requests that the behaviour may have in flight while performing
	// connectivity checks for nodes in the routing table.
	ProbeRequestConcurrency int

	// ProbeCheckInterval is the time interval the behaviour should use between connectivity checks for the same node in the routing table.
	ProbeCheckInterval time.Duration

	// IncludeQueueCapacity is the maximum number of nodes the behaviour should keep queued as candidates for inclusion in the routing table.
	IncludeQueueCapacity int

	// IncludeRequestConcurrency is the maximum number of concurrent requests that the behaviour may have in flight while performing
	// connectivity checks for nodes in the inclusion candidate queue.
	IncludeRequestConcurrency int

	// ExploreTimeout is the time the behaviour should wait before terminating an exploration of a routing table bucket if it is not making progress.
	ExploreTimeout time.Duration

	// ExploreRequestConcurrency is the maximum number of concurrent requests that the behaviour may have in flight while exploring the
	// network to increase routing table occupancy.
	ExploreRequestConcurrency int

	// ExploreRequestTimeout is the timeout the behaviour should use when attempting to contact a node while exploring the
	// network to increase routing table occupancy.
	ExploreRequestTimeout time.Duration

	// ExploreMaximumCpl is the maximum CPL (common prefix length) the behaviour should explore to increase routing table occupancy.
	// All CPLs from this value to zero will be explored on a repeating schedule.
	ExploreMaximumCpl int

	// ExploreInterval is the base time interval the behaviour should leave between explorations of the same CPL.
	// See the documentation for [routing.DynamicExploreSchedule] for the precise formula used to calculate explore intervals.
	ExploreInterval time.Duration

	// ExploreIntervalMultiplier is a factor that is applied to the base time interval for CPLs lower than the maximum to increase the delay between
	// explorations for lower CPLs.
	// See the documentation for [routing.DynamicExploreSchedule] for the precise formula used to calculate explore intervals.
	ExploreIntervalMultiplier float64

	// ExploreIntervalJitter is a factor that is used to increase the calculated interval for an exploratiion by a small random amount.
	// It must be between 0 and 0.05. When zero, no jitter is applied.
	// See the documentation for [routing.DynamicExploreSchedule] for the precise formula used to calculate explore intervals.
	ExploreIntervalJitter float64
}

func DefaultRoutingConfig

func DefaultRoutingConfig() *RoutingConfig

func (*RoutingConfig) Validate

func (cfg *RoutingConfig) Validate() error

Validate checks the configuration options and returns an error if any have invalid values.

type RoutingNotification

type RoutingNotification interface {
	BehaviourEvent
	// contains filtered or unexported methods
}

type RoutingNotifier

type RoutingNotifier interface {
	Notify(context.Context, RoutingNotification)
}

type Telemetry

type Telemetry struct {
	Tracer trace.Tracer
}

Telemetry is the struct that holds a reference to all metrics and the tracer used by the coordinator and its components. Make sure to also register the [MeterProviderOpts] with your custom or the global metric.MeterProvider.

func NewTelemetry

func NewTelemetry(meterProvider metric.MeterProvider, tracerProvider trace.TracerProvider) (*Telemetry, error)

NewTelemetry initializes a Telemetry struct with the given meter and tracer providers.

type Waiter

type Waiter[E BehaviourEvent] struct {
	// contains filtered or unexported fields
}

A Waiter is a Notifiee whose Notify method forwards the notified event to a channel which a client can wait on.

func NewWaiter

func NewWaiter[E BehaviourEvent]() *Waiter[E]

func (*Waiter[E]) Chan

func (w *Waiter[E]) Chan() <-chan WaiterEvent[E]

func (*Waiter[E]) Close

func (w *Waiter[E]) Close()

Close signals that the waiter should not forward and further calls to Notify. It closes the waiter channel so a client selecting on it will receive the close operation.

func (*Waiter[E]) Notify

func (w *Waiter[E]) Notify(ctx context.Context, ev E)

type WaiterEvent

type WaiterEvent[E BehaviourEvent] struct {
	Ctx   context.Context
	Event E
}

type WorkQueue

type WorkQueue[E BehaviourEvent] struct {
	// contains filtered or unexported fields
}

WorkQueue is buffered queue of work to be performed. The queue automatically drains the queue sequentially by calling a WorkQueueFunc for each work item, passing the original context and event.

func NewWorkQueue

func NewWorkQueue[E BehaviourEvent](fn WorkQueueFunc[E]) *WorkQueue[E]

func (*WorkQueue[E]) Enqueue

func (w *WorkQueue[E]) Enqueue(ctx context.Context, cmd E) error

Enqueue queues work to be perfomed. It will block if the queue has reached its maximum capacity for pending work. While blocking it will return a context cancellation error if the work item's context is cancelled.

type WorkQueueFunc

type WorkQueueFunc[E BehaviourEvent] func(context.Context, E) bool

Directories

Path Synopsis
Package brdcst contains state machines that implement algorithms for broadcasting records into the DHT network.
Package brdcst contains state machines that implement algorithms for broadcasting records into the DHT network.
internal
tiny
Package tiny implements Kademlia types suitable for tiny test networks
Package tiny implements Kademlia types suitable for tiny test networks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL