Documentation
¶
Overview ¶
Package narwhal implements a DAG-based mempool for BFT consensus. It separates data availability from ordering, enabling parallel transaction broadcast across validators without leader bottlenecks.
Index ¶
- Constants
- func BLSSign(privateKey []byte, message []byte) ([]byte, error)
- func GoWithRecovery(cfg RecoveryConfig, fn func())
- func GoWithRecoveryCtx(ctx context.Context, cfg RecoveryConfig, fn func(context.Context))
- func IsValidationError(err error) bool
- func RecoverPanic(cfg RecoveryConfig)
- func SafeGo(logger *zap.Logger, fn func())
- func SafeGoCtx(ctx context.Context, logger *zap.Logger, fn func(context.Context))
- func VerifyCertificateSignatures[H Hash](cert *Certificate[H], validators ValidatorSet, crypto CryptoProvider) error
- type BLSAggregator
- type BLSBatchVerifier
- type BLSCryptoProvider
- type BLSKeyPair
- type BLSPublicKey
- type BLSSigner
- type BLSValidatorSet
- type Batch
- type BatchCreatedEvent
- type BatchMessage
- type BatchMessageAccessor
- type BatchRequestMessage
- type BatchVerifier
- type ByteSlicePool
- type CachedBatchVerifier
- type Certificate
- func (c *Certificate[H]) Author() uint16
- func (c *Certificate[H]) Bytes() []byte
- func (c *Certificate[H]) Digest() H
- func (c *Certificate[H]) HasSigner(validatorIndex uint16) bool
- func (c *Certificate[H]) Round() uint64
- func (c *Certificate[H]) SignerCount() int
- func (c *Certificate[H]) Validate(validators ValidatorSet) error
- func (c *Certificate[H]) ValidateWithCrypto(validators ValidatorSet, crypto CryptoProvider, cache *SignatureCache) error
- type CertificateCache
- func (c *CertificateCache[H]) Clear()
- func (c *CertificateCache[H]) Contains(digest H) bool
- func (c *CertificateCache[H]) Get(digest H) (*Certificate[H], bool)
- func (c *CertificateCache[H]) Len() int
- func (c *CertificateCache[H]) Put(cert *Certificate[H])
- func (c *CertificateCache[H]) Remove(digest H) bool
- func (c *CertificateCache[H]) Stats() LRUCacheStats
- type CertificateFormedEvent
- type CertificateMessage
- type CertificateMessageAccessor
- type CertificatePendingEvent
- type CertificateRangeRequestMessage
- type CertificateRangeResponseMessage
- type CertificateReceivedEvent
- type CertificateRequestMessage
- type ChannelMetrics
- type CircuitBreaker
- type CircuitBreakerConfig
- type CircuitBreakerStats
- type Config
- func DefaultConfig[H Hash, T Transaction[H]]() Config[H, T]
- func DemoConfig[H Hash, T Transaction[H]]() Config[H, T]
- func HighThroughputConfig[H Hash, T Transaction[H]]() Config[H, T]
- func LowLatencyConfig[H Hash, T Transaction[H]]() Config[H, T]
- func NewConfig[H Hash, T Transaction[H]](opts ...ConfigOption[H, T]) (*Config[H, T], error)
- type ConfigOption
- func WithBatchSize[H Hash, T Transaction[H]](size int) ConfigOption[H, T]
- func WithBatchTimeout[H Hash, T Transaction[H]](timeout time.Duration) ConfigOption[H, T]
- func WithCryptoProvider[H Hash, T Transaction[H]](crypto CryptoProvider) ConfigOption[H, T]
- func WithDAGCache[H Hash, T Transaction[H]](cfg DAGCacheConfig) ConfigOption[H, T]
- func WithDAGCacheCapacity[H Hash, T Transaction[H]](capacity int) ConfigOption[H, T]
- func WithDAGCacheDisabled[H Hash, T Transaction[H]]() ConfigOption[H, T]
- func WithDropOnFull[H Hash, T Transaction[H]](drop bool) ConfigOption[H, T]
- func WithEpoch[H Hash, T Transaction[H]](epoch uint64) ConfigOption[H, T]
- func WithGCDepth[H Hash, T Transaction[H]](depth uint64) ConfigOption[H, T]
- func WithGCInterval[H Hash, T Transaction[H]](interval uint64) ConfigOption[H, T]
- func WithHeaderTimeout[H Hash, T Transaction[H]](timeout time.Duration) ConfigOption[H, T]
- func WithHeaderWaiterMaxAge[H Hash, T Transaction[H]](age time.Duration) ConfigOption[H, T]
- func WithHeaderWaiterMaxRetries[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
- func WithHeaderWaiterRetryInterval[H Hash, T Transaction[H]](interval time.Duration) ConfigOption[H, T]
- func WithHooks[H Hash, T Transaction[H]](hooks *Hooks[H, T]) ConfigOption[H, T]
- func WithLeaderSchedule[H Hash, T Transaction[H]](schedule LeaderSchedule) ConfigOption[H, T]
- func WithLogger[H Hash, T Transaction[H]](logger *zap.Logger) ConfigOption[H, T]
- func WithMaxHeaderBatches[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
- func WithMaxHeaderDelay[H Hash, T Transaction[H]](delay time.Duration) ConfigOption[H, T]
- func WithMaxPendingBatches[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
- func WithMaxPendingHeaders[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
- func WithMaxPendingTransactions[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
- func WithMaxRoundsGap[H Hash, T Transaction[H]](gap uint64) ConfigOption[H, T]
- func WithMyIndex[H Hash, T Transaction[H]](index uint16) ConfigOption[H, T]
- func WithNetwork[H Hash, T Transaction[H]](network Network[H, T]) ConfigOption[H, T]
- func WithNetworkModel[H Hash, T Transaction[H]](model NetworkModel) ConfigOption[H, T]
- func WithRecommendedMessageQueueSize[H Hash, T Transaction[H]](size int) ConfigOption[H, T]
- func WithSignatureCache[H Hash, T Transaction[H]](cache *SignatureCache) ConfigOption[H, T]
- func WithSigner[H Hash, T Transaction[H]](signer Signer) ConfigOption[H, T]
- func WithStorage[H Hash, T Transaction[H]](storage Storage[H, T]) ConfigOption[H, T]
- func WithSyncRetryDelay[H Hash, T Transaction[H]](delay time.Duration) ConfigOption[H, T]
- func WithSyncRetryNodes[H Hash, T Transaction[H]](nodes int) ConfigOption[H, T]
- func WithTimer[H Hash, T Transaction[H]](timer Timer) ConfigOption[H, T]
- func WithValidation[H Hash, T Transaction[H]](cfg ValidationConfig) ConfigOption[H, T]
- func WithValidators[H Hash, T Transaction[H]](validators ValidatorSet) ConfigOption[H, T]
- func WithWorkerCount[H Hash, T Transaction[H]](count int) ConfigOption[H, T]
- type ConfigWarning
- type CryptoProvider
- type DAG
- func NewDAG[H Hash, T Transaction[H]](validators ValidatorSet, logger *zap.Logger) *DAG[H, T]
- func NewDAGWithCache[H Hash, T Transaction[H]](validators ValidatorSet, hooks *Hooks[H, T], cryptoProvider CryptoProvider, ...) *DAG[H, T]
- func NewDAGWithCrypto[H Hash, T Transaction[H]](validators ValidatorSet, hooks *Hooks[H, T], cryptoProvider CryptoProvider, ...) *DAG[H, T]
- func NewDAGWithHooks[H Hash, T Transaction[H]](validators ValidatorSet, hooks *Hooks[H, T], logger *zap.Logger) *DAG[H, T]
- func (d *DAG[H, T]) CertificateCountForRound(round uint64) int
- func (d *DAG[H, T]) CurrentRound() uint64
- func (d *DAG[H, T]) GarbageCollect(beforeRound uint64)
- func (d *DAG[H, T]) GetCertificate(digest H) *Certificate[H]
- func (d *DAG[H, T]) GetCertificatesForRound(round uint64) []*Certificate[H]
- func (d *DAG[H, T]) GetMissingParents() []H
- func (d *DAG[H, T]) GetParents() []H
- func (d *DAG[H, T]) GetPendingCertificates() []*PendingCertificate[H, T]
- func (d *DAG[H, T]) GetTransactions(certs []*Certificate[H]) ([]T, error)
- func (d *DAG[H, T]) GetUncommitted() []*Certificate[H]
- func (d *DAG[H, T]) GetVertex(digest H) *Vertex[H, T]
- func (d *DAG[H, T]) InsertCertificate(cert *Certificate[H], batches []*Batch[H, T]) error
- func (d *DAG[H, T]) InsertValidatedCertificate(cert *Certificate[H], batches []*Batch[H, T]) error
- func (d *DAG[H, T]) IsCertified(digest H) bool
- func (d *DAG[H, T]) MarkCommitted(certs []*Certificate[H])
- func (d *DAG[H, T]) OnEquivocation(callback func(evidence *EquivocationEvidence[H]))
- func (d *DAG[H, T]) Stats() DAGStats
- func (d *DAG[H, T]) ValidateCertificate(cert *Certificate[H]) error
- type DAGCacheConfig
- type DAGStats
- type Ed25519CryptoProvider
- type EpochAwareValidatorSet
- type EpochChange
- type EquivocationDetectedEvent
- type EquivocationEvidence
- type FetchCompletedEvent
- type FetchStartedEvent
- type FetchType
- type Fetcher
- func (f *Fetcher[H, T]) FetchBatch(ctx context.Context, digest H, preferredPeer uint16) (*Batch[H, T], error)
- func (f *Fetcher[H, T]) FetchBatchesParallel(ctx context.Context, digests []H, preferredPeer uint16) ([]*Batch[H, T], error)
- func (f *Fetcher[H, T]) FetchCertificate(ctx context.Context, digest H, preferredPeer uint16) (*Certificate[H], error)
- func (f *Fetcher[H, T]) PendingFetchCount() int
- type FetcherConfig
- type GCConfig
- type GCStats
- type GarbageCollectedEvent
- type GarbageCollector
- func NewGarbageCollector[H Hash, T Transaction[H]](cfg GCConfig, dag *DAG[H, T], storage Storage[H, T], logger *zap.Logger) *GarbageCollector[H, T]
- func NewGarbageCollectorWithHooks[H Hash, T Transaction[H]](cfg GCConfig, dag *DAG[H, T], storage Storage[H, T], hooks *Hooks[H, T], ...) *GarbageCollector[H, T]
- type GnarkBLSAggregator
- type GnarkBLSBatchVerifier
- type GnarkBLSCryptoProvider
- type Hash
- type Header
- type HeaderCreatedEvent
- type HeaderMessage
- type HeaderMessageAccessor
- type HeaderReceivedEvent
- type HeaderTimeoutEvent
- type HeaderVote
- type HeaderWaiter
- func (hw *HeaderWaiter[H, T]) Add(header *Header[H], from uint16, missingParents []H) bool
- func (hw *HeaderWaiter[H, T]) OnParentAvailable(parentDigest H)
- func (hw *HeaderWaiter[H, T]) PendingCount() int
- func (hw *HeaderWaiter[H, T]) Run(ctx context.Context)
- func (hw *HeaderWaiter[H, T]) SetFetchParentFunc(fn func(digest H, from uint16) error)
- func (hw *HeaderWaiter[H, T]) Stats() HeaderWaiterStats
- type HeaderWaiterConfig
- type HeaderWaiterStats
- type Hooks
- type LRUCache
- func (c *LRUCache[K, V]) Clear()
- func (c *LRUCache[K, V]) Contains(key K) bool
- func (c *LRUCache[K, V]) Get(key K) (V, bool)
- func (c *LRUCache[K, V]) Keys() []K
- func (c *LRUCache[K, V]) Len() int
- func (c *LRUCache[K, V]) Peek(key K) (V, bool)
- func (c *LRUCache[K, V]) Put(key K, value V) (evictedKey K, evictedValue V, evicted bool)
- func (c *LRUCache[K, V]) Remove(key K) bool
- func (c *LRUCache[K, V]) Resize(newCapacity int) int
- func (c *LRUCache[K, V]) Stats() LRUCacheStats
- type LRUCacheStats
- type LeaderSchedule
- type LeaderTracker
- func (lt *LeaderTracker[H]) CheckLeaderVotes(parents []*Certificate[H], leaderDigest *H, f int) LeaderVoteStatus
- func (lt *LeaderTracker[H]) Clear()
- func (lt *LeaderTracker[H]) GarbageCollect(beforeRound uint64)
- func (lt *LeaderTracker[H]) GetLeaderForRound(round uint64) *Certificate[H]
- func (lt *LeaderTracker[H]) HasLeaderForRound(round uint64) bool
- func (lt *LeaderTracker[H]) RecordCertificate(cert *Certificate[H]) bool
- func (lt *LeaderTracker[H]) SetRound(round uint64)
- func (lt *LeaderTracker[H]) ShouldWaitForLeader(round uint64) bool
- type LeaderVoteStatus
- type Message
- type MessageType
- type MeteredChannel
- func (m *MeteredChannel[T]) Cap() int
- func (m *MeteredChannel[T]) Close()
- func (m *MeteredChannel[T]) Len() int
- func (m *MeteredChannel[T]) MarkReceived()
- func (m *MeteredChannel[T]) Receive() T
- func (m *MeteredChannel[T]) ReceiveChan() <-chan T
- func (m *MeteredChannel[T]) Send(value T)
- func (m *MeteredChannel[T]) Stats() MeteredChannelStats
- func (m *MeteredChannel[T]) TryReceive() (T, bool)
- func (m *MeteredChannel[T]) TrySend(value T) bool
- type MeteredChannelStats
- type MutableValidatorSet
- type Narwhal
- func (n *Narwhal[H, T]) AddTransaction(tx T)
- func (n *Narwhal[H, T]) CurrentRound() uint64
- func (n *Narwhal[H, T]) GetCertifiedVertices() []*Certificate[H]
- func (n *Narwhal[H, T]) GetDAG() *DAG[H, T]
- func (n *Narwhal[H, T]) GetTransactions(certs []*Certificate[H]) ([]T, error)
- func (n *Narwhal[H, T]) MarkCommitted(certs []*Certificate[H])
- func (n *Narwhal[H, T]) RequestTrackerStats() RequestTrackerStats
- func (n *Narwhal[H, T]) Start() error
- func (n *Narwhal[H, T]) Stop()
- type Network
- type NetworkModel
- type NetworkStatus
- type PanicHandler
- type PendingCertificate
- type PendingHeader
- type PerPeerRateLimiter
- type Primary
- type PrimaryConfig
- type PrimaryStats
- type ProposedHeader
- type Proposer
- func (p *Proposer[H, T]) AddBatch(batch *Batch[H, T]) bool
- func (p *Proposer[H, T]) ForcePropose() bool
- func (p *Proposer[H, T]) Headers() <-chan *ProposedHeader[H, T]
- func (p *Proposer[H, T]) PendingCount() int
- func (p *Proposer[H, T]) Run(ctx context.Context)
- func (p *Proposer[H, T]) Stats() ProposerStats
- type ProposerConfig
- type ProposerStats
- type PublicKey
- type RateLimiter
- type RateLimiterStats
- type ReconfigurationCoordinator
- func (rc *ReconfigurationCoordinator[H]) AddVote(vote *ReconfigurationVote) error
- func (rc *ReconfigurationCoordinator[H]) CheckQuorum() bool
- func (rc *ReconfigurationCoordinator[H]) Clear()
- func (rc *ReconfigurationCoordinator[H]) CurrentProposal() *ReconfigurationProposal
- func (rc *ReconfigurationCoordinator[H]) IsRejected() bool
- func (rc *ReconfigurationCoordinator[H]) ProposeReconfiguration(effectiveRound uint64, changes []ValidatorSetChange, proposer uint16, ...) error
- func (rc *ReconfigurationCoordinator[H]) VoteStats() (approvals, rejections, total int)
- type ReconfigurationProposal
- type ReconfigurationState
- type ReconfigurationStats
- type ReconfigurationVote
- type Reconfigurer
- func (r *Reconfigurer) BeginCommit() error
- func (r *Reconfigurer) CancelPendingChange() error
- func (r *Reconfigurer) CompleteCommit() error
- func (r *Reconfigurer) CurrentEpoch() uint64
- func (r *Reconfigurer) CurrentValidators() ValidatorSet
- func (r *Reconfigurer) History() []EpochChange
- func (r *Reconfigurer) IsEpochBoundaryRound(round uint64) bool
- func (r *Reconfigurer) OnEpochChange(callback func(EpochChange))
- func (r *Reconfigurer) PendingChange() *EpochChange
- func (r *Reconfigurer) ProposeEpochChange(newEpoch uint64, effectiveRound uint64, newValidators ValidatorSet) error
- func (r *Reconfigurer) ShouldAcceptForEpoch(epoch uint64) bool
- func (r *Reconfigurer) State() ReconfigurationState
- func (r *Reconfigurer) Stats() ReconfigurationStats
- type ReconnectableNetwork
- type RecoveryConfig
- type RequestTracker
- func (rt *RequestTracker[H]) CancelAll()
- func (rt *RequestTracker[H]) CancelForRound(round uint64)
- func (rt *RequestTracker[H]) OnRoundAdvance(newRound uint64)
- func (rt *RequestTracker[H]) PendingCount() int
- func (rt *RequestTracker[H]) PendingForRound(round uint64) int
- func (rt *RequestTracker[H]) Stats() RequestTrackerStats
- func (rt *RequestTracker[H]) Track(parentCtx context.Context, round uint64, reqType RequestType, digest H) (context.Context, func())
- type RequestTrackerConfig
- type RequestTrackerStats
- type RequestType
- type RoundAdvancedEvent
- type RoundRobinLeaderSchedule
- type SignatureCache
- type SignatureError
- type SignatureScheme
- type Signer
- type SlicePool
- type Storage
- type Timer
- type Transaction
- type TransactionReceivedEvent
- type ValidationConfig
- type ValidationError
- type Validator
- func (v *Validator[H, T]) ValidateBatch(batch *Batch[H, T], checkTxSize bool) error
- func (v *Validator[H, T]) ValidateCertificate(cert *Certificate[H], currentRound uint64, verifySignatures bool) error
- func (v *Validator[H, T]) ValidateHeader(header *Header[H], currentRound uint64) error
- func (v *Validator[H, T]) ValidateMessageSize(msgType MessageType, data []byte) error
- func (v *Validator[H, T]) ValidateVote(vote *HeaderVote[H], verifySignature bool) error
- type ValidatorSet
- type ValidatorSetChange
- type ValidatorSetChangeType
- type Vertex
- type VertexCache
- func (c *VertexCache[H, T]) Clear()
- func (c *VertexCache[H, T]) Contains(digest H) bool
- func (c *VertexCache[H, T]) Get(digest H) (*Vertex[H, T], bool)
- func (c *VertexCache[H, T]) Len() int
- func (c *VertexCache[H, T]) Put(vertex *Vertex[H, T])
- func (c *VertexCache[H, T]) Remove(digest H) bool
- func (c *VertexCache[H, T]) Stats() LRUCacheStats
- type VertexInsertedEvent
- type VoteDecision
- type VoteMessage
- type VoteMessageAccessor
- type VoteReceivedEvent
- type VoteRecord
- type VoteSentEvent
- type VoteTracker
- func (vt *VoteTracker[H]) Clear()
- func (vt *VoteTracker[H]) GarbageCollect(gcRound uint64)
- func (vt *VoteTracker[H]) RecordVote(author uint16, round, epoch uint64, headerDigest H)
- func (vt *VoteTracker[H]) SetEpoch(epoch uint64)
- func (vt *VoteTracker[H]) ShouldVote(author uint16, round, epoch uint64, headerDigest H) (VoteDecision, *H)
- func (vt *VoteTracker[H]) Stats() VoteTrackerStats
- type VoteTrackerStats
- type Worker
- type WorkerConfig
- type WorkerStats
Constants ¶
const ( // DefaultMaxBatchRefs is the maximum number of batch references in a header. // Prevents memory exhaustion from headers with too many refs. DefaultMaxBatchRefs = 1000 // DefaultMaxParents is the maximum number of parent certificates in a header. // Limited by validator count in practice, but enforced for safety. DefaultMaxParents = 200 // DefaultMaxSignatures is the maximum number of signatures in a certificate. // Should not exceed validator count. DefaultMaxSignatures = 200 // DefaultMaxTransactionsPerBatch is the maximum transactions in a single batch. // Prevents memory exhaustion from oversized batches. DefaultMaxTransactionsPerBatch = 10000 // DefaultMaxTransactionSize is the maximum size of a single transaction in bytes. DefaultMaxTransactionSize = 1024 * 1024 // 1 MB // DefaultMaxBatchSize is the maximum size of a batch in bytes. DefaultMaxBatchSize = 100 * 1024 * 1024 // 100 MB // DefaultMaxHeaderSize is the maximum size of a header in bytes. DefaultMaxHeaderSize = 1024 * 1024 // 1 MB // DefaultMaxCertificateSize is the maximum size of a certificate in bytes. DefaultMaxCertificateSize = 10 * 1024 * 1024 // 10 MB // DefaultMaxRoundSkip is the maximum rounds a header can be ahead of current. DefaultMaxRoundSkip = 100 // DefaultMaxTimestampDrift is the maximum time a header timestamp can be in the future. DefaultMaxTimestampDrift = 60 * time.Second )
Security limits to prevent DoS attacks. These can be overridden via ValidationConfig.
Variables ¶
This section is empty.
Functions ¶
func GoWithRecovery ¶
func GoWithRecovery(cfg RecoveryConfig, fn func())
GoWithRecovery starts a goroutine with panic recovery.
func GoWithRecoveryCtx ¶
func GoWithRecoveryCtx(ctx context.Context, cfg RecoveryConfig, fn func(context.Context))
GoWithRecoveryCtx starts a goroutine with panic recovery and context. The function receives the context and can check for cancellation.
func IsValidationError ¶
IsValidationError returns true if err is a ValidationError.
func RecoverPanic ¶
func RecoverPanic(cfg RecoveryConfig)
RecoverPanic recovers from panics and handles them according to config. Use as: defer RecoverPanic(cfg)
func SafeGo ¶
SafeGo starts a goroutine with panic recovery using a simple logger. This is a convenience wrapper for common cases.
func SafeGoCtx ¶
SafeGoCtx starts a goroutine with panic recovery using a simple logger and context.
func VerifyCertificateSignatures ¶
func VerifyCertificateSignatures[H Hash]( cert *Certificate[H], validators ValidatorSet, crypto CryptoProvider, ) error
VerifyCertificateSignatures verifies all signatures in a certificate. Uses the provided CryptoProvider for optimized verification.
Types ¶
type BLSAggregator ¶
type BLSAggregator interface {
// Aggregate combines multiple BLS signatures into one.
// Returns the aggregated signature bytes.
Aggregate(signatures [][]byte) ([]byte, error)
// VerifyAggregate verifies an aggregated signature against multiple public keys.
// pubKeys are the raw public key bytes of all signers.
// message is the data that was signed (must be same for all).
// aggSig is the aggregated signature.
VerifyAggregate(pubKeys [][]byte, message []byte, aggSig []byte) bool
}
BLSAggregator provides signature aggregation for BLS. Aggregated signatures reduce certificate size and verification time.
type BLSBatchVerifier ¶
type BLSBatchVerifier interface {
// AddSignature adds a signature to the batch.
// pubKey is the raw public key bytes.
// message is the data that was signed.
// signature is the BLS signature bytes.
AddSignature(pubKey []byte, message []byte, signature []byte)
// VerifyBatch verifies all signatures added to the batch.
// Returns true if all signatures are valid.
VerifyBatch() bool
// Reset clears the batch for reuse.
Reset()
}
BLSBatchVerifier provides batch verification capability for BLS signatures. This interface should be implemented by the integrating application using their preferred BLS library (e.g., blst, bls12-381).
type BLSCryptoProvider ¶
type BLSCryptoProvider struct {
// contains filtered or unexported fields
}
BLSCryptoProvider implements CryptoProvider for BLS.
func NewBLSCryptoProvider ¶
func NewBLSCryptoProvider(batcherFactory func() BLSBatchVerifier, aggregator BLSAggregator) *BLSCryptoProvider
NewBLSCryptoProvider creates a crypto provider for BLS. batcherFactory creates new BLSBatchVerifier instances. aggregator provides signature aggregation (optional, can be nil).
func (*BLSCryptoProvider) Aggregator ¶
func (p *BLSCryptoProvider) Aggregator() BLSAggregator
func (*BLSCryptoProvider) NewBatchVerifier ¶
func (p *BLSCryptoProvider) NewBatchVerifier(validators ValidatorSet) BatchVerifier
func (*BLSCryptoProvider) Scheme ¶
func (p *BLSCryptoProvider) Scheme() SignatureScheme
func (*BLSCryptoProvider) SupportsAggregation ¶
func (p *BLSCryptoProvider) SupportsAggregation() bool
type BLSKeyPair ¶
type BLSKeyPair struct {
PrivateKey []byte // 32 bytes (fr.Element)
PublicKey []byte // 96 bytes (G2Affine compressed)
}
BLSKeyPair holds a BLS private/public key pair.
func GenerateBLSKeyPair ¶
func GenerateBLSKeyPair() (*BLSKeyPair, error)
GenerateBLSKeyPair generates a new BLS key pair.
type BLSPublicKey ¶
type BLSPublicKey struct {
// contains filtered or unexported fields
}
BLSPublicKey wraps a BLS public key for the PublicKey interface.
func NewBLSPublicKey ¶
func NewBLSPublicKey(b []byte) (*BLSPublicKey, error)
NewBLSPublicKey creates a BLS public key from bytes.
func (*BLSPublicKey) Bytes ¶
func (k *BLSPublicKey) Bytes() []byte
func (*BLSPublicKey) Equals ¶
func (k *BLSPublicKey) Equals(other interface{ Bytes() []byte }) bool
func (*BLSPublicKey) Verify ¶
func (k *BLSPublicKey) Verify(message, signature []byte) bool
type BLSSigner ¶
type BLSSigner struct {
// contains filtered or unexported fields
}
BLSSigner implements the Signer interface for BLS.
func NewBLSSigner ¶
func NewBLSSigner(keyPair *BLSKeyPair) (*BLSSigner, error)
NewBLSSigner creates a new BLS signer from a key pair.
type BLSValidatorSet ¶
type BLSValidatorSet struct {
// contains filtered or unexported fields
}
BLSValidatorSet implements ValidatorSet for BLS validators.
func NewBLSValidatorSet ¶
func NewBLSValidatorSet(n int) (*BLSValidatorSet, error)
NewBLSValidatorSet creates a validator set with n BLS validators.
func (*BLSValidatorSet) Contains ¶
func (v *BLSValidatorSet) Contains(index uint16) bool
func (*BLSValidatorSet) Count ¶
func (v *BLSValidatorSet) Count() int
func (*BLSValidatorSet) F ¶
func (v *BLSValidatorSet) F() int
func (*BLSValidatorSet) GetByIndex ¶
func (v *BLSValidatorSet) GetByIndex(index uint16) (PublicKey, error)
func (*BLSValidatorSet) GetSigner ¶
func (v *BLSValidatorSet) GetSigner(index uint16) *BLSSigner
GetSigner returns the signer for a validator index.
type Batch ¶
type Batch[H Hash, T Transaction[H]] struct { WorkerID uint16 ValidatorID uint16 Round uint64 Transactions []T Digest H }
Batch is a collection of transactions created by a worker.
func BatchFromBytes ¶
func BatchFromBytes[H Hash, T Transaction[H]]( data []byte, hashFromBytes func([]byte) (H, error), txFromBytes func([]byte) (T, error), ) (*Batch[H, T], error)
BatchFromBytes deserializes a batch from bytes. hashFromBytes converts raw bytes to H. txFromBytes converts raw bytes to T.
func (*Batch[H, T]) Bytes ¶
Bytes serializes the batch to bytes. Format: [WorkerID:2][ValidatorID:2][Round:8][DigestLen:2][Digest][TxCount:4][Tx0Len:4][Tx0][Tx1Len:4][Tx1]...
func (*Batch[H, T]) ComputeDigest ¶
ComputeDigest computes and sets the digest for this batch. The digest is computed over the batch metadata and transaction hashes. This must be called before broadcasting the batch.
func (*Batch[H, T]) TransactionCount ¶
TransactionCount returns the number of transactions in the batch.
type BatchCreatedEvent ¶
type BatchCreatedEvent[H Hash, T Transaction[H]] struct { Batch *Batch[H, T] WorkerID uint16 TransactionCount int SizeBytes int CreatedAt time.Time }
BatchCreatedEvent contains information about a newly created batch.
type BatchMessage ¶
type BatchMessage[H Hash, T Transaction[H]] struct { BatchData *Batch[H, T] // contains filtered or unexported fields }
func (*BatchMessage[H, T]) Batch ¶
func (m *BatchMessage[H, T]) Batch() *Batch[H, T]
func (*BatchMessage[H, T]) Sender ¶
func (m *BatchMessage[H, T]) Sender() uint16
func (*BatchMessage[H, T]) Type ¶
func (m *BatchMessage[H, T]) Type() MessageType
type BatchMessageAccessor ¶
type BatchMessageAccessor[H Hash, T Transaction[H]] interface{ Batch() *Batch[H, T] }
type BatchRequestMessage ¶
type BatchRequestMessage[H Hash, T Transaction[H]] struct { Digest H // contains filtered or unexported fields }
BatchRequestMessage requests a batch by digest.
func (*BatchRequestMessage[H, T]) Sender ¶
func (m *BatchRequestMessage[H, T]) Sender() uint16
func (*BatchRequestMessage[H, T]) Type ¶
func (m *BatchRequestMessage[H, T]) Type() MessageType
type BatchVerifier ¶
type BatchVerifier interface {
// Add queues a signature for batch verification.
// validatorIndex is used to look up the public key.
Add(message []byte, signature []byte, validatorIndex uint16)
// Verify performs batch verification of all queued signatures.
// Returns nil if all signatures are valid, or an error describing the failure.
Verify() error
// Reset clears all queued signatures for reuse.
Reset()
}
BatchVerifier provides optimized signature verification. For BLS, it can verify multiple signatures in a single operation. For Ed25519, it parallelizes verification across goroutines.
Usage:
verifier := NewBatchVerifier(validators, SignatureSchemeBLS)
for i, sig := range signatures {
verifier.Add(message, sig, validatorIndex)
}
if err := verifier.Verify(); err != nil {
// verification failed
}
func NewBLSBatchVerifier ¶
func NewBLSBatchVerifier(validators ValidatorSet, batcher BLSBatchVerifier) BatchVerifier
NewBLSBatchVerifier creates a batch verifier for BLS signatures. The batcher parameter should be provided by the integrating application.
func NewEd25519BatchVerifier ¶
func NewEd25519BatchVerifier(validators ValidatorSet, workers int) BatchVerifier
NewEd25519BatchVerifier creates a batch verifier for Ed25519. It parallelizes verification across multiple goroutines.
type ByteSlicePool ¶
type ByteSlicePool struct {
// contains filtered or unexported fields
}
ByteSlicePool provides a pool of reusable byte slices.
func NewByteSlicePool ¶
func NewByteSlicePool(defaultSize int) *ByteSlicePool
NewByteSlicePool creates a new byte slice pool with the specified default size.
func (*ByteSlicePool) Get ¶
func (p *ByteSlicePool) Get() *[]byte
Get retrieves a byte slice from the pool. The slice is reset to zero length but retains its capacity.
func (*ByteSlicePool) Put ¶
func (p *ByteSlicePool) Put(b *[]byte)
Put returns a byte slice to the pool. The slice should not be used after calling Put.
type CachedBatchVerifier ¶
type CachedBatchVerifier struct {
// contains filtered or unexported fields
}
CachedBatchVerifier wraps a BatchVerifier with signature caching.
func NewCachedBatchVerifier ¶
func NewCachedBatchVerifier(inner BatchVerifier, cache *SignatureCache, validators ValidatorSet) *CachedBatchVerifier
NewCachedBatchVerifier wraps a batch verifier with caching.
func (*CachedBatchVerifier) Add ¶
func (v *CachedBatchVerifier) Add(message []byte, signature []byte, validatorIndex uint16)
func (*CachedBatchVerifier) Reset ¶
func (v *CachedBatchVerifier) Reset()
func (*CachedBatchVerifier) Verify ¶
func (v *CachedBatchVerifier) Verify() error
type Certificate ¶
type Certificate[H Hash] struct { Header *Header[H] Signatures [][]byte SignerBitmap uint64 // bit i set if validator i signed }
Certificate is a header with 2f+1 validator signatures.
func CertificateFromBytes ¶
func CertificateFromBytes[H Hash]( data []byte, hashFromBytes func([]byte) (H, error), ) (*Certificate[H], error)
CertificateFromBytes deserializes a certificate from bytes.
func NewCertificate ¶
func NewCertificate[H Hash](header *Header[H], votes map[uint16][]byte) *Certificate[H]
NewCertificate creates a new certificate from a header and votes.
func (*Certificate[H]) Author ¶
func (c *Certificate[H]) Author() uint16
Author returns the certificate's author.
func (*Certificate[H]) Bytes ¶
func (c *Certificate[H]) Bytes() []byte
Bytes serializes the certificate to bytes. Format: [Header bytes][SignerBitmap:8][SigCount:4][Sig0Len:2][Sig0][Sig1Len:2][Sig1]...
func (*Certificate[H]) Digest ¶
func (c *Certificate[H]) Digest() H
Digest returns the certificate's digest (same as header digest).
func (*Certificate[H]) HasSigner ¶
func (c *Certificate[H]) HasSigner(validatorIndex uint16) bool
HasSigner returns true if the given validator signed this certificate.
func (*Certificate[H]) Round ¶
func (c *Certificate[H]) Round() uint64
Round returns the certificate's round.
func (*Certificate[H]) SignerCount ¶
func (c *Certificate[H]) SignerCount() int
SignerCount returns the number of validators who signed this certificate.
func (*Certificate[H]) Validate ¶
func (c *Certificate[H]) Validate(validators ValidatorSet) error
Validate verifies that the certificate has a valid quorum of signatures.
func (*Certificate[H]) ValidateWithCrypto ¶
func (c *Certificate[H]) ValidateWithCrypto(validators ValidatorSet, crypto CryptoProvider, cache *SignatureCache) error
ValidateWithCrypto verifies that the certificate has a valid quorum of signatures using the provided CryptoProvider for optimized batch/parallel verification. If cache is provided, already-verified signatures are skipped.
type CertificateCache ¶
type CertificateCache[H Hash] struct { // contains filtered or unexported fields }
CertificateCache is a specialized LRU cache for certificates. It uses certificate digests as keys and provides convenient methods for certificate lookups.
func NewCertificateCache ¶
func NewCertificateCache[H Hash](capacity int) *CertificateCache[H]
NewCertificateCache creates a new certificate cache with the given capacity.
func (*CertificateCache[H]) Clear ¶
func (c *CertificateCache[H]) Clear()
Clear removes all certificates from the cache.
func (*CertificateCache[H]) Contains ¶
func (c *CertificateCache[H]) Contains(digest H) bool
Contains checks if a certificate exists in the cache.
func (*CertificateCache[H]) Get ¶
func (c *CertificateCache[H]) Get(digest H) (*Certificate[H], bool)
Get retrieves a certificate by its digest.
func (*CertificateCache[H]) Len ¶
func (c *CertificateCache[H]) Len() int
Len returns the number of certificates in the cache.
func (*CertificateCache[H]) Put ¶
func (c *CertificateCache[H]) Put(cert *Certificate[H])
Put adds a certificate to the cache.
func (*CertificateCache[H]) Remove ¶
func (c *CertificateCache[H]) Remove(digest H) bool
Remove removes a certificate from the cache.
func (*CertificateCache[H]) Stats ¶
func (c *CertificateCache[H]) Stats() LRUCacheStats
Stats returns cache statistics.
type CertificateFormedEvent ¶
type CertificateFormedEvent[H Hash] struct { Certificate *Certificate[H] SignerCount int Latency time.Duration // Time from header creation to certificate formation FormedAt time.Time }
CertificateFormedEvent contains information about a newly formed certificate.
type CertificateMessage ¶
type CertificateMessage[H Hash, T Transaction[H]] struct { CertificateData *Certificate[H] // contains filtered or unexported fields }
CertificateMessage wraps a certificate for network transmission.
func (*CertificateMessage[H, T]) Certificate ¶
func (m *CertificateMessage[H, T]) Certificate() *Certificate[H]
func (*CertificateMessage[H, T]) Sender ¶
func (m *CertificateMessage[H, T]) Sender() uint16
func (*CertificateMessage[H, T]) Type ¶
func (m *CertificateMessage[H, T]) Type() MessageType
type CertificateMessageAccessor ¶
type CertificateMessageAccessor[H Hash] interface{ Certificate() *Certificate[H] }
type CertificatePendingEvent ¶
type CertificatePendingEvent[H Hash] struct { Certificate *Certificate[H] MissingParents []H QueuedAt time.Time }
CertificatePendingEvent contains information about a certificate queued as pending.
type CertificateRangeRequestMessage ¶
type CertificateRangeRequestMessage[H Hash, T Transaction[H]] struct { StartRound uint64 EndRound uint64 // contains filtered or unexported fields }
CertificateRangeRequestMessage requests certificates for a range of rounds.
func (*CertificateRangeRequestMessage[H, T]) Sender ¶
func (m *CertificateRangeRequestMessage[H, T]) Sender() uint16
func (*CertificateRangeRequestMessage[H, T]) Type ¶
func (m *CertificateRangeRequestMessage[H, T]) Type() MessageType
type CertificateRangeResponseMessage ¶
type CertificateRangeResponseMessage[H Hash, T Transaction[H]] struct { Certificates []*Certificate[H] StartRound uint64 EndRound uint64 // contains filtered or unexported fields }
CertificateRangeResponseMessage contains certificates for a range of rounds.
func (*CertificateRangeResponseMessage[H, T]) Sender ¶
func (m *CertificateRangeResponseMessage[H, T]) Sender() uint16
func (*CertificateRangeResponseMessage[H, T]) Type ¶
func (m *CertificateRangeResponseMessage[H, T]) Type() MessageType
type CertificateReceivedEvent ¶
type CertificateReceivedEvent[H Hash] struct { Certificate *Certificate[H] From uint16 ReceivedAt time.Time }
CertificateReceivedEvent contains information about a received certificate.
type CertificateRequestMessage ¶
type CertificateRequestMessage[H Hash, T Transaction[H]] struct { Digest H // contains filtered or unexported fields }
CertificateRequestMessage requests a certificate by digest.
func (*CertificateRequestMessage[H, T]) Sender ¶
func (m *CertificateRequestMessage[H, T]) Sender() uint16
func (*CertificateRequestMessage[H, T]) Type ¶
func (m *CertificateRequestMessage[H, T]) Type() MessageType
type ChannelMetrics ¶
type ChannelMetrics struct {
// contains filtered or unexported fields
}
ChannelMetrics collects metrics from multiple metered channels. Useful for exposing all channel metrics through a single interface.
func NewChannelMetrics ¶
func NewChannelMetrics() *ChannelMetrics
NewChannelMetrics creates a new ChannelMetrics collector.
func (*ChannelMetrics) AllStats ¶
func (cm *ChannelMetrics) AllStats() map[string]MeteredChannelStats
AllStats returns statistics for all registered channels.
func (*ChannelMetrics) Register ¶
func (cm *ChannelMetrics) Register(name string, ch interface{ Stats() MeteredChannelStats })
Register adds a metered channel to the collector.
func (*ChannelMetrics) TotalDropped ¶
func (cm *ChannelMetrics) TotalDropped() uint64
TotalDropped returns the sum of dropped messages across all channels.
func (*ChannelMetrics) TotalPending ¶
func (cm *ChannelMetrics) TotalPending() int
TotalPending returns the sum of pending messages across all channels.
func (*ChannelMetrics) Unregister ¶
func (cm *ChannelMetrics) Unregister(name string)
Unregister removes a channel from the collector.
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern for network operations. It prevents repeated failures from overwhelming the system.
func NewCircuitBreaker ¶
func NewCircuitBreaker(cfg CircuitBreakerConfig) *CircuitBreaker
NewCircuitBreaker creates a new circuit breaker.
func (*CircuitBreaker) Allow ¶
func (cb *CircuitBreaker) Allow() bool
Allow checks if a request should be allowed. Returns true if allowed, false if the circuit is open.
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
RecordFailure records a failed operation.
func (*CircuitBreaker) RecordSuccess ¶
func (cb *CircuitBreaker) RecordSuccess()
RecordSuccess records a successful operation.
func (*CircuitBreaker) Reset ¶
func (cb *CircuitBreaker) Reset()
Reset resets the circuit breaker to closed state.
func (*CircuitBreaker) State ¶
func (cb *CircuitBreaker) State() string
State returns the current circuit state as a string.
func (*CircuitBreaker) Stats ¶
func (cb *CircuitBreaker) Stats() CircuitBreakerStats
Stats returns current statistics.
type CircuitBreakerConfig ¶
type CircuitBreakerConfig struct {
// FailureThreshold is the number of failures before opening the circuit.
// Default: 5
FailureThreshold int
// SuccessThreshold is the number of successes needed to close from half-open.
// Default: 2
SuccessThreshold int
// Timeout is how long to wait before trying again (moving to half-open).
// Default: 30s
Timeout time.Duration
}
CircuitBreakerConfig configures the circuit breaker.
func DefaultCircuitBreakerConfig ¶
func DefaultCircuitBreakerConfig() CircuitBreakerConfig
DefaultCircuitBreakerConfig returns sensible defaults.
type CircuitBreakerStats ¶
type CircuitBreakerStats struct {
State string
Failures int
Successes int
TotalFailures uint64
TotalSuccesses uint64
TotalRejected uint64
LastFailure time.Time
}
CircuitBreakerStats contains circuit breaker statistics.
type Config ¶
type Config[H Hash, T Transaction[H]] struct { // MyIndex is this validator's index in the validator set. MyIndex uint16 // Validators is the set of all validators. // Required. Validators ValidatorSet // Signer provides cryptographic signing capability. // Required. Signer Signer // Storage provides persistent storage. // Required. Storage Storage[H, T] // Network provides message delivery. // Required. Network Network[H, T] // Timer provides timeout management. // Required. Timer Timer // Logger for structured logging. // Defaults to a no-op logger if not provided. Logger *zap.Logger // WorkerCount is the number of workers for parallel batch creation. // Default: 4 WorkerCount int // BatchSize is the maximum number of transactions per batch. // When this threshold is reached, a batch is created immediately. // Default: 500 BatchSize int // BatchTimeout is the maximum time to wait before creating a batch. // Even if BatchSize is not reached, a batch is created after this duration. // Default: 100ms BatchTimeout time.Duration // HeaderTimeout is the interval between header creation attempts. // Default: 500ms HeaderTimeout time.Duration // GCInterval is the number of rounds between garbage collection runs. // Older rounds are cleaned up to free memory and storage. // Default: 50 GCInterval uint64 // MaxRoundsGap is the maximum number of rounds ahead we accept headers for. // Headers for rounds beyond currentRound + MaxRoundsGap are rejected. // Default: 10 MaxRoundsGap uint64 // MaxPendingTransactions is the maximum number of transactions queued per worker. // When this limit is reached, AddTransaction will block or drop based on DropOnFull. // Default: 10000 MaxPendingTransactions int // MaxPendingBatches is the maximum number of batches queued in the primary. // Default: 1000 MaxPendingBatches int // DropOnFull determines behavior when queues are full. // If true, new items are dropped. If false, AddTransaction blocks. // Default: false (block) DropOnFull bool // MaxHeaderBatches is the maximum number of batch refs to include in a header. // Headers are created when this limit is reached, even if MaxHeaderDelay hasn't elapsed. // Default: 100 MaxHeaderBatches int // MaxHeaderDelay is the maximum time to wait before creating a header. // Even if MaxHeaderBatches is not reached, a header is created after this duration. // Default: HeaderTimeout (uses HeaderTimeout value if not set) MaxHeaderDelay time.Duration // SyncRetryDelay is the delay before retrying sync requests for missing data. // Default: 10s SyncRetryDelay time.Duration // SyncRetryNodes is the number of random nodes to contact when syncing. // Default: 3 SyncRetryNodes int // GCDepth is the number of rounds to retain after the committed round. // This is separate from GCInterval which controls how often GC runs. // Default: 100 GCDepth uint64 // Hooks provides callbacks for observability events. // All hooks are optional - nil hooks are ignored. // See the Hooks struct for available callbacks. Hooks *Hooks[H, T] // MaxPendingHeaders is the maximum number of headers with missing parents to queue. // Default: 1000 MaxPendingHeaders int // HeaderWaiterRetryInterval is how often to retry processing pending headers. // Default: 1s HeaderWaiterRetryInterval time.Duration // HeaderWaiterMaxRetries is the maximum retries before dropping a pending header. // Default: 10 HeaderWaiterMaxRetries int // HeaderWaiterMaxAge is the maximum time a header can wait before being dropped. // Default: 30s HeaderWaiterMaxAge time.Duration // FetchMissingParents enables proactive fetching of missing parent certificates // when headers are queued. This can reduce latency by fetching parents in parallel. // Default: true FetchMissingParents bool // CryptoProvider provides optimized cryptographic operations. // When set, certificate validation uses batch/parallel signature verification. // For Ed25519, this parallelizes verification across goroutines. // For BLS, this enables true batch verification and signature aggregation. // Default: nil (uses standard sequential verification) CryptoProvider CryptoProvider // SignatureCache caches verified signatures to avoid re-verification. // Only used when CryptoProvider is set. // Default: nil (no caching) SignatureCache *SignatureCache // Validation is the configuration for input validation. // Controls limits for DoS prevention (max batch refs, max transactions, etc.) // Default: DefaultValidationConfig() Validation ValidationConfig // RecommendedMessageQueueSize is the advisory buffer size for Network receive channels. // Default: 10000 RecommendedMessageQueueSize int // DAGCache configures the optional LRU cache for DAG vertex lookups. // When enabled, frequently accessed vertices are cached for faster retrieval. // This improves performance for workloads with repeated vertex lookups. // Default: DAGCacheConfig{Enabled: true, Capacity: 10000} DAGCache DAGCacheConfig // NetworkModel specifies the network timing assumptions. // Asynchronous mode advances rounds as soon as 2f+1 parents are available. // Partially synchronous mode waits for leader blocks before advancing. // Default: NetworkModelAsynchronous NetworkModel NetworkModel // LeaderSchedule determines which validator is leader for each round. // Only used when NetworkModel is NetworkModelPartiallySynchronous. // Default: nil (round-robin schedule created from validator count) LeaderSchedule LeaderSchedule // Epoch is the initial epoch for the validator set. // Used for epoch-aware reconfiguration and vote tracking. // Default: 0 Epoch uint64 }
Config holds the configuration for a Narwhal instance. Use NewConfig with functional options to create a properly configured instance.
The type parameters specify the hash and transaction types used.
func DefaultConfig ¶
func DefaultConfig[H Hash, T Transaction[H]]() Config[H, T]
DefaultConfig returns a configuration suitable for most use cases. Balances throughput and latency for typical blockchain workloads.
Settings:
- WorkerCount: 4
- BatchSize: 500
- BatchTimeout: 100ms
- HeaderTimeout: 500ms
- GCInterval: 100
- MaxRoundsGap: 10
func DemoConfig ¶
func DemoConfig[H Hash, T Transaction[H]]() Config[H, T]
DemoConfig returns a configuration suitable for demonstrations and testing. Uses visible timing to make the protocol easier to observe.
Settings:
- WorkerCount: 2
- BatchSize: 10
- BatchTimeout: 1s
- HeaderTimeout: 2s
- GCInterval: 50
- MaxRoundsGap: 10
func HighThroughputConfig ¶
func HighThroughputConfig[H Hash, T Transaction[H]]() Config[H, T]
HighThroughputConfig returns a configuration optimized for maximum throughput. Uses more workers and larger batches, suitable for high-volume systems.
Settings:
- WorkerCount: 8
- BatchSize: 1000
- BatchTimeout: 50ms
- HeaderTimeout: 200ms
- GCInterval: 50
- MaxRoundsGap: 5
func LowLatencyConfig ¶
func LowLatencyConfig[H Hash, T Transaction[H]]() Config[H, T]
LowLatencyConfig returns a configuration optimized for low latency. Uses smaller batches and shorter timeouts, suitable for interactive applications.
Settings:
- WorkerCount: 2
- BatchSize: 100
- BatchTimeout: 20ms
- HeaderTimeout: 100ms
- GCInterval: 100
- MaxRoundsGap: 10
func NewConfig ¶
func NewConfig[H Hash, T Transaction[H]](opts ...ConfigOption[H, T]) (*Config[H, T], error)
NewConfig creates a new Config with the given options. Required options: WithValidators, WithSigner, WithStorage, WithNetwork, WithTimer.
Returns an error if any option fails or if required options are missing.
func (*Config[H, T]) LogWarnings ¶
func (c *Config[H, T]) LogWarnings()
LogWarnings logs all configuration warnings.
func (*Config[H, T]) Warnings ¶
func (c *Config[H, T]) Warnings() []ConfigWarning
Warnings returns warnings for suboptimal configuration choices.
type ConfigOption ¶
type ConfigOption[H Hash, T Transaction[H]] func(*Config[H, T]) error
ConfigOption is a functional option for configuring Narwhal. Options are applied in order, so later options override earlier ones.
func WithBatchSize ¶
func WithBatchSize[H Hash, T Transaction[H]](size int) ConfigOption[H, T]
WithBatchSize sets the maximum number of transactions per batch. Larger batches improve throughput but increase latency. Default: 500
func WithBatchTimeout ¶
func WithBatchTimeout[H Hash, T Transaction[H]](timeout time.Duration) ConfigOption[H, T]
WithBatchTimeout sets the maximum time to wait before creating a batch. Shorter timeouts reduce latency but may create smaller batches. Default: 100ms
func WithCryptoProvider ¶
func WithCryptoProvider[H Hash, T Transaction[H]](crypto CryptoProvider) ConfigOption[H, T]
WithCryptoProvider sets the crypto provider for optimized signature verification. When set, certificate validation uses batch/parallel verification. Default: nil (uses standard sequential verification)
func WithDAGCache ¶
func WithDAGCache[H Hash, T Transaction[H]](cfg DAGCacheConfig) ConfigOption[H, T]
WithDAGCache configures the LRU cache for DAG vertex lookups. When enabled, frequently accessed vertices are cached for faster retrieval. Default: DAGCacheConfig{Enabled: true, Capacity: 10000}
func WithDAGCacheCapacity ¶
func WithDAGCacheCapacity[H Hash, T Transaction[H]](capacity int) ConfigOption[H, T]
WithDAGCacheCapacity enables DAG caching with the specified capacity. This is a convenience method equivalent to WithDAGCache(DAGCacheConfig{Enabled: true, Capacity: capacity}).
func WithDAGCacheDisabled ¶
func WithDAGCacheDisabled[H Hash, T Transaction[H]]() ConfigOption[H, T]
WithDAGCacheDisabled disables DAG vertex caching. Use this if memory is constrained or if your access pattern doesn't benefit from caching.
func WithDropOnFull ¶
func WithDropOnFull[H Hash, T Transaction[H]](drop bool) ConfigOption[H, T]
WithDropOnFull sets the behavior when queues are full. If true, new items are dropped silently. If false, operations block. Default: false (block)
func WithEpoch ¶
func WithEpoch[H Hash, T Transaction[H]](epoch uint64) ConfigOption[H, T]
WithEpoch sets the initial epoch for the validator set. The epoch is used for reconfiguration and vote tracking. Default: 0
func WithGCDepth ¶
func WithGCDepth[H Hash, T Transaction[H]](depth uint64) ConfigOption[H, T]
WithGCDepth sets the number of rounds to retain after committed round. Default: 100
func WithGCInterval ¶
func WithGCInterval[H Hash, T Transaction[H]](interval uint64) ConfigOption[H, T]
WithGCInterval sets the number of rounds between garbage collection runs. Lower values free memory sooner but may impact performance. Default: 50
func WithHeaderTimeout ¶
func WithHeaderTimeout[H Hash, T Transaction[H]](timeout time.Duration) ConfigOption[H, T]
WithHeaderTimeout sets the interval between header creation attempts. Shorter intervals can improve throughput under high load. Default: 500ms
func WithHeaderWaiterMaxAge ¶
func WithHeaderWaiterMaxAge[H Hash, T Transaction[H]](age time.Duration) ConfigOption[H, T]
WithHeaderWaiterMaxAge sets the maximum age before dropping a pending header. Default: 30s
func WithHeaderWaiterMaxRetries ¶
func WithHeaderWaiterMaxRetries[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
WithHeaderWaiterMaxRetries sets the max retries before dropping a pending header. Default: 10
func WithHeaderWaiterRetryInterval ¶
func WithHeaderWaiterRetryInterval[H Hash, T Transaction[H]](interval time.Duration) ConfigOption[H, T]
WithHeaderWaiterRetryInterval sets how often to retry pending headers. Default: 1s
func WithHooks ¶
func WithHooks[H Hash, T Transaction[H]](hooks *Hooks[H, T]) ConfigOption[H, T]
WithHooks sets the observability hooks. Hooks provide callbacks for metrics, logging, and monitoring integration. All hooks are optional - nil hooks are ignored.
func WithLeaderSchedule ¶
func WithLeaderSchedule[H Hash, T Transaction[H]](schedule LeaderSchedule) ConfigOption[H, T]
WithLeaderSchedule sets the leader schedule for partially synchronous mode. If not set and NetworkModel is PartialSync, a round-robin schedule is created.
func WithLogger ¶
func WithLogger[H Hash, T Transaction[H]](logger *zap.Logger) ConfigOption[H, T]
WithLogger sets the structured logger. If not provided, a no-op logger is used.
func WithMaxHeaderBatches ¶
func WithMaxHeaderBatches[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
WithMaxHeaderBatches sets the maximum batch refs to include in a header. Headers are created when this limit is reached. Default: 100
func WithMaxHeaderDelay ¶
func WithMaxHeaderDelay[H Hash, T Transaction[H]](delay time.Duration) ConfigOption[H, T]
WithMaxHeaderDelay sets the maximum time before creating a header. Headers are created after this duration even if MaxHeaderBatches isn't reached. Default: HeaderTimeout value
func WithMaxPendingBatches ¶
func WithMaxPendingBatches[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
WithMaxPendingBatches sets the maximum batches queued in the primary. Default: 1000
func WithMaxPendingHeaders ¶
func WithMaxPendingHeaders[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
WithMaxPendingHeaders sets the maximum headers with missing parents to queue. Default: 1000
func WithMaxPendingTransactions ¶
func WithMaxPendingTransactions[H Hash, T Transaction[H]](max int) ConfigOption[H, T]
WithMaxPendingTransactions sets the maximum transactions queued per worker. When this limit is reached, behavior depends on DropOnFull setting. Default: 10000
func WithMaxRoundsGap ¶
func WithMaxRoundsGap[H Hash, T Transaction[H]](gap uint64) ConfigOption[H, T]
WithMaxRoundsGap sets the maximum rounds ahead we accept headers for. Headers for rounds beyond currentRound + MaxRoundsGap are rejected. Default: 10
func WithMyIndex ¶
func WithMyIndex[H Hash, T Transaction[H]](index uint16) ConfigOption[H, T]
WithMyIndex sets this validator's index in the validator set.
func WithNetwork ¶
func WithNetwork[H Hash, T Transaction[H]](network Network[H, T]) ConfigOption[H, T]
WithNetwork sets the network layer for message delivery. This is a required option.
func WithNetworkModel ¶
func WithNetworkModel[H Hash, T Transaction[H]](model NetworkModel) ConfigOption[H, T]
WithNetworkModel sets the network timing assumptions. Use NetworkModelAsynchronous (default) for optimal throughput. Use NetworkModelPartiallySynchronous for better commit latency with leader awareness.
func WithRecommendedMessageQueueSize ¶
func WithRecommendedMessageQueueSize[H Hash, T Transaction[H]](size int) ConfigOption[H, T]
WithRecommendedMessageQueueSize sets the advisory buffer size for Network receive channels.
func WithSignatureCache ¶
func WithSignatureCache[H Hash, T Transaction[H]](cache *SignatureCache) ConfigOption[H, T]
WithSignatureCache sets the signature cache for avoiding re-verification. Only effective when CryptoProvider is also set. Default: nil (no caching)
func WithSigner ¶
func WithSigner[H Hash, T Transaction[H]](signer Signer) ConfigOption[H, T]
WithSigner sets the cryptographic signer. This is a required option.
func WithStorage ¶
func WithStorage[H Hash, T Transaction[H]](storage Storage[H, T]) ConfigOption[H, T]
WithStorage sets the persistent storage backend. This is a required option.
func WithSyncRetryDelay ¶
func WithSyncRetryDelay[H Hash, T Transaction[H]](delay time.Duration) ConfigOption[H, T]
WithSyncRetryDelay sets the delay before retrying sync requests. Default: 10s
func WithSyncRetryNodes ¶
func WithSyncRetryNodes[H Hash, T Transaction[H]](nodes int) ConfigOption[H, T]
WithSyncRetryNodes sets the number of nodes to contact when syncing. Default: 3
func WithTimer ¶
func WithTimer[H Hash, T Transaction[H]](timer Timer) ConfigOption[H, T]
WithTimer sets the timer for timeout management. This is a required option.
func WithValidation ¶
func WithValidation[H Hash, T Transaction[H]](cfg ValidationConfig) ConfigOption[H, T]
WithValidation sets the validation configuration. Controls limits for DoS prevention (max batch refs, max transactions, etc.) Default: DefaultValidationConfig()
func WithValidators ¶
func WithValidators[H Hash, T Transaction[H]](validators ValidatorSet) ConfigOption[H, T]
WithValidators sets the validator set. This is a required option.
func WithWorkerCount ¶
func WithWorkerCount[H Hash, T Transaction[H]](count int) ConfigOption[H, T]
WithWorkerCount sets the number of workers for parallel batch creation. More workers can increase throughput but use more resources. Default: 4
type ConfigWarning ¶
type ConfigWarning struct {
// Field is the name of the config field that triggered the warning.
Field string
// Message describes the potential issue.
Message string
// Suggestion provides a recommended action or value.
Suggestion string
}
ConfigWarning represents a warning about potentially suboptimal configuration.
func (ConfigWarning) String ¶
func (w ConfigWarning) String() string
String returns a human-readable warning message.
type CryptoProvider ¶
type CryptoProvider interface {
// Scheme returns the signature scheme this provider implements.
Scheme() SignatureScheme
// NewBatchVerifier creates a new batch verifier.
// The verifier uses the provided validator set for public key lookups.
NewBatchVerifier(validators ValidatorSet) BatchVerifier
// SupportsAggregation returns true if this scheme supports signature aggregation.
// BLS supports aggregation, Ed25519 does not.
SupportsAggregation() bool
// Aggregator returns the BLS aggregator if aggregation is supported.
// Returns nil for Ed25519.
Aggregator() BLSAggregator
}
CryptoProvider abstracts the cryptographic operations for Narwhal. Implementations should provide optimized verification for their signature scheme.
type DAG ¶
type DAG[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
DAG manages the directed acyclic graph of certified vertices.
func NewDAG ¶
func NewDAG[H Hash, T Transaction[H]]( validators ValidatorSet, logger *zap.Logger, ) *DAG[H, T]
NewDAG creates a new DAG instance.
func NewDAGWithCache ¶
func NewDAGWithCache[H Hash, T Transaction[H]]( validators ValidatorSet, hooks *Hooks[H, T], cryptoProvider CryptoProvider, sigCache *SignatureCache, cacheConfig *DAGCacheConfig, logger *zap.Logger, ) *DAG[H, T]
NewDAGWithCache creates a new DAG instance with all optional features. This is the most flexible constructor, allowing configuration of:
- cryptoProvider: optimized signature verification
- sigCache: signature verification caching
- cacheConfig: LRU cache for vertex lookups
func NewDAGWithCrypto ¶
func NewDAGWithCrypto[H Hash, T Transaction[H]]( validators ValidatorSet, hooks *Hooks[H, T], cryptoProvider CryptoProvider, sigCache *SignatureCache, logger *zap.Logger, ) *DAG[H, T]
NewDAGWithCrypto creates a new DAG instance with optimized crypto verification. If cryptoProvider is non-nil, certificate validation uses batch/parallel verification. If sigCache is non-nil, verified signatures are cached to avoid re-verification.
func NewDAGWithHooks ¶
func NewDAGWithHooks[H Hash, T Transaction[H]]( validators ValidatorSet, hooks *Hooks[H, T], logger *zap.Logger, ) *DAG[H, T]
NewDAGWithHooks creates a new DAG instance with observability hooks.
func (*DAG[H, T]) CertificateCountForRound ¶
CertificateCountForRound returns the number of certificates for a given round.
func (*DAG[H, T]) CurrentRound ¶
CurrentRound returns the current DAG round.
func (*DAG[H, T]) GarbageCollect ¶
GarbageCollect removes all data for rounds strictly less than the given round.
func (*DAG[H, T]) GetCertificate ¶
func (d *DAG[H, T]) GetCertificate(digest H) *Certificate[H]
GetCertificate retrieves a certificate by its digest. If caching is enabled, this checks the cache first for faster lookups.
func (*DAG[H, T]) GetCertificatesForRound ¶
func (d *DAG[H, T]) GetCertificatesForRound(round uint64) []*Certificate[H]
GetCertificatesForRound returns all certificates for a given round. Results are sorted by validator index for deterministic ordering.
func (*DAG[H, T]) GetMissingParents ¶
func (d *DAG[H, T]) GetMissingParents() []H
GetMissingParents returns all unique parent digests that are missing from the DAG.
func (*DAG[H, T]) GetParents ¶
func (d *DAG[H, T]) GetParents() []H
GetParents returns certificate digests from the previous round to use as parents. Returns nil for round 0. Results are sorted by validator index for deterministic ordering.
func (*DAG[H, T]) GetPendingCertificates ¶
func (d *DAG[H, T]) GetPendingCertificates() []*PendingCertificate[H, T]
GetPendingCertificates returns all certificates waiting for missing parents.
func (*DAG[H, T]) GetTransactions ¶
func (d *DAG[H, T]) GetTransactions(certs []*Certificate[H]) ([]T, error)
GetTransactions extracts all transactions from the given certificates. Transactions are deduplicated by hash.
func (*DAG[H, T]) GetUncommitted ¶
func (d *DAG[H, T]) GetUncommitted() []*Certificate[H]
GetUncommitted returns certificates not yet ordered by consensus. Results are sorted by (round, author) for deterministic ordering.
func (*DAG[H, T]) GetVertex ¶
GetVertex retrieves a vertex (certificate + batches) by its digest. If caching is enabled, this checks the cache first for faster lookups.
func (*DAG[H, T]) InsertCertificate ¶
func (d *DAG[H, T]) InsertCertificate(cert *Certificate[H], batches []*Batch[H, T]) error
InsertCertificate adds a certified vertex to the DAG. Returns an error if the certificate is invalid or an equivocation is detected. If parents are missing, the certificate is queued as pending.
func (*DAG[H, T]) InsertValidatedCertificate ¶
func (d *DAG[H, T]) InsertValidatedCertificate(cert *Certificate[H], batches []*Batch[H, T]) error
InsertValidatedCertificate validates and inserts a certificate into the DAG. This is a convenience method that combines validation and insertion. Returns an error if validation fails or if the certificate is an equivocation.
func (*DAG[H, T]) IsCertified ¶
IsCertified checks if a certificate with the given digest exists in the DAG. If caching is enabled, this checks the cache first for faster lookups.
func (*DAG[H, T]) MarkCommitted ¶
func (d *DAG[H, T]) MarkCommitted(certs []*Certificate[H])
MarkCommitted marks certificates as committed by consensus. This allows garbage collection of the underlying data.
func (*DAG[H, T]) OnEquivocation ¶
func (d *DAG[H, T]) OnEquivocation(callback func(evidence *EquivocationEvidence[H]))
OnEquivocation sets a callback that is invoked when equivocation is detected.
func (*DAG[H, T]) ValidateCertificate ¶
func (d *DAG[H, T]) ValidateCertificate(cert *Certificate[H]) error
ValidateCertificate validates a certificate using the DAG's crypto provider. If no crypto provider is configured, falls back to standard sequential verification.
type DAGCacheConfig ¶
type DAGCacheConfig struct {
// Enabled controls whether vertex caching is enabled.
// When enabled, frequently accessed vertices are cached for faster retrieval.
Enabled bool
// Capacity is the maximum number of vertices to cache.
// Default: 10000 if Enabled is true but Capacity is 0.
Capacity int
}
DAGCacheConfig configures the optional LRU cache for DAG vertex lookups.
func DefaultDAGCacheConfig ¶
func DefaultDAGCacheConfig() DAGCacheConfig
DefaultDAGCacheConfig returns the default cache configuration (disabled).
type DAGStats ¶
type DAGStats struct {
CurrentRound uint64
GCRound uint64
TotalVertices int
UncommittedCount int
PendingCount int
RoundCounts map[uint64]int
// CacheStats contains LRU cache statistics (nil if caching disabled)
CacheStats *LRUCacheStats
}
DAGStats contains DAG statistics for monitoring.
type Ed25519CryptoProvider ¶
type Ed25519CryptoProvider struct {
// contains filtered or unexported fields
}
Ed25519CryptoProvider implements CryptoProvider for Ed25519.
func NewEd25519CryptoProvider ¶
func NewEd25519CryptoProvider(workers int) *Ed25519CryptoProvider
NewEd25519CryptoProvider creates a crypto provider for Ed25519.
func (*Ed25519CryptoProvider) Aggregator ¶
func (p *Ed25519CryptoProvider) Aggregator() BLSAggregator
func (*Ed25519CryptoProvider) NewBatchVerifier ¶
func (p *Ed25519CryptoProvider) NewBatchVerifier(validators ValidatorSet) BatchVerifier
func (*Ed25519CryptoProvider) Scheme ¶
func (p *Ed25519CryptoProvider) Scheme() SignatureScheme
func (*Ed25519CryptoProvider) SupportsAggregation ¶
func (p *Ed25519CryptoProvider) SupportsAggregation() bool
type EpochAwareValidatorSet ¶
type EpochAwareValidatorSet struct {
ValidatorSet
// contains filtered or unexported fields
}
EpochAwareValidatorSet wraps a validator set with epoch information. This can be used to validate that certificates use the correct validator set for their epoch.
func NewEpochAwareValidatorSet ¶
func NewEpochAwareValidatorSet(vs ValidatorSet, epoch uint64) *EpochAwareValidatorSet
NewEpochAwareValidatorSet creates a new epoch-aware validator set.
func (*EpochAwareValidatorSet) Epoch ¶
func (e *EpochAwareValidatorSet) Epoch() uint64
Epoch returns the epoch this validator set is valid for.
type EpochChange ¶
type EpochChange struct {
// FromEpoch is the epoch we're transitioning from.
FromEpoch uint64
// ToEpoch is the epoch we're transitioning to.
ToEpoch uint64
// EffectiveRound is the DAG round at which the new epoch takes effect.
// All certificates at or after this round use the new validator set.
EffectiveRound uint64
// NewValidators is the new validator set (may be nil if unchanged).
NewValidators ValidatorSet
// ProposedAt is when this epoch change was proposed.
ProposedAt time.Time
// CommittedAt is when this epoch change was committed (zero if not yet committed).
CommittedAt time.Time
}
EpochChange represents a pending or completed epoch change.
type EquivocationDetectedEvent ¶
type EquivocationDetectedEvent[H Hash] struct { Author uint16 Round uint64 FirstDigest H SecondDigest H DetectedAt time.Time }
EquivocationDetectedEvent contains information about detected equivocation.
type EquivocationEvidence ¶
type EquivocationEvidence[H Hash] struct { Author uint16 Round uint64 Certificate1 *Certificate[H] Certificate2 *Certificate[H] }
EquivocationEvidence contains proof of a validator creating conflicting certificates.
type FetchCompletedEvent ¶
type FetchCompletedEvent[H Hash] struct { Type FetchType Digest H Success bool Error error Attempts int FromPeer uint16 // Peer that provided the data (if successful) Latency time.Duration CompletedAt time.Time }
FetchCompletedEvent contains information about a completed fetch operation.
type FetchStartedEvent ¶
type FetchStartedEvent[H Hash] struct { Type FetchType Digest H PreferredPeer uint16 StartedAt time.Time }
FetchStartedEvent contains information about a fetch operation starting.
type Fetcher ¶
type Fetcher[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
Fetcher handles fetching missing batches and certificates from peers. Standalone component with retry logic and request deduplication.
func NewFetcher ¶
func NewFetcher[H Hash, T Transaction[H]]( cfg FetcherConfig, network Network[H, T], storage Storage[H, T], validators ValidatorSet, hashFunc func([]byte) H, logger *zap.Logger, ) *Fetcher[H, T]
NewFetcher creates a new Fetcher instance.
func NewFetcherWithHooks ¶
func NewFetcherWithHooks[H Hash, T Transaction[H]]( cfg FetcherConfig, network Network[H, T], storage Storage[H, T], validators ValidatorSet, hashFunc func([]byte) H, hooks *Hooks[H, T], logger *zap.Logger, ) *Fetcher[H, T]
NewFetcherWithHooks creates a new Fetcher instance with observability hooks.
func (*Fetcher[H, T]) FetchBatch ¶
func (f *Fetcher[H, T]) FetchBatch(ctx context.Context, digest H, preferredPeer uint16) (*Batch[H, T], error)
FetchBatch fetches a batch by digest, trying multiple peers if needed. It deduplicates concurrent requests for the same digest.
func (*Fetcher[H, T]) FetchBatchesParallel ¶
func (f *Fetcher[H, T]) FetchBatchesParallel(ctx context.Context, digests []H, preferredPeer uint16) ([]*Batch[H, T], error)
FetchBatchesParallel fetches multiple batches in parallel.
func (*Fetcher[H, T]) FetchCertificate ¶
func (f *Fetcher[H, T]) FetchCertificate(ctx context.Context, digest H, preferredPeer uint16) (*Certificate[H], error)
FetchCertificate fetches a certificate by digest, trying multiple peers if needed.
func (*Fetcher[H, T]) PendingFetchCount ¶
PendingFetchCount returns the number of pending fetch operations.
type FetcherConfig ¶
type FetcherConfig struct {
// MaxRetries is the maximum number of retry attempts per fetch.
MaxRetries int
// RetryDelay is the base delay between retries (with exponential backoff).
RetryDelay time.Duration
// FetchTimeout is the timeout for a single fetch attempt.
FetchTimeout time.Duration
// MaxConcurrentFetches limits parallel fetch operations.
MaxConcurrentFetches int
}
FetcherConfig configures the Fetcher.
func DefaultFetcherConfig ¶
func DefaultFetcherConfig() FetcherConfig
DefaultFetcherConfig returns sensible defaults for the Fetcher.
type GCConfig ¶
type GCConfig struct {
// Interval is how often GC runs (in rounds, not time).
// GC is triggered when currentRound - lastGCRound >= Interval.
Interval uint64
// RetainRounds is the number of rounds to keep after the committed round.
// Older data is eligible for garbage collection.
RetainRounds uint64
// CheckInterval is how often to check if GC should run (in time).
CheckInterval time.Duration
}
GCConfig configures the GarbageCollector.
func DefaultGCConfig ¶
func DefaultGCConfig() GCConfig
DefaultGCConfig returns sensible defaults for the GarbageCollector.
type GarbageCollectedEvent ¶
type GarbageCollectedEvent struct {
BeforeRound uint64
CommittedRound uint64
CollectedAt time.Time
}
GarbageCollectedEvent contains information about a GC cycle.
type GarbageCollector ¶
type GarbageCollector[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
GarbageCollector periodically cleans up old DAG data. Standalone component for automatic round-based cleanup independent of the main Narwhal struct.
func NewGarbageCollector ¶
func NewGarbageCollector[H Hash, T Transaction[H]]( cfg GCConfig, dag *DAG[H, T], storage Storage[H, T], logger *zap.Logger, ) *GarbageCollector[H, T]
NewGarbageCollector creates a new GarbageCollector.
func NewGarbageCollectorWithHooks ¶
func NewGarbageCollectorWithHooks[H Hash, T Transaction[H]]( cfg GCConfig, dag *DAG[H, T], storage Storage[H, T], hooks *Hooks[H, T], logger *zap.Logger, ) *GarbageCollector[H, T]
NewGarbageCollectorWithHooks creates a new GarbageCollector with observability hooks.
func (*GarbageCollector[H, T]) ForceGC ¶
func (gc *GarbageCollector[H, T]) ForceGC()
ForceGC forces an immediate garbage collection cycle.
func (*GarbageCollector[H, T]) OnGC ¶
func (gc *GarbageCollector[H, T]) OnGC(callback func(beforeRound uint64))
OnGC sets a callback that is invoked when GC runs.
func (*GarbageCollector[H, T]) Run ¶
func (gc *GarbageCollector[H, T]) Run(ctx context.Context)
Run starts the garbage collection loop. It runs until the context is cancelled.
func (*GarbageCollector[H, T]) SetCommittedRound ¶
func (gc *GarbageCollector[H, T]) SetCommittedRound(round uint64)
SetCommittedRound updates the committed round. GC will only clean up data older than (committedRound - retainRounds).
func (*GarbageCollector[H, T]) Stats ¶
func (gc *GarbageCollector[H, T]) Stats() GCStats
Stats returns current GC statistics.
type GnarkBLSAggregator ¶
type GnarkBLSAggregator struct{}
GnarkBLSAggregator implements BLSAggregator using gnark-crypto.
func NewGnarkBLSAggregator ¶
func NewGnarkBLSAggregator() *GnarkBLSAggregator
NewGnarkBLSAggregator creates a new BLS aggregator.
func (*GnarkBLSAggregator) Aggregate ¶
func (a *GnarkBLSAggregator) Aggregate(signatures [][]byte) ([]byte, error)
Aggregate combines multiple BLS signatures into one.
func (*GnarkBLSAggregator) VerifyAggregate ¶
func (a *GnarkBLSAggregator) VerifyAggregate(pubKeys [][]byte, message []byte, aggSig []byte) bool
VerifyAggregate verifies an aggregated signature (all signers signed the same message).
type GnarkBLSBatchVerifier ¶
type GnarkBLSBatchVerifier struct {
// contains filtered or unexported fields
}
GnarkBLSBatchVerifier implements BLSBatchVerifier using gnark-crypto.
func NewGnarkBLSBatchVerifier ¶
func NewGnarkBLSBatchVerifier() *GnarkBLSBatchVerifier
NewGnarkBLSBatchVerifier creates a new BLS batch verifier.
func (*GnarkBLSBatchVerifier) AddSignature ¶
func (v *GnarkBLSBatchVerifier) AddSignature(pubKey []byte, message []byte, signature []byte)
AddSignature adds a signature to the batch.
func (*GnarkBLSBatchVerifier) Reset ¶
func (v *GnarkBLSBatchVerifier) Reset()
Reset clears the batch.
func (*GnarkBLSBatchVerifier) VerifyBatch ¶
func (v *GnarkBLSBatchVerifier) VerifyBatch() bool
VerifyBatch verifies all signatures in the batch using random linear combination.
type GnarkBLSCryptoProvider ¶
type GnarkBLSCryptoProvider struct {
// contains filtered or unexported fields
}
GnarkBLSCryptoProvider implements CryptoProvider for BLS using gnark-crypto.
func NewGnarkBLSCryptoProvider ¶
func NewGnarkBLSCryptoProvider() *GnarkBLSCryptoProvider
NewGnarkBLSCryptoProvider creates a BLS crypto provider.
func (*GnarkBLSCryptoProvider) Aggregator ¶
func (p *GnarkBLSCryptoProvider) Aggregator() BLSAggregator
func (*GnarkBLSCryptoProvider) NewBatchVerifier ¶
func (p *GnarkBLSCryptoProvider) NewBatchVerifier(validators ValidatorSet) BatchVerifier
func (*GnarkBLSCryptoProvider) Scheme ¶
func (p *GnarkBLSCryptoProvider) Scheme() SignatureScheme
func (*GnarkBLSCryptoProvider) SupportsAggregation ¶
func (p *GnarkBLSCryptoProvider) SupportsAggregation() bool
type Header ¶
type Header[H Hash] struct { Author uint16 Round uint64 Epoch uint64 BatchRefs []H Parents []H // digests of parent certificates from round-1 Timestamp uint64 Digest H }
Header is a proposal from a primary to form a DAG vertex.
func HeaderFromBytes ¶
func HeaderFromBytes[H Hash]( data []byte, hashFromBytes func([]byte) (H, error), ) (*Header[H], error)
HeaderFromBytes deserializes a header from bytes.
func (*Header[H]) Bytes ¶
Bytes serializes the header to bytes. Format: [Author:2][Round:8][Epoch:8][Timestamp:8][DigestLen:2][Digest][BatchRefCount:4][HashLen:2][BatchRef0]...[ParentCount:4][Parent0]...
func (*Header[H]) ComputeDigest ¶
ComputeDigest computes and sets the digest for this header. The digest is computed over all header fields except the digest itself. This must be called before broadcasting the header.
func (*Header[H]) VerifyDigest ¶
VerifyDigest verifies that the header digest matches its contents.
type HeaderCreatedEvent ¶
HeaderCreatedEvent contains information about a newly created header.
type HeaderMessage ¶
type HeaderMessage[H Hash, T Transaction[H]] struct { HeaderData *Header[H] // contains filtered or unexported fields }
HeaderMessage wraps a header for network transmission.
func (*HeaderMessage[H, T]) Header ¶
func (m *HeaderMessage[H, T]) Header() *Header[H]
func (*HeaderMessage[H, T]) Sender ¶
func (m *HeaderMessage[H, T]) Sender() uint16
func (*HeaderMessage[H, T]) Type ¶
func (m *HeaderMessage[H, T]) Type() MessageType
type HeaderMessageAccessor ¶
type HeaderReceivedEvent ¶
HeaderReceivedEvent contains information about a received header.
type HeaderTimeoutEvent ¶
type HeaderTimeoutEvent[H Hash] struct { HeaderDigest H Round uint64 RetryCount int WillRetry bool VotesReceived int QuorumNeeded int TimeoutAt time.Time }
HeaderTimeoutEvent contains information about a header that timed out.
type HeaderVote ¶
HeaderVote is a vote from a validator for a header.
func NewVote ¶
func NewVote[H Hash](headerDigest H, validatorIndex uint16, signer Signer) (*HeaderVote[H], error)
NewVote creates a new vote for a header.
func VoteFromBytes ¶
func VoteFromBytes[H Hash]( data []byte, hashFromBytes func([]byte) (H, error), ) (*HeaderVote[H], error)
VoteFromBytes deserializes a vote from bytes.
func (*HeaderVote[H]) Bytes ¶
func (v *HeaderVote[H]) Bytes() []byte
Bytes serializes the vote to bytes. Format: [HeaderDigestLen:2][HeaderDigest][ValidatorIndex:2][SignatureLen:2][Signature]
func (*HeaderVote[H]) Verify ¶
func (v *HeaderVote[H]) Verify(pubKey PublicKey) bool
Verify verifies the vote signature.
type HeaderWaiter ¶
type HeaderWaiter[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
HeaderWaiter queues headers that can't be processed immediately due to missing parent certificates or batches. It periodically retries processing them as dependencies become available.
func NewHeaderWaiter ¶
func NewHeaderWaiter[H Hash, T Transaction[H]]( cfg HeaderWaiterConfig, processFunc func(header *Header[H], from uint16) error, checkParentFunc func(digest H) bool, hooks *Hooks[H, T], logger *zap.Logger, ) *HeaderWaiter[H, T]
NewHeaderWaiter creates a new HeaderWaiter.
func (*HeaderWaiter[H, T]) Add ¶
func (hw *HeaderWaiter[H, T]) Add(header *Header[H], from uint16, missingParents []H) bool
Add queues a header with missing parents for later processing. Returns true if the header was queued, false if it was dropped (queue full or duplicate). If FetchParents is enabled and fetchParentFunc is set, missing parents will be proactively fetched from the sender.
func (*HeaderWaiter[H, T]) OnParentAvailable ¶
func (hw *HeaderWaiter[H, T]) OnParentAvailable(parentDigest H)
OnParentAvailable should be called when a new certificate is inserted into the DAG. It triggers retry of any headers that were waiting for this parent.
func (*HeaderWaiter[H, T]) PendingCount ¶
func (hw *HeaderWaiter[H, T]) PendingCount() int
PendingCount returns the number of headers currently waiting.
func (*HeaderWaiter[H, T]) Run ¶
func (hw *HeaderWaiter[H, T]) Run(ctx context.Context)
Run starts the periodic retry loop.
func (*HeaderWaiter[H, T]) SetFetchParentFunc ¶
func (hw *HeaderWaiter[H, T]) SetFetchParentFunc(fn func(digest H, from uint16) error)
SetFetchParentFunc sets the function to fetch missing parent certificates. When set and FetchParents is enabled, the HeaderWaiter will proactively fetch missing parents from the peer that sent the header.
func (*HeaderWaiter[H, T]) Stats ¶
func (hw *HeaderWaiter[H, T]) Stats() HeaderWaiterStats
Stats returns current statistics.
type HeaderWaiterConfig ¶
type HeaderWaiterConfig struct {
// MaxPendingHeaders is the maximum number of headers to queue.
// Headers beyond this limit are dropped (oldest first).
// Default: 1000
MaxPendingHeaders int
// RetryInterval is how often to retry processing pending headers.
// Default: 1s
RetryInterval time.Duration
// MaxRetries is the maximum number of retry attempts before dropping a header.
// Default: 10
MaxRetries int
// MaxAge is the maximum time a header can wait before being dropped.
// Default: 30s
MaxAge time.Duration
// FetchParents enables proactive fetching of missing parent certificates.
// Default: false
FetchParents bool
}
HeaderWaiterConfig configures the HeaderWaiter.
func DefaultHeaderWaiterConfig ¶
func DefaultHeaderWaiterConfig() HeaderWaiterConfig
DefaultHeaderWaiterConfig returns sensible defaults.
type HeaderWaiterStats ¶
type HeaderWaiterStats struct {
PendingCount int
TotalReceived uint64
TotalProcessed uint64
TotalDropped uint64
TotalExpired uint64
TotalFetched uint64
}
HeaderWaiterStats contains statistics for monitoring.
type Hooks ¶
type Hooks[H Hash, T Transaction[H]] struct { // OnBatchCreated is called when a worker creates a new batch. OnBatchCreated func(BatchCreatedEvent[H, T]) // OnTransactionReceived is called when a transaction is added to a worker. OnTransactionReceived func(TransactionReceivedEvent[H]) // OnHeaderCreated is called when a primary creates a new header. OnHeaderCreated func(HeaderCreatedEvent[H]) // OnHeaderReceived is called when a primary receives a header from another validator. OnHeaderReceived func(HeaderReceivedEvent[H]) // OnVoteReceived is called when a primary receives a vote for its header. OnVoteReceived func(VoteReceivedEvent[H]) // OnVoteSent is called when a primary sends a vote for another validator's header. OnVoteSent func(VoteSentEvent[H]) // OnCertificateFormed is called when a primary forms a certificate (collected quorum). OnCertificateFormed func(CertificateFormedEvent[H]) // OnCertificateReceived is called when a certificate is received from another validator. OnCertificateReceived func(CertificateReceivedEvent[H]) // OnHeaderTimeout is called when a header times out waiting for votes. OnHeaderTimeout func(HeaderTimeoutEvent[H]) // OnVertexInserted is called when a certificate is inserted into the DAG. OnVertexInserted func(VertexInsertedEvent[H]) // OnRoundAdvanced is called when the DAG advances to a new round. OnRoundAdvanced func(RoundAdvancedEvent) // OnEquivocationDetected is called when equivocation is detected. // Note: This is in addition to the DAG's OnEquivocation callback, // which provides the full evidence for slashing. OnEquivocationDetected func(EquivocationDetectedEvent[H]) // OnCertificatePending is called when a certificate is queued as pending // due to missing parents. OnCertificatePending func(CertificatePendingEvent[H]) // OnFetchStarted is called when a fetch operation begins. OnFetchStarted func(FetchStartedEvent[H]) // OnFetchCompleted is called when a fetch operation completes (success or failure). OnFetchCompleted func(FetchCompletedEvent[H]) // OnGarbageCollected is called when garbage collection runs. OnGarbageCollected func(GarbageCollectedEvent) }
Hooks provides optional callbacks for observability events. All hooks are invoked synchronously - keep implementations fast.
func NewRecoveryMiddleware ¶
func NewRecoveryMiddleware[H Hash, T Transaction[H]](hooks *Hooks[H, T], logger *zap.Logger) *Hooks[H, T]
NewRecoveryMiddleware wraps hooks with panic recovery.
type LRUCache ¶
type LRUCache[K comparable, V any] struct { // contains filtered or unexported fields }
LRUCache is a generic thread-safe LRU cache. It evicts the least recently used items when capacity is exceeded.
func NewLRUCache ¶
func NewLRUCache[K comparable, V any](capacity int) *LRUCache[K, V]
NewLRUCache creates a new LRU cache with the given capacity. Capacity must be at least 1.
func (*LRUCache[K, V]) Clear ¶
func (c *LRUCache[K, V]) Clear()
Clear removes all items from the cache.
func (*LRUCache[K, V]) Contains ¶
Contains checks if a key exists in the cache without updating LRU order.
func (*LRUCache[K, V]) Get ¶
Get retrieves a value from the cache. Returns the value and true if found, zero value and false otherwise. Accessing an item moves it to the front (most recently used).
func (*LRUCache[K, V]) Keys ¶
func (c *LRUCache[K, V]) Keys() []K
Keys returns all keys in the cache, from most to least recently used.
func (*LRUCache[K, V]) Peek ¶
Peek retrieves a value without updating its position in the LRU order.
func (*LRUCache[K, V]) Put ¶
Put adds or updates a value in the cache. If the cache is at capacity, the least recently used item is evicted. Returns the evicted key and value if eviction occurred.
func (*LRUCache[K, V]) Remove ¶
Remove removes an item from the cache. Returns true if the item was found and removed.
func (*LRUCache[K, V]) Resize ¶
Resize changes the cache capacity. If the new capacity is smaller, excess items are evicted (LRU order).
func (*LRUCache[K, V]) Stats ¶
func (c *LRUCache[K, V]) Stats() LRUCacheStats
Stats returns cache statistics.
type LRUCacheStats ¶
type LRUCacheStats struct {
Size int
Capacity int
Hits uint64
Misses uint64
Evicts uint64
HitRate float64
}
LRUCacheStats contains cache statistics.
type LeaderSchedule ¶
type LeaderSchedule interface {
// Leader returns the validator index of the leader for the given round.
// This must be deterministic - all validators must agree on the leader.
Leader(round uint64) uint16
// IsLeader returns true if the given validator is the leader for the round.
IsLeader(round uint64, validatorIndex uint16) bool
}
LeaderSchedule determines which validator is leader for a given round.
type LeaderTracker ¶
type LeaderTracker[H Hash] struct { // contains filtered or unexported fields }
LeaderTracker tracks leader blocks and votes for partially synchronous mode. In this mode, we want to: 1. Wait for the leader's block before advancing to odd rounds 2. Wait for f+1 votes for the leader or 2f+1 non-votes before advancing to even rounds
This follows the Sui Narwhal proposer logic for partially synchronous networks.
func NewLeaderTracker ¶
func NewLeaderTracker[H Hash](schedule LeaderSchedule) *LeaderTracker[H]
NewLeaderTracker creates a new LeaderTracker.
func (*LeaderTracker[H]) CheckLeaderVotes ¶
func (lt *LeaderTracker[H]) CheckLeaderVotes(parents []*Certificate[H], leaderDigest *H, f int) LeaderVoteStatus
CheckLeaderVotes analyzes parent certificates to determine leader vote status. This is used on odd rounds to decide if we have enough support to advance.
Parameters: - parents: certificates from the previous round that we're using as parents - leaderDigest: the digest of the leader's certificate from the previous round (if known) - f: the maximum Byzantine faults tolerated
Returns the vote status analysis.
func (*LeaderTracker[H]) Clear ¶
func (lt *LeaderTracker[H]) Clear()
Clear removes all leader records.
func (*LeaderTracker[H]) GarbageCollect ¶
func (lt *LeaderTracker[H]) GarbageCollect(beforeRound uint64)
GarbageCollect removes leader records for old rounds.
func (*LeaderTracker[H]) GetLeaderForRound ¶
func (lt *LeaderTracker[H]) GetLeaderForRound(round uint64) *Certificate[H]
GetLeaderForRound returns the leader certificate for a round, if we have it.
func (*LeaderTracker[H]) HasLeaderForRound ¶
func (lt *LeaderTracker[H]) HasLeaderForRound(round uint64) bool
HasLeaderForRound returns true if we have the leader's certificate for the round.
func (*LeaderTracker[H]) RecordCertificate ¶
func (lt *LeaderTracker[H]) RecordCertificate(cert *Certificate[H]) bool
RecordCertificate records a certificate and returns true if it's from the leader.
func (*LeaderTracker[H]) SetRound ¶
func (lt *LeaderTracker[H]) SetRound(round uint64)
SetRound updates the current round.
func (*LeaderTracker[H]) ShouldWaitForLeader ¶
func (lt *LeaderTracker[H]) ShouldWaitForLeader(round uint64) bool
ShouldWaitForLeader returns true if we should wait for the leader block. This is used in partially synchronous mode on even rounds (round % 2 == 0).
type LeaderVoteStatus ¶
type LeaderVoteStatus struct {
// VotesForLeader is the stake voting for the leader block
VotesForLeader int
// VotesNotForLeader is the stake not voting for the leader block
// (voting for a different parent set that doesn't include the leader)
VotesNotForLeader int
// HasEnoughForLeaderVotes returns true if we have f+1 votes for the leader
HasEnoughForLeaderVotes bool
// HasEnoughNotForLeaderVotes returns true if we have 2f+1 votes not for leader
HasEnoughNotForLeaderVotes bool
}
LeaderVoteStatus tracks votes for/against the leader block.
type Message ¶
type Message[H Hash, T Transaction[H]] interface { Type() MessageType Sender() uint16 }
Message represents a protocol message.
type MessageType ¶
type MessageType uint8
const ( MessageBatch MessageType = iota MessageHeader MessageVote MessageCertificate MessageBatchRequest MessageCertificateRequest MessageCertificateRangeRequest MessageCertificateRangeResponse )
func (MessageType) String ¶
func (t MessageType) String() string
String returns a human-readable name for the message type.
type MeteredChannel ¶
type MeteredChannel[T any] struct { // contains filtered or unexported fields }
MeteredChannel wraps a channel with metrics for observability. Tracks sent, received, dropped counts and queue depth. Thread-safe.
func NewMeteredChannel ¶
func NewMeteredChannel[T any](name string, capacity int) *MeteredChannel[T]
NewMeteredChannel creates a new metered channel with the given name and capacity.
func (*MeteredChannel[T]) Cap ¶
func (m *MeteredChannel[T]) Cap() int
Cap returns the capacity of the channel.
func (*MeteredChannel[T]) Close ¶
func (m *MeteredChannel[T]) Close()
Close closes the underlying channel.
func (*MeteredChannel[T]) Len ¶
func (m *MeteredChannel[T]) Len() int
Len returns the current number of items in the channel.
func (*MeteredChannel[T]) MarkReceived ¶
func (m *MeteredChannel[T]) MarkReceived()
MarkReceived increments the received counter. Call this after receiving from ReceiveChan() to maintain accurate metrics.
func (*MeteredChannel[T]) Receive ¶
func (m *MeteredChannel[T]) Receive() T
Receive receives a value from the channel. Blocks if the channel is empty.
func (*MeteredChannel[T]) ReceiveChan ¶
func (m *MeteredChannel[T]) ReceiveChan() <-chan T
ReceiveChan returns the underlying channel for use in select statements. Note: messages received directly from this channel won't update the received counter. Use MarkReceived() after receiving to update the counter.
func (*MeteredChannel[T]) Send ¶
func (m *MeteredChannel[T]) Send(value T)
Send sends a value on the channel. Blocks if the channel is full.
func (*MeteredChannel[T]) Stats ¶
func (m *MeteredChannel[T]) Stats() MeteredChannelStats
Stats returns current statistics.
func (*MeteredChannel[T]) TryReceive ¶
func (m *MeteredChannel[T]) TryReceive() (T, bool)
TryReceive attempts to receive a value from the channel without blocking. Returns the value and true if successful, or zero value and false if empty.
func (*MeteredChannel[T]) TrySend ¶
func (m *MeteredChannel[T]) TrySend(value T) bool
TrySend attempts to send a value on the channel without blocking. Returns true if the send succeeded, false if the channel was full.
type MeteredChannelStats ¶
type MeteredChannelStats struct {
// Name is the channel name.
Name string
// Sent is the total number of messages sent.
Sent uint64
// Received is the total number of messages received.
Received uint64
// Dropped is the total number of messages dropped due to full channel.
Dropped uint64
// Pending is the current number of messages in the channel.
Pending int
// Capacity is the channel capacity.
Capacity int
// Throughput is the approximate messages per second (sent).
Throughput float64
// Uptime is how long the channel has been running.
Uptime time.Duration
}
MeteredChannelStats contains statistics for a metered channel.
type MutableValidatorSet ¶
type MutableValidatorSet interface {
ValidatorSet
// AddValidator adds a new validator and returns their assigned index.
AddValidator(pubKey PublicKey) (uint16, error)
// RemoveValidator removes a validator by index.
RemoveValidator(index uint16) error
// UpdateValidator updates a validator's public key.
UpdateValidator(index uint16, newPubKey PublicKey) error
// Clone creates a deep copy of this validator set.
Clone() MutableValidatorSet
}
MutableValidatorSet extends ValidatorSet with mutation operations. This interface is used during reconfiguration to build a new validator set.
type Narwhal ¶
type Narwhal[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
Narwhal implements the Narwhal DAG-based mempool protocol.
func New ¶
func New[H Hash, T Transaction[H]](cfg *Config[H, T], hashFunc func([]byte) H) (*Narwhal[H, T], error)
New creates a new Narwhal instance. The hashFunc parameter is required to compute digests for batches, headers, etc.
func (*Narwhal[H, T]) AddTransaction ¶
func (n *Narwhal[H, T]) AddTransaction(tx T)
AddTransaction adds a transaction to be disseminated. Transactions are routed to workers based on their hash for load balancing.
func (*Narwhal[H, T]) CurrentRound ¶
CurrentRound returns the current DAG round.
func (*Narwhal[H, T]) GetCertifiedVertices ¶
func (n *Narwhal[H, T]) GetCertifiedVertices() []*Certificate[H]
GetCertifiedVertices returns certificates ready for consensus ordering. These are certificates that have not yet been included in a committed block.
func (*Narwhal[H, T]) GetTransactions ¶
func (n *Narwhal[H, T]) GetTransactions(certs []*Certificate[H]) ([]T, error)
GetTransactions extracts all transactions from the given certificates. Used by execution layer to get ordered transactions from consensus block. Transactions are deduplicated by hash.
func (*Narwhal[H, T]) MarkCommitted ¶
func (n *Narwhal[H, T]) MarkCommitted(certs []*Certificate[H])
MarkCommitted marks certificates as committed by consensus. This allows garbage collection of the underlying data.
func (*Narwhal[H, T]) RequestTrackerStats ¶
func (n *Narwhal[H, T]) RequestTrackerStats() RequestTrackerStats
RequestTrackerStats returns statistics about pending network operations.
type Network ¶
type Network[H Hash, T Transaction[H]] interface { BroadcastBatch(batch *Batch[H, T]) BroadcastHeader(header *Header[H]) SendVote(validatorIndex uint16, vote *HeaderVote[H]) BroadcastCertificate(cert *Certificate[H]) FetchBatch(from uint16, digest H) (*Batch[H, T], error) FetchCertificate(from uint16, digest H) (*Certificate[H], error) FetchCertificatesInRange(from uint16, startRound, endRound uint64) ([]*Certificate[H], error) SendBatch(to uint16, batch *Batch[H, T]) SendCertificate(to uint16, cert *Certificate[H]) Receive() <-chan Message[H, T] Close() error }
Network provides message delivery between validators.
type NetworkModel ¶
type NetworkModel uint8
NetworkModel specifies the network timing assumptions for the consensus protocol. This affects how the proposer decides when to advance rounds.
const ( // NetworkModelAsynchronous assumes no timing bounds on message delivery. // The proposer advances to the next round as soon as it has 2f+1 parent certificates. // This provides optimal throughput but may have higher latency in practice. NetworkModelAsynchronous NetworkModel = iota // NetworkModelPartiallySynchronous assumes eventual synchrony. // The proposer may wait for leader blocks and leader votes before advancing. // This can provide better commit latency by ensuring leaders are included. NetworkModelPartiallySynchronous )
func (NetworkModel) String ¶
func (m NetworkModel) String() string
String returns a human-readable name for the network model.
type NetworkStatus ¶
type NetworkStatus uint8
const ( NetworkStatusUnknown NetworkStatus = iota NetworkStatusConnected NetworkStatusDisconnected NetworkStatusConnecting )
type PanicHandler ¶
type PanicHandler func(panicVal interface{}, stack []byte)
PanicHandler is called when a panic is recovered. It receives the panic value and stack trace.
type PendingCertificate ¶
type PendingCertificate[H Hash, T Transaction[H]] struct { Certificate *Certificate[H] Batches []*Batch[H, T] MissingParents []H }
PendingCertificate represents a certificate waiting for missing parents.
type PendingHeader ¶
type PendingHeader[H Hash] struct { // Header is the header waiting to be processed. Header *Header[H] // From is the validator that sent this header. From uint16 // MissingParents are parent certificate digests we don't have. MissingParents []H // ReceivedAt is when this header was first received. ReceivedAt time.Time // RetryCount tracks how many times we've tried to process this header. RetryCount int }
PendingHeader represents a header waiting for missing dependencies.
type PerPeerRateLimiter ¶
type PerPeerRateLimiter struct {
// contains filtered or unexported fields
}
PerPeerRateLimiter maintains separate rate limits for each peer.
func NewPerPeerRateLimiter ¶
func NewPerPeerRateLimiter(rate float64, burst int) *PerPeerRateLimiter
NewPerPeerRateLimiter creates a rate limiter that tracks per-peer limits.
func (*PerPeerRateLimiter) Allow ¶
func (p *PerPeerRateLimiter) Allow(peer uint16) bool
Allow checks if the peer is within rate limits.
func (*PerPeerRateLimiter) AllowN ¶
func (p *PerPeerRateLimiter) AllowN(peer uint16, n int) bool
AllowN checks if n tokens are available for the peer.
func (*PerPeerRateLimiter) Reset ¶
func (p *PerPeerRateLimiter) Reset()
Reset removes all peer state.
type Primary ¶
type Primary[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
Primary creates headers and collects votes to form certificates.
func NewPrimary ¶
func NewPrimary[H Hash, T Transaction[H]](cfg PrimaryConfig[H, T]) *Primary[H, T]
NewPrimary creates a new primary.
func (*Primary[H, T]) OnBatch ¶
OnBatch is called when a local worker creates a batch. If MaxPendingBatches is configured and the queue is full:
- If DropOnFull is true, the batch is dropped silently
- If DropOnFull is false, this call blocks until space is available
Returns true if the batch was accepted, false if dropped.
func (*Primary[H, T]) OnHeaderReceived ¶
OnHeaderReceived handles a header from another primary. Before voting, validates: 1. Header digest is correct 2. Author is a valid validator 3. Round is not too far ahead 4. All parent certificates exist in the DAG 5. All batch digests are available (in local storage or fetched from author) 6. We haven't already voted for a different header from this author at this round
func (*Primary[H, T]) OnVoteReceived ¶
func (p *Primary[H, T]) OnVoteReceived(vote *HeaderVote[H]) error
OnVoteReceived handles a vote for our header.
func (*Primary[H, T]) Stats ¶
func (p *Primary[H, T]) Stats() PrimaryStats
Stats returns current primary statistics.
type PrimaryConfig ¶
type PrimaryConfig[H Hash, T Transaction[H]] struct { ValidatorID uint16 DAG *DAG[H, T] Validators ValidatorSet Signer Signer Network Network[H, T] Storage Storage[H, T] HeaderTimeout time.Duration HashFunc func([]byte) H Hooks *Hooks[H, T] Logger *zap.Logger // Retry configuration MaxHeaderRetries int // Max retries before abandoning a header (default: 3) HeaderRetryInterval time.Duration // Time before retrying header broadcast (default: HeaderTimeout) // Header creation thresholds (aligned with reference implementation) MaxHeaderBatches int // Max batch refs per header (default: 100) MaxHeaderDelay time.Duration // Max delay before creating header (default: HeaderTimeout) // Backpressure settings MaxPendingBatches int // Max pending batches (0 = unbounded) DropOnFull bool // If true, drop when full; if false, block // Optimized crypto verification (optional) CryptoProvider CryptoProvider SignatureCache *SignatureCache }
PrimaryConfig configures a primary.
type PrimaryStats ¶
type PrimaryStats struct {
PendingBatches int // Number of batch refs waiting for header creation
QueuedBatches int // Batches in channel (if bounded)
DroppedBatches uint64 // Batches dropped due to full queue
MaxPending int // Maximum pending batches allowed
IsBounded bool // Whether bounded mode is enabled
HasPendingHdr bool // Whether there's a header waiting for votes
CurrentVotes int // Votes received for current header
HeaderRetries int // Retry count for current header
}
PrimaryStats contains primary statistics for monitoring.
type ProposedHeader ¶
type ProposedHeader[H Hash, T Transaction[H]] struct { Header *Header[H] Batches []*Batch[H, T] CreatedAt time.Time }
ProposedHeader represents a header proposed by the Proposer.
type Proposer ¶
type Proposer[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
Proposer creates headers from batches. Standalone component for advanced use cases where header creation needs to be separated from vote collection.
func NewProposer ¶
func NewProposer[H Hash, T Transaction[H]](cfg ProposerConfig[H, T]) *Proposer[H, T]
NewProposer creates a new Proposer.
func (*Proposer[H, T]) AddBatch ¶
AddBatch adds a batch to be included in a future header. Returns true if accepted, false if dropped due to full queue.
func (*Proposer[H, T]) ForcePropose ¶
ForcePropose forces creation of a header if there are any pending batches. Used for testing and flushing on shutdown.
func (*Proposer[H, T]) Headers ¶
func (p *Proposer[H, T]) Headers() <-chan *ProposedHeader[H, T]
Headers returns the channel for receiving proposed headers.
func (*Proposer[H, T]) PendingCount ¶
PendingCount returns the number of pending batches.
func (*Proposer[H, T]) Stats ¶
func (p *Proposer[H, T]) Stats() ProposerStats
Stats returns current statistics.
type ProposerConfig ¶
type ProposerConfig[H Hash, T Transaction[H]] struct { ValidatorID uint16 DAG *DAG[H, T] Validators ValidatorSet HashFunc func([]byte) H Hooks *Hooks[H, T] Logger *zap.Logger // Header creation thresholds MaxHeaderBatches int // Max batch refs per header before proposing MaxHeaderDelay time.Duration // Max delay before proposing regardless of batch count MinHeaderBatches int // Min batches before proposing (default: 1) // Backpressure MaxPendingBatches int // Max pending batches (0 = unbounded) DropOnFull bool // If true, drop when full; if false, block // Output buffer size HeaderOutputBuffer int // Size of header output channel (default: 10) }
ProposerConfig configures the Proposer.
func DefaultProposerConfig ¶
func DefaultProposerConfig[H Hash, T Transaction[H]]() ProposerConfig[H, T]
DefaultProposerConfig returns sensible defaults.
type ProposerStats ¶
type ProposerStats struct {
PendingBatches int
QueuedBatches int
DroppedBatches uint64
HeadersProposed uint64
BatchesIncluded uint64
MaxPending int
IsBounded bool
}
ProposerStats contains statistics for monitoring.
type PublicKey ¶
type PublicKey interface {
Bytes() []byte
Verify(message []byte, signature []byte) bool
Equals(other interface{ Bytes() []byte }) bool
}
PublicKey provides signature verification.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter provides a token bucket rate limiter. Thread-safe.
func NewRateLimiter ¶
func NewRateLimiter(rate float64, burst int) *RateLimiter
NewRateLimiter creates a new rate limiter. - rate: tokens per second to add - burst: maximum tokens that can accumulate
func (*RateLimiter) Allow ¶
func (r *RateLimiter) Allow() bool
Allow checks if one token is available and consumes it if so. Returns true if allowed, false if rate limited.
func (*RateLimiter) AllowN ¶
func (r *RateLimiter) AllowN(n int) bool
AllowN checks if n tokens are available and consumes them if so. Returns true if allowed, false if rate limited.
func (*RateLimiter) Reserve ¶
func (r *RateLimiter) Reserve(n int) time.Duration
Reserve reserves n tokens, returning the time to wait before they're available. If wait is 0, the tokens were immediately available. If wait is negative, the request can never be satisfied (n > burst).
func (*RateLimiter) Stats ¶
func (r *RateLimiter) Stats() RateLimiterStats
Stats returns current statistics.
type RateLimiterStats ¶
Stats returns rate limiter statistics.
type ReconfigurationCoordinator ¶
type ReconfigurationCoordinator[H Hash] struct { // contains filtered or unexported fields }
ReconfigurationCoordinator coordinates the voting process for reconfiguration proposals. This is a higher-level component that uses Reconfigurer for state management.
func NewReconfigurationCoordinator ¶
func NewReconfigurationCoordinator[H Hash]( reconfigurer *Reconfigurer, hashFunc func([]byte) H, logger *zap.Logger, ) *ReconfigurationCoordinator[H]
NewReconfigurationCoordinator creates a new ReconfigurationCoordinator.
func (*ReconfigurationCoordinator[H]) AddVote ¶
func (rc *ReconfigurationCoordinator[H]) AddVote(vote *ReconfigurationVote) error
AddVote adds a vote for the current proposal. Returns an error if no proposal is in progress or if the vote is invalid.
func (*ReconfigurationCoordinator[H]) CheckQuorum ¶
func (rc *ReconfigurationCoordinator[H]) CheckQuorum() bool
CheckQuorum checks if the proposal has reached quorum. Returns true if 2f+1 approvals have been received.
func (*ReconfigurationCoordinator[H]) Clear ¶
func (rc *ReconfigurationCoordinator[H]) Clear()
Clear resets the coordinator state.
func (*ReconfigurationCoordinator[H]) CurrentProposal ¶
func (rc *ReconfigurationCoordinator[H]) CurrentProposal() *ReconfigurationProposal
CurrentProposal returns the current proposal, if any.
func (*ReconfigurationCoordinator[H]) IsRejected ¶
func (rc *ReconfigurationCoordinator[H]) IsRejected() bool
IsRejected checks if the proposal has been rejected. Returns true if f+1 rejections have been received (making quorum impossible).
func (*ReconfigurationCoordinator[H]) ProposeReconfiguration ¶
func (rc *ReconfigurationCoordinator[H]) ProposeReconfiguration( effectiveRound uint64, changes []ValidatorSetChange, proposer uint16, signature []byte, ) error
ProposeReconfiguration initiates a new reconfiguration proposal. Returns an error if a proposal is already in progress.
func (*ReconfigurationCoordinator[H]) VoteStats ¶
func (rc *ReconfigurationCoordinator[H]) VoteStats() (approvals, rejections, total int)
VoteStats returns the current vote counts.
type ReconfigurationProposal ¶
type ReconfigurationProposal struct {
// Epoch is the epoch this proposal is for (currentEpoch + 1).
Epoch uint64
// EffectiveRound is the round at which the change takes effect.
EffectiveRound uint64
// Changes is the list of validator set changes to apply.
Changes []ValidatorSetChange
// Proposer is the validator who proposed this reconfiguration.
Proposer uint16
// Signature is the proposer's signature over this proposal.
Signature []byte
// ProposedAt is when this proposal was created.
ProposedAt time.Time
}
ReconfigurationProposal represents a proposal to change the validator set. This is used to coordinate reconfiguration across the network.
type ReconfigurationState ¶
type ReconfigurationState uint8
ReconfigurationState represents the current state of a reconfiguration operation.
const ( // ReconfigurationStateIdle means no reconfiguration is in progress. ReconfigurationStateIdle ReconfigurationState = iota // ReconfigurationStatePending means a reconfiguration has been proposed // but not yet finalized (waiting for quorum confirmation). ReconfigurationStatePending // ReconfigurationStateCommitting means the reconfiguration is being applied // (draining in-flight operations, updating state). ReconfigurationStateCommitting // ReconfigurationStateComplete means the reconfiguration has been applied. ReconfigurationStateComplete )
func (ReconfigurationState) String ¶
func (s ReconfigurationState) String() string
String returns a human-readable name for the reconfiguration state.
type ReconfigurationStats ¶
type ReconfigurationStats struct {
CurrentEpoch uint64
State ReconfigurationState
HasPendingChange bool
PendingToEpoch uint64
HistoryCount int
ValidatorCount int
CallbackCount int
}
ReconfigurationStats contains statistics for monitoring.
type ReconfigurationVote ¶
type ReconfigurationVote struct {
// ProposalHash is the hash of the proposal being voted on.
ProposalHash []byte
// Voter is the validator casting this vote.
Voter uint16
// Approve indicates whether this is an approval or rejection.
Approve bool
// Signature is the voter's signature over this vote.
Signature []byte
}
ReconfigurationVote represents a vote on a reconfiguration proposal.
type Reconfigurer ¶
type Reconfigurer struct {
// contains filtered or unexported fields
}
Reconfigurer manages committee reconfiguration and epoch transitions. It coordinates the transition between validator sets, ensuring: 1. All nodes agree on the epoch boundary (consensus on reconfiguration) 2. In-flight operations for the old epoch complete before switching 3. State is properly cleaned up for the new epoch
Thread-safe for concurrent use.
func NewReconfigurer ¶
func NewReconfigurer(initialEpoch uint64, validators ValidatorSet, logger *zap.Logger) *Reconfigurer
NewReconfigurer creates a new Reconfigurer.
func (*Reconfigurer) BeginCommit ¶
func (r *Reconfigurer) BeginCommit() error
BeginCommit transitions from PENDING to COMMITTING state. This should be called when the epoch boundary round has been reached and all nodes have confirmed the reconfiguration.
Returns an error if not in PENDING state.
func (*Reconfigurer) CancelPendingChange ¶
func (r *Reconfigurer) CancelPendingChange() error
CancelPendingChange cancels a pending epoch change. This can be used if the reconfiguration is aborted (e.g., consensus rollback).
Returns an error if not in PENDING state.
func (*Reconfigurer) CompleteCommit ¶
func (r *Reconfigurer) CompleteCommit() error
CompleteCommit finalizes the epoch change. This should be called after all in-flight operations have completed and state has been updated for the new epoch.
Returns an error if not in COMMITTING state.
func (*Reconfigurer) CurrentEpoch ¶
func (r *Reconfigurer) CurrentEpoch() uint64
CurrentEpoch returns the current epoch.
func (*Reconfigurer) CurrentValidators ¶
func (r *Reconfigurer) CurrentValidators() ValidatorSet
CurrentValidators returns the current validator set.
func (*Reconfigurer) History ¶
func (r *Reconfigurer) History() []EpochChange
History returns the history of recent epoch changes.
func (*Reconfigurer) IsEpochBoundaryRound ¶
func (r *Reconfigurer) IsEpochBoundaryRound(round uint64) bool
IsEpochBoundaryRound returns true if the given round is the effective round of a pending epoch change.
func (*Reconfigurer) OnEpochChange ¶
func (r *Reconfigurer) OnEpochChange(callback func(EpochChange))
OnEpochChange registers a callback to be invoked when an epoch change is committed. Callbacks are invoked synchronously in the order they were registered.
func (*Reconfigurer) PendingChange ¶
func (r *Reconfigurer) PendingChange() *EpochChange
PendingChange returns the pending epoch change, if any.
func (*Reconfigurer) ProposeEpochChange ¶
func (r *Reconfigurer) ProposeEpochChange(newEpoch uint64, effectiveRound uint64, newValidators ValidatorSet) error
ProposeEpochChange proposes a new epoch change. This is typically called when consensus has decided on a reconfiguration.
Returns an error if: - A reconfiguration is already in progress - The new epoch is not exactly currentEpoch + 1 - The effective round is in the past
func (*Reconfigurer) ShouldAcceptForEpoch ¶
func (r *Reconfigurer) ShouldAcceptForEpoch(epoch uint64) bool
ShouldAcceptForEpoch determines if a message from the given epoch should be accepted. Returns true if the epoch is current or if it's the pending epoch during transition.
func (*Reconfigurer) State ¶
func (r *Reconfigurer) State() ReconfigurationState
State returns the current reconfiguration state.
func (*Reconfigurer) Stats ¶
func (r *Reconfigurer) Stats() ReconfigurationStats
Stats returns current statistics.
type ReconnectableNetwork ¶
type ReconnectableNetwork[H Hash, T Transaction[H]] interface { Network[H, T] Connect(validatorIndex uint16) error Disconnect(validatorIndex uint16) error Reconnect(validatorIndex uint16) error Status(validatorIndex uint16) NetworkStatus IsConnected(validatorIndex uint16) bool ConnectedPeers() []uint16 OnDisconnect(callback func(validatorIndex uint16)) OnReconnect(callback func(validatorIndex uint16)) }
ReconnectableNetwork extends Network with reconnection and health checking.
type RecoveryConfig ¶
type RecoveryConfig struct {
// Handler is called when a panic is recovered.
// If nil, panics are logged and the goroutine terminates cleanly.
Handler PanicHandler
// Logger for recording recovered panics.
Logger *zap.Logger
// Rethrow causes the panic to be re-raised after handling.
// Use this if you want panics to propagate after logging.
Rethrow bool
}
RecoveryConfig configures panic recovery behavior.
type RequestTracker ¶
type RequestTracker[H Hash] struct { // contains filtered or unexported fields }
RequestTracker tracks pending network operations and provides cancel-on-round-advance. When the round advances, stale requests from previous rounds can be cancelled.
func NewRequestTracker ¶
func NewRequestTracker[H Hash](cfg RequestTrackerConfig, logger *zap.Logger) *RequestTracker[H]
NewRequestTracker creates a new RequestTracker.
func (*RequestTracker[H]) CancelAll ¶
func (rt *RequestTracker[H]) CancelAll()
CancelAll cancels all pending requests.
func (*RequestTracker[H]) CancelForRound ¶
func (rt *RequestTracker[H]) CancelForRound(round uint64)
CancelForRound cancels all pending requests for a specific round.
func (*RequestTracker[H]) OnRoundAdvance ¶
func (rt *RequestTracker[H]) OnRoundAdvance(newRound uint64)
OnRoundAdvance should be called when the DAG round advances. It cancels any requests for rounds that are now considered stale.
func (*RequestTracker[H]) PendingCount ¶
func (rt *RequestTracker[H]) PendingCount() int
PendingCount returns the number of pending requests.
func (*RequestTracker[H]) PendingForRound ¶
func (rt *RequestTracker[H]) PendingForRound(round uint64) int
PendingForRound returns the number of pending requests for a specific round.
func (*RequestTracker[H]) Stats ¶
func (rt *RequestTracker[H]) Stats() RequestTrackerStats
Stats returns current statistics.
func (*RequestTracker[H]) Track ¶
func (rt *RequestTracker[H]) Track( parentCtx context.Context, round uint64, reqType RequestType, digest H, ) (context.Context, func())
Track registers a new request for tracking. Returns a context that will be cancelled if the request becomes stale, and a completion function that must be called when the request finishes.
type RequestTrackerConfig ¶
type RequestTrackerConfig struct {
// MaxPendingPerRound limits requests per round to prevent memory bloat.
// Default: 1000
MaxPendingPerRound int
// StaleRoundThreshold is how many rounds behind a request must be to be considered stale.
// Requests for rounds < (currentRound - StaleRoundThreshold) are cancelled.
// Default: 2
StaleRoundThreshold uint64
// CancelOnAdvance enables automatic cancellation when round advances.
// Default: true
CancelOnAdvance bool
}
RequestTrackerConfig configures the RequestTracker.
func DefaultRequestTrackerConfig ¶
func DefaultRequestTrackerConfig() RequestTrackerConfig
DefaultRequestTrackerConfig returns sensible defaults.
type RequestTrackerStats ¶
type RequestTrackerStats struct {
CurrentRound uint64
PendingCount int
TotalStarted uint64
TotalCompleted uint64
TotalCancelled uint64
}
RequestTrackerStats contains statistics for monitoring.
type RequestType ¶
type RequestType uint8
RequestType identifies the type of network request.
const ( RequestTypeBatch RequestType = iota RequestTypeCertificate RequestTypeHeader )
func (RequestType) String ¶
func (t RequestType) String() string
type RoundAdvancedEvent ¶
type RoundAdvancedEvent struct {
OldRound uint64
NewRound uint64
CertificatesInRound int // Number of certificates that triggered advancement
AdvancedAt time.Time
}
RoundAdvancedEvent contains information about a round advancement.
type RoundRobinLeaderSchedule ¶
type RoundRobinLeaderSchedule struct {
// contains filtered or unexported fields
}
RoundRobinLeaderSchedule implements a simple round-robin leader election. Each validator takes turns being the leader.
func NewRoundRobinLeaderSchedule ¶
func NewRoundRobinLeaderSchedule(validatorCount int) *RoundRobinLeaderSchedule
NewRoundRobinLeaderSchedule creates a new round-robin leader schedule.
func (*RoundRobinLeaderSchedule) IsLeader ¶
func (s *RoundRobinLeaderSchedule) IsLeader(round uint64, validatorIndex uint16) bool
IsLeader returns true if the validator is the leader for the round.
func (*RoundRobinLeaderSchedule) Leader ¶
func (s *RoundRobinLeaderSchedule) Leader(round uint64) uint16
Leader returns the leader for the given round.
type SignatureCache ¶
type SignatureCache struct {
// contains filtered or unexported fields
}
SignatureCache caches verified signatures to avoid re-verification. Thread-safe for concurrent access.
func NewSignatureCache ¶
func NewSignatureCache(maxSize int) *SignatureCache
NewSignatureCache creates a new signature cache.
func (*SignatureCache) IsVerified ¶
func (c *SignatureCache) IsVerified(digest []byte, validatorIndex uint16) bool
IsVerified checks if a signature has been verified.
func (*SignatureCache) MarkVerified ¶
func (c *SignatureCache) MarkVerified(digest []byte, validatorIndex uint16)
MarkVerified marks a signature as verified.
func (*SignatureCache) Size ¶
func (c *SignatureCache) Size() int
Size returns the number of cached entries.
type SignatureError ¶
type SignatureError struct {
ValidatorIndex uint16
}
SignatureError indicates a signature verification failure.
func (*SignatureError) Error ¶
func (e *SignatureError) Error() string
type SignatureScheme ¶
type SignatureScheme uint8
SignatureScheme identifies the signature algorithm in use.
const ( // SignatureSchemeEd25519 uses Ed25519 signatures (individual verification). SignatureSchemeEd25519 SignatureScheme = iota // SignatureSchemeBLS uses BLS signatures (supports aggregation and batch verification). SignatureSchemeBLS )
type SlicePool ¶
type SlicePool[T any] struct { // contains filtered or unexported fields }
SlicePool provides a pool of reusable slices.
func NewSlicePool ¶
NewSlicePool creates a new slice pool with the specified initial capacity.
type Storage ¶
type Storage[H Hash, T Transaction[H]] interface { GetBatch(digest H) (*Batch[H, T], error) PutBatch(batch *Batch[H, T]) error HasBatch(digest H) bool GetHeader(digest H) (*Header[H], error) PutHeader(header *Header[H]) error GetCertificate(digest H) (*Certificate[H], error) PutCertificate(cert *Certificate[H]) error GetHighestRound() (uint64, error) PutHighestRound(round uint64) error DeleteBeforeRound(round uint64) error GetCertificatesByRound(round uint64) ([]*Certificate[H], error) GetCertificatesInRange(startRound, endRound uint64) ([]*Certificate[H], error) Close() error }
Storage provides persistent storage. Put operations must be durable before returning.
type Timer ¶
type Timer interface {
Start()
Stop()
Reset()
C() <-chan struct{}
}
Timer provides timeout management.
type Transaction ¶
Transaction represents a unit of data to be disseminated.
type TransactionReceivedEvent ¶
TransactionReceivedEvent contains information about a received transaction.
type ValidationConfig ¶
type ValidationConfig struct {
// MaxBatchRefs is the maximum number of batch references per header.
MaxBatchRefs int
// MaxParents is the maximum number of parent certificates per header.
MaxParents int
// MaxSignatures is the maximum number of signatures per certificate.
MaxSignatures int
// MaxTransactionsPerBatch is the maximum transactions per batch.
MaxTransactionsPerBatch int
// MaxTransactionSize is the maximum size of a single transaction in bytes.
MaxTransactionSize int
// MaxBatchSize is the maximum size of a batch in bytes.
MaxBatchSize int
// MaxHeaderSize is the maximum size of a header in bytes.
MaxHeaderSize int
// MaxCertificateSize is the maximum size of a certificate in bytes.
MaxCertificateSize int
// MaxRoundSkip is the maximum rounds a header can be ahead.
MaxRoundSkip uint64
// MaxTimestampDrift is the maximum time a header can be in the future.
MaxTimestampDrift time.Duration
}
ValidationConfig configures validation limits.
func DefaultValidationConfig ¶
func DefaultValidationConfig() ValidationConfig
DefaultValidationConfig returns a ValidationConfig with default limits.
type ValidationError ¶
type ValidationError struct {
Type string // "header", "certificate", "batch", "vote"
Field string // Specific field that failed validation
Message string // Human-readable error message
Cause error // Underlying error if any
}
ValidationError wraps a validation error with additional context.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
func (*ValidationError) Unwrap ¶
func (e *ValidationError) Unwrap() error
type Validator ¶
type Validator[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
Validator provides comprehensive input validation for Narwhal messages. All methods are safe for concurrent use.
func NewValidator ¶
func NewValidator[H Hash, T Transaction[H]]( cfg ValidationConfig, validators ValidatorSet, hashFunc func([]byte) H, ) *Validator[H, T]
NewValidator creates a new Validator with the given configuration.
func (*Validator[H, T]) ValidateBatch ¶
ValidateBatch performs comprehensive validation on a batch.
Checks performed:
- Batch is not nil
- ValidatorID is valid
- Digest is correct
- Transaction count is within limits
- Individual transaction sizes are within limits (if checkTxSize is true)
func (*Validator[H, T]) ValidateCertificate ¶
func (v *Validator[H, T]) ValidateCertificate(cert *Certificate[H], currentRound uint64, verifySignatures bool) error
ValidateCertificate performs comprehensive validation on a certificate.
Checks performed:
- Certificate is not nil
- Header is valid (via ValidateHeader)
- Has quorum of signatures (2f+1)
- Signature count matches bitmap
- Number of signatures is within limits
- All signatures are valid (if verifySignatures is true)
func (*Validator[H, T]) ValidateHeader ¶
ValidateHeader validates a header before voting or accepting.
func (*Validator[H, T]) ValidateMessageSize ¶
func (v *Validator[H, T]) ValidateMessageSize(msgType MessageType, data []byte) error
ValidateMessageSize checks if a raw message is within size limits.
func (*Validator[H, T]) ValidateVote ¶
func (v *Validator[H, T]) ValidateVote(vote *HeaderVote[H], verifySignature bool) error
ValidateVote performs validation on a header vote.
Checks performed:
- Vote is not nil
- Voter is a valid validator
- Signature is valid (if verifySignature is true)
type ValidatorSet ¶
type ValidatorSet interface {
Count() int
GetByIndex(index uint16) (PublicKey, error)
Contains(index uint16) bool
F() int // max Byzantine faults: (n-1)/3
}
ValidatorSet represents the set of validators. Quorum requires 2f+1 validators.
type ValidatorSetChange ¶
type ValidatorSetChange struct {
// Type indicates what kind of change this is.
Type ValidatorSetChangeType
// ValidatorIndex is the index of the validator being changed.
// For ADD, this is the new index assigned.
// For REMOVE and UPDATE, this is the existing index.
ValidatorIndex uint16
// PublicKey is the public key involved in the change.
// For ADD and UPDATE, this is the new key.
// For REMOVE, this may be nil.
PublicKey PublicKey
}
ValidatorSetChange represents a change to the validator set.
type ValidatorSetChangeType ¶
type ValidatorSetChangeType uint8
ValidatorSetChangeType indicates the type of validator set change.
const ( // ValidatorSetChangeAdd adds a new validator. ValidatorSetChangeAdd ValidatorSetChangeType = iota // ValidatorSetChangeRemove removes an existing validator. ValidatorSetChangeRemove // ValidatorSetChangeUpdate updates a validator's key (key rotation). ValidatorSetChangeUpdate )
func (ValidatorSetChangeType) String ¶
func (t ValidatorSetChangeType) String() string
String returns a human-readable name for the change type.
type Vertex ¶
type Vertex[H Hash, T Transaction[H]] struct { Certificate *Certificate[H] Batches []*Batch[H, T] }
Vertex represents a certified DAG vertex with its data.
type VertexCache ¶
type VertexCache[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
VertexCache is a specialized LRU cache for vertices (certificate + batches).
func NewVertexCache ¶
func NewVertexCache[H Hash, T Transaction[H]](capacity int) *VertexCache[H, T]
NewVertexCache creates a new vertex cache with the given capacity.
func (*VertexCache[H, T]) Clear ¶
func (c *VertexCache[H, T]) Clear()
Clear removes all vertices from the cache.
func (*VertexCache[H, T]) Contains ¶
func (c *VertexCache[H, T]) Contains(digest H) bool
Contains checks if a vertex exists in the cache.
func (*VertexCache[H, T]) Get ¶
func (c *VertexCache[H, T]) Get(digest H) (*Vertex[H, T], bool)
Get retrieves a vertex by its certificate digest.
func (*VertexCache[H, T]) Len ¶
func (c *VertexCache[H, T]) Len() int
Len returns the number of vertices in the cache.
func (*VertexCache[H, T]) Put ¶
func (c *VertexCache[H, T]) Put(vertex *Vertex[H, T])
Put adds a vertex to the cache.
func (*VertexCache[H, T]) Remove ¶
func (c *VertexCache[H, T]) Remove(digest H) bool
Remove removes a vertex from the cache.
func (*VertexCache[H, T]) Stats ¶
func (c *VertexCache[H, T]) Stats() LRUCacheStats
Stats returns cache statistics.
type VertexInsertedEvent ¶
type VertexInsertedEvent[H Hash] struct { Certificate *Certificate[H] Round uint64 Author uint16 ParentCount int TotalInRound int // Total certificates in this round after insertion InsertedAt time.Time }
VertexInsertedEvent contains information about a vertex inserted into the DAG.
type VoteDecision ¶
type VoteDecision uint8
VoteDecision represents the decision about whether to vote for a header.
const ( // VoteDecisionAllow means we should vote for this header. VoteDecisionAllow VoteDecision = iota // VoteDecisionSkipOldRound means we've already voted for a higher round from this author. VoteDecisionSkipOldRound // VoteDecisionSkipEquivocation means we've already voted for a different header // from this author at the same round (equivocation detected). VoteDecisionSkipEquivocation // VoteDecisionSkipOldEpoch means the header is from an old epoch. VoteDecisionSkipOldEpoch // VoteDecisionSkipDuplicate means we've already voted for this exact header. VoteDecisionSkipDuplicate )
func (VoteDecision) String ¶
func (d VoteDecision) String() string
String returns a human-readable description of the vote decision.
type VoteMessage ¶
type VoteMessage[H Hash, T Transaction[H]] struct { VoteData *HeaderVote[H] // contains filtered or unexported fields }
VoteMessage wraps a vote for network transmission.
func (*VoteMessage[H, T]) Sender ¶
func (m *VoteMessage[H, T]) Sender() uint16
func (*VoteMessage[H, T]) Type ¶
func (m *VoteMessage[H, T]) Type() MessageType
func (*VoteMessage[H, T]) Vote ¶
func (m *VoteMessage[H, T]) Vote() *HeaderVote[H]
type VoteMessageAccessor ¶
type VoteMessageAccessor[H Hash] interface{ Vote() *HeaderVote[H] }
type VoteReceivedEvent ¶
type VoteReceivedEvent[H Hash] struct { HeaderDigest H Voter uint16 TotalVotes int // Current vote count for this header QuorumRequired int ReceivedAt time.Time }
VoteReceivedEvent contains information about a received vote.
type VoteRecord ¶
type VoteRecord[H Hash] struct { // Round is the round we voted for. Round uint64 // Epoch is the epoch we voted for (for cross-epoch safety). Epoch uint64 // HeaderDigest is the digest of the header we voted for. HeaderDigest H // VotedAt is when we cast this vote. VotedAt time.Time }
VoteRecord tracks a vote we sent for a specific (author, round) pair. Used to prevent double-voting on equivocating headers from the same author.
type VoteSentEvent ¶
VoteSentEvent contains information about a vote we sent.
type VoteTracker ¶
type VoteTracker[H Hash] struct { // contains filtered or unexported fields }
VoteTracker prevents double-voting on equivocating headers. Thread-safe.
func NewVoteTracker ¶
func NewVoteTracker[H Hash](logger *zap.Logger) *VoteTracker[H]
NewVoteTracker creates a new VoteTracker.
func (*VoteTracker[H]) Clear ¶
func (vt *VoteTracker[H]) Clear()
Clear removes all vote records. Used during reconfiguration.
func (*VoteTracker[H]) GarbageCollect ¶
func (vt *VoteTracker[H]) GarbageCollect(gcRound uint64)
GarbageCollect removes vote records for rounds below gcRound. Called periodically to prevent unbounded memory growth.
func (*VoteTracker[H]) RecordVote ¶
func (vt *VoteTracker[H]) RecordVote(author uint16, round, epoch uint64, headerDigest H)
RecordVote records that we voted for a header. This should be called AFTER successfully sending the vote.
func (*VoteTracker[H]) SetEpoch ¶
func (vt *VoteTracker[H]) SetEpoch(epoch uint64)
SetEpoch updates the current epoch and clears records from old epochs.
func (*VoteTracker[H]) ShouldVote ¶
func (vt *VoteTracker[H]) ShouldVote(author uint16, round, epoch uint64, headerDigest H) (VoteDecision, *H)
ShouldVote determines whether we should vote for a header. Returns the decision and, if equivocation is detected, the digest of the header we already voted for.
The logic follows Sui's Narwhal implementation: 1. If header.Round < lastVotedRound for this author: skip (old round) 2. If header.Round == lastVotedRound and header.Digest == lastVotedDigest: skip (duplicate) 3. If header.Round == lastVotedRound and header.Digest != lastVotedDigest: skip (equivocation) 4. If header.Round > lastVotedRound: allow (new round) 5. If no previous vote for this author: allow
func (*VoteTracker[H]) Stats ¶
func (vt *VoteTracker[H]) Stats() VoteTrackerStats
Stats returns current statistics.
type VoteTrackerStats ¶
VoteTrackerStats contains statistics for monitoring.
type Worker ¶
type Worker[H Hash, T Transaction[H]] struct { // contains filtered or unexported fields }
Worker receives transactions and creates batches.
func NewWorker ¶
func NewWorker[H Hash, T Transaction[H]](cfg WorkerConfig[H, T]) *Worker[H, T]
NewWorker creates a new worker.
func (*Worker[H, T]) AddTransaction ¶
AddTransaction adds a transaction to the pending batch. If MaxPending is configured and the queue is full:
- If DropOnFull is true, the transaction is dropped silently
- If DropOnFull is false, this call blocks until space is available
Returns true if the transaction was accepted, false if dropped.
func (*Worker[H, T]) Stats ¶
func (w *Worker[H, T]) Stats() WorkerStats
Stats returns current worker statistics.
type WorkerConfig ¶
type WorkerConfig[H Hash, T Transaction[H]] struct { ID uint16 ValidatorID uint16 BatchSize int BatchTimeout time.Duration HashFunc func([]byte) H OnBatch func(*Batch[H, T]) Hooks *Hooks[H, T] Logger *zap.Logger // Backpressure settings MaxPending int // Max pending transactions (0 = unbounded) DropOnFull bool // If true, drop when full; if false, block }
WorkerConfig configures a worker.