proto

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

Package proto provides kafka binary protocol implementation.

Index

Constants

View Source
const (
	ProduceReqKind          = 0
	FetchReqKind            = 1
	OffsetReqKind           = 2
	MetadataReqKind         = 3
	OffsetCommitReqKind     = 8
	OffsetFetchReqKind      = 9
	GroupCoordinatorReqKind = 10

	// receive the latest offset (i.e. the offset of the next coming message)
	OffsetReqTimeLatest = -1

	// receive the earliest available offset. Note that because offsets are
	// pulled in descending order, asking for the earliest offset will always
	// return you a single element.
	OffsetReqTimeEarliest = -2

	// Server will not send any response.
	RequiredAcksNone = 0

	// Server will block until the message is committed by all in sync replicas
	// before sending a response.
	RequiredAcksAll = -1

	// Server will wait the data is written to the local log before sending a
	// response.
	RequiredAcksLocal = 1
)

Variables

View Source
var (
	ErrUnknown                                 = &KafkaError{-1, "unknown error"}
	ErrOffsetOutOfRange                        = &KafkaError{1, "offset out of range"}
	ErrInvalidMessage                          = &KafkaError{2, "invalid message"}
	ErrUnknownTopicOrPartition                 = &KafkaError{3, "[transient] unknown topic or partition"}
	ErrInvalidMessageSize                      = &KafkaError{4, "invalid message size"}
	ErrLeaderNotAvailable                      = &KafkaError{5, "[transient] leader not available"}
	ErrNotLeaderForPartition                   = &KafkaError{6, "[transient] not leader for partition"}
	ErrRequestTimeout                          = &KafkaError{7, "[transient] request timed out"}
	ErrBrokerNotAvailable                      = &KafkaError{8, "broker not available"}
	ErrReplicaNotAvailable                     = &KafkaError{9, "replica not available"}
	ErrMessageSizeTooLarge                     = &KafkaError{10, "message size too large"}
	ErrScaleControllerEpoch                    = &KafkaError{11, "scale controller epoch"}
	ErrOffsetMetadataTooLarge                  = &KafkaError{12, "offset metadata too large"}
	ErrOffsetLoadInProgress                    = &KafkaError{14, "[transient] offsets load in progress"}
	ErrNoCoordinator                           = &KafkaError{15, "[transient] consumer coordinator not available"}
	ErrNotCoordinator                          = &KafkaError{16, "[transient] not coordinator for consumer"}
	ErrInvalidTopic                            = &KafkaError{17, "operation on an invalid topic"}
	ErrRecordListTooLarge                      = &KafkaError{18, "message batch larger than the configured segment size"}
	ErrNotEnoughReplicas                       = &KafkaError{19, "[transient] not enough in-sync replicas"}
	ErrNotEnoughReplicasAfterAppend            = &KafkaError{20, "[transient] messages are written to the log, but to fewer in-sync replicas than required"}
	ErrInvalidRequiredAcks                     = &KafkaError{21, "invalid value for required acks"}
	ErrIllegalGeneration                       = &KafkaError{22, "consumer generation id is not valid"}
	ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"}
	ErrUnknownParititonAssignmentStrategy      = &KafkaError{24, "partition assignment strategy is unknown to the broker"}
	ErrUnknownConsumerID                       = &KafkaError{25, "coordinator is not aware of this consumer"}
	ErrInvalidSessionTimeout                   = &KafkaError{26, "invalid session timeout"}
	ErrCommitingParitionsNotAssigned           = &KafkaError{27, "committing partitions are not assigned the committer"}
	ErrInvalidCommitOffsetSize                 = &KafkaError{28, "offset data size is not valid"}
	ErrAuthorizationFailed                     = &KafkaError{29, "not authorized"}
	ErrRebalanceInProgress                     = &KafkaError{30, "group is rebalancing, rejoin is needed"}
)
View Source
var ErrNotEnoughData = errors.New("not enough data")

Functions

func ComputeCrc

func ComputeCrc(m *Message, compression Compression) uint32

ComputeCrc returns crc32 hash for given message content.

func NewDecoder

func NewDecoder(r io.Reader) *decoder

func NewEncoder

func NewEncoder(w io.Writer) *encoder

func ReadReq

func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)

ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.

func ReadResp

func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)

ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.

Types

type Compression

type Compression int8
const (
	CompressionNone   Compression = 0
	CompressionGzip   Compression = 1
	CompressionSnappy Compression = 2
)

type FetchReq

type FetchReq struct {
	CorrelationID int32
	ClientID      string
	MaxWaitTime   time.Duration
	MinBytes      int32

	Topics []FetchReqTopic
}

func ReadFetchReq

func ReadFetchReq(r io.Reader) (*FetchReq, error)

func (*FetchReq) Bytes

func (r *FetchReq) Bytes() ([]byte, error)

func (*FetchReq) WriteTo

func (r *FetchReq) WriteTo(w io.Writer) (int64, error)

type FetchReqPartition

type FetchReqPartition struct {
	ID          int32
	FetchOffset int64
	MaxBytes    int32
}

type FetchReqTopic

type FetchReqTopic struct {
	Name       string
	Partitions []FetchReqPartition
}

type FetchResp

type FetchResp struct {
	CorrelationID int32
	Topics        []FetchRespTopic
}

func ReadFetchResp

func ReadFetchResp(r io.Reader) (*FetchResp, error)

func (*FetchResp) Bytes

func (r *FetchResp) Bytes() ([]byte, error)

type FetchRespPartition

type FetchRespPartition struct {
	ID        int32
	Err       error
	TipOffset int64
	Messages  []*Message
}

type FetchRespTopic

type FetchRespTopic struct {
	Name       string
	Partitions []FetchRespPartition
}

type GroupCoordinatorReq

type GroupCoordinatorReq struct {
	CorrelationID int32
	ClientID      string
	ConsumerGroup string
}

func ReadGroupCoordinatorReq

func ReadGroupCoordinatorReq(r io.Reader) (*GroupCoordinatorReq, error)

func (*GroupCoordinatorReq) Bytes

func (r *GroupCoordinatorReq) Bytes() ([]byte, error)

func (*GroupCoordinatorReq) WriteTo

func (r *GroupCoordinatorReq) WriteTo(w io.Writer) (int64, error)

type GroupCoordinatorResp

type GroupCoordinatorResp struct {
	CorrelationID   int32
	Err             error
	CoordinatorID   int32
	CoordinatorHost string
	CoordinatorPort int32
}

func ReadGroupCoordinatorResp

func ReadGroupCoordinatorResp(r io.Reader) (*GroupCoordinatorResp, error)

func (*GroupCoordinatorResp) Bytes

func (r *GroupCoordinatorResp) Bytes() ([]byte, error)

type KafkaError

type KafkaError struct {
	// contains filtered or unexported fields
}

func (*KafkaError) Errno

func (err *KafkaError) Errno() int

func (*KafkaError) Error

func (err *KafkaError) Error() string

type Message

type Message struct {
	Key       []byte
	Value     []byte
	Offset    int64  // set when fetching and after successful producing
	Crc       uint32 // set when fetching, ignored when producing
	Topic     string // set when fetching, ignored when producing
	Partition int32  // set when fetching, ignored when producing
	TipOffset int64  // set when fetching, ignored when processing
}

Message represents single entity of message set.

type MetadataReq

type MetadataReq struct {
	CorrelationID int32
	ClientID      string
	Topics        []string
}

func ReadMetadataReq

func ReadMetadataReq(r io.Reader) (*MetadataReq, error)

func (*MetadataReq) Bytes

func (r *MetadataReq) Bytes() ([]byte, error)

func (*MetadataReq) WriteTo

func (r *MetadataReq) WriteTo(w io.Writer) (int64, error)

type MetadataResp

type MetadataResp struct {
	CorrelationID int32
	Brokers       []MetadataRespBroker
	Topics        []MetadataRespTopic
}

func ReadMetadataResp

func ReadMetadataResp(r io.Reader) (*MetadataResp, error)

func (*MetadataResp) Bytes

func (r *MetadataResp) Bytes() ([]byte, error)

type MetadataRespBroker

type MetadataRespBroker struct {
	NodeID int32
	Host   string
	Port   int32
}

type MetadataRespPartition

type MetadataRespPartition struct {
	ID       int32
	Err      error
	Leader   int32
	Replicas []int32
	Isrs     []int32
}

type MetadataRespTopic

type MetadataRespTopic struct {
	Name       string
	Err        error
	Partitions []MetadataRespPartition
}

type OffsetCommitReq

type OffsetCommitReq struct {
	CorrelationID int32
	ClientID      string
	ConsumerGroup string
	Topics        []OffsetCommitReqTopic
}

func ReadOffsetCommitReq

func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)

func (*OffsetCommitReq) Bytes

func (r *OffsetCommitReq) Bytes() ([]byte, error)

func (*OffsetCommitReq) WriteTo

func (r *OffsetCommitReq) WriteTo(w io.Writer) (int64, error)

type OffsetCommitReqPartition

type OffsetCommitReqPartition struct {
	ID        int32
	Offset    int64
	TimeStamp time.Time
	Metadata  string
}

type OffsetCommitReqTopic

type OffsetCommitReqTopic struct {
	Name       string
	Partitions []OffsetCommitReqPartition
}

type OffsetCommitResp

type OffsetCommitResp struct {
	CorrelationID int32
	Topics        []OffsetCommitRespTopic
}

func ReadOffsetCommitResp

func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)

func (*OffsetCommitResp) Bytes

func (r *OffsetCommitResp) Bytes() ([]byte, error)

type OffsetCommitRespPartition

type OffsetCommitRespPartition struct {
	ID  int32
	Err error
}

type OffsetCommitRespTopic

type OffsetCommitRespTopic struct {
	Name       string
	Partitions []OffsetCommitRespPartition
}

type OffsetFetchReq

type OffsetFetchReq struct {
	CorrelationID int32
	ClientID      string
	ConsumerGroup string
	Topics        []OffsetFetchReqTopic
}

func ReadOffsetFetchReq

func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)

func (*OffsetFetchReq) Bytes

func (r *OffsetFetchReq) Bytes() ([]byte, error)

func (*OffsetFetchReq) WriteTo

func (r *OffsetFetchReq) WriteTo(w io.Writer) (int64, error)

type OffsetFetchReqTopic

type OffsetFetchReqTopic struct {
	Name       string
	Partitions []int32
}

type OffsetFetchResp

type OffsetFetchResp struct {
	CorrelationID int32
	Topics        []OffsetFetchRespTopic
}

func ReadOffsetFetchResp

func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)

func (*OffsetFetchResp) Bytes

func (r *OffsetFetchResp) Bytes() ([]byte, error)

type OffsetFetchRespPartition

type OffsetFetchRespPartition struct {
	ID       int32
	Offset   int64
	Metadata string
	Err      error
}

type OffsetFetchRespTopic

type OffsetFetchRespTopic struct {
	Name       string
	Partitions []OffsetFetchRespPartition
}

type OffsetReq

type OffsetReq struct {
	CorrelationID int32
	ClientID      string
	ReplicaID     int32
	Topics        []OffsetReqTopic
}

func ReadOffsetReq

func ReadOffsetReq(r io.Reader) (*OffsetReq, error)

func (*OffsetReq) Bytes

func (r *OffsetReq) Bytes() ([]byte, error)

func (*OffsetReq) WriteTo

func (r *OffsetReq) WriteTo(w io.Writer) (int64, error)

type OffsetReqPartition

type OffsetReqPartition struct {
	ID         int32
	TimeMs     int64 // cannot be time.Time because of negative values
	MaxOffsets int32
}

type OffsetReqTopic

type OffsetReqTopic struct {
	Name       string
	Partitions []OffsetReqPartition
}

type OffsetResp

type OffsetResp struct {
	CorrelationID int32
	Topics        []OffsetRespTopic
}

func ReadOffsetResp

func ReadOffsetResp(r io.Reader) (*OffsetResp, error)

func (*OffsetResp) Bytes

func (r *OffsetResp) Bytes() ([]byte, error)

type OffsetRespPartition

type OffsetRespPartition struct {
	ID      int32
	Err     error
	Offsets []int64
}

type OffsetRespTopic

type OffsetRespTopic struct {
	Name       string
	Partitions []OffsetRespPartition
}

type ProduceReq

type ProduceReq struct {
	CorrelationID int32
	ClientID      string
	Compression   Compression // only used when sending ProduceReqs
	RequiredAcks  int16
	Timeout       time.Duration
	Topics        []ProduceReqTopic
}

func ReadProduceReq

func ReadProduceReq(r io.Reader) (*ProduceReq, error)

func (*ProduceReq) Bytes

func (r *ProduceReq) Bytes() ([]byte, error)

func (*ProduceReq) WriteTo

func (r *ProduceReq) WriteTo(w io.Writer) (int64, error)

type ProduceReqPartition

type ProduceReqPartition struct {
	ID       int32
	Messages []*Message
}

type ProduceReqTopic

type ProduceReqTopic struct {
	Name       string
	Partitions []ProduceReqPartition
}

type ProduceResp

type ProduceResp struct {
	CorrelationID int32
	Topics        []ProduceRespTopic
}

func ReadProduceResp

func ReadProduceResp(r io.Reader) (*ProduceResp, error)

func (*ProduceResp) Bytes

func (r *ProduceResp) Bytes() ([]byte, error)

type ProduceRespPartition

type ProduceRespPartition struct {
	ID     int32
	Err    error
	Offset int64
}

type ProduceRespTopic

type ProduceRespTopic struct {
	Name       string
	Partitions []ProduceRespPartition
}

type Request

type Request interface {
	WriteTo(io.Writer) (int64, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL