log

package
v0.0.0-...-2c53574 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2022 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const RaftRPC = 1

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Raft struct {
		raft.Config
		StreamLayer *StreamLayer
		Bootstrap   bool
	}

	Segment struct {
		MaxStoreBytes uint64
		MaxIndexBytes uint64
		InitialOffset uint64
	}
}

type DistributedLog

type DistributedLog struct {
	// contains filtered or unexported fields
}

func NewDistributedLog

func NewDistributedLog(dataDir string, config Config) (
	*DistributedLog,
	error,
)

NewDistributedLog(dataDir, config) handler to create the log. Log package will contain the single-server, non-replicated log and the distributed, replicated log built with Raft.

func (*DistributedLog) Append

func (l *DistributedLog) Append(record *api.Record) (uint64, error)

Append(record) appends the record to the log.

func (*DistributedLog) Close

func (l *DistributedLog) Close() error

Close shuts down the Raft instance and closes the local log.

func (*DistributedLog) GetServers

func (l *DistributedLog) GetServers() ([]*api.Server, error)

func (*DistributedLog) Join

func (l *DistributedLog) Join(id, addr string) error

Join adds the server to the Raft cluster. We add every server as a voter

func (*DistributedLog) Leave

func (l *DistributedLog) Leave(id string) error

Leave removes the server from the cluster. Removing the leader will trigger a new election.

func (*DistributedLog) Read

func (l *DistributedLog) Read(offset uint64) (*api.Record, error)

Read(offset) reads the record for the offset from the server's log.

func (*DistributedLog) WaitForLeader

func (l *DistributedLog) WaitForLeader(timeout time.Duration) error

WatiForLeader blocks until the cluster has elected a leader or times out.

type Log

type Log struct {
	Dir    string
	Config Config
	// contains filtered or unexported fields
}

func NewLog

func NewLog(dir string, c Config) (*Log, error)

NewLog(dir, config), we first set defaults for the configs the caller didn't specify, create a log instance, and set up that instance

func (*Log) Append

func (l *Log) Append(record *api.Record) (uint64, error)

Append(*api.Record) appends a record to the log

func (*Log) Close

func (l *Log) Close() error

Close() iterates over the segments and closes them

func (*Log) HighestOffset

func (l *Log) HighestOffset() (uint64, error)

HighestOffset() tells us the highest offset stored in the log

func (*Log) LowestOffset

func (l *Log) LowestOffset() (uint64, error)

LowestOffset() tells us the lowest offset stored in the log

func (*Log) Read

func (l *Log) Read(off uint64) (*api.Record, error)

Read(offset uint64) reads the record stored at the given offset. First we find the segment that contains the given record.

func (*Log) Reader

func (l *Log) Reader() io.Reader

Reader() returns an io.Reader to read the whole log It uses an io.MultiReader() call to concatenate the segments' stores

func (*Log) Remove

func (l *Log) Remove() error

Remove() closes the log and removes its data

func (*Log) Reset

func (l *Log) Reset() error

Reset() removes the log and then creates a new log to replace it

func (*Log) Truncate

func (l *Log) Truncate(lowest uint64) error

Truncate(lowest uint64) removes all segments whose highest offset is lower than lowest

type Replicator

type Replicator struct {
	DialOptions []grpc.DialOption
	LocalServer api.LogClient
	// contains filtered or unexported fields
}

func (*Replicator) Close

func (r *Replicator) Close() error

Close() closes the replicator so it doesn't replicate new servers that join the cluster and it stops replicating existing servers by causing the replicate() goroutines to return.

func (*Replicator) Join

func (r *Replicator) Join(name, addr string) error

Join(name, addr) methods adds the given server address to the list of servers to replicate and kicks off the add goroutine to run the actual replication.

func (*Replicator) Leave

func (r *Replicator) Leave(name string) error

Leave(name) handles the server leaving the cluster by removing the server from the list of servers to replicate and closes the server's associated channel.

type RequestType

type RequestType uint8
const (
	AppendRequestType RequestType = 0
)

type StreamLayer

type StreamLayer struct {
	// contains filtered or unexported fields
}

func NewStreamLayer

func NewStreamLayer(ln net.Listener, serverTLSConfig,
	peerTLSConfig *tls.Config,
) *StreamLayer

This defines the StreamLayer type and checks that it satisfies the raft.StreamLayer interface

func (*StreamLayer) Accept

func (s *StreamLayer) Accept() (net.Conn, error)

It is the mirror of Dial(). We accept the incoming connection and read the byte that identifies the connection and then create a server-side TLS connection

func (*StreamLayer) Addr

func (s *StreamLayer) Addr() net.Addr

Returns the listener's address

func (*StreamLayer) Close

func (s *StreamLayer) Close() error

Closes the listener

func (*StreamLayer) Dial

func (s *StreamLayer) Dial(addr raft.ServerAddress,
	timeout time.Duration,
) (net.Conn, error)

It makes outgoing connections to other servers in the Raft cluster.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL