cluster

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CertPayload

type CertPayload struct {
	Domain  string `json:"domain"`
	CertPEM []byte `json:"cert"`
	KeyPEM  []byte `json:"key"`
}

type Cluster

type Cluster interface {
	Members() []string
	Get(key string) ([]byte, bool)
	Set(key string, value []byte)
	Delete(key string)
	TryAcquireLock(key string) bool

	BroadcastCert(domain string, certPEM, keyPEM []byte) error
	BroadcastConfig(domain string, rawHCL []byte, deleted bool) error
	BroadcastChallenge(token, keyAuth string, deleted bool)

	Shutdown() error
}

Cluster defines the distributed backend contract for cloud-native readiness. Implementations (Memberlist, ETCD, Consul) must satisfy these coordination primitives.

type Config

type Config struct {
	BindAddr string
	BindPort int
	Secret   []byte
	Name     string
	Seeds    []string
	HostsDir expect.Folder

	// KeeperSnapshot is called by the seed node during LocalState(join=true) to
	// collect all keeper secrets that should be pushed to the joining node.
	// The returned map is key -> plaintext bytes; values are zeroed after use.
	// Nil means no keeper sync.
	KeeperSnapshot func() map[string][]byte

	// KeeperWrite is called on the joining node when an OpSecret envelope arrives
	// via MergeRemoteState. It must write key -> value into the local keeper store.
	// The value slice is zeroed by the caller immediately after KeeperWrite returns.
	// Nil means incoming OpSecret envelopes are dropped.
	KeeperWrite func(key string, value []byte)
}

type ConfigPayload

type ConfigPayload struct {
	Domain    string `json:"domain"`
	RawHCL    []byte `json:"raw_hcl,omitempty"`
	Checksum  string `json:"checksum"`
	Timestamp int64  `json:"timestamp"`
	NodeID    string `json:"node_id"`
	Deleted   bool   `json:"deleted"`
}

type Distributor

type Distributor struct {
	// contains filtered or unexported fields
}

func NewDistributor

func NewDistributor(logger *ll.Logger, localDir expect.Folder) *Distributor

NewDistributor initializes the configuration synchronizer. It populates the initial checksum cache from existing files on disk.

func (*Distributor) Apply

func (c *Distributor) Apply(payload ConfigPayload)

Apply writes or deletes a configuration file based on a cluster payload. Domain values are validated to prevent path traversal before any file operation.

func (*Distributor) LoadExistingChecksums

func (c *Distributor) LoadExistingChecksums()

LoadExistingChecksums scans the local directory for configuration files. It caches the checksums to prevent unnecessary synchronization loops on startup.

func (*Distributor) PreparePayload

func (c *Distributor) PreparePayload(domain string, rawHCL []byte, deleted bool, nodeID string) (*ConfigPayload, error)

PreparePayload creates a compressed configuration payload for cluster distribution. It mutates the local checksum cache ensuring subsequent fsnotify events are ignored.

func (*Distributor) ShouldBroadcast

func (c *Distributor) ShouldBroadcast(domain string, content []byte) bool

ShouldBroadcast determines if the file content has changed locally. It is strictly a read-only check authorizing fsnotify to trigger PreparePayload.

func (*Distributor) ShouldBroadcastDeletion

func (c *Distributor) ShouldBroadcastDeletion(domain string) bool

ShouldBroadcastDeletion determines if a deleted file was previously tracked. It returns true to authorize a cluster-wide deletion broadcast without modifying state.

func (*Distributor) UpdateChecksum

func (c *Distributor) UpdateChecksum(domain string, content []byte)

UpdateChecksum manually forces a checksum update for a domain. Used when local changes need to bypass immediate broadcasting.

type Envelope

type Envelope struct {
	Op        OpType `json:"op"`
	Key       string `json:"k"`
	Value     []byte `json:"v,omitempty"`
	Timestamp int64  `json:"ts"`
	Owner     string `json:"owner,omitempty"`
}

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func NewManager

func NewManager(cfg Config, handler UpdateHandler, logger *ll.Logger) (*Manager, error)

func (*Manager) BroadcastCert

func (m *Manager) BroadcastCert(domain string, certPEM, keyPEM []byte) error

BroadcastCert encrypts and disseminates new SSL certificates reliably. Prevents multiple nodes from hammering Let's Encrypt simultaneously.

func (*Manager) BroadcastChallenge

func (m *Manager) BroadcastChallenge(token, keyAuth string, deleted bool)

BroadcastChallenge disseminates ACME tokens for HTTP-01 verification. Allows any node behind the load balancer to answer validation requests.

func (*Manager) BroadcastConfig

func (m *Manager) BroadcastConfig(domain string, rawHCL []byte, deleted bool) error

BroadcastConfig propagates a host configuration file to the cluster. It utilizes TCP to bypass standard gossip UDP payload limits.

func (*Manager) BroadcastGossip

func (m *Manager) BroadcastGossip(op OpType, key string, value []byte)

func (*Manager) BroadcastReliable

func (m *Manager) BroadcastReliable(op OpType, key string, value []byte) error

BroadcastReliable sends a payload to all cluster members over TCP. It guarantees delivery for critical data like configurations and certificates.

func (*Manager) BroadcastRoute

func (m *Manager) BroadcastRoute(key string, value []byte)

BroadcastRoute distributes ephemeral routing logic over UDP. Commonly used for temporary webhook or API-created paths.

func (*Manager) BroadcastSecret

func (m *Manager) BroadcastSecret(key string, plaintext []byte) error

BroadcastSecret encrypts plaintext with the cluster cipher and broadcasts it to all peers as an OpSecret envelope. Peers handle it in apply() and write it to their local keeper via keeperWrite. This is called by the keeper API handler after any write or delete so that runtime secret changes propagate to all cluster nodes.

func (*Manager) BroadcastStatus

func (m *Manager) BroadcastStatus(status string)

BroadcastStatus shares health and readiness states across peers. Enables traffic shaping and early aborts based on neighbor status.

func (*Manager) ConfigManager

func (m *Manager) ConfigManager() *Distributor

func (*Manager) Delete

func (m *Manager) Delete(key string)

Delete removes a key from the gossip mesh globally.

func (*Manager) Get

func (m *Manager) Get(key string) ([]byte, bool)

Get returns the value of a key from the local cache.

func (*Manager) Members

func (m *Manager) Members() []string

Members returns a list of actively connected node names.

func (*Manager) Metrics

func (m *Manager) Metrics() map[string]uint64

Metrics retrieves operation counts for monitoring hooks.

func (*Manager) Set

func (m *Manager) Set(key string, value []byte)

Set persists a generic key-value pair into the gossip mesh.

func (*Manager) Shutdown

func (m *Manager) Shutdown() error

Shutdown safely disconnects the node and alerts peers before exit. Stops maintenance loops to free resources gracefully.

func (*Manager) TryAcquireLock

func (m *Manager) TryAcquireLock(key string) bool

TryAcquireLock attempts to grab a distributed lock over the gossip layer. Useful for ensuring single-node operations (like ACME issuance) in a cluster.

type Metrics

type Metrics interface {
	IncUpdatesReceived()
	IncUpdatesIgnored()
	IncDeletes()
	IncJoin()
	IncLeave()
	Snapshot() map[string]uint64
}

type OpType

type OpType uint8
const (
	OpSet       OpType = 1
	OpDel       OpType = 2
	OpRoute     OpType = 3
	OpCert      OpType = 4
	OpLock      OpType = 5
	OpStatus    OpType = 6
	OpChallenge OpType = 7
	OpConfig    OpType = 8
	OpSecret    OpType = 9
)

type RealMetrics

type RealMetrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics() *RealMetrics

func (*RealMetrics) IncDeletes

func (m *RealMetrics) IncDeletes()

func (*RealMetrics) IncJoin

func (m *RealMetrics) IncJoin()

func (*RealMetrics) IncLeave

func (m *RealMetrics) IncLeave()

func (*RealMetrics) IncUpdatesIgnored

func (m *RealMetrics) IncUpdatesIgnored()

func (*RealMetrics) IncUpdatesReceived

func (m *RealMetrics) IncUpdatesReceived()

func (*RealMetrics) Snapshot

func (m *RealMetrics) Snapshot() map[string]uint64

type RedisSharedState

type RedisSharedState struct {
	// contains filtered or unexported fields
}

RedisSharedState implements woos.SharedState using Redis as the distributed backend. Provides atomic counters and token bucket rate limiting across cluster nodes.

func NewRedisSharedState

func NewRedisSharedState(cfg *alaye.RedisState) (*RedisSharedState, error)

NewRedisSharedState connects to Redis to provide distributed consistency. Exposes atomic scripts for precise limits across independent proxies.

func (*RedisSharedState) AllowRateLimit

func (r *RedisSharedState) AllowRateLimit(ctx context.Context, key string, limit int, window time.Duration, burst int) (bool, error)

AllowRateLimit acts as a distributed token bucket evaluation over Redis. Enforces quotas and updates capacities entirely within the database safely.

func (*RedisSharedState) Close

func (r *RedisSharedState) Close() error

Close destroys the underlying Redis connection pool gracefully. Must be called upon system reload or shutdown.

func (*RedisSharedState) Increment

func (r *RedisSharedState) Increment(ctx context.Context, key string, window time.Duration) (int64, error)

Increment tracks hits against a key and resets when the TTL window expires. Atomically increases the counter and applies expiration via a Lua pipeline.

type UpdateHandler

type UpdateHandler interface {
	OnClusterChange(key string, value []byte, deleted bool)
	OnClusterCert(domain string, certPEM, keyPEM []byte) error
	OnClusterChallenge(token, keyAuth string, deleted bool)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL