offsetmgr

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2016 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoCoordinator = errors.New("failed to resolve coordinator")
View Source
var ErrRequestTimeout = errors.New("request timeout")

Functions

This section is empty.

Types

type DecoratedOffset

type DecoratedOffset struct {
	Offset   int64
	Metadata string
}

type Factory

type Factory interface {
	// NewOffsetManager creates an OffsetManager for the given group/topic/partition.
	// It returns an error if given group/topic/partition already has a not stopped
	// OffsetManager instance. After an old offset manager instance is stopped a
	// new one can be started.
	SpawnOffsetManager(namespace *actor.ID, group, topic string, partition int32) (T, error)

	// Stop waits for the spawned offset managers to stop and then terminates. Note
	// that all spawned offset managers has to be explicitly stopped by calling
	// their Stop method.
	Stop()
}

Factory provides a method to spawn offset manager instances to commit offsets for a particular group/topic/partition. It makes sure that there is only one running manager instance for a particular group/topic/partition combination.

One Factory instance per application is usually more then enough, but it is possible to create many of them.

Factory spawns background goroutines so it must be explicitly stopped by the application. But first it should explicitly stop all spawned offset manager instances.

func SpawnFactory

func SpawnFactory(namespace *actor.ID, cfg *config.T, client sarama.Client) Factory

SpawnFactory creates a new offset manager factory from the given client.

type OffsetCommitError

type OffsetCommitError struct {
	Group     string
	Topic     string
	Partition int32
	Err       error
}

type T

type T interface {
	// InitialOffset returns a channel that an initial offset will be sent down
	// to, when retrieved by a background goroutine. At most one value is sent down
	// the channel, and the channel is closed immediately after that. If error
	// reporting is enabled with `Config.Consumer.Return.Errors` then errors may be
	// coming and has to be read from the `Errors()` channel, otherwise the offset
	// manager will get into a dead lock.
	InitialOffset() <-chan DecoratedOffset

	// SubmitOffset triggers saving of the specified offset in Kafka. Commits are
	// performed periodically in a background goroutine. The commit interval is
	// configured by `Config.Consumer.Offsets.CommitInterval`. Note that not every
	// submitted offset gets committed. Committed offsets are sent down to the
	// `CommittedOffsets()` channel. The `CommittedOffsets()` channel has to be
	// read alongside with submitting offsets, otherwise the partition offset
	// manager will block.
	SubmitOffset(offset int64, metadata string)

	// CommittedOffsets returns a channel that offsets committed to Kafka are
	// sent down to. The user must read from this channel otherwise the
	// `SubmitOffset` function will eventually block.
	CommittedOffsets() <-chan DecoratedOffset

	// Errors returns a read channel of errors that occur during offset management,
	// if enabled. By default errors are not returned. If you want to implement any
	// custom error handling logic then you need to set `Consumer.Return.Errors` to
	// true, and read from this channel.
	Errors() <-chan *OffsetCommitError

	// Stop stops the offset manager. It is required to stop all spawned offset
	// managers before their parent factory can be stopped.
	//
	// It is guaranteed that the most recent offset is committed before `Stop`
	// returns.
	Stop()
}

T provides interface to store and retrieve offsets for a particular group/topic/partition in Kafka.

Directories

Path Synopsis
This file exists just so `go install ./...` does not complain that this directory contains "no buildable Go source files".
This file exists just so `go install ./...` does not complain that this directory contains "no buildable Go source files".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL