conn

package
v25.0.0-split-vector3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2025 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoConnection indicates no connection exists to a node.
	ErrNoConnection = errors.New("No connection exists")
	// ErrUnhealthyConnection indicates the connection to a node is unhealthy.
	ErrUnhealthyConnection = errors.New("Unhealthy connection")
)
View Source
var (
	// ErrNoNode is returned when no node has been set up.
	ErrNoNode = errors.Errorf("No node has been set up yet")
)

Functions

This section is empty.

Types

type Node

type Node struct {
	x.SafeMutex

	// Applied is used to keep track of the applied RAFT proposals.
	// The stages are proposed -> committed (accepted by cluster) ->
	// applied (to PL) -> synced (to BadgerDB).
	// This needs to be 64 bit aligned for atomics to work on 32 bit machine.
	Applied y.WaterMark

	// Fields which are never changed after init.
	StartTime time.Time
	Cfg       *raft.Config
	MyAddr    string
	Id        uint64

	RaftContext *pb.RaftContext
	Store       *raftwal.DiskStorage
	Rand        *rand.Rand

	Proposals proposals
	// contains filtered or unexported fields
}

Node represents a node participating in the RAFT protocol.

func NewNode

func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage, tlsConfig *tls.Config) *Node

NewNode returns a new Node instance.

func (*Node) BatchAndSendMessages

func (n *Node) BatchAndSendMessages()

BatchAndSendMessages sends messages in batches.

func (*Node) ConfState

func (n *Node) ConfState() *raftpb.ConfState

ConfState would return the latest ConfState stored in node.

func (*Node) Connect

func (n *Node) Connect(pid uint64, addr string)

Connect connects the node and makes its peerPool refer to the constructed pool and address (possibly updating ourselves from the old address.) (Unless pid is ourselves, in which case this does nothing.)

func (*Node) DeletePeer

func (n *Node) DeletePeer(pid uint64)

DeletePeer deletes the record of the peer with the given id.

func (*Node) DoneConfChange

func (n *Node) DoneConfChange(id uint64, err error)

DoneConfChange marks a configuration change as done and sends the given error to the config channel.

func (*Node) PastLife

func (n *Node) PastLife() (uint64, bool, error)

PastLife returns the index of the snapshot before the restart (if any) and whether there was a previous state that should be recovered after a restart.

func (*Node) Peer

func (n *Node) Peer(pid uint64) (string, bool)

Peer returns the address of the peer with the given id.

func (*Node) ProposePeerRemoval

func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error

ProposePeerRemoval proposes a new configuration with the peer with the given id removed.

func (*Node) Raft

func (n *Node) Raft() raft.Node

Raft would return back the raft.Node stored in the node.

func (*Node) ReportRaftComms

func (n *Node) ReportRaftComms()

ReportRaftComms periodically prints the state of the node (heartbeats in and out).

func (*Node) RunReadIndexLoop

func (n *Node) RunReadIndexLoop(closer *z.Closer, readStateCh <-chan raft.ReadState)

RunReadIndexLoop runs the RAFT index in a loop.

func (*Node) SaveToStorage

func (n *Node) SaveToStorage(h *raftpb.HardState, es []raftpb.Entry, s *raftpb.Snapshot)

SaveToStorage saves the hard state, entries, and snapshot to persistent storage, in that order.

func (*Node) Send

func (n *Node) Send(msg *raftpb.Message)

Send sends the given RAFT message from this node.

func (*Node) SetConfState

func (n *Node) SetConfState(cs *raftpb.ConfState)

SetConfState would store the latest ConfState generated by ApplyConfChange.

func (*Node) SetPeer

func (n *Node) SetPeer(pid uint64, addr string)

SetPeer sets the address of the peer with the given id. The address must not be empty.

func (*Node) SetRaft

func (n *Node) SetRaft(r raft.Node)

SetRaft would set the provided raft.Node to this node. It would check fail if the node is already set.

func (*Node) Snapshot

func (n *Node) Snapshot() (raftpb.Snapshot, error)

Snapshot returns the current snapshot.

func (*Node) WaitLinearizableRead

func (n *Node) WaitLinearizableRead(ctx context.Context) error

WaitLinearizableRead waits until a linearizable read can be performed.

type Pool

type Pool struct {
	sync.RWMutex

	Addr string
	// contains filtered or unexported fields
}

Pool is used to manage the grpc client connection(s) for communicating with other worker instances. Right now it just holds one of them.

func (*Pool) Get

func (p *Pool) Get() *grpc.ClientConn

Get returns the connection to use from the pool of connections.

func (*Pool) HealthInfo

func (p *Pool) HealthInfo() pb.HealthInfo

HealthInfo returns the healthinfo.

func (*Pool) IsHealthy

func (p *Pool) IsHealthy() bool

IsHealthy returns whether the pool is healthy.

func (*Pool) MonitorHealth

func (p *Pool) MonitorHealth()

MonitorHealth monitors the health of the connection via Echo. This function blocks forever.

func (*Pool) SetUnhealthy

func (p *Pool) SetUnhealthy()

SetUnhealthy marks a pool as unhealthy.

type Pools

type Pools struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Pools manages a concurrency-safe set of Pool.

func GetPools

func GetPools() *Pools

GetPools returns the list of pools.

func (*Pools) Connect

func (p *Pools) Connect(addr string, tlsClientConf *tls.Config) *Pool

Connect creates a Pool instance for the node with the given address or returns the existing one.

func (*Pools) Get

func (p *Pools) Get(addr string) (*Pool, error)

Get returns the list for the given address.

func (*Pools) GetAll

func (p *Pools) GetAll() []*Pool

GetAll returns all pool entries.

func (*Pools) RemoveInvalid

func (p *Pools) RemoveInvalid(state *pb.MembershipState)

RemoveInvalid removes invalid nodes from the list of pools.

type ProposalCtx

type ProposalCtx struct {
	Found uint32
	ErrCh chan error
	Ctx   context.Context
}

ProposalCtx stores the context for a proposal with extra information.

type RaftServer

type RaftServer struct {

	// embedding the pb.UnimplementedRaftServer struct to ensure forward compatibility of the server.
	pb.UnimplementedRaftServer
	// contains filtered or unexported fields
}

RaftServer is a wrapper around node that implements the Raft service.

func NewRaftServer

func NewRaftServer(n *Node) *RaftServer

NewRaftServer returns a pointer to a new RaftServer instance.

func (*RaftServer) GetNode

func (w *RaftServer) GetNode() *Node

GetNode safely retrieves the node.

func (*RaftServer) Heartbeat

func (w *RaftServer) Heartbeat(_ *api.Payload, stream pb.Raft_HeartbeatServer) error

Heartbeat rpc call is used to check connection with other workers after worker tcp server for this instance starts.

func (*RaftServer) IsPeer

func (w *RaftServer) IsPeer(ctx context.Context, rc *pb.RaftContext) (
	*pb.PeerResponse, error)

IsPeer checks whether this node is a peer of the node sending the request.

func (*RaftServer) JoinCluster

func (w *RaftServer) JoinCluster(ctx context.Context,
	rc *pb.RaftContext) (*api.Payload, error)

JoinCluster handles requests to join the cluster.

func (*RaftServer) RaftMessage

func (w *RaftServer) RaftMessage(server pb.Raft_RaftMessageServer) error

RaftMessage handles RAFT messages.

func (*RaftServer) UpdateNode

func (w *RaftServer) UpdateNode(n *Node)

UpdateNode safely updates the node.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL