narwhal

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

README

Narwhal

A production-ready Go implementation of the Narwhal DAG-based mempool protocol.

Go Version Tests License

Features

  • DAG-based mempool - Parallel transaction dissemination across all validators
  • No leader bottleneck - All validators broadcast simultaneously
  • Reliable availability - Certificates guarantee data availability (2f+1 acknowledgments)
  • Consensus-agnostic - Integrates with any BFT consensus (HotStuff-2, Bullshark, etc.)
  • Generic design - Supports custom hash and transaction types
  • Reconfiguration - Epoch-based validator set changes

Installation

go get github.com/edgedlt/narwhal

Requirements: Go 1.23+

Quick Start

package main

import (
    "github.com/edgedlt/narwhal"
    "github.com/edgedlt/narwhal/timer"
    "go.uber.org/zap"
)

func main() {
    // Configure Narwhal
    cfg, _ := narwhal.NewConfig[MyHash, *MyTransaction](
        narwhal.WithMyIndex[MyHash, *MyTransaction](0),
        narwhal.WithValidators[MyHash, *MyTransaction](validators),
        narwhal.WithSigner[MyHash, *MyTransaction](signer),
        narwhal.WithStorage[MyHash, *MyTransaction](storage),
        narwhal.WithNetwork[MyHash, *MyTransaction](network),
        narwhal.WithTimer[MyHash, *MyTransaction](timer.NewRealTimer()),
        narwhal.WithLogger[MyHash, *MyTransaction](zap.NewProduction()),
    )

    // Create and start
    nw, _ := narwhal.New(cfg, myHashFunc)
    nw.Start()
    defer nw.Stop()

    // Submit transactions
    nw.AddTransaction(tx)

    // Integration with consensus
    certs := nw.GetCertifiedVertices()  // For block proposals
    nw.MarkCommitted(certs)             // After commit
    txs, _ := nw.GetTransactions(certs) // For execution
}

Architecture

narwhal/
├── narwhal.go           # Core mempool coordinator
├── config.go            # Configuration with functional options
├── types.go             # Core interfaces (Hash, Transaction, Storage, Network)
├── dag.go               # DAG structure and certificate management
├── proposer.go          # Header proposal logic
├── fetcher.go           # Missing data fetching
├── gc.go                # Garbage collection
├── validation.go        # Header and certificate validation
├── reconfiguration.go   # Epoch transitions
├── hooks.go             # Observability callbacks
└── timer/               # Timer implementations (Real, Mock)

Interfaces

Implement these interfaces to integrate with your blockchain:

// Your hash type
type Hash interface {
    Bytes() []byte
    Equals(other Hash) bool
    String() string
}

// Your transaction type
type Transaction[H Hash] interface {
    Hash() H
    Bytes() []byte
}

// Validator set
type ValidatorSet interface {
    Count() int
    GetByIndex(index uint16) (PublicKey, error)
    Contains(index uint16) bool
    F() int  // max Byzantine faults: (n-1)/3
}

// Persistent storage
type Storage[H Hash, T Transaction[H]] interface {
    GetBatch(digest H) (*Batch[H, T], error)
    PutBatch(batch *Batch[H, T]) error
    GetHeader(digest H) (*Header[H], error)
    PutHeader(header *Header[H]) error
    GetCertificate(digest H) (*Certificate[H], error)
    PutCertificate(cert *Certificate[H]) error
    GetHighestRound() (uint64, error)
    DeleteBeforeRound(round uint64) error
    // ... see types.go for full interface
}

// Network transport
type Network[H Hash, T Transaction[H]] interface {
    BroadcastBatch(batch *Batch[H, T])
    BroadcastHeader(header *Header[H])
    SendVote(validatorIndex uint16, vote *HeaderVote[H])
    BroadcastCertificate(cert *Certificate[H])
    FetchBatch(from uint16, digest H) (*Batch[H, T], error)
    Receive() <-chan Message[H, T]
    // ... see types.go for full interface
}

Protocol Overview

Narwhal separates data availability from ordering:

  1. Workers batch transactions and broadcast to all validators
  2. Primary creates headers referencing batches and parent certificates
  3. Validators vote on headers after verifying data availability
  4. Certificates form when 2f+1 votes are collected
  5. DAG stores certified vertices for consensus to order

Data Availability: A certificate proves 2f+1 validators have the referenced data.

Consensus Integration: Your consensus protocol orders certificates; Narwhal handles dissemination.

Configuration

Option Default Description
WithWorkerCount 4 Parallel workers for batching
WithBatchSize 500 Max transactions per batch
WithBatchTimeout 100ms Max wait before creating batch
WithHeaderTimeout 500ms Interval between header proposals
WithGCInterval 50 Rounds between garbage collection
WithMaxRoundsGap 10 Max rounds ahead to accept

See Configuration Guide for tuning and implementation guides.

Testing

# Run all tests
go test ./...

# Run with race detection
go test -race ./...

# Run benchmarks
go test -bench=. -benchmem

References

License

Apache 2.0 - see LICENSE for details.

Documentation

Overview

Package narwhal implements a DAG-based mempool for BFT consensus. It separates data availability from ordering, enabling parallel transaction broadcast across validators without leader bottlenecks.

Index

Constants

View Source
const (
	// DefaultMaxBatchRefs is the maximum number of batch references in a header.
	// Prevents memory exhaustion from headers with too many refs.
	DefaultMaxBatchRefs = 1000

	// DefaultMaxParents is the maximum number of parent certificates in a header.
	// Limited by validator count in practice, but enforced for safety.
	DefaultMaxParents = 200

	// DefaultMaxSignatures is the maximum number of signatures in a certificate.
	// Should not exceed validator count.
	DefaultMaxSignatures = 200

	// DefaultMaxTransactionsPerBatch is the maximum transactions in a single batch.
	// Prevents memory exhaustion from oversized batches.
	DefaultMaxTransactionsPerBatch = 10000

	// DefaultMaxTransactionSize is the maximum size of a single transaction in bytes.
	DefaultMaxTransactionSize = 1024 * 1024 // 1 MB

	// DefaultMaxBatchSize is the maximum size of a batch in bytes.
	DefaultMaxBatchSize = 100 * 1024 * 1024 // 100 MB

	// DefaultMaxHeaderSize is the maximum size of a header in bytes.
	DefaultMaxHeaderSize = 1024 * 1024 // 1 MB

	// DefaultMaxCertificateSize is the maximum size of a certificate in bytes.
	DefaultMaxCertificateSize = 10 * 1024 * 1024 // 10 MB

	// DefaultMaxRoundSkip is the maximum rounds a header can be ahead of current.
	DefaultMaxRoundSkip = 100

	// DefaultMaxTimestampDrift is the maximum time a header timestamp can be in the future.
	DefaultMaxTimestampDrift = 60 * time.Second
)

Security limits to prevent DoS attacks. These can be overridden via ValidationConfig.

Variables

This section is empty.

Functions

func BLSSign

func BLSSign(privateKey []byte, message []byte) ([]byte, error)

BLSSign signs a message using a BLS private key.

func GoWithRecovery

func GoWithRecovery(cfg RecoveryConfig, fn func())

GoWithRecovery starts a goroutine with panic recovery.

func GoWithRecoveryCtx

func GoWithRecoveryCtx(ctx context.Context, cfg RecoveryConfig, fn func(context.Context))

GoWithRecoveryCtx starts a goroutine with panic recovery and context. The function receives the context and can check for cancellation.

func IsValidationError

func IsValidationError(err error) bool

IsValidationError returns true if err is a ValidationError.

func RecoverPanic

func RecoverPanic(cfg RecoveryConfig)

RecoverPanic recovers from panics and handles them according to config. Use as: defer RecoverPanic(cfg)

func SafeGo

func SafeGo(logger *zap.Logger, fn func())

SafeGo starts a goroutine with panic recovery using a simple logger. This is a convenience wrapper for common cases.

func SafeGoCtx

func SafeGoCtx(ctx context.Context, logger *zap.Logger, fn func(context.Context))

SafeGoCtx starts a goroutine with panic recovery using a simple logger and context.

func VerifyCertificateSignatures

func VerifyCertificateSignatures[H Hash](
	cert *Certificate[H],
	validators ValidatorSet,
	crypto CryptoProvider,
) error

VerifyCertificateSignatures verifies all signatures in a certificate. Uses the provided CryptoProvider for optimized verification.

Types

type BLSAggregator

type BLSAggregator interface {
	// Aggregate combines multiple BLS signatures into one.
	// Returns the aggregated signature bytes.
	Aggregate(signatures [][]byte) ([]byte, error)

	// VerifyAggregate verifies an aggregated signature against multiple public keys.
	// pubKeys are the raw public key bytes of all signers.
	// message is the data that was signed (must be same for all).
	// aggSig is the aggregated signature.
	VerifyAggregate(pubKeys [][]byte, message []byte, aggSig []byte) bool
}

BLSAggregator provides signature aggregation for BLS. Aggregated signatures reduce certificate size and verification time.

type BLSBatchVerifier

type BLSBatchVerifier interface {
	// AddSignature adds a signature to the batch.
	// pubKey is the raw public key bytes.
	// message is the data that was signed.
	// signature is the BLS signature bytes.
	AddSignature(pubKey []byte, message []byte, signature []byte)

	// VerifyBatch verifies all signatures added to the batch.
	// Returns true if all signatures are valid.
	VerifyBatch() bool

	// Reset clears the batch for reuse.
	Reset()
}

BLSBatchVerifier provides batch verification capability for BLS signatures. This interface should be implemented by the integrating application using their preferred BLS library (e.g., blst, bls12-381).

type BLSCryptoProvider

type BLSCryptoProvider struct {
	// contains filtered or unexported fields
}

BLSCryptoProvider implements CryptoProvider for BLS.

func NewBLSCryptoProvider

func NewBLSCryptoProvider(batcherFactory func() BLSBatchVerifier, aggregator BLSAggregator) *BLSCryptoProvider

NewBLSCryptoProvider creates a crypto provider for BLS. batcherFactory creates new BLSBatchVerifier instances. aggregator provides signature aggregation (optional, can be nil).

func (*BLSCryptoProvider) Aggregator

func (p *BLSCryptoProvider) Aggregator() BLSAggregator

func (*BLSCryptoProvider) NewBatchVerifier

func (p *BLSCryptoProvider) NewBatchVerifier(validators ValidatorSet) BatchVerifier

func (*BLSCryptoProvider) Scheme

func (p *BLSCryptoProvider) Scheme() SignatureScheme

func (*BLSCryptoProvider) SupportsAggregation

func (p *BLSCryptoProvider) SupportsAggregation() bool

type BLSKeyPair

type BLSKeyPair struct {
	PrivateKey []byte // 32 bytes (fr.Element)
	PublicKey  []byte // 96 bytes (G2Affine compressed)
}

BLSKeyPair holds a BLS private/public key pair.

func GenerateBLSKeyPair

func GenerateBLSKeyPair() (*BLSKeyPair, error)

GenerateBLSKeyPair generates a new BLS key pair.

type BLSPublicKey

type BLSPublicKey struct {
	// contains filtered or unexported fields
}

BLSPublicKey wraps a BLS public key for the PublicKey interface.

func NewBLSPublicKey

func NewBLSPublicKey(b []byte) (*BLSPublicKey, error)

NewBLSPublicKey creates a BLS public key from bytes.

func (*BLSPublicKey) Bytes

func (k *BLSPublicKey) Bytes() []byte

func (*BLSPublicKey) Equals

func (k *BLSPublicKey) Equals(other interface{ Bytes() []byte }) bool

func (*BLSPublicKey) Verify

func (k *BLSPublicKey) Verify(message, signature []byte) bool

type BLSSigner

type BLSSigner struct {
	// contains filtered or unexported fields
}

BLSSigner implements the Signer interface for BLS.

func NewBLSSigner

func NewBLSSigner(keyPair *BLSKeyPair) (*BLSSigner, error)

NewBLSSigner creates a new BLS signer from a key pair.

func (*BLSSigner) PublicKey

func (s *BLSSigner) PublicKey() PublicKey

func (*BLSSigner) Sign

func (s *BLSSigner) Sign(message []byte) ([]byte, error)

type BLSValidatorSet

type BLSValidatorSet struct {
	// contains filtered or unexported fields
}

BLSValidatorSet implements ValidatorSet for BLS validators.

func NewBLSValidatorSet

func NewBLSValidatorSet(n int) (*BLSValidatorSet, error)

NewBLSValidatorSet creates a validator set with n BLS validators.

func (*BLSValidatorSet) Contains

func (v *BLSValidatorSet) Contains(index uint16) bool

func (*BLSValidatorSet) Count

func (v *BLSValidatorSet) Count() int

func (*BLSValidatorSet) F

func (v *BLSValidatorSet) F() int

func (*BLSValidatorSet) GetByIndex

func (v *BLSValidatorSet) GetByIndex(index uint16) (PublicKey, error)

func (*BLSValidatorSet) GetSigner

func (v *BLSValidatorSet) GetSigner(index uint16) *BLSSigner

GetSigner returns the signer for a validator index.

type Batch

type Batch[H Hash, T Transaction[H]] struct {
	WorkerID     uint16
	ValidatorID  uint16
	Round        uint64
	Transactions []T
	Digest       H
}

Batch is a collection of transactions created by a worker.

func BatchFromBytes

func BatchFromBytes[H Hash, T Transaction[H]](
	data []byte,
	hashFromBytes func([]byte) (H, error),
	txFromBytes func([]byte) (T, error),
) (*Batch[H, T], error)

BatchFromBytes deserializes a batch from bytes. hashFromBytes converts raw bytes to H. txFromBytes converts raw bytes to T.

func (*Batch[H, T]) Bytes

func (b *Batch[H, T]) Bytes() []byte

Bytes serializes the batch to bytes. Format: [WorkerID:2][ValidatorID:2][Round:8][DigestLen:2][Digest][TxCount:4][Tx0Len:4][Tx0][Tx1Len:4][Tx1]...

func (*Batch[H, T]) ComputeDigest

func (b *Batch[H, T]) ComputeDigest(hashFunc func([]byte) H)

ComputeDigest computes and sets the digest for this batch. The digest is computed over the batch metadata and transaction hashes. This must be called before broadcasting the batch.

func (*Batch[H, T]) TransactionCount

func (b *Batch[H, T]) TransactionCount() int

TransactionCount returns the number of transactions in the batch.

func (*Batch[H, T]) Verify

func (b *Batch[H, T]) Verify(hashFunc func([]byte) H) error

Verify verifies that the batch digest matches its contents.

type BatchCreatedEvent

type BatchCreatedEvent[H Hash, T Transaction[H]] struct {
	Batch            *Batch[H, T]
	WorkerID         uint16
	TransactionCount int
	SizeBytes        int
	CreatedAt        time.Time
}

BatchCreatedEvent contains information about a newly created batch.

type BatchMessage

type BatchMessage[H Hash, T Transaction[H]] struct {
	BatchData *Batch[H, T]
	// contains filtered or unexported fields
}

func (*BatchMessage[H, T]) Batch

func (m *BatchMessage[H, T]) Batch() *Batch[H, T]

func (*BatchMessage[H, T]) Sender

func (m *BatchMessage[H, T]) Sender() uint16

func (*BatchMessage[H, T]) Type

func (m *BatchMessage[H, T]) Type() MessageType

type BatchMessageAccessor

type BatchMessageAccessor[H Hash, T Transaction[H]] interface{ Batch() *Batch[H, T] }

type BatchRequestMessage

type BatchRequestMessage[H Hash, T Transaction[H]] struct {
	Digest H
	// contains filtered or unexported fields
}

BatchRequestMessage requests a batch by digest.

func (*BatchRequestMessage[H, T]) Sender

func (m *BatchRequestMessage[H, T]) Sender() uint16

func (*BatchRequestMessage[H, T]) Type

func (m *BatchRequestMessage[H, T]) Type() MessageType

type BatchVerifier

type BatchVerifier interface {
	// Add queues a signature for batch verification.
	// validatorIndex is used to look up the public key.
	Add(message []byte, signature []byte, validatorIndex uint16)

	// Verify performs batch verification of all queued signatures.
	// Returns nil if all signatures are valid, or an error describing the failure.
	Verify() error

	// Reset clears all queued signatures for reuse.
	Reset()
}

BatchVerifier provides optimized signature verification. For BLS, it can verify multiple signatures in a single operation. For Ed25519, it parallelizes verification across goroutines.

Usage:

verifier := NewBatchVerifier(validators, SignatureSchemeBLS)
for i, sig := range signatures {
    verifier.Add(message, sig, validatorIndex)
}
if err := verifier.Verify(); err != nil {
    // verification failed
}

func NewBLSBatchVerifier

func NewBLSBatchVerifier(validators ValidatorSet, batcher BLSBatchVerifier) BatchVerifier

NewBLSBatchVerifier creates a batch verifier for BLS signatures. The batcher parameter should be provided by the integrating application.

func NewEd25519BatchVerifier

func NewEd25519BatchVerifier(validators ValidatorSet, workers int) BatchVerifier

NewEd25519BatchVerifier creates a batch verifier for Ed25519. It parallelizes verification across multiple goroutines.

type ByteSlicePool

type ByteSlicePool struct {
	// contains filtered or unexported fields
}

ByteSlicePool provides a pool of reusable byte slices.

func NewByteSlicePool

func NewByteSlicePool(defaultSize int) *ByteSlicePool

NewByteSlicePool creates a new byte slice pool with the specified default size.

func (*ByteSlicePool) Get

func (p *ByteSlicePool) Get() *[]byte

Get retrieves a byte slice from the pool. The slice is reset to zero length but retains its capacity.

func (*ByteSlicePool) Put

func (p *ByteSlicePool) Put(b *[]byte)

Put returns a byte slice to the pool. The slice should not be used after calling Put.

type CachedBatchVerifier

type CachedBatchVerifier struct {
	// contains filtered or unexported fields
}

CachedBatchVerifier wraps a BatchVerifier with signature caching.

func NewCachedBatchVerifier

func NewCachedBatchVerifier(inner BatchVerifier, cache *SignatureCache, validators ValidatorSet) *CachedBatchVerifier

NewCachedBatchVerifier wraps a batch verifier with caching.

func (*CachedBatchVerifier) Add

func (v *CachedBatchVerifier) Add(message []byte, signature []byte, validatorIndex uint16)

func (*CachedBatchVerifier) Reset

func (v *CachedBatchVerifier) Reset()

func (*CachedBatchVerifier) Verify

func (v *CachedBatchVerifier) Verify() error

type Certificate

type Certificate[H Hash] struct {
	Header       *Header[H]
	Signatures   [][]byte
	SignerBitmap uint64 // bit i set if validator i signed
}

Certificate is a header with 2f+1 validator signatures.

func CertificateFromBytes

func CertificateFromBytes[H Hash](
	data []byte,
	hashFromBytes func([]byte) (H, error),
) (*Certificate[H], error)

CertificateFromBytes deserializes a certificate from bytes.

func NewCertificate

func NewCertificate[H Hash](header *Header[H], votes map[uint16][]byte) *Certificate[H]

NewCertificate creates a new certificate from a header and votes.

func (*Certificate[H]) Author

func (c *Certificate[H]) Author() uint16

Author returns the certificate's author.

func (*Certificate[H]) Bytes

func (c *Certificate[H]) Bytes() []byte

Bytes serializes the certificate to bytes. Format: [Header bytes][SignerBitmap:8][SigCount:4][Sig0Len:2][Sig0][Sig1Len:2][Sig1]...

func (*Certificate[H]) Digest

func (c *Certificate[H]) Digest() H

Digest returns the certificate's digest (same as header digest).

func (*Certificate[H]) HasSigner

func (c *Certificate[H]) HasSigner(validatorIndex uint16) bool

HasSigner returns true if the given validator signed this certificate.

func (*Certificate[H]) Round

func (c *Certificate[H]) Round() uint64

Round returns the certificate's round.

func (*Certificate[H]) SignerCount

func (c *Certificate[H]) SignerCount() int

SignerCount returns the number of validators who signed this certificate.

func (*Certificate[H]) Validate

func (c *Certificate[H]) Validate(validators ValidatorSet) error

Validate verifies that the certificate has a valid quorum of signatures.

func (*Certificate[H]) ValidateWithCrypto

func (c *Certificate[H]) ValidateWithCrypto(validators ValidatorSet, crypto CryptoProvider, cache *SignatureCache) error

ValidateWithCrypto verifies that the certificate has a valid quorum of signatures using the provided CryptoProvider for optimized batch/parallel verification. If cache is provided, already-verified signatures are skipped.

type CertificateCache

type CertificateCache[H Hash] struct {
	// contains filtered or unexported fields
}

CertificateCache is a specialized LRU cache for certificates. It uses certificate digests as keys and provides convenient methods for certificate lookups.

func NewCertificateCache

func NewCertificateCache[H Hash](capacity int) *CertificateCache[H]

NewCertificateCache creates a new certificate cache with the given capacity.

func (*CertificateCache[H]) Clear

func (c *CertificateCache[H]) Clear()

Clear removes all certificates from the cache.

func (*CertificateCache[H]) Contains

func (c *CertificateCache[H]) Contains(digest H) bool

Contains checks if a certificate exists in the cache.

func (*CertificateCache[H]) Get

func (c *CertificateCache[H]) Get(digest H) (*Certificate[H], bool)

Get retrieves a certificate by its digest.

func (*CertificateCache[H]) Len

func (c *CertificateCache[H]) Len() int

Len returns the number of certificates in the cache.

func (*CertificateCache[H]) Put

func (c *CertificateCache[H]) Put(cert *Certificate[H])

Put adds a certificate to the cache.

func (*CertificateCache[H]) Remove

func (c *CertificateCache[H]) Remove(digest H) bool

Remove removes a certificate from the cache.

func (*CertificateCache[H]) Stats

func (c *CertificateCache[H]) Stats() LRUCacheStats

Stats returns cache statistics.

type CertificateFormedEvent

type CertificateFormedEvent[H Hash] struct {
	Certificate *Certificate[H]
	SignerCount int
	Latency     time.Duration // Time from header creation to certificate formation
	FormedAt    time.Time
}

CertificateFormedEvent contains information about a newly formed certificate.

type CertificateMessage

type CertificateMessage[H Hash, T Transaction[H]] struct {
	CertificateData *Certificate[H]
	// contains filtered or unexported fields
}

CertificateMessage wraps a certificate for network transmission.

func (*CertificateMessage[H, T]) Certificate

func (m *CertificateMessage[H, T]) Certificate() *Certificate[H]

func (*CertificateMessage[H, T]) Sender

func (m *CertificateMessage[H, T]) Sender() uint16

func (*CertificateMessage[H, T]) Type

func (m *CertificateMessage[H, T]) Type() MessageType

type CertificateMessageAccessor

type CertificateMessageAccessor[H Hash] interface{ Certificate() *Certificate[H] }

type CertificatePendingEvent

type CertificatePendingEvent[H Hash] struct {
	Certificate    *Certificate[H]
	MissingParents []H
	QueuedAt       time.Time
}

CertificatePendingEvent contains information about a certificate queued as pending.

type CertificateRangeRequestMessage

type CertificateRangeRequestMessage[H Hash, T Transaction[H]] struct {
	StartRound uint64
	EndRound   uint64
	// contains filtered or unexported fields
}

CertificateRangeRequestMessage requests certificates for a range of rounds.

func (*CertificateRangeRequestMessage[H, T]) Sender

func (m *CertificateRangeRequestMessage[H, T]) Sender() uint16

func (*CertificateRangeRequestMessage[H, T]) Type

type CertificateRangeResponseMessage

type CertificateRangeResponseMessage[H Hash, T Transaction[H]] struct {
	Certificates []*Certificate[H]
	StartRound   uint64
	EndRound     uint64
	// contains filtered or unexported fields
}

CertificateRangeResponseMessage contains certificates for a range of rounds.

func (*CertificateRangeResponseMessage[H, T]) Sender

func (m *CertificateRangeResponseMessage[H, T]) Sender() uint16

func (*CertificateRangeResponseMessage[H, T]) Type

type CertificateReceivedEvent

type CertificateReceivedEvent[H Hash] struct {
	Certificate *Certificate[H]
	From        uint16
	ReceivedAt  time.Time
}

CertificateReceivedEvent contains information about a received certificate.

type CertificateRequestMessage

type CertificateRequestMessage[H Hash, T Transaction[H]] struct {
	Digest H
	// contains filtered or unexported fields
}

CertificateRequestMessage requests a certificate by digest.

func (*CertificateRequestMessage[H, T]) Sender

func (m *CertificateRequestMessage[H, T]) Sender() uint16

func (*CertificateRequestMessage[H, T]) Type

func (m *CertificateRequestMessage[H, T]) Type() MessageType

type ChannelMetrics

type ChannelMetrics struct {
	// contains filtered or unexported fields
}

ChannelMetrics collects metrics from multiple metered channels. Useful for exposing all channel metrics through a single interface.

func NewChannelMetrics

func NewChannelMetrics() *ChannelMetrics

NewChannelMetrics creates a new ChannelMetrics collector.

func (*ChannelMetrics) AllStats

func (cm *ChannelMetrics) AllStats() map[string]MeteredChannelStats

AllStats returns statistics for all registered channels.

func (*ChannelMetrics) Register

func (cm *ChannelMetrics) Register(name string, ch interface{ Stats() MeteredChannelStats })

Register adds a metered channel to the collector.

func (*ChannelMetrics) TotalDropped

func (cm *ChannelMetrics) TotalDropped() uint64

TotalDropped returns the sum of dropped messages across all channels.

func (*ChannelMetrics) TotalPending

func (cm *ChannelMetrics) TotalPending() int

TotalPending returns the sum of pending messages across all channels.

func (*ChannelMetrics) Unregister

func (cm *ChannelMetrics) Unregister(name string)

Unregister removes a channel from the collector.

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern for network operations. It prevents repeated failures from overwhelming the system.

func NewCircuitBreaker

func NewCircuitBreaker(cfg CircuitBreakerConfig) *CircuitBreaker

NewCircuitBreaker creates a new circuit breaker.

func (*CircuitBreaker) Allow

func (cb *CircuitBreaker) Allow() bool

Allow checks if a request should be allowed. Returns true if allowed, false if the circuit is open.

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

RecordFailure records a failed operation.

func (*CircuitBreaker) RecordSuccess

func (cb *CircuitBreaker) RecordSuccess()

RecordSuccess records a successful operation.

func (*CircuitBreaker) Reset

func (cb *CircuitBreaker) Reset()

Reset resets the circuit breaker to closed state.

func (*CircuitBreaker) State

func (cb *CircuitBreaker) State() string

State returns the current circuit state as a string.

func (*CircuitBreaker) Stats

func (cb *CircuitBreaker) Stats() CircuitBreakerStats

Stats returns current statistics.

type CircuitBreakerConfig

type CircuitBreakerConfig struct {
	// FailureThreshold is the number of failures before opening the circuit.
	// Default: 5
	FailureThreshold int

	// SuccessThreshold is the number of successes needed to close from half-open.
	// Default: 2
	SuccessThreshold int

	// Timeout is how long to wait before trying again (moving to half-open).
	// Default: 30s
	Timeout time.Duration
}

CircuitBreakerConfig configures the circuit breaker.

func DefaultCircuitBreakerConfig

func DefaultCircuitBreakerConfig() CircuitBreakerConfig

DefaultCircuitBreakerConfig returns sensible defaults.

type CircuitBreakerStats

type CircuitBreakerStats struct {
	State          string
	Failures       int
	Successes      int
	TotalFailures  uint64
	TotalSuccesses uint64
	TotalRejected  uint64
	LastFailure    time.Time
}

CircuitBreakerStats contains circuit breaker statistics.

type Config

type Config[H Hash, T Transaction[H]] struct {

	// MyIndex is this validator's index in the validator set.
	MyIndex uint16

	// Validators is the set of all validators.
	// Required.
	Validators ValidatorSet

	// Signer provides cryptographic signing capability.
	// Required.
	Signer Signer

	// Storage provides persistent storage.
	// Required.
	Storage Storage[H, T]

	// Network provides message delivery.
	// Required.
	Network Network[H, T]

	// Timer provides timeout management.
	// Required.
	Timer Timer

	// Logger for structured logging.
	// Defaults to a no-op logger if not provided.
	Logger *zap.Logger

	// WorkerCount is the number of workers for parallel batch creation.
	// Default: 4
	WorkerCount int

	// BatchSize is the maximum number of transactions per batch.
	// When this threshold is reached, a batch is created immediately.
	// Default: 500
	BatchSize int

	// BatchTimeout is the maximum time to wait before creating a batch.
	// Even if BatchSize is not reached, a batch is created after this duration.
	// Default: 100ms
	BatchTimeout time.Duration

	// HeaderTimeout is the interval between header creation attempts.
	// Default: 500ms
	HeaderTimeout time.Duration

	// GCInterval is the number of rounds between garbage collection runs.
	// Older rounds are cleaned up to free memory and storage.
	// Default: 50
	GCInterval uint64

	// MaxRoundsGap is the maximum number of rounds ahead we accept headers for.
	// Headers for rounds beyond currentRound + MaxRoundsGap are rejected.
	// Default: 10
	MaxRoundsGap uint64

	// MaxPendingTransactions is the maximum number of transactions queued per worker.
	// When this limit is reached, AddTransaction will block or drop based on DropOnFull.
	// Default: 10000
	MaxPendingTransactions int

	// MaxPendingBatches is the maximum number of batches queued in the primary.
	// Default: 1000
	MaxPendingBatches int

	// DropOnFull determines behavior when queues are full.
	// If true, new items are dropped. If false, AddTransaction blocks.
	// Default: false (block)
	DropOnFull bool

	// MaxHeaderBatches is the maximum number of batch refs to include in a header.
	// Headers are created when this limit is reached, even if MaxHeaderDelay hasn't elapsed.
	// Default: 100
	MaxHeaderBatches int

	// MaxHeaderDelay is the maximum time to wait before creating a header.
	// Even if MaxHeaderBatches is not reached, a header is created after this duration.
	// Default: HeaderTimeout (uses HeaderTimeout value if not set)
	MaxHeaderDelay time.Duration

	// SyncRetryDelay is the delay before retrying sync requests for missing data.
	// Default: 10s
	SyncRetryDelay time.Duration

	// SyncRetryNodes is the number of random nodes to contact when syncing.
	// Default: 3
	SyncRetryNodes int

	// GCDepth is the number of rounds to retain after the committed round.
	// This is separate from GCInterval which controls how often GC runs.
	// Default: 100
	GCDepth uint64

	// Hooks provides callbacks for observability events.
	// All hooks are optional - nil hooks are ignored.
	// See the Hooks struct for available callbacks.
	Hooks *Hooks[H, T]

	// MaxPendingHeaders is the maximum number of headers with missing parents to queue.
	// Default: 1000
	MaxPendingHeaders int

	// HeaderWaiterRetryInterval is how often to retry processing pending headers.
	// Default: 1s
	HeaderWaiterRetryInterval time.Duration

	// HeaderWaiterMaxRetries is the maximum retries before dropping a pending header.
	// Default: 10
	HeaderWaiterMaxRetries int

	// HeaderWaiterMaxAge is the maximum time a header can wait before being dropped.
	// Default: 30s
	HeaderWaiterMaxAge time.Duration

	// FetchMissingParents enables proactive fetching of missing parent certificates
	// when headers are queued. This can reduce latency by fetching parents in parallel.
	// Default: true
	FetchMissingParents bool

	// CryptoProvider provides optimized cryptographic operations.
	// When set, certificate validation uses batch/parallel signature verification.
	// For Ed25519, this parallelizes verification across goroutines.
	// For BLS, this enables true batch verification and signature aggregation.
	// Default: nil (uses standard sequential verification)
	CryptoProvider CryptoProvider

	// SignatureCache caches verified signatures to avoid re-verification.
	// Only used when CryptoProvider is set.
	// Default: nil (no caching)
	SignatureCache *SignatureCache

	// Validation is the configuration for input validation.
	// Controls limits for DoS prevention (max batch refs, max transactions, etc.)
	// Default: DefaultValidationConfig()
	Validation ValidationConfig

	// RecommendedMessageQueueSize is the advisory buffer size for Network receive channels.
	// Default: 10000
	RecommendedMessageQueueSize int

	// DAGCache configures the optional LRU cache for DAG vertex lookups.
	// When enabled, frequently accessed vertices are cached for faster retrieval.
	// This improves performance for workloads with repeated vertex lookups.
	// Default: DAGCacheConfig{Enabled: true, Capacity: 10000}
	DAGCache DAGCacheConfig

	// NetworkModel specifies the network timing assumptions.
	// Asynchronous mode advances rounds as soon as 2f+1 parents are available.
	// Partially synchronous mode waits for leader blocks before advancing.
	// Default: NetworkModelAsynchronous
	NetworkModel NetworkModel

	// LeaderSchedule determines which validator is leader for each round.
	// Only used when NetworkModel is NetworkModelPartiallySynchronous.
	// Default: nil (round-robin schedule created from validator count)
	LeaderSchedule LeaderSchedule

	// Epoch is the initial epoch for the validator set.
	// Used for epoch-aware reconfiguration and vote tracking.
	// Default: 0
	Epoch uint64
}

Config holds the configuration for a Narwhal instance. Use NewConfig with functional options to create a properly configured instance.

The type parameters specify the hash and transaction types used.

func DefaultConfig

func DefaultConfig[H Hash, T Transaction[H]]() Config[H, T]

DefaultConfig returns a configuration suitable for most use cases. Balances throughput and latency for typical blockchain workloads.

Settings:

  • WorkerCount: 4
  • BatchSize: 500
  • BatchTimeout: 100ms
  • HeaderTimeout: 500ms
  • GCInterval: 100
  • MaxRoundsGap: 10

func DemoConfig

func DemoConfig[H Hash, T Transaction[H]]() Config[H, T]

DemoConfig returns a configuration suitable for demonstrations and testing. Uses visible timing to make the protocol easier to observe.

Settings:

  • WorkerCount: 2
  • BatchSize: 10
  • BatchTimeout: 1s
  • HeaderTimeout: 2s
  • GCInterval: 50
  • MaxRoundsGap: 10

func HighThroughputConfig

func HighThroughputConfig[H Hash, T Transaction[H]]() Config[H, T]

HighThroughputConfig returns a configuration optimized for maximum throughput. Uses more workers and larger batches, suitable for high-volume systems.

Settings:

  • WorkerCount: 8
  • BatchSize: 1000
  • BatchTimeout: 50ms
  • HeaderTimeout: 200ms
  • GCInterval: 50
  • MaxRoundsGap: 5

func LowLatencyConfig

func LowLatencyConfig[H Hash, T Transaction[H]]() Config[H, T]

LowLatencyConfig returns a configuration optimized for low latency. Uses smaller batches and shorter timeouts, suitable for interactive applications.

Settings:

  • WorkerCount: 2
  • BatchSize: 100
  • BatchTimeout: 20ms
  • HeaderTimeout: 100ms
  • GCInterval: 100
  • MaxRoundsGap: 10

func NewConfig

func NewConfig[H Hash, T Transaction[H]](opts ...ConfigOption[H, T]) (*Config[H, T], error)

NewConfig creates a new Config with the given options. Required options: WithValidators, WithSigner, WithStorage, WithNetwork, WithTimer.

Returns an error if any option fails or if required options are missing.

func (*Config[H, T]) LogWarnings

func (c *Config[H, T]) LogWarnings()

LogWarnings logs all configuration warnings.

func (*Config[H, T]) Warnings

func (c *Config[H, T]) Warnings() []ConfigWarning

Warnings returns warnings for suboptimal configuration choices.

type ConfigOption

type ConfigOption[H Hash, T Transaction[H]] func(*Config[H, T]) error

ConfigOption is a functional option for configuring Narwhal. Options are applied in order, so later options override earlier ones.

func WithBatchSize

func WithBatchSize[H Hash, T Transaction[H]](size int) ConfigOption[H, T]

WithBatchSize sets the maximum number of transactions per batch. Larger batches improve throughput but increase latency. Default: 500

func WithBatchTimeout

func WithBatchTimeout[H Hash, T Transaction[H]](timeout time.Duration) ConfigOption[H, T]

WithBatchTimeout sets the maximum time to wait before creating a batch. Shorter timeouts reduce latency but may create smaller batches. Default: 100ms

func WithCryptoProvider

func WithCryptoProvider[H Hash, T Transaction[H]](crypto CryptoProvider) ConfigOption[H, T]

WithCryptoProvider sets the crypto provider for optimized signature verification. When set, certificate validation uses batch/parallel verification. Default: nil (uses standard sequential verification)

func WithDAGCache

func WithDAGCache[H Hash, T Transaction[H]](cfg DAGCacheConfig) ConfigOption[H, T]

WithDAGCache configures the LRU cache for DAG vertex lookups. When enabled, frequently accessed vertices are cached for faster retrieval. Default: DAGCacheConfig{Enabled: true, Capacity: 10000}

func WithDAGCacheCapacity

func WithDAGCacheCapacity[H Hash, T Transaction[H]](capacity int) ConfigOption[H, T]

WithDAGCacheCapacity enables DAG caching with the specified capacity. This is a convenience method equivalent to WithDAGCache(DAGCacheConfig{Enabled: true, Capacity: capacity}).

func WithDAGCacheDisabled

func WithDAGCacheDisabled[H Hash, T Transaction[H]]() ConfigOption[H, T]

WithDAGCacheDisabled disables DAG vertex caching. Use this if memory is constrained or if your access pattern doesn't benefit from caching.

func WithDropOnFull

func WithDropOnFull[H Hash, T Transaction[H]](drop bool) ConfigOption[H, T]

WithDropOnFull sets the behavior when queues are full. If true, new items are dropped silently. If false, operations block. Default: false (block)

func WithEpoch

func WithEpoch[H Hash, T Transaction[H]](epoch uint64) ConfigOption[H, T]

WithEpoch sets the initial epoch for the validator set. The epoch is used for reconfiguration and vote tracking. Default: 0

func WithGCDepth

func WithGCDepth[H Hash, T Transaction[H]](depth uint64) ConfigOption[H, T]

WithGCDepth sets the number of rounds to retain after committed round. Default: 100

func WithGCInterval

func WithGCInterval[H Hash, T Transaction[H]](interval uint64) ConfigOption[H, T]

WithGCInterval sets the number of rounds between garbage collection runs. Lower values free memory sooner but may impact performance. Default: 50

func WithHeaderTimeout

func WithHeaderTimeout[H Hash, T Transaction[H]](timeout time.Duration) ConfigOption[H, T]

WithHeaderTimeout sets the interval between header creation attempts. Shorter intervals can improve throughput under high load. Default: 500ms

func WithHeaderWaiterMaxAge

func WithHeaderWaiterMaxAge[H Hash, T Transaction[H]](age time.Duration) ConfigOption[H, T]

WithHeaderWaiterMaxAge sets the maximum age before dropping a pending header. Default: 30s

func WithHeaderWaiterMaxRetries

func WithHeaderWaiterMaxRetries[H Hash, T Transaction[H]](max int) ConfigOption[H, T]

WithHeaderWaiterMaxRetries sets the max retries before dropping a pending header. Default: 10

func WithHeaderWaiterRetryInterval

func WithHeaderWaiterRetryInterval[H Hash, T Transaction[H]](interval time.Duration) ConfigOption[H, T]

WithHeaderWaiterRetryInterval sets how often to retry pending headers. Default: 1s

func WithHooks

func WithHooks[H Hash, T Transaction[H]](hooks *Hooks[H, T]) ConfigOption[H, T]

WithHooks sets the observability hooks. Hooks provide callbacks for metrics, logging, and monitoring integration. All hooks are optional - nil hooks are ignored.

func WithLeaderSchedule

func WithLeaderSchedule[H Hash, T Transaction[H]](schedule LeaderSchedule) ConfigOption[H, T]

WithLeaderSchedule sets the leader schedule for partially synchronous mode. If not set and NetworkModel is PartialSync, a round-robin schedule is created.

func WithLogger

func WithLogger[H Hash, T Transaction[H]](logger *zap.Logger) ConfigOption[H, T]

WithLogger sets the structured logger. If not provided, a no-op logger is used.

func WithMaxHeaderBatches

func WithMaxHeaderBatches[H Hash, T Transaction[H]](max int) ConfigOption[H, T]

WithMaxHeaderBatches sets the maximum batch refs to include in a header. Headers are created when this limit is reached. Default: 100

func WithMaxHeaderDelay

func WithMaxHeaderDelay[H Hash, T Transaction[H]](delay time.Duration) ConfigOption[H, T]

WithMaxHeaderDelay sets the maximum time before creating a header. Headers are created after this duration even if MaxHeaderBatches isn't reached. Default: HeaderTimeout value

func WithMaxPendingBatches

func WithMaxPendingBatches[H Hash, T Transaction[H]](max int) ConfigOption[H, T]

WithMaxPendingBatches sets the maximum batches queued in the primary. Default: 1000

func WithMaxPendingHeaders

func WithMaxPendingHeaders[H Hash, T Transaction[H]](max int) ConfigOption[H, T]

WithMaxPendingHeaders sets the maximum headers with missing parents to queue. Default: 1000

func WithMaxPendingTransactions

func WithMaxPendingTransactions[H Hash, T Transaction[H]](max int) ConfigOption[H, T]

WithMaxPendingTransactions sets the maximum transactions queued per worker. When this limit is reached, behavior depends on DropOnFull setting. Default: 10000

func WithMaxRoundsGap

func WithMaxRoundsGap[H Hash, T Transaction[H]](gap uint64) ConfigOption[H, T]

WithMaxRoundsGap sets the maximum rounds ahead we accept headers for. Headers for rounds beyond currentRound + MaxRoundsGap are rejected. Default: 10

func WithMyIndex

func WithMyIndex[H Hash, T Transaction[H]](index uint16) ConfigOption[H, T]

WithMyIndex sets this validator's index in the validator set.

func WithNetwork

func WithNetwork[H Hash, T Transaction[H]](network Network[H, T]) ConfigOption[H, T]

WithNetwork sets the network layer for message delivery. This is a required option.

func WithNetworkModel

func WithNetworkModel[H Hash, T Transaction[H]](model NetworkModel) ConfigOption[H, T]

WithNetworkModel sets the network timing assumptions. Use NetworkModelAsynchronous (default) for optimal throughput. Use NetworkModelPartiallySynchronous for better commit latency with leader awareness.

func WithRecommendedMessageQueueSize

func WithRecommendedMessageQueueSize[H Hash, T Transaction[H]](size int) ConfigOption[H, T]

WithRecommendedMessageQueueSize sets the advisory buffer size for Network receive channels.

func WithSignatureCache

func WithSignatureCache[H Hash, T Transaction[H]](cache *SignatureCache) ConfigOption[H, T]

WithSignatureCache sets the signature cache for avoiding re-verification. Only effective when CryptoProvider is also set. Default: nil (no caching)

func WithSigner

func WithSigner[H Hash, T Transaction[H]](signer Signer) ConfigOption[H, T]

WithSigner sets the cryptographic signer. This is a required option.

func WithStorage

func WithStorage[H Hash, T Transaction[H]](storage Storage[H, T]) ConfigOption[H, T]

WithStorage sets the persistent storage backend. This is a required option.

func WithSyncRetryDelay

func WithSyncRetryDelay[H Hash, T Transaction[H]](delay time.Duration) ConfigOption[H, T]

WithSyncRetryDelay sets the delay before retrying sync requests. Default: 10s

func WithSyncRetryNodes

func WithSyncRetryNodes[H Hash, T Transaction[H]](nodes int) ConfigOption[H, T]

WithSyncRetryNodes sets the number of nodes to contact when syncing. Default: 3

func WithTimer

func WithTimer[H Hash, T Transaction[H]](timer Timer) ConfigOption[H, T]

WithTimer sets the timer for timeout management. This is a required option.

func WithValidation

func WithValidation[H Hash, T Transaction[H]](cfg ValidationConfig) ConfigOption[H, T]

WithValidation sets the validation configuration. Controls limits for DoS prevention (max batch refs, max transactions, etc.) Default: DefaultValidationConfig()

func WithValidators

func WithValidators[H Hash, T Transaction[H]](validators ValidatorSet) ConfigOption[H, T]

WithValidators sets the validator set. This is a required option.

func WithWorkerCount

func WithWorkerCount[H Hash, T Transaction[H]](count int) ConfigOption[H, T]

WithWorkerCount sets the number of workers for parallel batch creation. More workers can increase throughput but use more resources. Default: 4

type ConfigWarning

type ConfigWarning struct {
	// Field is the name of the config field that triggered the warning.
	Field string
	// Message describes the potential issue.
	Message string
	// Suggestion provides a recommended action or value.
	Suggestion string
}

ConfigWarning represents a warning about potentially suboptimal configuration.

func (ConfigWarning) String

func (w ConfigWarning) String() string

String returns a human-readable warning message.

type CryptoProvider

type CryptoProvider interface {
	// Scheme returns the signature scheme this provider implements.
	Scheme() SignatureScheme

	// NewBatchVerifier creates a new batch verifier.
	// The verifier uses the provided validator set for public key lookups.
	NewBatchVerifier(validators ValidatorSet) BatchVerifier

	// SupportsAggregation returns true if this scheme supports signature aggregation.
	// BLS supports aggregation, Ed25519 does not.
	SupportsAggregation() bool

	// Aggregator returns the BLS aggregator if aggregation is supported.
	// Returns nil for Ed25519.
	Aggregator() BLSAggregator
}

CryptoProvider abstracts the cryptographic operations for Narwhal. Implementations should provide optimized verification for their signature scheme.

type DAG

type DAG[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

DAG manages the directed acyclic graph of certified vertices.

func NewDAG

func NewDAG[H Hash, T Transaction[H]](
	validators ValidatorSet,
	logger *zap.Logger,
) *DAG[H, T]

NewDAG creates a new DAG instance.

func NewDAGWithCache

func NewDAGWithCache[H Hash, T Transaction[H]](
	validators ValidatorSet,
	hooks *Hooks[H, T],
	cryptoProvider CryptoProvider,
	sigCache *SignatureCache,
	cacheConfig *DAGCacheConfig,
	logger *zap.Logger,
) *DAG[H, T]

NewDAGWithCache creates a new DAG instance with all optional features. This is the most flexible constructor, allowing configuration of:

  • cryptoProvider: optimized signature verification
  • sigCache: signature verification caching
  • cacheConfig: LRU cache for vertex lookups

func NewDAGWithCrypto

func NewDAGWithCrypto[H Hash, T Transaction[H]](
	validators ValidatorSet,
	hooks *Hooks[H, T],
	cryptoProvider CryptoProvider,
	sigCache *SignatureCache,
	logger *zap.Logger,
) *DAG[H, T]

NewDAGWithCrypto creates a new DAG instance with optimized crypto verification. If cryptoProvider is non-nil, certificate validation uses batch/parallel verification. If sigCache is non-nil, verified signatures are cached to avoid re-verification.

func NewDAGWithHooks

func NewDAGWithHooks[H Hash, T Transaction[H]](
	validators ValidatorSet,
	hooks *Hooks[H, T],
	logger *zap.Logger,
) *DAG[H, T]

NewDAGWithHooks creates a new DAG instance with observability hooks.

func (*DAG[H, T]) CertificateCountForRound

func (d *DAG[H, T]) CertificateCountForRound(round uint64) int

CertificateCountForRound returns the number of certificates for a given round.

func (*DAG[H, T]) CurrentRound

func (d *DAG[H, T]) CurrentRound() uint64

CurrentRound returns the current DAG round.

func (*DAG[H, T]) GarbageCollect

func (d *DAG[H, T]) GarbageCollect(beforeRound uint64)

GarbageCollect removes all data for rounds strictly less than the given round.

func (*DAG[H, T]) GetCertificate

func (d *DAG[H, T]) GetCertificate(digest H) *Certificate[H]

GetCertificate retrieves a certificate by its digest. If caching is enabled, this checks the cache first for faster lookups.

func (*DAG[H, T]) GetCertificatesForRound

func (d *DAG[H, T]) GetCertificatesForRound(round uint64) []*Certificate[H]

GetCertificatesForRound returns all certificates for a given round. Results are sorted by validator index for deterministic ordering.

func (*DAG[H, T]) GetMissingParents

func (d *DAG[H, T]) GetMissingParents() []H

GetMissingParents returns all unique parent digests that are missing from the DAG.

func (*DAG[H, T]) GetParents

func (d *DAG[H, T]) GetParents() []H

GetParents returns certificate digests from the previous round to use as parents. Returns nil for round 0. Results are sorted by validator index for deterministic ordering.

func (*DAG[H, T]) GetPendingCertificates

func (d *DAG[H, T]) GetPendingCertificates() []*PendingCertificate[H, T]

GetPendingCertificates returns all certificates waiting for missing parents.

func (*DAG[H, T]) GetTransactions

func (d *DAG[H, T]) GetTransactions(certs []*Certificate[H]) ([]T, error)

GetTransactions extracts all transactions from the given certificates. Transactions are deduplicated by hash.

func (*DAG[H, T]) GetUncommitted

func (d *DAG[H, T]) GetUncommitted() []*Certificate[H]

GetUncommitted returns certificates not yet ordered by consensus. Results are sorted by (round, author) for deterministic ordering.

func (*DAG[H, T]) GetVertex

func (d *DAG[H, T]) GetVertex(digest H) *Vertex[H, T]

GetVertex retrieves a vertex (certificate + batches) by its digest. If caching is enabled, this checks the cache first for faster lookups.

func (*DAG[H, T]) InsertCertificate

func (d *DAG[H, T]) InsertCertificate(cert *Certificate[H], batches []*Batch[H, T]) error

InsertCertificate adds a certified vertex to the DAG. Returns an error if the certificate is invalid or an equivocation is detected. If parents are missing, the certificate is queued as pending.

func (*DAG[H, T]) InsertValidatedCertificate

func (d *DAG[H, T]) InsertValidatedCertificate(cert *Certificate[H], batches []*Batch[H, T]) error

InsertValidatedCertificate validates and inserts a certificate into the DAG. This is a convenience method that combines validation and insertion. Returns an error if validation fails or if the certificate is an equivocation.

func (*DAG[H, T]) IsCertified

func (d *DAG[H, T]) IsCertified(digest H) bool

IsCertified checks if a certificate with the given digest exists in the DAG. If caching is enabled, this checks the cache first for faster lookups.

func (*DAG[H, T]) MarkCommitted

func (d *DAG[H, T]) MarkCommitted(certs []*Certificate[H])

MarkCommitted marks certificates as committed by consensus. This allows garbage collection of the underlying data.

func (*DAG[H, T]) OnEquivocation

func (d *DAG[H, T]) OnEquivocation(callback func(evidence *EquivocationEvidence[H]))

OnEquivocation sets a callback that is invoked when equivocation is detected.

func (*DAG[H, T]) Stats

func (d *DAG[H, T]) Stats() DAGStats

Stats returns current DAG statistics.

func (*DAG[H, T]) ValidateCertificate

func (d *DAG[H, T]) ValidateCertificate(cert *Certificate[H]) error

ValidateCertificate validates a certificate using the DAG's crypto provider. If no crypto provider is configured, falls back to standard sequential verification.

type DAGCacheConfig

type DAGCacheConfig struct {
	// Enabled controls whether vertex caching is enabled.
	// When enabled, frequently accessed vertices are cached for faster retrieval.
	Enabled bool

	// Capacity is the maximum number of vertices to cache.
	// Default: 10000 if Enabled is true but Capacity is 0.
	Capacity int
}

DAGCacheConfig configures the optional LRU cache for DAG vertex lookups.

func DefaultDAGCacheConfig

func DefaultDAGCacheConfig() DAGCacheConfig

DefaultDAGCacheConfig returns the default cache configuration (disabled).

type DAGStats

type DAGStats struct {
	CurrentRound     uint64
	GCRound          uint64
	TotalVertices    int
	UncommittedCount int
	PendingCount     int
	RoundCounts      map[uint64]int
	// CacheStats contains LRU cache statistics (nil if caching disabled)
	CacheStats *LRUCacheStats
}

DAGStats contains DAG statistics for monitoring.

type Ed25519CryptoProvider

type Ed25519CryptoProvider struct {
	// contains filtered or unexported fields
}

Ed25519CryptoProvider implements CryptoProvider for Ed25519.

func NewEd25519CryptoProvider

func NewEd25519CryptoProvider(workers int) *Ed25519CryptoProvider

NewEd25519CryptoProvider creates a crypto provider for Ed25519.

func (*Ed25519CryptoProvider) Aggregator

func (p *Ed25519CryptoProvider) Aggregator() BLSAggregator

func (*Ed25519CryptoProvider) NewBatchVerifier

func (p *Ed25519CryptoProvider) NewBatchVerifier(validators ValidatorSet) BatchVerifier

func (*Ed25519CryptoProvider) Scheme

func (*Ed25519CryptoProvider) SupportsAggregation

func (p *Ed25519CryptoProvider) SupportsAggregation() bool

type EpochAwareValidatorSet

type EpochAwareValidatorSet struct {
	ValidatorSet
	// contains filtered or unexported fields
}

EpochAwareValidatorSet wraps a validator set with epoch information. This can be used to validate that certificates use the correct validator set for their epoch.

func NewEpochAwareValidatorSet

func NewEpochAwareValidatorSet(vs ValidatorSet, epoch uint64) *EpochAwareValidatorSet

NewEpochAwareValidatorSet creates a new epoch-aware validator set.

func (*EpochAwareValidatorSet) Epoch

func (e *EpochAwareValidatorSet) Epoch() uint64

Epoch returns the epoch this validator set is valid for.

type EpochChange

type EpochChange struct {
	// FromEpoch is the epoch we're transitioning from.
	FromEpoch uint64

	// ToEpoch is the epoch we're transitioning to.
	ToEpoch uint64

	// EffectiveRound is the DAG round at which the new epoch takes effect.
	// All certificates at or after this round use the new validator set.
	EffectiveRound uint64

	// NewValidators is the new validator set (may be nil if unchanged).
	NewValidators ValidatorSet

	// ProposedAt is when this epoch change was proposed.
	ProposedAt time.Time

	// CommittedAt is when this epoch change was committed (zero if not yet committed).
	CommittedAt time.Time
}

EpochChange represents a pending or completed epoch change.

type EquivocationDetectedEvent

type EquivocationDetectedEvent[H Hash] struct {
	Author       uint16
	Round        uint64
	FirstDigest  H
	SecondDigest H
	DetectedAt   time.Time
}

EquivocationDetectedEvent contains information about detected equivocation.

type EquivocationEvidence

type EquivocationEvidence[H Hash] struct {
	Author       uint16
	Round        uint64
	Certificate1 *Certificate[H]
	Certificate2 *Certificate[H]
}

EquivocationEvidence contains proof of a validator creating conflicting certificates.

type FetchCompletedEvent

type FetchCompletedEvent[H Hash] struct {
	Type        FetchType
	Digest      H
	Success     bool
	Error       error
	Attempts    int
	FromPeer    uint16 // Peer that provided the data (if successful)
	Latency     time.Duration
	CompletedAt time.Time
}

FetchCompletedEvent contains information about a completed fetch operation.

type FetchStartedEvent

type FetchStartedEvent[H Hash] struct {
	Type          FetchType
	Digest        H
	PreferredPeer uint16
	StartedAt     time.Time
}

FetchStartedEvent contains information about a fetch operation starting.

type FetchType

type FetchType uint8

FetchType indicates what is being fetched.

const (
	FetchTypeBatch FetchType = iota
	FetchTypeCertificate
)

func (FetchType) String

func (t FetchType) String() string

type Fetcher

type Fetcher[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

Fetcher handles fetching missing batches and certificates from peers. Standalone component with retry logic and request deduplication.

func NewFetcher

func NewFetcher[H Hash, T Transaction[H]](
	cfg FetcherConfig,
	network Network[H, T],
	storage Storage[H, T],
	validators ValidatorSet,
	hashFunc func([]byte) H,
	logger *zap.Logger,
) *Fetcher[H, T]

NewFetcher creates a new Fetcher instance.

func NewFetcherWithHooks

func NewFetcherWithHooks[H Hash, T Transaction[H]](
	cfg FetcherConfig,
	network Network[H, T],
	storage Storage[H, T],
	validators ValidatorSet,
	hashFunc func([]byte) H,
	hooks *Hooks[H, T],
	logger *zap.Logger,
) *Fetcher[H, T]

NewFetcherWithHooks creates a new Fetcher instance with observability hooks.

func (*Fetcher[H, T]) FetchBatch

func (f *Fetcher[H, T]) FetchBatch(ctx context.Context, digest H, preferredPeer uint16) (*Batch[H, T], error)

FetchBatch fetches a batch by digest, trying multiple peers if needed. It deduplicates concurrent requests for the same digest.

func (*Fetcher[H, T]) FetchBatchesParallel

func (f *Fetcher[H, T]) FetchBatchesParallel(ctx context.Context, digests []H, preferredPeer uint16) ([]*Batch[H, T], error)

FetchBatchesParallel fetches multiple batches in parallel.

func (*Fetcher[H, T]) FetchCertificate

func (f *Fetcher[H, T]) FetchCertificate(ctx context.Context, digest H, preferredPeer uint16) (*Certificate[H], error)

FetchCertificate fetches a certificate by digest, trying multiple peers if needed.

func (*Fetcher[H, T]) PendingFetchCount

func (f *Fetcher[H, T]) PendingFetchCount() int

PendingFetchCount returns the number of pending fetch operations.

type FetcherConfig

type FetcherConfig struct {
	// MaxRetries is the maximum number of retry attempts per fetch.
	MaxRetries int

	// RetryDelay is the base delay between retries (with exponential backoff).
	RetryDelay time.Duration

	// FetchTimeout is the timeout for a single fetch attempt.
	FetchTimeout time.Duration

	// MaxConcurrentFetches limits parallel fetch operations.
	MaxConcurrentFetches int
}

FetcherConfig configures the Fetcher.

func DefaultFetcherConfig

func DefaultFetcherConfig() FetcherConfig

DefaultFetcherConfig returns sensible defaults for the Fetcher.

type GCConfig

type GCConfig struct {
	// Interval is how often GC runs (in rounds, not time).
	// GC is triggered when currentRound - lastGCRound >= Interval.
	Interval uint64

	// RetainRounds is the number of rounds to keep after the committed round.
	// Older data is eligible for garbage collection.
	RetainRounds uint64

	// CheckInterval is how often to check if GC should run (in time).
	CheckInterval time.Duration
}

GCConfig configures the GarbageCollector.

func DefaultGCConfig

func DefaultGCConfig() GCConfig

DefaultGCConfig returns sensible defaults for the GarbageCollector.

type GCStats

type GCStats struct {
	LastGCRound    uint64
	CommittedRound uint64
}

GCStats contains garbage collection statistics.

type GarbageCollectedEvent

type GarbageCollectedEvent struct {
	BeforeRound    uint64
	CommittedRound uint64
	CollectedAt    time.Time
}

GarbageCollectedEvent contains information about a GC cycle.

type GarbageCollector

type GarbageCollector[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

GarbageCollector periodically cleans up old DAG data. Standalone component for automatic round-based cleanup independent of the main Narwhal struct.

func NewGarbageCollector

func NewGarbageCollector[H Hash, T Transaction[H]](
	cfg GCConfig,
	dag *DAG[H, T],
	storage Storage[H, T],
	logger *zap.Logger,
) *GarbageCollector[H, T]

NewGarbageCollector creates a new GarbageCollector.

func NewGarbageCollectorWithHooks

func NewGarbageCollectorWithHooks[H Hash, T Transaction[H]](
	cfg GCConfig,
	dag *DAG[H, T],
	storage Storage[H, T],
	hooks *Hooks[H, T],
	logger *zap.Logger,
) *GarbageCollector[H, T]

NewGarbageCollectorWithHooks creates a new GarbageCollector with observability hooks.

func (*GarbageCollector[H, T]) ForceGC

func (gc *GarbageCollector[H, T]) ForceGC()

ForceGC forces an immediate garbage collection cycle.

func (*GarbageCollector[H, T]) OnGC

func (gc *GarbageCollector[H, T]) OnGC(callback func(beforeRound uint64))

OnGC sets a callback that is invoked when GC runs.

func (*GarbageCollector[H, T]) Run

func (gc *GarbageCollector[H, T]) Run(ctx context.Context)

Run starts the garbage collection loop. It runs until the context is cancelled.

func (*GarbageCollector[H, T]) SetCommittedRound

func (gc *GarbageCollector[H, T]) SetCommittedRound(round uint64)

SetCommittedRound updates the committed round. GC will only clean up data older than (committedRound - retainRounds).

func (*GarbageCollector[H, T]) Stats

func (gc *GarbageCollector[H, T]) Stats() GCStats

Stats returns current GC statistics.

type GnarkBLSAggregator

type GnarkBLSAggregator struct{}

GnarkBLSAggregator implements BLSAggregator using gnark-crypto.

func NewGnarkBLSAggregator

func NewGnarkBLSAggregator() *GnarkBLSAggregator

NewGnarkBLSAggregator creates a new BLS aggregator.

func (*GnarkBLSAggregator) Aggregate

func (a *GnarkBLSAggregator) Aggregate(signatures [][]byte) ([]byte, error)

Aggregate combines multiple BLS signatures into one.

func (*GnarkBLSAggregator) VerifyAggregate

func (a *GnarkBLSAggregator) VerifyAggregate(pubKeys [][]byte, message []byte, aggSig []byte) bool

VerifyAggregate verifies an aggregated signature (all signers signed the same message).

type GnarkBLSBatchVerifier

type GnarkBLSBatchVerifier struct {
	// contains filtered or unexported fields
}

GnarkBLSBatchVerifier implements BLSBatchVerifier using gnark-crypto.

func NewGnarkBLSBatchVerifier

func NewGnarkBLSBatchVerifier() *GnarkBLSBatchVerifier

NewGnarkBLSBatchVerifier creates a new BLS batch verifier.

func (*GnarkBLSBatchVerifier) AddSignature

func (v *GnarkBLSBatchVerifier) AddSignature(pubKey []byte, message []byte, signature []byte)

AddSignature adds a signature to the batch.

func (*GnarkBLSBatchVerifier) Reset

func (v *GnarkBLSBatchVerifier) Reset()

Reset clears the batch.

func (*GnarkBLSBatchVerifier) VerifyBatch

func (v *GnarkBLSBatchVerifier) VerifyBatch() bool

VerifyBatch verifies all signatures in the batch using random linear combination.

type GnarkBLSCryptoProvider

type GnarkBLSCryptoProvider struct {
	// contains filtered or unexported fields
}

GnarkBLSCryptoProvider implements CryptoProvider for BLS using gnark-crypto.

func NewGnarkBLSCryptoProvider

func NewGnarkBLSCryptoProvider() *GnarkBLSCryptoProvider

NewGnarkBLSCryptoProvider creates a BLS crypto provider.

func (*GnarkBLSCryptoProvider) Aggregator

func (p *GnarkBLSCryptoProvider) Aggregator() BLSAggregator

func (*GnarkBLSCryptoProvider) NewBatchVerifier

func (p *GnarkBLSCryptoProvider) NewBatchVerifier(validators ValidatorSet) BatchVerifier

func (*GnarkBLSCryptoProvider) Scheme

func (*GnarkBLSCryptoProvider) SupportsAggregation

func (p *GnarkBLSCryptoProvider) SupportsAggregation() bool

type Hash

type Hash interface {
	Bytes() []byte
	Equals(other Hash) bool
	String() string
}

Hash represents a cryptographic hash for content addressing.

type Header[H Hash] struct {
	Author    uint16
	Round     uint64
	Epoch     uint64
	BatchRefs []H
	Parents   []H // digests of parent certificates from round-1
	Timestamp uint64
	Digest    H
}

Header is a proposal from a primary to form a DAG vertex.

func HeaderFromBytes

func HeaderFromBytes[H Hash](
	data []byte,
	hashFromBytes func([]byte) (H, error),
) (*Header[H], error)

HeaderFromBytes deserializes a header from bytes.

func (*Header[H]) Bytes

func (h *Header[H]) Bytes() []byte

Bytes serializes the header to bytes. Format: [Author:2][Round:8][Epoch:8][Timestamp:8][DigestLen:2][Digest][BatchRefCount:4][HashLen:2][BatchRef0]...[ParentCount:4][Parent0]...

func (*Header[H]) ComputeDigest

func (h *Header[H]) ComputeDigest(hashFunc func([]byte) H)

ComputeDigest computes and sets the digest for this header. The digest is computed over all header fields except the digest itself. This must be called before broadcasting the header.

func (*Header[H]) VerifyDigest

func (h *Header[H]) VerifyDigest(hashFunc func([]byte) H) bool

VerifyDigest verifies that the header digest matches its contents.

type HeaderCreatedEvent

type HeaderCreatedEvent[H Hash] struct {
	Header     *Header[H]
	BatchCount int
	CreatedAt  time.Time
}

HeaderCreatedEvent contains information about a newly created header.

type HeaderMessage

type HeaderMessage[H Hash, T Transaction[H]] struct {
	HeaderData *Header[H]
	// contains filtered or unexported fields
}

HeaderMessage wraps a header for network transmission.

func (*HeaderMessage[H, T]) Header

func (m *HeaderMessage[H, T]) Header() *Header[H]

func (*HeaderMessage[H, T]) Sender

func (m *HeaderMessage[H, T]) Sender() uint16

func (*HeaderMessage[H, T]) Type

func (m *HeaderMessage[H, T]) Type() MessageType

type HeaderMessageAccessor

type HeaderMessageAccessor[H Hash] interface{ Header() *Header[H] }

type HeaderReceivedEvent

type HeaderReceivedEvent[H Hash] struct {
	Header     *Header[H]
	From       uint16
	ReceivedAt time.Time
}

HeaderReceivedEvent contains information about a received header.

type HeaderTimeoutEvent

type HeaderTimeoutEvent[H Hash] struct {
	HeaderDigest  H
	Round         uint64
	RetryCount    int
	WillRetry     bool
	VotesReceived int
	QuorumNeeded  int
	TimeoutAt     time.Time
}

HeaderTimeoutEvent contains information about a header that timed out.

type HeaderVote

type HeaderVote[H Hash] struct {
	HeaderDigest   H
	ValidatorIndex uint16
	Signature      []byte
}

HeaderVote is a vote from a validator for a header.

func NewVote

func NewVote[H Hash](headerDigest H, validatorIndex uint16, signer Signer) (*HeaderVote[H], error)

NewVote creates a new vote for a header.

func VoteFromBytes

func VoteFromBytes[H Hash](
	data []byte,
	hashFromBytes func([]byte) (H, error),
) (*HeaderVote[H], error)

VoteFromBytes deserializes a vote from bytes.

func (*HeaderVote[H]) Bytes

func (v *HeaderVote[H]) Bytes() []byte

Bytes serializes the vote to bytes. Format: [HeaderDigestLen:2][HeaderDigest][ValidatorIndex:2][SignatureLen:2][Signature]

func (*HeaderVote[H]) Verify

func (v *HeaderVote[H]) Verify(pubKey PublicKey) bool

Verify verifies the vote signature.

type HeaderWaiter

type HeaderWaiter[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

HeaderWaiter queues headers that can't be processed immediately due to missing parent certificates or batches. It periodically retries processing them as dependencies become available.

func NewHeaderWaiter

func NewHeaderWaiter[H Hash, T Transaction[H]](
	cfg HeaderWaiterConfig,
	processFunc func(header *Header[H], from uint16) error,
	checkParentFunc func(digest H) bool,
	hooks *Hooks[H, T],
	logger *zap.Logger,
) *HeaderWaiter[H, T]

NewHeaderWaiter creates a new HeaderWaiter.

func (*HeaderWaiter[H, T]) Add

func (hw *HeaderWaiter[H, T]) Add(header *Header[H], from uint16, missingParents []H) bool

Add queues a header with missing parents for later processing. Returns true if the header was queued, false if it was dropped (queue full or duplicate). If FetchParents is enabled and fetchParentFunc is set, missing parents will be proactively fetched from the sender.

func (*HeaderWaiter[H, T]) OnParentAvailable

func (hw *HeaderWaiter[H, T]) OnParentAvailable(parentDigest H)

OnParentAvailable should be called when a new certificate is inserted into the DAG. It triggers retry of any headers that were waiting for this parent.

func (*HeaderWaiter[H, T]) PendingCount

func (hw *HeaderWaiter[H, T]) PendingCount() int

PendingCount returns the number of headers currently waiting.

func (*HeaderWaiter[H, T]) Run

func (hw *HeaderWaiter[H, T]) Run(ctx context.Context)

Run starts the periodic retry loop.

func (*HeaderWaiter[H, T]) SetFetchParentFunc

func (hw *HeaderWaiter[H, T]) SetFetchParentFunc(fn func(digest H, from uint16) error)

SetFetchParentFunc sets the function to fetch missing parent certificates. When set and FetchParents is enabled, the HeaderWaiter will proactively fetch missing parents from the peer that sent the header.

func (*HeaderWaiter[H, T]) Stats

func (hw *HeaderWaiter[H, T]) Stats() HeaderWaiterStats

Stats returns current statistics.

type HeaderWaiterConfig

type HeaderWaiterConfig struct {
	// MaxPendingHeaders is the maximum number of headers to queue.
	// Headers beyond this limit are dropped (oldest first).
	// Default: 1000
	MaxPendingHeaders int

	// RetryInterval is how often to retry processing pending headers.
	// Default: 1s
	RetryInterval time.Duration

	// MaxRetries is the maximum number of retry attempts before dropping a header.
	// Default: 10
	MaxRetries int

	// MaxAge is the maximum time a header can wait before being dropped.
	// Default: 30s
	MaxAge time.Duration

	// FetchParents enables proactive fetching of missing parent certificates.
	// Default: false
	FetchParents bool
}

HeaderWaiterConfig configures the HeaderWaiter.

func DefaultHeaderWaiterConfig

func DefaultHeaderWaiterConfig() HeaderWaiterConfig

DefaultHeaderWaiterConfig returns sensible defaults.

type HeaderWaiterStats

type HeaderWaiterStats struct {
	PendingCount   int
	TotalReceived  uint64
	TotalProcessed uint64
	TotalDropped   uint64
	TotalExpired   uint64
	TotalFetched   uint64
}

HeaderWaiterStats contains statistics for monitoring.

type Hooks

type Hooks[H Hash, T Transaction[H]] struct {

	// OnBatchCreated is called when a worker creates a new batch.
	OnBatchCreated func(BatchCreatedEvent[H, T])

	// OnTransactionReceived is called when a transaction is added to a worker.
	OnTransactionReceived func(TransactionReceivedEvent[H])

	// OnHeaderCreated is called when a primary creates a new header.
	OnHeaderCreated func(HeaderCreatedEvent[H])

	// OnHeaderReceived is called when a primary receives a header from another validator.
	OnHeaderReceived func(HeaderReceivedEvent[H])

	// OnVoteReceived is called when a primary receives a vote for its header.
	OnVoteReceived func(VoteReceivedEvent[H])

	// OnVoteSent is called when a primary sends a vote for another validator's header.
	OnVoteSent func(VoteSentEvent[H])

	// OnCertificateFormed is called when a primary forms a certificate (collected quorum).
	OnCertificateFormed func(CertificateFormedEvent[H])

	// OnCertificateReceived is called when a certificate is received from another validator.
	OnCertificateReceived func(CertificateReceivedEvent[H])

	// OnHeaderTimeout is called when a header times out waiting for votes.
	OnHeaderTimeout func(HeaderTimeoutEvent[H])

	// OnVertexInserted is called when a certificate is inserted into the DAG.
	OnVertexInserted func(VertexInsertedEvent[H])

	// OnRoundAdvanced is called when the DAG advances to a new round.
	OnRoundAdvanced func(RoundAdvancedEvent)

	// OnEquivocationDetected is called when equivocation is detected.
	// Note: This is in addition to the DAG's OnEquivocation callback,
	// which provides the full evidence for slashing.
	OnEquivocationDetected func(EquivocationDetectedEvent[H])

	// OnCertificatePending is called when a certificate is queued as pending
	// due to missing parents.
	OnCertificatePending func(CertificatePendingEvent[H])

	// OnFetchStarted is called when a fetch operation begins.
	OnFetchStarted func(FetchStartedEvent[H])

	// OnFetchCompleted is called when a fetch operation completes (success or failure).
	OnFetchCompleted func(FetchCompletedEvent[H])

	// OnGarbageCollected is called when garbage collection runs.
	OnGarbageCollected func(GarbageCollectedEvent)
}

Hooks provides optional callbacks for observability events. All hooks are invoked synchronously - keep implementations fast.

func NewRecoveryMiddleware

func NewRecoveryMiddleware[H Hash, T Transaction[H]](hooks *Hooks[H, T], logger *zap.Logger) *Hooks[H, T]

NewRecoveryMiddleware wraps hooks with panic recovery.

func (*Hooks[H, T]) Clone

func (h *Hooks[H, T]) Clone() *Hooks[H, T]

Clone returns a shallow copy of the hooks.

type LRUCache

type LRUCache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

LRUCache is a generic thread-safe LRU cache. It evicts the least recently used items when capacity is exceeded.

func NewLRUCache

func NewLRUCache[K comparable, V any](capacity int) *LRUCache[K, V]

NewLRUCache creates a new LRU cache with the given capacity. Capacity must be at least 1.

func (*LRUCache[K, V]) Clear

func (c *LRUCache[K, V]) Clear()

Clear removes all items from the cache.

func (*LRUCache[K, V]) Contains

func (c *LRUCache[K, V]) Contains(key K) bool

Contains checks if a key exists in the cache without updating LRU order.

func (*LRUCache[K, V]) Get

func (c *LRUCache[K, V]) Get(key K) (V, bool)

Get retrieves a value from the cache. Returns the value and true if found, zero value and false otherwise. Accessing an item moves it to the front (most recently used).

func (*LRUCache[K, V]) Keys

func (c *LRUCache[K, V]) Keys() []K

Keys returns all keys in the cache, from most to least recently used.

func (*LRUCache[K, V]) Len

func (c *LRUCache[K, V]) Len() int

Len returns the current number of items in the cache.

func (*LRUCache[K, V]) Peek

func (c *LRUCache[K, V]) Peek(key K) (V, bool)

Peek retrieves a value without updating its position in the LRU order.

func (*LRUCache[K, V]) Put

func (c *LRUCache[K, V]) Put(key K, value V) (evictedKey K, evictedValue V, evicted bool)

Put adds or updates a value in the cache. If the cache is at capacity, the least recently used item is evicted. Returns the evicted key and value if eviction occurred.

func (*LRUCache[K, V]) Remove

func (c *LRUCache[K, V]) Remove(key K) bool

Remove removes an item from the cache. Returns true if the item was found and removed.

func (*LRUCache[K, V]) Resize

func (c *LRUCache[K, V]) Resize(newCapacity int) int

Resize changes the cache capacity. If the new capacity is smaller, excess items are evicted (LRU order).

func (*LRUCache[K, V]) Stats

func (c *LRUCache[K, V]) Stats() LRUCacheStats

Stats returns cache statistics.

type LRUCacheStats

type LRUCacheStats struct {
	Size     int
	Capacity int
	Hits     uint64
	Misses   uint64
	Evicts   uint64
	HitRate  float64
}

LRUCacheStats contains cache statistics.

type LeaderSchedule

type LeaderSchedule interface {
	// Leader returns the validator index of the leader for the given round.
	// This must be deterministic - all validators must agree on the leader.
	Leader(round uint64) uint16

	// IsLeader returns true if the given validator is the leader for the round.
	IsLeader(round uint64, validatorIndex uint16) bool
}

LeaderSchedule determines which validator is leader for a given round.

type LeaderTracker

type LeaderTracker[H Hash] struct {
	// contains filtered or unexported fields
}

LeaderTracker tracks leader blocks and votes for partially synchronous mode. In this mode, we want to: 1. Wait for the leader's block before advancing to odd rounds 2. Wait for f+1 votes for the leader or 2f+1 non-votes before advancing to even rounds

This follows the Sui Narwhal proposer logic for partially synchronous networks.

func NewLeaderTracker

func NewLeaderTracker[H Hash](schedule LeaderSchedule) *LeaderTracker[H]

NewLeaderTracker creates a new LeaderTracker.

func (*LeaderTracker[H]) CheckLeaderVotes

func (lt *LeaderTracker[H]) CheckLeaderVotes(parents []*Certificate[H], leaderDigest *H, f int) LeaderVoteStatus

CheckLeaderVotes analyzes parent certificates to determine leader vote status. This is used on odd rounds to decide if we have enough support to advance.

Parameters: - parents: certificates from the previous round that we're using as parents - leaderDigest: the digest of the leader's certificate from the previous round (if known) - f: the maximum Byzantine faults tolerated

Returns the vote status analysis.

func (*LeaderTracker[H]) Clear

func (lt *LeaderTracker[H]) Clear()

Clear removes all leader records.

func (*LeaderTracker[H]) GarbageCollect

func (lt *LeaderTracker[H]) GarbageCollect(beforeRound uint64)

GarbageCollect removes leader records for old rounds.

func (*LeaderTracker[H]) GetLeaderForRound

func (lt *LeaderTracker[H]) GetLeaderForRound(round uint64) *Certificate[H]

GetLeaderForRound returns the leader certificate for a round, if we have it.

func (*LeaderTracker[H]) HasLeaderForRound

func (lt *LeaderTracker[H]) HasLeaderForRound(round uint64) bool

HasLeaderForRound returns true if we have the leader's certificate for the round.

func (*LeaderTracker[H]) RecordCertificate

func (lt *LeaderTracker[H]) RecordCertificate(cert *Certificate[H]) bool

RecordCertificate records a certificate and returns true if it's from the leader.

func (*LeaderTracker[H]) SetRound

func (lt *LeaderTracker[H]) SetRound(round uint64)

SetRound updates the current round.

func (*LeaderTracker[H]) ShouldWaitForLeader

func (lt *LeaderTracker[H]) ShouldWaitForLeader(round uint64) bool

ShouldWaitForLeader returns true if we should wait for the leader block. This is used in partially synchronous mode on even rounds (round % 2 == 0).

type LeaderVoteStatus

type LeaderVoteStatus struct {
	// VotesForLeader is the stake voting for the leader block
	VotesForLeader int

	// VotesNotForLeader is the stake not voting for the leader block
	// (voting for a different parent set that doesn't include the leader)
	VotesNotForLeader int

	// HasEnoughForLeaderVotes returns true if we have f+1 votes for the leader
	HasEnoughForLeaderVotes bool

	// HasEnoughNotForLeaderVotes returns true if we have 2f+1 votes not for leader
	HasEnoughNotForLeaderVotes bool
}

LeaderVoteStatus tracks votes for/against the leader block.

type Message

type Message[H Hash, T Transaction[H]] interface {
	Type() MessageType
	Sender() uint16
}

Message represents a protocol message.

type MessageType

type MessageType uint8
const (
	MessageBatch MessageType = iota
	MessageHeader
	MessageVote
	MessageCertificate
	MessageBatchRequest
	MessageCertificateRequest
	MessageCertificateRangeRequest
	MessageCertificateRangeResponse
)

func (MessageType) String

func (t MessageType) String() string

String returns a human-readable name for the message type.

type MeteredChannel

type MeteredChannel[T any] struct {
	// contains filtered or unexported fields
}

MeteredChannel wraps a channel with metrics for observability. Tracks sent, received, dropped counts and queue depth. Thread-safe.

func NewMeteredChannel

func NewMeteredChannel[T any](name string, capacity int) *MeteredChannel[T]

NewMeteredChannel creates a new metered channel with the given name and capacity.

func (*MeteredChannel[T]) Cap

func (m *MeteredChannel[T]) Cap() int

Cap returns the capacity of the channel.

func (*MeteredChannel[T]) Close

func (m *MeteredChannel[T]) Close()

Close closes the underlying channel.

func (*MeteredChannel[T]) Len

func (m *MeteredChannel[T]) Len() int

Len returns the current number of items in the channel.

func (*MeteredChannel[T]) MarkReceived

func (m *MeteredChannel[T]) MarkReceived()

MarkReceived increments the received counter. Call this after receiving from ReceiveChan() to maintain accurate metrics.

func (*MeteredChannel[T]) Receive

func (m *MeteredChannel[T]) Receive() T

Receive receives a value from the channel. Blocks if the channel is empty.

func (*MeteredChannel[T]) ReceiveChan

func (m *MeteredChannel[T]) ReceiveChan() <-chan T

ReceiveChan returns the underlying channel for use in select statements. Note: messages received directly from this channel won't update the received counter. Use MarkReceived() after receiving to update the counter.

func (*MeteredChannel[T]) Send

func (m *MeteredChannel[T]) Send(value T)

Send sends a value on the channel. Blocks if the channel is full.

func (*MeteredChannel[T]) Stats

func (m *MeteredChannel[T]) Stats() MeteredChannelStats

Stats returns current statistics.

func (*MeteredChannel[T]) TryReceive

func (m *MeteredChannel[T]) TryReceive() (T, bool)

TryReceive attempts to receive a value from the channel without blocking. Returns the value and true if successful, or zero value and false if empty.

func (*MeteredChannel[T]) TrySend

func (m *MeteredChannel[T]) TrySend(value T) bool

TrySend attempts to send a value on the channel without blocking. Returns true if the send succeeded, false if the channel was full.

type MeteredChannelStats

type MeteredChannelStats struct {
	// Name is the channel name.
	Name string

	// Sent is the total number of messages sent.
	Sent uint64

	// Received is the total number of messages received.
	Received uint64

	// Dropped is the total number of messages dropped due to full channel.
	Dropped uint64

	// Pending is the current number of messages in the channel.
	Pending int

	// Capacity is the channel capacity.
	Capacity int

	// Throughput is the approximate messages per second (sent).
	Throughput float64

	// Uptime is how long the channel has been running.
	Uptime time.Duration
}

MeteredChannelStats contains statistics for a metered channel.

type MutableValidatorSet

type MutableValidatorSet interface {
	ValidatorSet

	// AddValidator adds a new validator and returns their assigned index.
	AddValidator(pubKey PublicKey) (uint16, error)

	// RemoveValidator removes a validator by index.
	RemoveValidator(index uint16) error

	// UpdateValidator updates a validator's public key.
	UpdateValidator(index uint16, newPubKey PublicKey) error

	// Clone creates a deep copy of this validator set.
	Clone() MutableValidatorSet
}

MutableValidatorSet extends ValidatorSet with mutation operations. This interface is used during reconfiguration to build a new validator set.

type Narwhal

type Narwhal[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

Narwhal implements the Narwhal DAG-based mempool protocol.

func New

func New[H Hash, T Transaction[H]](cfg *Config[H, T], hashFunc func([]byte) H) (*Narwhal[H, T], error)

New creates a new Narwhal instance. The hashFunc parameter is required to compute digests for batches, headers, etc.

func (*Narwhal[H, T]) AddTransaction

func (n *Narwhal[H, T]) AddTransaction(tx T)

AddTransaction adds a transaction to be disseminated. Transactions are routed to workers based on their hash for load balancing.

func (*Narwhal[H, T]) CurrentRound

func (n *Narwhal[H, T]) CurrentRound() uint64

CurrentRound returns the current DAG round.

func (*Narwhal[H, T]) GetCertifiedVertices

func (n *Narwhal[H, T]) GetCertifiedVertices() []*Certificate[H]

GetCertifiedVertices returns certificates ready for consensus ordering. These are certificates that have not yet been included in a committed block.

func (*Narwhal[H, T]) GetDAG

func (n *Narwhal[H, T]) GetDAG() *DAG[H, T]

DAG returns the underlying DAG for direct access.

func (*Narwhal[H, T]) GetTransactions

func (n *Narwhal[H, T]) GetTransactions(certs []*Certificate[H]) ([]T, error)

GetTransactions extracts all transactions from the given certificates. Used by execution layer to get ordered transactions from consensus block. Transactions are deduplicated by hash.

func (*Narwhal[H, T]) MarkCommitted

func (n *Narwhal[H, T]) MarkCommitted(certs []*Certificate[H])

MarkCommitted marks certificates as committed by consensus. This allows garbage collection of the underlying data.

func (*Narwhal[H, T]) RequestTrackerStats

func (n *Narwhal[H, T]) RequestTrackerStats() RequestTrackerStats

RequestTrackerStats returns statistics about pending network operations.

func (*Narwhal[H, T]) Start

func (n *Narwhal[H, T]) Start() error

Start starts the Narwhal protocol.

func (*Narwhal[H, T]) Stop

func (n *Narwhal[H, T]) Stop()

Stop stops the Narwhal protocol gracefully.

type Network

type Network[H Hash, T Transaction[H]] interface {
	BroadcastBatch(batch *Batch[H, T])
	BroadcastHeader(header *Header[H])
	SendVote(validatorIndex uint16, vote *HeaderVote[H])
	BroadcastCertificate(cert *Certificate[H])

	FetchBatch(from uint16, digest H) (*Batch[H, T], error)
	FetchCertificate(from uint16, digest H) (*Certificate[H], error)
	FetchCertificatesInRange(from uint16, startRound, endRound uint64) ([]*Certificate[H], error)

	SendBatch(to uint16, batch *Batch[H, T])
	SendCertificate(to uint16, cert *Certificate[H])

	Receive() <-chan Message[H, T]
	Close() error
}

Network provides message delivery between validators.

type NetworkModel

type NetworkModel uint8

NetworkModel specifies the network timing assumptions for the consensus protocol. This affects how the proposer decides when to advance rounds.

const (
	// NetworkModelAsynchronous assumes no timing bounds on message delivery.
	// The proposer advances to the next round as soon as it has 2f+1 parent certificates.
	// This provides optimal throughput but may have higher latency in practice.
	NetworkModelAsynchronous NetworkModel = iota

	// NetworkModelPartiallySynchronous assumes eventual synchrony.
	// The proposer may wait for leader blocks and leader votes before advancing.
	// This can provide better commit latency by ensuring leaders are included.
	NetworkModelPartiallySynchronous
)

func (NetworkModel) String

func (m NetworkModel) String() string

String returns a human-readable name for the network model.

type NetworkStatus

type NetworkStatus uint8
const (
	NetworkStatusUnknown NetworkStatus = iota
	NetworkStatusConnected
	NetworkStatusDisconnected
	NetworkStatusConnecting
)

type PanicHandler

type PanicHandler func(panicVal interface{}, stack []byte)

PanicHandler is called when a panic is recovered. It receives the panic value and stack trace.

type PendingCertificate

type PendingCertificate[H Hash, T Transaction[H]] struct {
	Certificate    *Certificate[H]
	Batches        []*Batch[H, T]
	MissingParents []H
}

PendingCertificate represents a certificate waiting for missing parents.

type PendingHeader

type PendingHeader[H Hash] struct {
	// Header is the header waiting to be processed.
	Header *Header[H]

	// From is the validator that sent this header.
	From uint16

	// MissingParents are parent certificate digests we don't have.
	MissingParents []H

	// ReceivedAt is when this header was first received.
	ReceivedAt time.Time

	// RetryCount tracks how many times we've tried to process this header.
	RetryCount int
}

PendingHeader represents a header waiting for missing dependencies.

type PerPeerRateLimiter

type PerPeerRateLimiter struct {
	// contains filtered or unexported fields
}

PerPeerRateLimiter maintains separate rate limits for each peer.

func NewPerPeerRateLimiter

func NewPerPeerRateLimiter(rate float64, burst int) *PerPeerRateLimiter

NewPerPeerRateLimiter creates a rate limiter that tracks per-peer limits.

func (*PerPeerRateLimiter) Allow

func (p *PerPeerRateLimiter) Allow(peer uint16) bool

Allow checks if the peer is within rate limits.

func (*PerPeerRateLimiter) AllowN

func (p *PerPeerRateLimiter) AllowN(peer uint16, n int) bool

AllowN checks if n tokens are available for the peer.

func (*PerPeerRateLimiter) Reset

func (p *PerPeerRateLimiter) Reset()

Reset removes all peer state.

type Primary

type Primary[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

Primary creates headers and collects votes to form certificates.

func NewPrimary

func NewPrimary[H Hash, T Transaction[H]](cfg PrimaryConfig[H, T]) *Primary[H, T]

NewPrimary creates a new primary.

func (*Primary[H, T]) OnBatch

func (p *Primary[H, T]) OnBatch(batch *Batch[H, T]) bool

OnBatch is called when a local worker creates a batch. If MaxPendingBatches is configured and the queue is full:

  • If DropOnFull is true, the batch is dropped silently
  • If DropOnFull is false, this call blocks until space is available

Returns true if the batch was accepted, false if dropped.

func (*Primary[H, T]) OnHeaderReceived

func (p *Primary[H, T]) OnHeaderReceived(header *Header[H], from uint16) error

OnHeaderReceived handles a header from another primary. Before voting, validates: 1. Header digest is correct 2. Author is a valid validator 3. Round is not too far ahead 4. All parent certificates exist in the DAG 5. All batch digests are available (in local storage or fetched from author) 6. We haven't already voted for a different header from this author at this round

func (*Primary[H, T]) OnVoteReceived

func (p *Primary[H, T]) OnVoteReceived(vote *HeaderVote[H]) error

OnVoteReceived handles a vote for our header.

func (*Primary[H, T]) Run

func (p *Primary[H, T]) Run(ctx context.Context)

Run starts the primary's header creation loop.

func (*Primary[H, T]) Stats

func (p *Primary[H, T]) Stats() PrimaryStats

Stats returns current primary statistics.

type PrimaryConfig

type PrimaryConfig[H Hash, T Transaction[H]] struct {
	ValidatorID   uint16
	DAG           *DAG[H, T]
	Validators    ValidatorSet
	Signer        Signer
	Network       Network[H, T]
	Storage       Storage[H, T]
	HeaderTimeout time.Duration
	HashFunc      func([]byte) H
	Hooks         *Hooks[H, T]
	Logger        *zap.Logger

	// Retry configuration
	MaxHeaderRetries    int           // Max retries before abandoning a header (default: 3)
	HeaderRetryInterval time.Duration // Time before retrying header broadcast (default: HeaderTimeout)

	// Header creation thresholds (aligned with reference implementation)
	MaxHeaderBatches int           // Max batch refs per header (default: 100)
	MaxHeaderDelay   time.Duration // Max delay before creating header (default: HeaderTimeout)

	// Backpressure settings
	MaxPendingBatches int  // Max pending batches (0 = unbounded)
	DropOnFull        bool // If true, drop when full; if false, block

	// Optimized crypto verification (optional)
	CryptoProvider CryptoProvider
	SignatureCache *SignatureCache
}

PrimaryConfig configures a primary.

type PrimaryStats

type PrimaryStats struct {
	PendingBatches int    // Number of batch refs waiting for header creation
	QueuedBatches  int    // Batches in channel (if bounded)
	DroppedBatches uint64 // Batches dropped due to full queue
	MaxPending     int    // Maximum pending batches allowed
	IsBounded      bool   // Whether bounded mode is enabled
	HasPendingHdr  bool   // Whether there's a header waiting for votes
	CurrentVotes   int    // Votes received for current header
	HeaderRetries  int    // Retry count for current header
}

PrimaryStats contains primary statistics for monitoring.

type ProposedHeader

type ProposedHeader[H Hash, T Transaction[H]] struct {
	Header    *Header[H]
	Batches   []*Batch[H, T]
	CreatedAt time.Time
}

ProposedHeader represents a header proposed by the Proposer.

type Proposer

type Proposer[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

Proposer creates headers from batches. Standalone component for advanced use cases where header creation needs to be separated from vote collection.

func NewProposer

func NewProposer[H Hash, T Transaction[H]](cfg ProposerConfig[H, T]) *Proposer[H, T]

NewProposer creates a new Proposer.

func (*Proposer[H, T]) AddBatch

func (p *Proposer[H, T]) AddBatch(batch *Batch[H, T]) bool

AddBatch adds a batch to be included in a future header. Returns true if accepted, false if dropped due to full queue.

func (*Proposer[H, T]) ForcePropose

func (p *Proposer[H, T]) ForcePropose() bool

ForcePropose forces creation of a header if there are any pending batches. Used for testing and flushing on shutdown.

func (*Proposer[H, T]) Headers

func (p *Proposer[H, T]) Headers() <-chan *ProposedHeader[H, T]

Headers returns the channel for receiving proposed headers.

func (*Proposer[H, T]) PendingCount

func (p *Proposer[H, T]) PendingCount() int

PendingCount returns the number of pending batches.

func (*Proposer[H, T]) Run

func (p *Proposer[H, T]) Run(ctx context.Context)

Run starts the proposer's main loop.

func (*Proposer[H, T]) Stats

func (p *Proposer[H, T]) Stats() ProposerStats

Stats returns current statistics.

type ProposerConfig

type ProposerConfig[H Hash, T Transaction[H]] struct {
	ValidatorID uint16
	DAG         *DAG[H, T]
	Validators  ValidatorSet
	HashFunc    func([]byte) H
	Hooks       *Hooks[H, T]
	Logger      *zap.Logger

	// Header creation thresholds
	MaxHeaderBatches int           // Max batch refs per header before proposing
	MaxHeaderDelay   time.Duration // Max delay before proposing regardless of batch count
	MinHeaderBatches int           // Min batches before proposing (default: 1)

	// Backpressure
	MaxPendingBatches int  // Max pending batches (0 = unbounded)
	DropOnFull        bool // If true, drop when full; if false, block

	// Output buffer size
	HeaderOutputBuffer int // Size of header output channel (default: 10)
}

ProposerConfig configures the Proposer.

func DefaultProposerConfig

func DefaultProposerConfig[H Hash, T Transaction[H]]() ProposerConfig[H, T]

DefaultProposerConfig returns sensible defaults.

type ProposerStats

type ProposerStats struct {
	PendingBatches  int
	QueuedBatches   int
	DroppedBatches  uint64
	HeadersProposed uint64
	BatchesIncluded uint64
	MaxPending      int
	IsBounded       bool
}

ProposerStats contains statistics for monitoring.

type PublicKey

type PublicKey interface {
	Bytes() []byte
	Verify(message []byte, signature []byte) bool
	Equals(other interface{ Bytes() []byte }) bool
}

PublicKey provides signature verification.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter provides a token bucket rate limiter. Thread-safe.

func NewRateLimiter

func NewRateLimiter(rate float64, burst int) *RateLimiter

NewRateLimiter creates a new rate limiter. - rate: tokens per second to add - burst: maximum tokens that can accumulate

func (*RateLimiter) Allow

func (r *RateLimiter) Allow() bool

Allow checks if one token is available and consumes it if so. Returns true if allowed, false if rate limited.

func (*RateLimiter) AllowN

func (r *RateLimiter) AllowN(n int) bool

AllowN checks if n tokens are available and consumes them if so. Returns true if allowed, false if rate limited.

func (*RateLimiter) Reserve

func (r *RateLimiter) Reserve(n int) time.Duration

Reserve reserves n tokens, returning the time to wait before they're available. If wait is 0, the tokens were immediately available. If wait is negative, the request can never be satisfied (n > burst).

func (*RateLimiter) Stats

func (r *RateLimiter) Stats() RateLimiterStats

Stats returns current statistics.

type RateLimiterStats

type RateLimiterStats struct {
	Allowed  uint64
	Rejected uint64
	Rate     float64
	Burst    int64
}

Stats returns rate limiter statistics.

type ReconfigurationCoordinator

type ReconfigurationCoordinator[H Hash] struct {
	// contains filtered or unexported fields
}

ReconfigurationCoordinator coordinates the voting process for reconfiguration proposals. This is a higher-level component that uses Reconfigurer for state management.

func NewReconfigurationCoordinator

func NewReconfigurationCoordinator[H Hash](
	reconfigurer *Reconfigurer,
	hashFunc func([]byte) H,
	logger *zap.Logger,
) *ReconfigurationCoordinator[H]

NewReconfigurationCoordinator creates a new ReconfigurationCoordinator.

func (*ReconfigurationCoordinator[H]) AddVote

AddVote adds a vote for the current proposal. Returns an error if no proposal is in progress or if the vote is invalid.

func (*ReconfigurationCoordinator[H]) CheckQuorum

func (rc *ReconfigurationCoordinator[H]) CheckQuorum() bool

CheckQuorum checks if the proposal has reached quorum. Returns true if 2f+1 approvals have been received.

func (*ReconfigurationCoordinator[H]) Clear

func (rc *ReconfigurationCoordinator[H]) Clear()

Clear resets the coordinator state.

func (*ReconfigurationCoordinator[H]) CurrentProposal

func (rc *ReconfigurationCoordinator[H]) CurrentProposal() *ReconfigurationProposal

CurrentProposal returns the current proposal, if any.

func (*ReconfigurationCoordinator[H]) IsRejected

func (rc *ReconfigurationCoordinator[H]) IsRejected() bool

IsRejected checks if the proposal has been rejected. Returns true if f+1 rejections have been received (making quorum impossible).

func (*ReconfigurationCoordinator[H]) ProposeReconfiguration

func (rc *ReconfigurationCoordinator[H]) ProposeReconfiguration(
	effectiveRound uint64,
	changes []ValidatorSetChange,
	proposer uint16,
	signature []byte,
) error

ProposeReconfiguration initiates a new reconfiguration proposal. Returns an error if a proposal is already in progress.

func (*ReconfigurationCoordinator[H]) VoteStats

func (rc *ReconfigurationCoordinator[H]) VoteStats() (approvals, rejections, total int)

VoteStats returns the current vote counts.

type ReconfigurationProposal

type ReconfigurationProposal struct {
	// Epoch is the epoch this proposal is for (currentEpoch + 1).
	Epoch uint64

	// EffectiveRound is the round at which the change takes effect.
	EffectiveRound uint64

	// Changes is the list of validator set changes to apply.
	Changes []ValidatorSetChange

	// Proposer is the validator who proposed this reconfiguration.
	Proposer uint16

	// Signature is the proposer's signature over this proposal.
	Signature []byte

	// ProposedAt is when this proposal was created.
	ProposedAt time.Time
}

ReconfigurationProposal represents a proposal to change the validator set. This is used to coordinate reconfiguration across the network.

type ReconfigurationState

type ReconfigurationState uint8

ReconfigurationState represents the current state of a reconfiguration operation.

const (
	// ReconfigurationStateIdle means no reconfiguration is in progress.
	ReconfigurationStateIdle ReconfigurationState = iota

	// ReconfigurationStatePending means a reconfiguration has been proposed
	// but not yet finalized (waiting for quorum confirmation).
	ReconfigurationStatePending

	// ReconfigurationStateCommitting means the reconfiguration is being applied
	// (draining in-flight operations, updating state).
	ReconfigurationStateCommitting

	// ReconfigurationStateComplete means the reconfiguration has been applied.
	ReconfigurationStateComplete
)

func (ReconfigurationState) String

func (s ReconfigurationState) String() string

String returns a human-readable name for the reconfiguration state.

type ReconfigurationStats

type ReconfigurationStats struct {
	CurrentEpoch     uint64
	State            ReconfigurationState
	HasPendingChange bool
	PendingToEpoch   uint64
	HistoryCount     int
	ValidatorCount   int
	CallbackCount    int
}

ReconfigurationStats contains statistics for monitoring.

type ReconfigurationVote

type ReconfigurationVote struct {
	// ProposalHash is the hash of the proposal being voted on.
	ProposalHash []byte

	// Voter is the validator casting this vote.
	Voter uint16

	// Approve indicates whether this is an approval or rejection.
	Approve bool

	// Signature is the voter's signature over this vote.
	Signature []byte
}

ReconfigurationVote represents a vote on a reconfiguration proposal.

type Reconfigurer

type Reconfigurer struct {
	// contains filtered or unexported fields
}

Reconfigurer manages committee reconfiguration and epoch transitions. It coordinates the transition between validator sets, ensuring: 1. All nodes agree on the epoch boundary (consensus on reconfiguration) 2. In-flight operations for the old epoch complete before switching 3. State is properly cleaned up for the new epoch

Thread-safe for concurrent use.

func NewReconfigurer

func NewReconfigurer(initialEpoch uint64, validators ValidatorSet, logger *zap.Logger) *Reconfigurer

NewReconfigurer creates a new Reconfigurer.

func (*Reconfigurer) BeginCommit

func (r *Reconfigurer) BeginCommit() error

BeginCommit transitions from PENDING to COMMITTING state. This should be called when the epoch boundary round has been reached and all nodes have confirmed the reconfiguration.

Returns an error if not in PENDING state.

func (*Reconfigurer) CancelPendingChange

func (r *Reconfigurer) CancelPendingChange() error

CancelPendingChange cancels a pending epoch change. This can be used if the reconfiguration is aborted (e.g., consensus rollback).

Returns an error if not in PENDING state.

func (*Reconfigurer) CompleteCommit

func (r *Reconfigurer) CompleteCommit() error

CompleteCommit finalizes the epoch change. This should be called after all in-flight operations have completed and state has been updated for the new epoch.

Returns an error if not in COMMITTING state.

func (*Reconfigurer) CurrentEpoch

func (r *Reconfigurer) CurrentEpoch() uint64

CurrentEpoch returns the current epoch.

func (*Reconfigurer) CurrentValidators

func (r *Reconfigurer) CurrentValidators() ValidatorSet

CurrentValidators returns the current validator set.

func (*Reconfigurer) History

func (r *Reconfigurer) History() []EpochChange

History returns the history of recent epoch changes.

func (*Reconfigurer) IsEpochBoundaryRound

func (r *Reconfigurer) IsEpochBoundaryRound(round uint64) bool

IsEpochBoundaryRound returns true if the given round is the effective round of a pending epoch change.

func (*Reconfigurer) OnEpochChange

func (r *Reconfigurer) OnEpochChange(callback func(EpochChange))

OnEpochChange registers a callback to be invoked when an epoch change is committed. Callbacks are invoked synchronously in the order they were registered.

func (*Reconfigurer) PendingChange

func (r *Reconfigurer) PendingChange() *EpochChange

PendingChange returns the pending epoch change, if any.

func (*Reconfigurer) ProposeEpochChange

func (r *Reconfigurer) ProposeEpochChange(newEpoch uint64, effectiveRound uint64, newValidators ValidatorSet) error

ProposeEpochChange proposes a new epoch change. This is typically called when consensus has decided on a reconfiguration.

Returns an error if: - A reconfiguration is already in progress - The new epoch is not exactly currentEpoch + 1 - The effective round is in the past

func (*Reconfigurer) ShouldAcceptForEpoch

func (r *Reconfigurer) ShouldAcceptForEpoch(epoch uint64) bool

ShouldAcceptForEpoch determines if a message from the given epoch should be accepted. Returns true if the epoch is current or if it's the pending epoch during transition.

func (*Reconfigurer) State

State returns the current reconfiguration state.

func (*Reconfigurer) Stats

Stats returns current statistics.

type ReconnectableNetwork

type ReconnectableNetwork[H Hash, T Transaction[H]] interface {
	Network[H, T]
	Connect(validatorIndex uint16) error
	Disconnect(validatorIndex uint16) error
	Reconnect(validatorIndex uint16) error
	Status(validatorIndex uint16) NetworkStatus
	IsConnected(validatorIndex uint16) bool
	ConnectedPeers() []uint16
	OnDisconnect(callback func(validatorIndex uint16))
	OnReconnect(callback func(validatorIndex uint16))
}

ReconnectableNetwork extends Network with reconnection and health checking.

type RecoveryConfig

type RecoveryConfig struct {
	// Handler is called when a panic is recovered.
	// If nil, panics are logged and the goroutine terminates cleanly.
	Handler PanicHandler

	// Logger for recording recovered panics.
	Logger *zap.Logger

	// Rethrow causes the panic to be re-raised after handling.
	// Use this if you want panics to propagate after logging.
	Rethrow bool
}

RecoveryConfig configures panic recovery behavior.

type RequestTracker

type RequestTracker[H Hash] struct {
	// contains filtered or unexported fields
}

RequestTracker tracks pending network operations and provides cancel-on-round-advance. When the round advances, stale requests from previous rounds can be cancelled.

func NewRequestTracker

func NewRequestTracker[H Hash](cfg RequestTrackerConfig, logger *zap.Logger) *RequestTracker[H]

NewRequestTracker creates a new RequestTracker.

func (*RequestTracker[H]) CancelAll

func (rt *RequestTracker[H]) CancelAll()

CancelAll cancels all pending requests.

func (*RequestTracker[H]) CancelForRound

func (rt *RequestTracker[H]) CancelForRound(round uint64)

CancelForRound cancels all pending requests for a specific round.

func (*RequestTracker[H]) OnRoundAdvance

func (rt *RequestTracker[H]) OnRoundAdvance(newRound uint64)

OnRoundAdvance should be called when the DAG round advances. It cancels any requests for rounds that are now considered stale.

func (*RequestTracker[H]) PendingCount

func (rt *RequestTracker[H]) PendingCount() int

PendingCount returns the number of pending requests.

func (*RequestTracker[H]) PendingForRound

func (rt *RequestTracker[H]) PendingForRound(round uint64) int

PendingForRound returns the number of pending requests for a specific round.

func (*RequestTracker[H]) Stats

func (rt *RequestTracker[H]) Stats() RequestTrackerStats

Stats returns current statistics.

func (*RequestTracker[H]) Track

func (rt *RequestTracker[H]) Track(
	parentCtx context.Context,
	round uint64,
	reqType RequestType,
	digest H,
) (context.Context, func())

Track registers a new request for tracking. Returns a context that will be cancelled if the request becomes stale, and a completion function that must be called when the request finishes.

type RequestTrackerConfig

type RequestTrackerConfig struct {
	// MaxPendingPerRound limits requests per round to prevent memory bloat.
	// Default: 1000
	MaxPendingPerRound int

	// StaleRoundThreshold is how many rounds behind a request must be to be considered stale.
	// Requests for rounds < (currentRound - StaleRoundThreshold) are cancelled.
	// Default: 2
	StaleRoundThreshold uint64

	// CancelOnAdvance enables automatic cancellation when round advances.
	// Default: true
	CancelOnAdvance bool
}

RequestTrackerConfig configures the RequestTracker.

func DefaultRequestTrackerConfig

func DefaultRequestTrackerConfig() RequestTrackerConfig

DefaultRequestTrackerConfig returns sensible defaults.

type RequestTrackerStats

type RequestTrackerStats struct {
	CurrentRound   uint64
	PendingCount   int
	TotalStarted   uint64
	TotalCompleted uint64
	TotalCancelled uint64
}

RequestTrackerStats contains statistics for monitoring.

type RequestType

type RequestType uint8

RequestType identifies the type of network request.

const (
	RequestTypeBatch RequestType = iota
	RequestTypeCertificate
	RequestTypeHeader
)

func (RequestType) String

func (t RequestType) String() string

type RoundAdvancedEvent

type RoundAdvancedEvent struct {
	OldRound            uint64
	NewRound            uint64
	CertificatesInRound int // Number of certificates that triggered advancement
	AdvancedAt          time.Time
}

RoundAdvancedEvent contains information about a round advancement.

type RoundRobinLeaderSchedule

type RoundRobinLeaderSchedule struct {
	// contains filtered or unexported fields
}

RoundRobinLeaderSchedule implements a simple round-robin leader election. Each validator takes turns being the leader.

func NewRoundRobinLeaderSchedule

func NewRoundRobinLeaderSchedule(validatorCount int) *RoundRobinLeaderSchedule

NewRoundRobinLeaderSchedule creates a new round-robin leader schedule.

func (*RoundRobinLeaderSchedule) IsLeader

func (s *RoundRobinLeaderSchedule) IsLeader(round uint64, validatorIndex uint16) bool

IsLeader returns true if the validator is the leader for the round.

func (*RoundRobinLeaderSchedule) Leader

func (s *RoundRobinLeaderSchedule) Leader(round uint64) uint16

Leader returns the leader for the given round.

type SignatureCache

type SignatureCache struct {
	// contains filtered or unexported fields
}

SignatureCache caches verified signatures to avoid re-verification. Thread-safe for concurrent access.

func NewSignatureCache

func NewSignatureCache(maxSize int) *SignatureCache

NewSignatureCache creates a new signature cache.

func (*SignatureCache) Clear

func (c *SignatureCache) Clear()

Clear removes all cached entries.

func (*SignatureCache) IsVerified

func (c *SignatureCache) IsVerified(digest []byte, validatorIndex uint16) bool

IsVerified checks if a signature has been verified.

func (*SignatureCache) MarkVerified

func (c *SignatureCache) MarkVerified(digest []byte, validatorIndex uint16)

MarkVerified marks a signature as verified.

func (*SignatureCache) Size

func (c *SignatureCache) Size() int

Size returns the number of cached entries.

type SignatureError

type SignatureError struct {
	ValidatorIndex uint16
}

SignatureError indicates a signature verification failure.

func (*SignatureError) Error

func (e *SignatureError) Error() string

type SignatureScheme

type SignatureScheme uint8

SignatureScheme identifies the signature algorithm in use.

const (
	// SignatureSchemeEd25519 uses Ed25519 signatures (individual verification).
	SignatureSchemeEd25519 SignatureScheme = iota

	// SignatureSchemeBLS uses BLS signatures (supports aggregation and batch verification).
	SignatureSchemeBLS
)

type Signer

type Signer interface {
	Sign(message []byte) ([]byte, error)
	PublicKey() PublicKey
}

Signer provides cryptographic signing.

type SlicePool

type SlicePool[T any] struct {
	// contains filtered or unexported fields
}

SlicePool provides a pool of reusable slices.

func NewSlicePool

func NewSlicePool[T any](initCap int) *SlicePool[T]

NewSlicePool creates a new slice pool with the specified initial capacity.

func (*SlicePool[T]) Get

func (p *SlicePool[T]) Get() *[]T

Get retrieves a slice from the pool. The slice is reset to zero length but retains its capacity.

func (*SlicePool[T]) Put

func (p *SlicePool[T]) Put(s *[]T)

Put returns a slice to the pool.

type Storage

type Storage[H Hash, T Transaction[H]] interface {
	GetBatch(digest H) (*Batch[H, T], error)
	PutBatch(batch *Batch[H, T]) error
	HasBatch(digest H) bool

	GetHeader(digest H) (*Header[H], error)
	PutHeader(header *Header[H]) error

	GetCertificate(digest H) (*Certificate[H], error)
	PutCertificate(cert *Certificate[H]) error

	GetHighestRound() (uint64, error)
	PutHighestRound(round uint64) error
	DeleteBeforeRound(round uint64) error

	GetCertificatesByRound(round uint64) ([]*Certificate[H], error)
	GetCertificatesInRange(startRound, endRound uint64) ([]*Certificate[H], error)

	Close() error
}

Storage provides persistent storage. Put operations must be durable before returning.

type Timer

type Timer interface {
	Start()
	Stop()
	Reset()
	C() <-chan struct{}
}

Timer provides timeout management.

type Transaction

type Transaction[H Hash] interface {
	Hash() H
	Bytes() []byte
}

Transaction represents a unit of data to be disseminated.

type TransactionReceivedEvent

type TransactionReceivedEvent[H Hash] struct {
	TxHash     H
	WorkerID   uint16
	ReceivedAt time.Time
}

TransactionReceivedEvent contains information about a received transaction.

type ValidationConfig

type ValidationConfig struct {
	// MaxBatchRefs is the maximum number of batch references per header.
	MaxBatchRefs int

	// MaxParents is the maximum number of parent certificates per header.
	MaxParents int

	// MaxSignatures is the maximum number of signatures per certificate.
	MaxSignatures int

	// MaxTransactionsPerBatch is the maximum transactions per batch.
	MaxTransactionsPerBatch int

	// MaxTransactionSize is the maximum size of a single transaction in bytes.
	MaxTransactionSize int

	// MaxBatchSize is the maximum size of a batch in bytes.
	MaxBatchSize int

	// MaxHeaderSize is the maximum size of a header in bytes.
	MaxHeaderSize int

	// MaxCertificateSize is the maximum size of a certificate in bytes.
	MaxCertificateSize int

	// MaxRoundSkip is the maximum rounds a header can be ahead.
	MaxRoundSkip uint64

	// MaxTimestampDrift is the maximum time a header can be in the future.
	MaxTimestampDrift time.Duration
}

ValidationConfig configures validation limits.

func DefaultValidationConfig

func DefaultValidationConfig() ValidationConfig

DefaultValidationConfig returns a ValidationConfig with default limits.

type ValidationError

type ValidationError struct {
	Type    string // "header", "certificate", "batch", "vote"
	Field   string // Specific field that failed validation
	Message string // Human-readable error message
	Cause   error  // Underlying error if any
}

ValidationError wraps a validation error with additional context.

func (*ValidationError) Error

func (e *ValidationError) Error() string

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

type Validator

type Validator[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

Validator provides comprehensive input validation for Narwhal messages. All methods are safe for concurrent use.

func NewValidator

func NewValidator[H Hash, T Transaction[H]](
	cfg ValidationConfig,
	validators ValidatorSet,
	hashFunc func([]byte) H,
) *Validator[H, T]

NewValidator creates a new Validator with the given configuration.

func (*Validator[H, T]) ValidateBatch

func (v *Validator[H, T]) ValidateBatch(batch *Batch[H, T], checkTxSize bool) error

ValidateBatch performs comprehensive validation on a batch.

Checks performed:

  • Batch is not nil
  • ValidatorID is valid
  • Digest is correct
  • Transaction count is within limits
  • Individual transaction sizes are within limits (if checkTxSize is true)

func (*Validator[H, T]) ValidateCertificate

func (v *Validator[H, T]) ValidateCertificate(cert *Certificate[H], currentRound uint64, verifySignatures bool) error

ValidateCertificate performs comprehensive validation on a certificate.

Checks performed:

  • Certificate is not nil
  • Header is valid (via ValidateHeader)
  • Has quorum of signatures (2f+1)
  • Signature count matches bitmap
  • Number of signatures is within limits
  • All signatures are valid (if verifySignatures is true)

func (*Validator[H, T]) ValidateHeader

func (v *Validator[H, T]) ValidateHeader(header *Header[H], currentRound uint64) error

ValidateHeader validates a header before voting or accepting.

func (*Validator[H, T]) ValidateMessageSize

func (v *Validator[H, T]) ValidateMessageSize(msgType MessageType, data []byte) error

ValidateMessageSize checks if a raw message is within size limits.

func (*Validator[H, T]) ValidateVote

func (v *Validator[H, T]) ValidateVote(vote *HeaderVote[H], verifySignature bool) error

ValidateVote performs validation on a header vote.

Checks performed:

  • Vote is not nil
  • Voter is a valid validator
  • Signature is valid (if verifySignature is true)

type ValidatorSet

type ValidatorSet interface {
	Count() int
	GetByIndex(index uint16) (PublicKey, error)
	Contains(index uint16) bool
	F() int // max Byzantine faults: (n-1)/3
}

ValidatorSet represents the set of validators. Quorum requires 2f+1 validators.

type ValidatorSetChange

type ValidatorSetChange struct {
	// Type indicates what kind of change this is.
	Type ValidatorSetChangeType

	// ValidatorIndex is the index of the validator being changed.
	// For ADD, this is the new index assigned.
	// For REMOVE and UPDATE, this is the existing index.
	ValidatorIndex uint16

	// PublicKey is the public key involved in the change.
	// For ADD and UPDATE, this is the new key.
	// For REMOVE, this may be nil.
	PublicKey PublicKey
}

ValidatorSetChange represents a change to the validator set.

type ValidatorSetChangeType

type ValidatorSetChangeType uint8

ValidatorSetChangeType indicates the type of validator set change.

const (
	// ValidatorSetChangeAdd adds a new validator.
	ValidatorSetChangeAdd ValidatorSetChangeType = iota

	// ValidatorSetChangeRemove removes an existing validator.
	ValidatorSetChangeRemove

	// ValidatorSetChangeUpdate updates a validator's key (key rotation).
	ValidatorSetChangeUpdate
)

func (ValidatorSetChangeType) String

func (t ValidatorSetChangeType) String() string

String returns a human-readable name for the change type.

type Vertex

type Vertex[H Hash, T Transaction[H]] struct {
	Certificate *Certificate[H]
	Batches     []*Batch[H, T]
}

Vertex represents a certified DAG vertex with its data.

type VertexCache

type VertexCache[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

VertexCache is a specialized LRU cache for vertices (certificate + batches).

func NewVertexCache

func NewVertexCache[H Hash, T Transaction[H]](capacity int) *VertexCache[H, T]

NewVertexCache creates a new vertex cache with the given capacity.

func (*VertexCache[H, T]) Clear

func (c *VertexCache[H, T]) Clear()

Clear removes all vertices from the cache.

func (*VertexCache[H, T]) Contains

func (c *VertexCache[H, T]) Contains(digest H) bool

Contains checks if a vertex exists in the cache.

func (*VertexCache[H, T]) Get

func (c *VertexCache[H, T]) Get(digest H) (*Vertex[H, T], bool)

Get retrieves a vertex by its certificate digest.

func (*VertexCache[H, T]) Len

func (c *VertexCache[H, T]) Len() int

Len returns the number of vertices in the cache.

func (*VertexCache[H, T]) Put

func (c *VertexCache[H, T]) Put(vertex *Vertex[H, T])

Put adds a vertex to the cache.

func (*VertexCache[H, T]) Remove

func (c *VertexCache[H, T]) Remove(digest H) bool

Remove removes a vertex from the cache.

func (*VertexCache[H, T]) Stats

func (c *VertexCache[H, T]) Stats() LRUCacheStats

Stats returns cache statistics.

type VertexInsertedEvent

type VertexInsertedEvent[H Hash] struct {
	Certificate  *Certificate[H]
	Round        uint64
	Author       uint16
	ParentCount  int
	TotalInRound int // Total certificates in this round after insertion
	InsertedAt   time.Time
}

VertexInsertedEvent contains information about a vertex inserted into the DAG.

type VoteDecision

type VoteDecision uint8

VoteDecision represents the decision about whether to vote for a header.

const (
	// VoteDecisionAllow means we should vote for this header.
	VoteDecisionAllow VoteDecision = iota

	// VoteDecisionSkipOldRound means we've already voted for a higher round from this author.
	VoteDecisionSkipOldRound

	// VoteDecisionSkipEquivocation means we've already voted for a different header
	// from this author at the same round (equivocation detected).
	VoteDecisionSkipEquivocation

	// VoteDecisionSkipOldEpoch means the header is from an old epoch.
	VoteDecisionSkipOldEpoch

	// VoteDecisionSkipDuplicate means we've already voted for this exact header.
	VoteDecisionSkipDuplicate
)

func (VoteDecision) String

func (d VoteDecision) String() string

String returns a human-readable description of the vote decision.

type VoteMessage

type VoteMessage[H Hash, T Transaction[H]] struct {
	VoteData *HeaderVote[H]
	// contains filtered or unexported fields
}

VoteMessage wraps a vote for network transmission.

func (*VoteMessage[H, T]) Sender

func (m *VoteMessage[H, T]) Sender() uint16

func (*VoteMessage[H, T]) Type

func (m *VoteMessage[H, T]) Type() MessageType

func (*VoteMessage[H, T]) Vote

func (m *VoteMessage[H, T]) Vote() *HeaderVote[H]

type VoteMessageAccessor

type VoteMessageAccessor[H Hash] interface{ Vote() *HeaderVote[H] }

type VoteReceivedEvent

type VoteReceivedEvent[H Hash] struct {
	HeaderDigest   H
	Voter          uint16
	TotalVotes     int // Current vote count for this header
	QuorumRequired int
	ReceivedAt     time.Time
}

VoteReceivedEvent contains information about a received vote.

type VoteRecord

type VoteRecord[H Hash] struct {
	// Round is the round we voted for.
	Round uint64

	// Epoch is the epoch we voted for (for cross-epoch safety).
	Epoch uint64

	// HeaderDigest is the digest of the header we voted for.
	HeaderDigest H

	// VotedAt is when we cast this vote.
	VotedAt time.Time
}

VoteRecord tracks a vote we sent for a specific (author, round) pair. Used to prevent double-voting on equivocating headers from the same author.

type VoteSentEvent

type VoteSentEvent[H Hash] struct {
	HeaderDigest H
	HeaderAuthor uint16
	SentAt       time.Time
}

VoteSentEvent contains information about a vote we sent.

type VoteTracker

type VoteTracker[H Hash] struct {
	// contains filtered or unexported fields
}

VoteTracker prevents double-voting on equivocating headers. Thread-safe.

func NewVoteTracker

func NewVoteTracker[H Hash](logger *zap.Logger) *VoteTracker[H]

NewVoteTracker creates a new VoteTracker.

func (*VoteTracker[H]) Clear

func (vt *VoteTracker[H]) Clear()

Clear removes all vote records. Used during reconfiguration.

func (*VoteTracker[H]) GarbageCollect

func (vt *VoteTracker[H]) GarbageCollect(gcRound uint64)

GarbageCollect removes vote records for rounds below gcRound. Called periodically to prevent unbounded memory growth.

func (*VoteTracker[H]) RecordVote

func (vt *VoteTracker[H]) RecordVote(author uint16, round, epoch uint64, headerDigest H)

RecordVote records that we voted for a header. This should be called AFTER successfully sending the vote.

func (*VoteTracker[H]) SetEpoch

func (vt *VoteTracker[H]) SetEpoch(epoch uint64)

SetEpoch updates the current epoch and clears records from old epochs.

func (*VoteTracker[H]) ShouldVote

func (vt *VoteTracker[H]) ShouldVote(author uint16, round, epoch uint64, headerDigest H) (VoteDecision, *H)

ShouldVote determines whether we should vote for a header. Returns the decision and, if equivocation is detected, the digest of the header we already voted for.

The logic follows Sui's Narwhal implementation: 1. If header.Round < lastVotedRound for this author: skip (old round) 2. If header.Round == lastVotedRound and header.Digest == lastVotedDigest: skip (duplicate) 3. If header.Round == lastVotedRound and header.Digest != lastVotedDigest: skip (equivocation) 4. If header.Round > lastVotedRound: allow (new round) 5. If no previous vote for this author: allow

func (*VoteTracker[H]) Stats

func (vt *VoteTracker[H]) Stats() VoteTrackerStats

Stats returns current statistics.

type VoteTrackerStats

type VoteTrackerStats struct {
	TrackedAuthors int
	CurrentEpoch   uint64
	GCRound        uint64
}

VoteTrackerStats contains statistics for monitoring.

type Worker

type Worker[H Hash, T Transaction[H]] struct {
	// contains filtered or unexported fields
}

Worker receives transactions and creates batches.

func NewWorker

func NewWorker[H Hash, T Transaction[H]](cfg WorkerConfig[H, T]) *Worker[H, T]

NewWorker creates a new worker.

func (*Worker[H, T]) AddTransaction

func (w *Worker[H, T]) AddTransaction(tx T) bool

AddTransaction adds a transaction to the pending batch. If MaxPending is configured and the queue is full:

  • If DropOnFull is true, the transaction is dropped silently
  • If DropOnFull is false, this call blocks until space is available

Returns true if the transaction was accepted, false if dropped.

func (*Worker[H, T]) Run

func (w *Worker[H, T]) Run(ctx context.Context)

Run starts the worker's batch creation loop.

func (*Worker[H, T]) SetRound

func (w *Worker[H, T]) SetRound(round uint64)

SetRound updates the worker's current round.

func (*Worker[H, T]) Stats

func (w *Worker[H, T]) Stats() WorkerStats

Stats returns current worker statistics.

type WorkerConfig

type WorkerConfig[H Hash, T Transaction[H]] struct {
	ID           uint16
	ValidatorID  uint16
	BatchSize    int
	BatchTimeout time.Duration
	HashFunc     func([]byte) H
	OnBatch      func(*Batch[H, T])
	Hooks        *Hooks[H, T]
	Logger       *zap.Logger

	// Backpressure settings
	MaxPending int  // Max pending transactions (0 = unbounded)
	DropOnFull bool // If true, drop when full; if false, block
}

WorkerConfig configures a worker.

type WorkerStats

type WorkerStats struct {
	PendingCount int
	QueuedCount  int // Transactions in channel (if bounded)
	DroppedCount uint64
	MaxPending   int
	IsBounded    bool
}

WorkerStats contains worker statistics for monitoring.

Directories

Path Synopsis
internal
testutil
Package testutil provides test utilities for the Narwhal library.
Package testutil provides test utilities for the Narwhal library.
Package timer provides timer implementations for Narwhal.
Package timer provides timer implementations for Narwhal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL