Documentation
¶
Index ¶
- Variables
- type ErrNodePropertyNotFound
- type HearbeatEvent
- type JoinOp
- type Member
- type MemberState
- type Options
- func (o *Options) Validate() error
- func (o *Options) WithHTTPClient(client *http.Client) *Options
- func (o *Options) WithHashRing(ring hashring.Ring) *Options
- func (o *Options) WithHashRingReplicaCount(count int) *Options
- func (o *Options) WithHearbeatConcurrency(concurrency int) *Options
- func (o *Options) WithHeartbeatInterval(interval time.Duration) *Options
- func (o *Options) WithHeartbeatMaxFailures(maxFailures int) *Options
- func (o *Options) WithHeartbeatTimeout(timeout time.Duration) *Options
- func (o *Options) WithName(name string) *Options
- func (o *Options) WithPrefix(prefix string) *Options
- func (o *Options) WithRedisDB(db int) *Options
- func (o *Options) WithRedisHost(host string) *Options
- func (o *Options) WithRedisMaxRetries(retries int) *Options
- func (o *Options) WithRedisPassword(password string) *Options
- func (o *Options) WithRedisPort(port int) *Options
- func (o *Options) WithRedisRetryBackoff(interval time.Duration) *Options
- type Pantheon
- func (c *Pantheon) Destroy() error
- func (c *Pantheon) Distribute(keys []string) error
- func (c *Pantheon) GetKeyNode(key string) (string, error)
- func (c *Pantheon) GetNodeHealth(nodeID string) (MemberState, error)
- func (c *Pantheon) GetNodeKeys(nodeID string) ([]string, error)
- func (c *Pantheon) Join(op *JoinOp) error
- func (c *Pantheon) Leave(id string) error
- func (c *Pantheon) PingNode(nodeID string) error
- func (c *Pantheon) ResetNodeFailures(nodeID string) error
- func (c *Pantheon) Start() error
- type PantheonEvent
- type RedisClient
- type RedisClientOptions
- type Storage
- func (s *Storage) AddNode(ctx context.Context, nodeID, address, path string, port int) error
- func (s *Storage) GetNode(ctx context.Context, nodeID string) (*Member, error)
- func (s *Storage) GetNodes(ctx context.Context) ([]Member, error)
- func (s *Storage) IncrementHeartbeatFailures(ctx context.Context, nodeID string) error
- func (s *Storage) IncrementHeartbeats(ctx context.Context, nodeID string) error
- func (s *Storage) RemoveNode(ctx context.Context, nodeID string) error
- func (s Storage) ResetHeartbeatFailures(ctx context.Context, nodeID string) error
- func (s *Storage) UpdateNode(ctx context.Context, nodeID, address, path string, port int) error
- func (s *Storage) UpdateNodeHeartbeat(ctx context.Context, nodeID string) error
- func (s *Storage) UpdateNodeState(ctx context.Context, nodeID string, state MemberState) error
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidHTTPClient = errors.New("http client is required")
var ErrInvalidHashRing = errors.New("hash ring is required")
var ErrInvalidHeartbeatConcurrency = errors.New("heartbeat concurrency must be greater than 0")
var ErrInvalidHeartbeatInterval = errors.New("heartbeat interval must be greater than 0")
var ErrInvalidHeartbeatMaxFailures = errors.New("heartbeat max failures must be greater than 0")
var ErrInvalidHeartbeatTimeout = errors.New("heartbeat timeout must be greater than 0")
var ErrInvalidName = errors.New("name is required")
var ErrInvalidPrefix = errors.New("prefix is required")
Option validation errors
var ErrInvalidRedisDB = errors.New("redis db must be greater than or equal to 0")
var ErrInvalidRedisHost = errors.New("redis host is required")
var ErrInvalidRedisMaxRetries = errors.New("redis max retries must be greater than or equal to 0")
var ErrInvalidRedisPort = errors.New("redis port is required")
var ErrInvalidRedisRetryBackoff = errors.New("redis retry backoff must be greater than 0")
Functions ¶
This section is empty.
Types ¶
type ErrNodePropertyNotFound ¶
type ErrNodePropertyNotFound struct {
// contains filtered or unexported fields
}
ErrNodeNotFound is return when a node property is not found in the storage
func NewErrNodePropertyNotFound ¶
func NewErrNodePropertyNotFound(property string) *ErrNodePropertyNotFound
NewErrNodePropertyNotFound creates a new ErrNodePropertyNotFound
func (*ErrNodePropertyNotFound) Error ¶
func (e *ErrNodePropertyNotFound) Error() string
type HearbeatEvent ¶
type Member ¶
type Member struct { // ID; the unique identifier for the node ID string // Address; the address of the node Address string // Path; the path on the node to make the heartbeat request to Path string // JoinedAt; the time the node joined the cluster JoinedAt string // LastHeartbeat; the last time a heartbeat was received from the node LastHeartbeat string // HearbeatCount; the number of heartbeat requests sent to the node HeartbeatCount string // HeartbeatFailures; the number of failed heartbeat requests HeartbeatFailures string // State; the state of the node: alive, dead, or suspect State MemberState }
Member represents a node in the cluster
type MemberState ¶
type MemberState string
const ( MemberAlive MemberState = "alive" MemberDead MemberState = "dead" MemberSuspect MemberState = "suspect" )
func (MemberState) MarshalBinary ¶ added in v0.1.3
func (m MemberState) MarshalBinary() (data []byte, err error)
MarshalBinary implements the encoding.BinaryMarshaler interface
func (*MemberState) UnmarshalBinary ¶ added in v0.1.3
func (m *MemberState) UnmarshalBinary(data []byte) error
UnmarshalBinary implements the encoding.BinaryUnmarshaler interface
type Options ¶
type Options struct {
// contains filtered or unexported fields
}
func NewOptions ¶
func NewOptions() *Options
NewOptions creates a new Options instance with default values The default values are: - prefix: "pantheon" - name: "my-cluster" - hearbeatInterval: 30 seconds - heartbeatTimeout: 30 seconds - heartbeatConcurrency: 2 - heartbeatMaxFailures: 3 - redisHost: "localhost" - redisPort: 6379 - redisDB: 0 - redisMaxRetries: 5 - redisRetryBackoff: 20 seconds - hashringReplicaCount: 10 - httpClient: nil - hashRing: nil
func (*Options) WithHashRingReplicaCount ¶
func (*Options) WithHearbeatConcurrency ¶
func (*Options) WithHeartbeatInterval ¶
func (*Options) WithHeartbeatMaxFailures ¶ added in v0.1.1
func (*Options) WithHeartbeatTimeout ¶
func (*Options) WithPrefix ¶
func (*Options) WithRedisDB ¶
func (*Options) WithRedisHost ¶
func (*Options) WithRedisMaxRetries ¶
func (*Options) WithRedisPassword ¶
func (*Options) WithRedisPort ¶
type Pantheon ¶
type Pantheon struct { // eventsCh; a channel to send cluster events EventsCh chan PantheonEvent // contains filtered or unexported fields }
func (*Pantheon) Destroy ¶
Destroy stops the cluster this should be called when the cluster is no longer needed
func (*Pantheon) Distribute ¶
Distribute distributes keys to the nodes in the cluster This should be called after a nodes has joined or left the cluster to rebalance the keys to available nodes
func (*Pantheon) GetKeyNode ¶
GetKeyNode returns the node responsible for a specific key
func (*Pantheon) GetNodeHealth ¶
func (c *Pantheon) GetNodeHealth(nodeID string) (MemberState, error)
GetNodeHealth returns the health status of a node
func (*Pantheon) GetNodeKeys ¶
GetNodeKeys retrieves the keys assigned to a node. This is used to determine which keys a node in the cluster is responsible for
func (*Pantheon) Join ¶
Join adds a node to the cluster This should be called when a new node is starting up
func (*Pantheon) Leave ¶
Leave removes a node from the cluster This should called when a node is shutting down
func (*Pantheon) ResetNodeFailures ¶
ResetNodeFailures resets the heartbeat failure count for a node
type PantheonEvent ¶
type PantheonEvent struct { // Event; the name of the event // "started" - when the cluster is started // "joined" - when a node joins the cluster // "left" - when a node leaves the cluster // "died" - when a node is considered dead (no heartbeat received/timeout) Event string // NodeID; the identifier of the node NodeID string }
type RedisClient ¶
type RedisClient interface { Ping(ctx context.Context) *redis.StatusCmd Del(ctx context.Context, keys ...string) *redis.IntCmd HSet(ctx context.Context, key string, fields ...interface{}) *redis.IntCmd HGet(ctx context.Context, key, field string) *redis.StringCmd HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd HIncrBy(ctx context.Context, key, field string, incr int64) *redis.IntCmd Keys(ctx context.Context, pattern string) *redis.StringSliceCmd // Added for key distribution Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd Get(ctx context.Context, key string) *redis.StringCmd SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd SMembers(ctx context.Context, key string) *redis.StringSliceCmd }
func NewRedisClient ¶
func NewRedisClient(ctx context.Context, opts *RedisClientOptions) (RedisClient, error)
NewRedisClient creates a new redis client
type RedisClientOptions ¶
type RedisClientOptions struct { // The hostname of the redis server/cluster Host string // The port of the redis server/cluster Port int // The password for the redis server/cluster Password string // The database to use in the redis server/cluster DB int // The maximum number of retries before giving up MaxRetries int // The maximum time to wait before giving up RetryBackOffLimit time.Duration }
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func NewStorage ¶
func NewStorage(prefix string, namespace string, client RedisClient) *Storage
func (*Storage) AddNode ¶
AddNode upserts a node to the cluster The node is identified by its ID. The address and port are used to communicate with the node. The path is the path on the node to make the heartbeat request to. The node is added with the state "alive". The node is added with the current time as the joined_at and last_heartbeat times.
func (*Storage) IncrementHeartbeatFailures ¶
func (*Storage) IncrementHeartbeats ¶
func (*Storage) RemoveNode ¶
RemoveNode removes a node from the cluster
func (Storage) ResetHeartbeatFailures ¶
func (*Storage) UpdateNode ¶ added in v0.1.6
UpdateNode updates the address and path of a node
func (*Storage) UpdateNodeHeartbeat ¶
UpdateNodeHeartbeat updates the last heartbeat time for a node
func (*Storage) UpdateNodeState ¶
UpdateNodeState updates the state of a node