Versions in this module Expand all Collapse all v0 v0.9.0 Mar 20, 2020 v0.0.1 Mar 14, 2020 Changes in this version + const StatusReap + var ErrInvalidArgument = errors.New("no logger set") + var ErrTopicExists = errors.New("topic exists already") + var OffsetsTopicName = "__consumer_offsets" + var OffsetsTopicNumPartitions = 50 + func NewBrokerLookup() *brokerLookup + func NewReplicaLookup() *replicaLookup + func TestJoin(t testing.T, s1 *Server, other ...*Server) + func WaitForLeader(t testing.T, servers ...*Server) (*Server, []*Server) + type Broker struct + func NewBroker(config *config.Config, tracer opentracing.Tracer) (*Broker, error) + func (b *Broker) JoinLAN(addrs ...string) protocol.Error + func (b *Broker) LANMembers() []serf.Member + func (b *Broker) Leave() error + func (b *Broker) Run(ctx context.Context, requests <-chan *Context, responses chan<- *Context) + func (b *Broker) Shutdown() error + type CommitLog interface + Append func([]byte) (int64, error) + Delete func() error + NewReader func(offset int64, maxBytes int32) (io.Reader, error) + NewestOffset func() int64 + OldestOffset func() int64 + Truncate func(int64) error + type Conn struct + func Dial(network, address string) (*Conn, error) + func DialContext(ctx context.Context, network, address string) (*Conn, error) + func NewConn(conn net.Conn, clientID string) (*Conn, error) + func (c *Conn) APIVersions(req *protocol.APIVersionsRequest) (*protocol.APIVersionsResponse, error) + func (c *Conn) AlterConfigs(req *protocol.AlterConfigsRequest) (*protocol.AlterConfigsResponse, error) + func (c *Conn) Close() error + func (c *Conn) ControlledShutdown(req *protocol.ControlledShutdownRequest) (*protocol.ControlledShutdownResponse, error) + func (c *Conn) CreateTopics(req *protocol.CreateTopicRequests) (*protocol.CreateTopicsResponse, error) + func (c *Conn) DeleteTopics(req *protocol.DeleteTopicsRequest) (*protocol.DeleteTopicsResponse, error) + func (c *Conn) DescribeConfigs(req *protocol.DescribeConfigsRequest) (*protocol.DescribeConfigsResponse, error) + func (c *Conn) DescribeGroups(req *protocol.DescribeGroupsRequest) (*protocol.DescribeGroupsResponse, error) + func (c *Conn) Fetch(req *protocol.FetchRequest) (*protocol.FetchResponse, error) + func (c *Conn) FindCoordinator(req *protocol.FindCoordinatorRequest) (*protocol.FindCoordinatorResponse, error) + func (c *Conn) Heartbeat(req *protocol.HeartbeatRequest) (*protocol.HeartbeatResponse, error) + func (c *Conn) JoinGroup(req *protocol.JoinGroupRequest) (*protocol.JoinGroupResponse, error) + func (c *Conn) LeaderAndISR(req *protocol.LeaderAndISRRequest) (*protocol.LeaderAndISRResponse, error) + func (c *Conn) LeaveGroup(req *protocol.LeaveGroupRequest) (*protocol.LeaveGroupResponse, error) + func (c *Conn) ListGroups(req *protocol.ListGroupsRequest) (*protocol.ListGroupsResponse, error) + func (c *Conn) LocalAddr() net.Addr + func (c *Conn) Metadata(req *protocol.MetadataRequest) (*protocol.MetadataResponse, error) + func (c *Conn) OffsetCommit(req *protocol.OffsetCommitRequest) (*protocol.OffsetCommitResponse, error) + func (c *Conn) OffsetFetch(req *protocol.OffsetFetchRequest) (*protocol.OffsetFetchResponse, error) + func (c *Conn) Offsets(req *protocol.OffsetsRequest) (*protocol.OffsetsResponse, error) + func (c *Conn) Produce(req *protocol.ProduceRequest) (*protocol.ProduceResponse, error) + func (c *Conn) Read(b []byte) (int, error) + func (c *Conn) RemoteAddr() net.Addr + func (c *Conn) SaslHandshake(req *protocol.SaslHandshakeRequest) (*protocol.SaslHandshakeResponse, error) + func (c *Conn) SetDeadline(t time.Time) error + func (c *Conn) SetReadDeadline(t time.Time) error + func (c *Conn) SetWriteDeadline(t time.Time) error + func (c *Conn) StopReplica(req *protocol.StopReplicaRequest) (*protocol.StopReplicaResponse, error) + func (c *Conn) SyncGroup(req *protocol.SyncGroupRequest) (*protocol.SyncGroupResponse, error) + func (c *Conn) UpdateMetadata(req *protocol.UpdateMetadataRequest) (*protocol.UpdateMetadataResponse, error) + func (c *Conn) Write(b []byte) (int, error) + type Context struct + func (c *Context) Header() *protocol.RequestHeader + func (ctx *Context) Deadline() (deadline time.Time, ok bool) + func (ctx *Context) Done() <-chan struct{} + func (ctx *Context) Err() error + func (ctx *Context) Request() interface{} + func (ctx *Context) Response() interface{} + func (ctx *Context) String() string + func (ctx *Context) Value(key interface{}) interface{} + type Counter = prometheus.Counter + type Dialer struct + ClientID string + Deadline time.Time + DualStack bool + FallbackDelay time.Duration + KeepAlive time.Duration + LocalAddr net.Addr + RemoteAddr net.Addr + Resolver Resolver + SASL *SASL + TLS *tls.Config + Timeout time.Duration + func NewDialer(clientID string) *Dialer + func (d *Dialer) Dial(network, address string) (*Conn, error) + func (d *Dialer) DialContext(ctx context.Context, network, address string) (*Conn, error) + type Handler interface + Leave func() error + Run func(context.Context, <-chan *Context, chan<- *Context) + Shutdown func() error + type Metrics struct + RequestsHandled Counter + type Replica struct + BrokerID int32 + Hw int64 + IsLocal bool + Leo int64 + Log CommitLog + Partition structs.Partition + Replicator *Replicator + func (r Replica) String() string + type Replicator struct + func NewReplicator(config ReplicatorConfig, replica *Replica, leader client) *Replicator + func (r *Replicator) Close() error + func (r *Replicator) Replicate() + type ReplicatorConfig struct + MaxWaitTime time.Duration + MinBytes int32 + type Resolver interface + LookupHost func(ctx context.Context, host string) ([]string, error) + type SASL struct + Pass string + User string + type Server struct + func NewServer(config *config.Config, handler Handler, metrics *Metrics, ...) *Server + func NewTestServer(t testing.T, cbBroker func(cfg *config.Config), ...) (*Server, string) + func (s *Server) Addr() net.Addr + func (s *Server) ID() int32 + func (s *Server) Leave() error + func (s *Server) Shutdown() error + func (s *Server) Start(ctx context.Context) error