Documentation
¶
Index ¶
Constants ¶
const ( SASLMechanismNone = "none" SASLMechanismPlain = "plain" SASLMechanismSCRAM256 = "scram-sha-256" SASLMechanismSCRAM512 = "scram-sha-512" )
Variables ¶
var (
DefaultClusterVersion = sarama.MaxVersion.String()
)
Functions ¶
This section is empty.
Types ¶
type Callback ¶
type Callback func(topic string, partition int32, offset int64, time time.Time, key, value []byte) error
Callback the function which will get called upon receiving a message from Kafka.
type Checkpoint ¶ added in v0.0.7
type Checkpoint struct {
// contains filtered or unexported fields
}
Checkpoint represents a point in time or offset, from which the consumer has to start consuming from the specified topic.
func NewCheckpoint ¶ added in v0.0.7
func NewCheckpoint(rewind bool) *Checkpoint
NewCheckpoint creates a new checkpoint instance.
In rewind mode, the consumer will start consuming from the oldest available offset which means to consume all the old messages from the beginning of the stream.
func (*Checkpoint) Mode ¶ added in v0.0.7
func (c *Checkpoint) Mode() OffsetMode
Mode returns the current mode of the checkpoint.
func (*Checkpoint) Offset ¶ added in v0.0.7
func (c *Checkpoint) Offset() int64
Offset returns the final offset value from which consuming will be started.
In MillisecondsOffsetMode, the offset will be the milliseconds of the specified time. This is what Kafka needs to figure out the closest available offset at the given time.
func (*Checkpoint) OffsetString ¶ added in v0.0.7
func (c *Checkpoint) OffsetString() string
OffsetString returns the string representation of the time offset in `02-01-2006T15:04:05.999999999` format if in MillisecondsOffsetMode mode, otherwise returns the string representation of the offset value.
func (*Checkpoint) SetOffset ¶ added in v0.0.7
func (c *Checkpoint) SetOffset(offset int64)
SetOffset sets the offset of the checkpoint and switches the mode to ExplicitOffsetMode.
func (*Checkpoint) SetTimeOffset ¶ added in v0.0.7
func (c *Checkpoint) SetTimeOffset(at time.Time)
SetTimeOffset sets the offset to the milliseconds of the given time and sets the mode to MillisecondsOffsetMode.
func (*Checkpoint) TimeOffset ¶ added in v0.0.7
func (c *Checkpoint) TimeOffset() time.Time
TimeOffset returns the originally provided time value of the time-based offset in MillisecondsOffsetMode mode.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a new Kafka cluster consumer.
func NewConsumer ¶
func NewConsumer(brokers []string, printer internal.Printer, environment string, enableAutoTopicCreation bool, options ...Option) (*Consumer, error)
NewConsumer creates a new instance of Kafka cluster consumer.
func (*Consumer) Start ¶
Start starts consuming from the specified topics and executes the callback function on each message.
This is a blocking call which will be terminated on cancellation of the context parameter. The method returns error if the topic list is empty or the callback function is nil.
type OffsetMode ¶ added in v0.0.7
type OffsetMode int8
OffsetMode represents the offset mode for a checkpoint.
const ( // UndefinedOffsetMode the user has not requested for any specific offset. UndefinedOffsetMode OffsetMode = iota // MillisecondsOffsetMode the closet available offset at a given time will be fetched from the server // before the consumer starts pulling messages from Kafka. MillisecondsOffsetMode // ExplicitOffsetMode the user has explicitly asked for a specific offset. ExplicitOffsetMode )
type OffsetStore ¶
type OffsetStore interface {
Store(topic string, partition int32, offset int64) error
Query(topic string) (PartitionOffsets, error)
}
OffsetStore is the interface to store partition offsets.
type Option ¶
type Option func(options *Options)
Option represents a configuration function.
func WithClusterVersion ¶
WithClusterVersion kafka cluster version.
func WithOffsetStore ¶
func WithOffsetStore(store OffsetStore) Option
WithOffsetStore sets the consumer offset store.
type Options ¶
type Options struct {
// DisableErrorReporting disables sending consumer errors to the Errors() channel.
DisableErrorReporting bool
// ClusterVersion kafka cluster version.
ClusterVersion string
// OffsetStore the type responsible to store consumer offsets
OffsetStore OffsetStore
// TLS configuration to connect to Kafka cluster.
TLS *tls.Config
// contains filtered or unexported fields
}
Options holds the configuration settings for kafka consumer.
func NewOptions ¶
func NewOptions() *Options
NewOptions creates a new Options object with default values.
type PartitionOffsets ¶ added in v0.0.7
PartitionOffsets represents a map of partitions and offsets