jocko

package
v0.0.0-...-9613083 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2020 License: MIT Imports: 42 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// StatusReap is used to update the status of a node if we
	// are handling a EventMemberReap
	StatusReap = serf.MemberStatus(-1)
)

Variables

View Source
var (
	ErrTopicExists            = errors.New("topic exists already")
	ErrInvalidArgument        = errors.New("no logger set")
	OffsetsTopicName          = "__consumer_offsets"
	OffsetsTopicNumPartitions = 50
)

Functions

func NewBrokerLookup

func NewBrokerLookup() *brokerLookup

func NewReplicaLookup

func NewReplicaLookup() *replicaLookup

func TestJoin

func TestJoin(t testing.T, s1 *Server, other ...*Server)

func WaitForLeader

func WaitForLeader(t testing.T, servers ...*Server) (*Server, []*Server)

WaitForLeader waits for one of the servers to be leader, failing the test if no one is the leader. Returns the leader (if there is one) and non-leaders.

Types

type Broker

type Broker struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Broker represents a broker in a Jocko cluster, like a broker in a Kafka cluster.

func NewBroker

func NewBroker(config *config.Config, tracer opentracing.Tracer) (*Broker, error)

New is used to instantiate a new broker.

func (*Broker) JoinLAN

func (b *Broker) JoinLAN(addrs ...string) protocol.Error

Join is used to have the broker join the gossip ring. The given address should be another broker listening on the Serf address.

func (*Broker) LANMembers

func (b *Broker) LANMembers() []serf.Member

func (*Broker) Leave

func (b *Broker) Leave() error

Leave is used to prepare for a graceful shutdown.

func (*Broker) Run

func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses chan<- *Context)

Run starts a loop to handle requests send back responses.

func (*Broker) Shutdown

func (b *Broker) Shutdown() error

Shutdown is used to shutdown the broker, its serf, its raft, and so on.

type CommitLog

type CommitLog interface {
	Delete() error
	NewReader(offset int64, maxBytes int32) (io.Reader, error)
	Truncate(int64) error
	NewestOffset() int64
	OldestOffset() int64
	Append([]byte) (int64, error)
}

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn implemenets net.Conn for connections to Jocko brokers. It's used as an internal client for replication fetches and leader and ISR requests.

func Dial

func Dial(network, address string) (*Conn, error)

Dial creates a connection to the broker on the given network and address on the default dialer.

func DialContext

func DialContext(ctx context.Context, network, address string) (*Conn, error)

func NewConn

func NewConn(conn net.Conn, clientID string) (*Conn, error)

NewConn creates a new *Conn.

func (*Conn) APIVersions

APIVersions sends an api version request and returns the response.

func (*Conn) AlterConfigs

AlterConfigs sends an alter configs request and returns the response.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the connection.

func (*Conn) ControlledShutdown

ControlledShutdown sends a controlled shutdown request and returns the response.

func (*Conn) CreateTopics

CreateTopics sends a create topics request and returns the response.

func (*Conn) DeleteTopics

DeleteTopics sends a delete topic request and returns the response.

func (*Conn) DescribeConfigs

DescribeConfigs sends an describe configs request and returns the response.

func (*Conn) DescribeGroups

DescribeGroups sends a describe groups request and returns the response.

func (*Conn) Fetch

Fetch sends a fetch request and returns the response.

func (*Conn) FindCoordinator

FindCoordinator sends a find coordinator request and returns the response.

func (*Conn) Heartbeat

Heartbeat sends a heartbeat request and returns the response.

func (*Conn) JoinGroup

JoinGroup sends a join group request and returns the response.

func (*Conn) LeaderAndISR

LeaderAndISR sends a leader and ISR request and returns the response.

func (*Conn) LeaveGroup

LeaveGroup sends a leave group request and returns the response.

func (*Conn) ListGroups

ListGroups sends a list groups request and returns the response.

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Conn) Metadata

Metadata sends a metadata request and returns the response.

func (*Conn) OffsetCommit

OffsetCommit sends an offset commit and returns the response.

func (*Conn) OffsetFetch

OffsetFetch sends an offset fetch and returns the response.

func (*Conn) Offsets

Offsets sends an offsets request and returns the response.

func (*Conn) Produce

Produce sends a produce request and returns the response.

func (*Conn) Read

func (c *Conn) Read(b []byte) (int, error)

Read implements the Conn Read method. Don't use it.

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

func (*Conn) SaslHandshake

SaslHandshake sends a sasl handshake request and returns the response.

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both SetReadDeadline and SetWriteDeadline. See net.Conn SetDeadline.

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.

func (*Conn) StopReplica

StopReplica sends a stop replica request and returns the response.

func (*Conn) SyncGroup

SyncGroup sends a sync group request and returns the response.

func (*Conn) UpdateMetadata

UpdateMetadata sends an update metadata request and returns the response.

func (*Conn) Write

func (c *Conn) Write(b []byte) (int, error)

Write implements the Conn Write method. Don't use it.

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Deadline

func (ctx *Context) Deadline() (deadline time.Time, ok bool)

func (*Context) Done

func (ctx *Context) Done() <-chan struct{}

func (*Context) Err

func (ctx *Context) Err() error

func (*Context) Header

func (c *Context) Header() *protocol.RequestHeader

func (*Context) Request

func (ctx *Context) Request() interface{}

func (*Context) Response

func (ctx *Context) Response() interface{}

func (*Context) String

func (ctx *Context) String() string

func (*Context) Value

func (ctx *Context) Value(key interface{}) interface{}

type Counter

type Counter = prometheus.Counter

Alias prometheus' counter, probably only need to use Inc() though.

type Dialer

type Dialer struct {
	// Unique ID for client connections established by this Dialer.
	ClientID string
	// Timeout is the max duration a dial will wait for a connect to complete.
	Timeout time.Duration
	// Deadline is the absolute time after which the dial will fail. Zero is no deadline.
	Deadline time.Time
	// LocalAddr is the local address to dial.
	LocalAddr net.Addr
	// RemoteAddr is the remote address to dial.
	RemoteAddr net.Addr
	// KeepAlive is the keep-alive period for a network connection.
	KeepAlive time.Duration
	// FallbackDelay is the duration to wait before spawning a fallback connection. If 0, default duration is 300ms.
	FallbackDelay time.Duration
	// Resolver species an alternative resolver to use.
	Resolver Resolver
	// TLS enables the Dialer to secure connections. If nil, standard net.Conn is used.
	TLS *tls.Config
	// DualStack enables RFC 6555-compliant "happy eyeballs" dialing.
	DualStack bool
	// SASL enables SASL plain authentication.
	SASL *SASL
}

Dialer is like the net.Dialer API but for opening connections to Jocko brokers.

func NewDialer

func NewDialer(clientID string) *Dialer

NewDialer creates a new dialer.

func (*Dialer) Dial

func (d *Dialer) Dial(network, address string) (*Conn, error)

Dial creates a connection to the broker on the given network and address.

func (*Dialer) DialContext

func (d *Dialer) DialContext(ctx context.Context, network, address string) (*Conn, error)

type Handler

type Handler interface {
	Run(context.Context, <-chan *Context, chan<- *Context)
	Leave() error
	Shutdown() error
}

Broker is the interface that wraps the Broker's methods.

type Metrics

type Metrics struct {
	RequestsHandled Counter
}

Metrics is used for tracking metrics.

type Replica

type Replica struct {
	BrokerID   int32
	Partition  structs.Partition
	IsLocal    bool
	Log        CommitLog
	Hw         int64
	Leo        int64
	Replicator *Replicator
	sync.Mutex
}

Replica

func (Replica) String

func (r Replica) String() string

type Replicator

type Replicator struct {
	// contains filtered or unexported fields
}

Replicator fetches from the partition's leader producing to itself the follower, thereby replicating the partition.

func NewReplicator

func NewReplicator(config ReplicatorConfig, replica *Replica, leader client) *Replicator

NewReplicator returns a new replicator instance.

func (*Replicator) Close

func (r *Replicator) Close() error

Close the replicator object when we are no longer following

func (*Replicator) Replicate

func (r *Replicator) Replicate()

Replicate start fetching messages from the leader and appending them to the local commit log.

type ReplicatorConfig

type ReplicatorConfig struct {
	MinBytes int32
	// todo: make this a time.Duration
	MaxWaitTime time.Duration
}

type Resolver

type Resolver interface {
	// LookupHost looks up the given host using the local resolver.
	LookupHost(ctx context.Context, host string) ([]string, error)
}

Resolver provides service discovery of the hosts of a kafka cluster.

type SASL

type SASL struct {
	User, Pass string
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is used to handle the TCP connections, decode requests, defer to the broker, and encode the responses.

func NewServer

func NewServer(config *config.Config, handler Handler, metrics *Metrics, tracer opentracing.Tracer, close func() error) *Server

func NewTestServer

func NewTestServer(t testing.T, cbBroker func(cfg *config.Config), cbServer func(cfg *config.Config)) (*Server, string)

func (*Server) Addr

func (s *Server) Addr() net.Addr

Addr returns the address on which the Server is listening

func (*Server) ID

func (s *Server) ID() int32

func (*Server) Leave

func (s *Server) Leave() error

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown closes the service.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start starts the service.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL