Documentation
¶
Overview ¶
Package raft contains Raft consensus for WebMesh.
Package streamlayer contains the Raft stream layer implementation.
Index ¶
Constants ¶
const ( RaftListenAddressEnvVar = "RAFT_LISTEN_ADDRESS" DataDirEnvVar = "RAFT_DATA_DIR" InMemoryEnvVar = "RAFT_IN_MEMORY" ConnectionPoolCountEnvVar = "RAFT_CONNECTION_POOL_COUNT" ConnectionTimeoutEnvVar = "RAFT_CONNECTION_TIMEOUT" HeartbeatTimeoutEnvVar = "RAFT_HEARTBEAT_TIMEOUT" ElectionTimeoutEnvVar = "RAFT_ELECTION_TIMEOUT" ApplyTimeoutEnvVar = "RAFT_APPLY_TIMEOUT" CommitTimeoutEnvVar = "RAFT_COMMIT_TIMEOUT" MaxAppendEntriesEnvVar = "RAFT_MAX_APPEND_ENTRIES" LeaderLeaseTimeoutEnvVar = "RAFT_LEADER_LEASE_TIMEOUT" SnapshotIntervalEnvVar = "RAFT_SNAPSHOT_INTERVAL" SnapshotThresholdEnvVar = "RAFT_SNAPSHOT_THRESHOLD" SnapshotRetentionEnvVar = "RAFT_SNAPSHOT_RETENTION" ObserverChanBufferEnvVar = "RAFT_OBSERVER_CHAN_BUFFER" RaftLogLevelEnvVar = "RAFT_LOG_LEVEL" RaftPreferIPv6EnvVar = "RAFT_PREFER_IPV6" LeaveOnShutdownEnvVar = "RAFT_LEAVE_ON_SHUTDOWN" StartupTimeoutEnvVar = "RAFT_STARTUP_TIMEOUT" // RaftStorePath is the raft stable and log store directory. RaftStorePath = "raft-store" // DataStoragePath is the raft data storage directory. DataStoragePath = "raft-data" )
const ( Follower = raft.Follower Candidate = raft.Candidate Leader = raft.Leader Shutdown = raft.Shutdown )
Raft states.
const ( Voter = raft.Voter Nonvoter = raft.Nonvoter )
Raft suffrage states.
Variables ¶
var ( // ErrStarted is returned when the Raft node is already started. ErrStarted = errors.New("raft node already started") // ErrClosed is returned when the Raft node is already closed. ErrClosed = errors.New("raft node is closed") // ErrAlreadyBootstrapped is returned when the Raft node is already bootstrapped. ErrAlreadyBootstrapped = raft.ErrCantBootstrap // ErrNotLeader is returned when the Raft node is not the leader. ErrNotLeader = raft.ErrNotLeader )
Functions ¶
This section is empty.
Types ¶
type BootstrapOptions ¶
type BootstrapOptions struct {
// AdvertiseAddress is the address to advertise to the other
// bootstrap nodes. Defaults to localhost:listen_port if empty.
AdvertiseAddress string
// Servers are the Raft servers to bootstrap with.
// Keys are the node IDs, and values are the Raft addresses.
Servers map[string]string
// OnBootstrapped is called when the cluster is bootstrapped.
OnBootstrapped func(isLeader bool) error
}
BootstrapOptions are options for bootstrapping a Raft node.
type LeaderObservation ¶
type LeaderObservation = raft.LeaderObservation
LeaderObservation is an alias for raft.LeaderObservation.
type LogStoreCloser ¶
LogStoreCloser is a LogStore that can be closed.
type MemoryStore ¶
type MemoryStore interface {
LogStoreCloser
StableStoreCloser
}
MemoryStore is a Store that is in-memory.
func NewInmemStore ¶
func NewInmemStore() MemoryStore
NewInmemStore returns a new in-memory store that can be used for logs and stable storage.
type MonotonicLogStore ¶
MonotonicLogStore is a LogStore that is monotonic.
func (*MonotonicLogStore) IsMonotonic ¶
func (m *MonotonicLogStore) IsMonotonic() bool
IsMonotonic returns true if the log store is monotonic.
type Observation ¶
type Observation = raft.Observation
Observation is an alias for raft.Observation.
type Options ¶
type Options struct {
// ListenAddress is the address to listen on for raft.
ListenAddress string `json:"listen-address,omitempty" yaml:"listen-address,omitempty" toml:"listen-address,omitempty"`
// DataDir is the directory to store data in.
DataDir string `json:"data-dir,omitempty" yaml:"data-dir,omitempty" toml:"data-dir,omitempty"`
// InMemory is if the store should be in memory. This should only be used for testing and ephemeral nodes.
InMemory bool `json:"in-memory,omitempty" yaml:"in-memory,omitempty" toml:"in-memory,omitempty"`
// ConnectionPoolCount is the number of connections to pool. If 0, no connection pooling is used.
ConnectionPoolCount int `json:"connection-pool-count,omitempty" yaml:"connection-pool-count,omitempty" toml:"connection-pool-count,omitempty"`
// ConnectionTimeout is the timeout for connections.
ConnectionTimeout time.Duration `json:"connection-timeout,omitempty" yaml:"connection-timeout,omitempty" toml:"connection-timeout,omitempty"`
// HeartbeatTimeout is the timeout for heartbeats.
HeartbeatTimeout time.Duration `json:"heartbeat-timeout,omitempty" yaml:"heartbeat-timeout,omitempty" toml:"heartbeat-timeout,omitempty"`
// ElectionTimeout is the timeout for elections.
ElectionTimeout time.Duration `json:"election-timeout,omitempty" yaml:"election-timeout,omitempty" toml:"election-timeout,omitempty"`
// ApplyTimeout is the timeout for applying.
ApplyTimeout time.Duration `json:"apply-timeout,omitempty" yaml:"apply-timeout,omitempty" toml:"apply-timeout,omitempty"`
// CommitTimeout is the timeout for committing.
CommitTimeout time.Duration `json:"commit-timeout,omitempty" yaml:"commit-timeout,omitempty" toml:"commit-timeout,omitempty"`
// MaxAppendEntries is the maximum number of append entries.
MaxAppendEntries int `json:"max-append-entries,omitempty" yaml:"max-append-entries,omitempty" toml:"max-append-entries,omitempty"`
// LeaderLeaseTimeout is the timeout for leader leases.
LeaderLeaseTimeout time.Duration `json:"leader-lease-timeout,omitempty" yaml:"leader-lease-timeout,omitempty" toml:"leader-lease-timeout,omitempty"`
// SnapshotInterval is the interval to take snapshots.
SnapshotInterval time.Duration `json:"snapshot-interval,omitempty" yaml:"snapshot-interval,omitempty" toml:"snapshot-interval,omitempty"`
// SnapshotThreshold is the threshold to take snapshots.
SnapshotThreshold uint64 `json:"snapshot-threshold,omitempty" yaml:"snapshot-threshold,omitempty" toml:"snapshot-threshold,omitempty"`
// SnapshotRetention is the number of snapshots to retain.
SnapshotRetention uint64 `json:"snapshot-retention,omitempty" yaml:"snapshot-retention,omitempty" toml:"snapshot-retention,omitempty"`
// ObserverChanBuffer is the buffer size for the observer channel.
ObserverChanBuffer int `json:"observer-chan-buffer,omitempty" yaml:"observer-chan-buffer,omitempty" toml:"observer-chan-buffer,omitempty"`
// LogLevel is the log level for the raft backend.
LogLevel string `json:"log-level,omitempty" yaml:"log-level,omitempty" toml:"log-level,omitempty"`
// PreferIPv6 is the prefer IPv6 flag.
PreferIPv6 bool `json:"prefer-ipv6,omitempty" yaml:"prefer-ipv6,omitempty" toml:"prefer-ipv6,omitempty"`
// LeaveOnShutdown is the leave on shutdown flag.
LeaveOnShutdown bool `json:"leave-on-shutdown,omitempty" yaml:"leave-on-shutdown,omitempty" toml:"leave-on-shutdown,omitempty"`
// Below are callbacks used internally or by external packages.
OnApplyLog func(ctx context.Context, term, index uint64, log *v1.RaftLogEntry) `json:"-" yaml:"-" toml:"-"`
OnSnapshotRestore func(ctx context.Context, meta *SnapshotMeta, data io.ReadCloser) `json:"-" yaml:"-" toml:"-"`
OnObservation func(ev Observation) `json:"-" yaml:"-" toml:"-"`
}
Options are the raft options.
func NewOptions ¶
func NewOptions() *Options
NewOptions returns new raft options with the default values.
func (*Options) DataStoragePath ¶
DataStoragePath returns the data directory.
func (*Options) RaftConfig ¶
RaftConfig builds a raft config.
type PeerObservation ¶
type PeerObservation = raft.PeerObservation
PeerObservation is an alias for raft.PeerObservation.
type Raft ¶
type Raft interface {
// Start starts the Raft node.
Start(ctx context.Context, opts *StartOptions) error
// Bootstrap attempts to bootstrap the Raft cluster. If the cluster is already
// bootstrapped, ErrAlreadyBootstrapped is returned. If the cluster is not
// bootstrapped and bootstrapping succeeds, the optional callback is called
// with isLeader flag set to true if the node is the leader, and false otherwise.
// Any error returned by the callback is returned by Bootstrap.
Bootstrap(ctx context.Context, opts *BootstrapOptions) error
// Storage returns the storage. This is only valid after Start is called.
Storage() storage.Storage
// Raft returns the Raft instance. This is only valid after Start is called.
Raft() *raft.Raft
// Configuration returns the current raft configuration.
Configuration() raft.Configuration
// LastAppliedIndex returns the last applied index.
LastAppliedIndex() uint64
// ListenPort returns the listen port.
ListenPort() int
// IsLeader returns true if the Raft node is the leader.
IsLeader() bool
// AddNonVoter adds a non-voting node to the cluster with timeout enforced by the context.
AddNonVoter(ctx context.Context, id string, addr string) error
// AddVoter adds a voting node to the cluster with timeout enforced by the context.
AddVoter(ctx context.Context, id string, addr string) error
// DemoteVoter demotes a voting node to a non-voting node with timeout enforced by the context.
DemoteVoter(ctx context.Context, id string) error
// RemoveServer removes a peer from the cluster with timeout enforced by the context.
RemoveServer(ctx context.Context, id string, wait bool) error
// Restore restores the Raft node from a snapshot.
Restore(rdr io.ReadCloser) error
// Stop stops the Raft node.
Stop(ctx context.Context) error
}
Raft is the interface for Raft consensus and storage.
type SnapshotMeta ¶
type SnapshotMeta = raft.SnapshotMeta
SnapshotMeta is an alias for raft.SnapshotMeta.
type StableStoreCloser ¶
type StableStoreCloser interface {
io.Closer
raft.StableStore
}
StableStoreCloser is a StableStore that can be closed.
type StartOptions ¶
type StartOptions struct {
// NodeID is the node ID.
NodeID string
}
StartOptons are options for starting a Raft node.
type StreamLayer ¶
type StreamLayer interface {
raft.StreamLayer
// ListenPort returns the port the transport is listening on.
ListenPort() int
}
StreamLayer is the StreamLayer interface.
func NewStreamLayer ¶
func NewStreamLayer(addr string) (StreamLayer, error)
NewStreamLayer creates a new stream layer listening on the given address.