Version: v0.4.3 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2020 License: MIT Imports: 17 Imported by: 0




View Source
const (
	ErrCorrupted = Error("corrupted")
	ErrTruncated = Error("truncated")


This section is empty.


func Register

func Register(req, res Message)

Register is automatically called by sub-packages are imported to install a new pair of request/response message types.

func WriteRequest

func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error

func WriteResponse

func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error


type ApiKey

type ApiKey int16
const (
	Produce                     ApiKey = 0
	Fetch                       ApiKey = 1
	ListOffsets                 ApiKey = 2
	Metadata                    ApiKey = 3
	LeaderAndIsr                ApiKey = 4
	StopReplica                 ApiKey = 5
	UpdateMetadata              ApiKey = 6
	ControlledShutdown          ApiKey = 7
	OffsetCommit                ApiKey = 8
	OffsetFetch                 ApiKey = 9
	FindCoordinator             ApiKey = 10
	JoinGroup                   ApiKey = 11
	Heartbeat                   ApiKey = 12
	LeaveGroup                  ApiKey = 13
	SyncGroup                   ApiKey = 14
	DescribeGroups              ApiKey = 15
	ListGroups                  ApiKey = 16
	SaslHandshake               ApiKey = 17
	ApiVersions                 ApiKey = 18
	CreateTopics                ApiKey = 19
	DeleteTopics                ApiKey = 20
	DeleteRecords               ApiKey = 21
	InitProducerId              ApiKey = 22
	OffsetForLeaderEpoch        ApiKey = 23
	AddPartitionsToTxn          ApiKey = 24
	AddOffsetsToTxn             ApiKey = 25
	EndTxn                      ApiKey = 26
	WriteTxnMarkers             ApiKey = 27
	TxnOffsetCommit             ApiKey = 28
	DescribeAcls                ApiKey = 29
	CreateAcls                  ApiKey = 30
	DeleteAcls                  ApiKey = 31
	DescribeConfigs             ApiKey = 32
	AlterConfigs                ApiKey = 33
	AlterReplicaLogDirs         ApiKey = 34
	DescribeLogDirs             ApiKey = 35
	SaslAuthenticate            ApiKey = 36
	CreatePartitions            ApiKey = 37
	CreateDelegationToken       ApiKey = 38
	RenewDelegationToken        ApiKey = 39
	ExpireDelegationToken       ApiKey = 40
	DescribeDelegationToken     ApiKey = 41
	DeleteGroups                ApiKey = 42
	ElectLeaders                ApiKey = 43
	IncrementalAlterConfigs     ApiKey = 44
	AlterPartitionReassignments ApiKey = 45
	ListPartitionReassignments  ApiKey = 46
	OffsetDelete                ApiKey = 47

func (ApiKey) MaxVersion

func (k ApiKey) MaxVersion() int16

func (ApiKey) MinVersion

func (k ApiKey) MinVersion() int16

func (ApiKey) SelectVersion

func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16

func (ApiKey) String

func (k ApiKey) String() string

type Attributes

type Attributes int16

Attributes is a bitset representing special attributes set on records.

const (
	Gzip          Attributes = Attributes(compress.Gzip)   // 1
	Snappy        Attributes = Attributes(compress.Snappy) // 2
	Lz4           Attributes = Attributes(compress.Lz4)    // 3
	Zstd          Attributes = Attributes(compress.Zstd)   // 4
	Transactional Attributes = 1 << 4
	ControlBatch  Attributes = 1 << 5

func (Attributes) Compression

func (a Attributes) Compression() compress.Compression

func (Attributes) ControlBatch

func (a Attributes) ControlBatch() bool

func (Attributes) Transactional

func (a Attributes) Transactional() bool

type Broker

type Broker struct {
	Rack string
	Host string
	Port int
	ID   int

func (Broker) Network

func (b Broker) Network() string

func (Broker) String

func (b Broker) String() string

type BrokerMessage

type BrokerMessage interface {
	// Given a representation of the kafka cluster state as argument, returns
	// the broker that the message should be routed to.
	Broker(Cluster) (Broker, error)

BrokerMessage is an extension of the Message interface implemented by some request types to customize the broker assignment logic.

type ByteSequence

type ByteSequence interface {
	Size() int64

ByteSequence is an interface implemented by types that represent immutable sequences of bytes.

ByteSequence values are used to abstract the location where record keys and values are read from (e.g. in-memory buffers, network sockets, files).

The Close method should be called to release resources held by the sequence when the program is done with it.

ByteSequence values are generally not safe to use concurrently from multiple goroutines.

func Bytes

func Bytes(b []byte) ByteSequence

Bytes constructs a ByteSequence which exposes the content of b.

func String

func String(s string) ByteSequence

String constructs a ByteSequence which exposes the content of s.

type Cluster

type Cluster struct {
	ClusterID  string
	Controller int
	Brokers    map[int]Broker
	Topics     map[string]Topic

func (*Cluster) IsZero

func (c *Cluster) IsZero() bool

type Error

type Error string

Error is a string type implementing the error interface and used to declare constants representing recoverable protocol errors.

func (Error) Error

func (e Error) Error() string

type GroupMessage

type GroupMessage interface {
	// Returns the group configured on the message.
	Group() string

GroupMessage is an extension of the Message interface implemented by some request types to inform the program that they should be routed to a group coordinator.

type Header struct {
	Key   string
	Value []byte

Header represents a single entry in a list of record headers.

type Mapper

type Mapper interface {
	// For a given cluster layout, returns the list of messages constructed
	// from the receiver for each requests that should be sent to the cluster.
	// The second return value is a Reducer which can be used to merge back the
	// results of each request into a single message (or an error).
	Map(Cluster) ([]Message, Reducer, error)

Mapper is an interface implemented by messages that can be split into multiple requests and have their results merged back by a Reducer.

type Message

type Message interface {
	ApiKey() ApiKey

Message is an interface implemented by all request and response types of the kafka protocol.

This interface is used mostly as a safe-guard to provide a compile-time check for values passed to functions dealing kafka message types.

func ReadRequest

func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error)

func ReadResponse

func ReadResponse(r io.Reader, apiKey, apiVersion int16) (correlationID int32, msg Message, err error)

func Result

func Result(r interface{}) (Message, error)

Result converts r to a Message or and error, or panics if r could be be converted to these types.

func RoundTrip

func RoundTrip(rw io.ReadWriter, apiVersion int16, clientID string, msg Message) (Message, error)

RoundTrip sends a request to a kafka broker and returns the response.

The function expects that there were no other concurrent requests served by the connection wrapped by rw, and therefore uses a zero correlation ID.

type Partition

type Partition struct {
	ID     int
	Error  int
	Leader int

type PreparedMessage

type PreparedMessage interface {
	// Prepares the message before being sent to a kafka broker using the API
	// version passed as argument.
	Prepare(apiVersion int16)

PreparedMessage is an extension of the Message interface implemented by some request types which may need to run some pre-processing on their state before being sent.

type Record

type Record struct {
	Offset  int64
	Time    time.Time
	Key     ByteSequence
	Value   ByteSequence
	Headers []Header

Record values represent single records exchanged in Produce requests and Fetch responses.

func (*Record) Close

func (r *Record) Close() error

Close closes both the key and value of the record.

type RecordSet

type RecordSet struct {
	// The message version that this record set will be represented as, valid
	// values are 1, or 2.
	Version int8
	// The following fields carry properties used when representing the record
	// batch in version 2.
	Attributes           Attributes
	PartitionLeaderEpoch int32
	BaseOffset           int64
	ProducerID           int64
	ProducerEpoch        int16
	BaseSequence         int32
	// The list of records contained in this set.
	Records []Record

RecordSet represents a sequence of records in Produce requests and Fetch responses. All v0, v1, and v2 formats are supported.

func (*RecordSet) Close

func (rs *RecordSet) Close() error

Close closes all records of rs.

func (*RecordSet) ReadFrom

func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads the representation of a record set from r into rs, returning the number of bytes consumed from r, and an non-nil error if the record set could not be read.

func (*RecordSet) WriteTo

func (rs *RecordSet) WriteTo(w io.Writer) (int64, error)

WriteTo writes the representation of rs into w. The value of rs.Version dictates which format that the record set will be represented as.

Note: since this package is only compatible with kafka 0.10 and above, the method never produces messages in version 0. If rs.Version is zero, the method defaults to producing messages in version 1.

type Reducer

type Reducer interface {
	// Given a list of message and associated results, merge them back into a
	// response (or an error). The results must be either Message or error
	// values, other types should trigger a panic.
	Reduce(messages []Message, results []interface{}) (Message, error)

Reducer is an interface implemented by messages which can merge multiple results into one response.

type Topic

type Topic struct {
	Name       string
	Error      int
	Partitions map[int]Partition

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL