kafka

package
v2.1.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2019 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const OffsetNewest = -1

OffsetNewest lets consumers retrieve the newest possible message. It must be the same value as defined in sarama.

View Source
const OffsetOldest = -2

OffsetOldest lets consumers retrieve the oldest possible message. It must be the same value as defined in sarama.

Variables

View Source
var (
	ErrInvalidPartitionCount = errors.New("cannot reduce number of partitions")
	ErrSamePartitionCount    = errors.New("topic has the same partition count")
	ErrInvalidTopic          = errors.New("topic is invalid")
)
View Source
var (
	ErrStreamExitTimedout = errors.New("reader stream is closed based on timeout setting")
	ErrInvalidMessageType = errors.New("reader stream received incompatible message type")
)
View Source
var ConsumerGroupOption messaging.OptionCreator = &consumerOptions{}

ConsumerGroupOption is the default for Consumer Group. In this configuration, partition and offset are ignored since they are automatically managed by kafka

Functions

func GetDefaultConfig

func GetDefaultConfig() (*sarama.Config, *cluster.Config)

GetDefaultConfig returns default specific config

func NewConsumerOption

func NewConsumerOption(offset int64) messaging.OptionCreator

NewConsumerOption specifies consumer policies. Pass in either OffsetOldest, OffsetNewest, or specific offset that you want to consumer from

func NewMessage

func NewMessage(key string, payload interface{}) (messaging.Messager, error)

NewMessage creates message that is publishable. Client should pass in a JSON Object that has been marshalled into []byte as payload. Otherwise, any other input types will be converted to binary via gob

func NewProducer

func NewProducer(topic string, config *sarama.Config, brokers ...string) (messaging.Producer, error)

NewProducer initializes a new client for publishing messages

func NewStreamReader

func NewStreamReader(c messaging.Consumer, options messaging.OptionCreator) (messaging.StreamReader, error)

NewStreamReader creates a stream reader by wrapping a kafka consumer

func NewStreamWriter

func NewStreamWriter(p messaging.Producer, key string) (messaging.StreamWriter, error)

NewStreamWriter creates a stream writer by wrapping a kafka producer

func Producer

func Producer(topic string, strategy Strategy, brokers ...string) (messaging.Producer, error)

Producer initializes a default producer client for publishing messages

func SetLogger

func SetLogger(log *log.Logger)

SetLogger turns on sarama log

Types

type ClientOption

type ClientOption func(*KafkaConsumer)

func WithBrokers

func WithBrokers(brokers ...string) ClientOption

WithBrokers specifies brokers for kafka consumer

func WithDisableAutoMark

func WithDisableAutoMark() ClientOption

WithDisableAutoMark gives clients the ability to turn off auto-marking which means clients are responsible to mark the offset themselves This is useful when clients want to retry certain message they fail to process

func WithInitialOffset

func WithInitialOffset(offset int64) ClientOption

WithInitialOffset specifies the initial offset to use if no offset was previously committed.

type Consumer

type Consumer interface {
	messaging.Consumer
	MarkOffset(messaging.Event, string) error
}

Consumer defines interface for kafka consumer

type CreateTopicOptions

type CreateTopicOptions struct {
	NumPartitions     int32
	ReplicationFactor int16
	ReplicaAssignment map[int32][]int32
	ConfigEntries     map[string]*string
	Timeout           time.Duration
}

CreateTopicOptions is an options that will be applied to topic creation. The properties are idential to sarama.TopicDetail

func (CreateTopicOptions) Options

func (c CreateTopicOptions) Options() interface{}

Options returns the compatible options for creating topics

type GroupInfo

type GroupInfo map[int32]*PartitionInfo

GroupInfo is a map of PartitionInfo

type KafkaConsumer

type KafkaConsumer struct {
	*sync.Mutex
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(topic, groupID string, opts ...ClientOption) (*KafkaConsumer, error)

NewConsumer initializes a default consumer client for consuming messages. This function uses consumer group and all partitions will be load balanced

func NewConsumerFromPartition

func NewConsumerFromPartition(topic string, partition int, opts ...ClientOption) (*KafkaConsumer, error)

NewConsumerFromPartition initializes a default consumer client for consuming messages

func (*KafkaConsumer) Close

func (c *KafkaConsumer) Close() error

func (*KafkaConsumer) Consume

func (c *KafkaConsumer) Consume(ctx context.Context, opts messaging.OptionCreator) (<-chan messaging.Event, error)

func (*KafkaConsumer) MarkOffset

func (consumer *KafkaConsumer) MarkOffset(msg messaging.Event, metadata string) error

MarkOffset lets clients mark offset manually after they process each message

type KafkaManager

type KafkaManager struct {
	// contains filtered or unexported fields
}

func Manager

func Manager(hosts ...string) (*KafkaManager, error)

Manager creates a simple Kafka Manager with default config to perform administrative tasks

func (*KafkaManager) AddPartitions

func (m *KafkaManager) AddPartitions(_ context.Context, req TopicPartitionRequest) error

AddPartitions increases the partition count for a set of topics

func (*KafkaManager) Close

func (m *KafkaManager) Close() error

func (*KafkaManager) CreateTopics

func (m *KafkaManager) CreateTopics(_ context.Context, opts messaging.OptionCreator, topics ...string) error

func (*KafkaManager) DeleteConsumerGroups

func (m *KafkaManager) DeleteConsumerGroups(withRefresh bool, groups ...string) error

DeleteConsumerGroups removes consumer groups from kafka brokers. Error will be thrown if consumer groups have active consumer(s).

func (*KafkaManager) DeleteTopics

func (m *KafkaManager) DeleteTopics(_ context.Context, topics ...string) error

func (*KafkaManager) GetPartitionInfo

func (m *KafkaManager) GetPartitionInfo(topic, consumerGroup string, withRefresh bool) ([]*PartitionInfoContainer, error)

GetPartitionInfo retrieves information for all partitions that are associated with the given consumer_group:topic non-exisiting topic will be created automatically.

func (*KafkaManager) LatestOffset

func (m *KafkaManager) LatestOffset(topic string, partition int32, timeInMs int64) (int64, error)

func (*KafkaManager) ListTopics

func (m *KafkaManager) ListTopics(_ context.Context) (interface{}, error)

func (*KafkaManager) ListTopicsLite

func (m *KafkaManager) ListTopicsLite(_ context.Context) ([]string, []string, error)

ListTopicsLite is a fast version of ListTopics. It returns a list of topics and a list of consumer groups for all brokers.

func (*KafkaManager) Partitions

func (m *KafkaManager) Partitions(topic string) ([]int32, error)

type ListTopicsResponse

type ListTopicsResponse map[string]TopicInfo

ListTopicsResponse is a map of TopicInfo

type Message

type Message struct {
	Value     []byte
	Key       []byte
	Offset    int64
	Partition int32
	Time      time.Time
	Topic     string
}

Message is a data structure representing kafka messages

type PartitionInfo

type PartitionInfo struct {
	Start  int64
	End    int64
	Offset int64
	Lag    int64
}

PartitionInfo contains metadata for a given partition

type PartitionInfoContainer

type PartitionInfoContainer struct {
	Topic     string
	GroupID   string
	Partition int32
	*PartitionInfo
}

type Strategy

type Strategy string

Strategy is a type of routing rule

const (
	// StrategyRoundRobin distributes writes evenly
	StrategyRoundRobin Strategy = "RoundRobin"
	// StrategyLeastBytes distributes writes to nodes with least amount of traffic
	StrategyLeastBytes Strategy = "LeastBytes"
	// StrategyHash distributes writes based on 32-bit FNV-1 Hash function. This
	// guarantees messages with the same key are routed to the same host
	StrategyHash Strategy = "Hash"
	// Uses the same strategy for assigning partitions as the java client
	//https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
	StrategyHashMurmur2 Strategy = "HashMurmur2"
)

type TopicInfo

type TopicInfo map[string]GroupInfo

TopicInfo is a map of GroupInfo

type TopicPartitionRequest

type TopicPartitionRequest map[string]int

TopicPartitionRequest lets Kafka manager know which topic to modify and the target number of partitions it should have. key = topic name, value = target number of partitions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL