kafka

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 6, 2019 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SASLMechanismNone     = "none"
	SASLMechanismPlain    = "plain"
	SASLMechanismSCRAM256 = "scram-sha-256"
	SASLMechanismSCRAM512 = "scram-sha-512"
)

Variables

View Source
var (
	DefaultClusterVersion = sarama.MaxVersion.String()
)

Functions

This section is empty.

Types

type Callback

type Callback func(topic string, partition int32, offset int64, time time.Time, key, value []byte) error

Callback the function which will get called upon receiving a message from Kafka.

type Checkpoint added in v0.0.7

type Checkpoint struct {
	// contains filtered or unexported fields
}

Checkpoint represents a point in time or offset, from which the consumer has to start consuming from the specified topic.

func NewCheckpoint added in v0.0.7

func NewCheckpoint(rewind bool) *Checkpoint

NewCheckpoint creates a new checkpoint instance.

In rewind mode, the consumer will start consuming from the oldest available offset which means to consume all the old messages from the beginning of the stream.

func (*Checkpoint) Mode added in v0.0.7

func (c *Checkpoint) Mode() OffsetMode

Mode returns the current mode of the checkpoint.

func (*Checkpoint) Offset added in v0.0.7

func (c *Checkpoint) Offset() int64

Offset returns the final offset value from which consuming will be started.

In MillisecondsOffsetMode, the offset will be the milliseconds of the specified time. This is what Kafka needs to figure out the closest available offset at the given time.

func (*Checkpoint) OffsetString added in v0.0.7

func (c *Checkpoint) OffsetString() string

OffsetString returns the string representation of the time offset in `02-01-2006T15:04:05.999999999` format if in MillisecondsOffsetMode mode, otherwise returns the string representation of the offset value.

func (*Checkpoint) SetOffset added in v0.0.7

func (c *Checkpoint) SetOffset(offset int64)

SetOffset sets the offset of the checkpoint and switches the mode to ExplicitOffsetMode.

func (*Checkpoint) SetTimeOffset added in v0.0.7

func (c *Checkpoint) SetTimeOffset(at time.Time)

SetTimeOffset sets the offset to the milliseconds of the given time and sets the mode to MillisecondsOffsetMode.

func (*Checkpoint) TimeOffset added in v0.0.7

func (c *Checkpoint) TimeOffset() time.Time

TimeOffset returns the originally provided time value of the time-based offset in MillisecondsOffsetMode mode.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a new Kafka cluster consumer.

func NewConsumer

func NewConsumer(brokers []string, printer internal.Printer, environment string, enableAutoTopicCreation bool, options ...Option) (*Consumer, error)

NewConsumer creates a new instance of Kafka cluster consumer.

func (*Consumer) Close

func (c *Consumer) Close()

Close closes the Kafka consumer.

func (*Consumer) GetTopics

func (c *Consumer) GetTopics(filter string) ([]string, error)

GetTopics fetches the topics from the server.

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context, topics map[string]*Checkpoint, cb Callback) error

Start starts consuming from the specified topics and executes the callback function on each message.

This is a blocking call which will be terminated on cancellation of the context parameter. The method returns error if the topic list is empty or the callback function is nil.

type OffsetMode added in v0.0.7

type OffsetMode int8

OffsetMode represents the offset mode for a checkpoint.

const (
	// UndefinedOffsetMode the user has not requested for any specific offset.
	UndefinedOffsetMode OffsetMode = iota
	// MillisecondsOffsetMode the closet available offset at a given time will be fetched from the server
	// before the consumer starts pulling messages from Kafka.
	MillisecondsOffsetMode
	// ExplicitOffsetMode the user has explicitly asked for a specific offset.
	ExplicitOffsetMode
)

type OffsetStore

type OffsetStore interface {
	Store(topic string, partition int32, offset int64) error
	Query(topic string) (PartitionOffsets, error)
}

OffsetStore is the interface to store partition offsets.

type Option

type Option func(options *Options)

Option represents a configuration function.

func WithClusterVersion

func WithClusterVersion(version string) Option

WithClusterVersion kafka cluster version.

func WithOffsetStore

func WithOffsetStore(store OffsetStore) Option

WithOffsetStore sets the consumer offset store.

func WithSASL added in v0.0.8

func WithSASL(mechanism, username, password string) Option

WithSASL enables SASL authentication.

func WithTLS added in v0.0.8

func WithTLS(tls *tls.Config) Option

WithTLS enables TLS.

type Options

type Options struct {
	// DisableErrorReporting disables sending consumer errors to the Errors() channel.
	DisableErrorReporting bool
	// ClusterVersion kafka cluster version.
	ClusterVersion string
	// OffsetStore the type responsible to store consumer offsets
	OffsetStore OffsetStore
	// TLS configuration to connect to Kafka cluster.
	TLS *tls.Config
	// contains filtered or unexported fields
}

Options holds the configuration settings for kafka consumer.

func NewOptions

func NewOptions() *Options

NewOptions creates a new Options object with default values.

type PartitionOffsets added in v0.0.7

type PartitionOffsets map[int32]int64

PartitionOffsets represents a map of partitions and offsets

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL