helpers

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2019 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package helpers - Common utilities. The helpers subsystem provides common utilities that can be used by all subsystems. This includes utilities for coordinators to start and stop modules, as well as Kafka and Zookeeper client implementations. There are also a number of mocks that are provided for testing purposes only, and should not be used in normal code.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetSaramaConfigFromClientProfile

func GetSaramaConfigFromClientProfile(profileName string) *sarama.Config

GetSaramaConfigFromClientProfile takes the name of a client-profile configuration entry and returns a sarama.Config object that can be used to create a Sarama client with the specified configuration. This includes the Kafka version, client ID, TLS, and SASL configs. If there is any error in the configuration, such as a bad TLS certificate file, this func will panic as it is normally called when configuring modules.

func StartCoordinatorModules

func StartCoordinatorModules(modules map[string]protocol.Module) error

StartCoordinatorModules is a helper func for coordinators to start a list of modules. Given a map of protocol.Module, it calls the Start func on each one. If any module returns an error, it immediately stops and returns that error

func StopCoordinatorModules

func StopCoordinatorModules(modules map[string]protocol.Module)

StopCoordinatorModules is a helper func for coordinators to stop a list of modules. Given a map of protocol.Module, it calls the Stop func on each one. Any errors that are returned are ignored.

func TimeoutSendStorageRequest

func TimeoutSendStorageRequest(storageChannel chan *protocol.StorageRequest, request *protocol.StorageRequest, maxTime int) bool

TimeoutSendStorageRequest is a helper func for sending a protocol.StorageRequest to a channel with a timeout, specified in seconds. If the request is sent, return true. Otherwise, if the timeout is hit, return false.

func ValidateEmail

func ValidateEmail(email string) bool

ValidateEmail returns true if the provided string is an email address. This is a very simplistic validator - the string must be of the form (something)@(something).(something)

func ValidateFilename

func ValidateFilename(filename string) bool

ValidateFilename returns true if the provided string is a sane-looking filename (not just a valid filename, which could be almost anything). Right now, this is defined to be the same thing as ValidateTopic.

func ValidateHostList

func ValidateHostList(hosts []string) bool

ValidateHostList returns true if the provided slice of strings can all be parsed by ValidateHostPort

func ValidateHostPort

func ValidateHostPort(host string, allowBlankHost bool) bool

ValidateHostPort returns true if the provided string is of the form "hostname:port", where hostname is a valid hostname or IP address (as parsed by ValidateIP or ValidateHostname), and port is a valid integer.

func ValidateHostname

func ValidateHostname(hostname string) bool

ValidateHostname returns true if the provided string can be parsed as a hostname. In general this means:

* One or more segments delimited by a '.' * Each segment can be no more than 63 characters long * Valid characters in a segment are letters, numbers, and dashes * Segments may not start or end with a dash * The exception is IPv6 addresses, which are also permitted.

func ValidateIP

func ValidateIP(ipaddr string) bool

ValidateIP returns true if the provided string can be parsed as an IP address (either IPv4 or IPv6).

func ValidateTopic

func ValidateTopic(topic string) bool

ValidateTopic returns true if the provided string is a valid topic name, which may only contain letters, numbers, underscores, dashes, and periods.

func ValidateURL

func ValidateURL(rawURL string) bool

ValidateURL returns true if the provided string can be parsed as a URL. We use the net/url Parse func for this.

func ValidateZookeeperPath

func ValidateZookeeperPath(path string) bool

ValidateZookeeperPath returns true if the provided string can be parsed as a Zookeeper node path. This means that it starts with a forward slash, and contains one or more segments that are separated by slashes (but does not end with a slash).

func ZookeeperConnect

func ZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error)

ZookeeperConnect establishes a new connection to a pool of Zookeeper servers. The provided session timeout sets the amount of time for which a session is considered valid after losing connection to a server. Within the session timeout it's possible to reestablish a connection to a different server and keep the same session. This is means any ephemeral nodes and watches are maintained.

Types

type BurrowSaramaBroker

type BurrowSaramaBroker struct {
	// contains filtered or unexported fields
}

BurrowSaramaBroker is an implementation of the SaramaBroker interface that is used with SaramaClient

func (*BurrowSaramaBroker) Close

func (b *BurrowSaramaBroker) Close() error

Close closes the connection associated with the broker

func (*BurrowSaramaBroker) GetAvailableOffsets

func (b *BurrowSaramaBroker) GetAvailableOffsets(request *sarama.OffsetRequest) (*sarama.OffsetResponse, error)

GetAvailableOffsets sends an OffsetRequest to the broker and returns the OffsetResponse that was received

func (*BurrowSaramaBroker) ID

func (b *BurrowSaramaBroker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

type BurrowSaramaClient

type BurrowSaramaClient struct {
	Client sarama.Client
}

BurrowSaramaClient is an implementation of the SaramaClient interface for use in Burrow modules

func (*BurrowSaramaClient) Brokers

func (c *BurrowSaramaClient) Brokers() []SaramaBroker

Brokers returns the current set of active brokers as retrieved from cluster metadata.

func (*BurrowSaramaClient) Close

func (c *BurrowSaramaClient) Close() error

Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.

func (*BurrowSaramaClient) Closed

func (c *BurrowSaramaClient) Closed() bool

Closed returns true if the client has already had Close called on it

func (*BurrowSaramaClient) Config

func (c *BurrowSaramaClient) Config() *sarama.Config

Config returns the Config struct of the client. This struct should not be altered after it has been created.

func (*BurrowSaramaClient) Coordinator

func (c *BurrowSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error)

Coordinator returns the coordinating broker for a consumer group. It will return a locally cached value if it's available. You can call RefreshCoordinator to update the cached value. This function only works on Kafka 0.8.2 and higher.

func (*BurrowSaramaClient) GetOffset

func (c *BurrowSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)

GetOffset queries the cluster to get the most recent available offset at the given time (in milliseconds) on the topic/partition combination. Time should be OffsetOldest for the earliest available offset, OffsetNewest for the offset of the message that will be produced next, or a time.

func (*BurrowSaramaClient) InSyncReplicas

func (c *BurrowSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)

InSyncReplicas returns the set of all in-sync replica IDs for the given partition. In-sync replicas are replicas which are fully caught up with the partition leader.

func (*BurrowSaramaClient) Leader

func (c *BurrowSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error)

Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.

func (*BurrowSaramaClient) NewConsumerFromClient

func (c *BurrowSaramaClient) NewConsumerFromClient() (sarama.Consumer, error)

NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.

func (*BurrowSaramaClient) Partitions

func (c *BurrowSaramaClient) Partitions(topic string) ([]int32, error)

Partitions returns the sorted list of all partition IDs for the given topic.

func (*BurrowSaramaClient) RefreshCoordinator

func (c *BurrowSaramaClient) RefreshCoordinator(consumerGroup string) error

RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache. This function only works on Kafka 0.8.2 and higher.

func (*BurrowSaramaClient) RefreshMetadata

func (c *BurrowSaramaClient) RefreshMetadata(topics ...string) error

RefreshMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics. If no topics are provided, it will refresh metadata for all topics.

func (*BurrowSaramaClient) Replicas

func (c *BurrowSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)

Replicas returns the set of all replica IDs for the given partition.

func (*BurrowSaramaClient) Topics

func (c *BurrowSaramaClient) Topics() ([]string, error)

Topics returns the set of available topics as retrieved from cluster metadata.

func (*BurrowSaramaClient) WritablePartitions

func (c *BurrowSaramaClient) WritablePartitions(topic string) ([]int32, error)

WritablePartitions returns the sorted list of all writable partition IDs for the given topic, where "writable" means "having a valid leader accepting writes".

type BurrowZookeeperClient

type BurrowZookeeperClient struct {
	// contains filtered or unexported fields
}

BurrowZookeeperClient is an implementation of protocol.ZookeeperClient

func (*BurrowZookeeperClient) ChildrenW

func (z *BurrowZookeeperClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error)

ChildrenW returns a slice of names of child ZNodes immediately underneath the specified parent path. It also returns a zk.Stat describing the parent path, and a channel over which a zk.Event object will be sent if the child list changes (a child is added or deleted).

func (*BurrowZookeeperClient) Close

func (z *BurrowZookeeperClient) Close()

Close shuts down the connection to the Zookeeper ensemble.

func (*BurrowZookeeperClient) Create

func (z *BurrowZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)

Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is desired, specify

zk.WorldACL(zk.PermAll)

func (*BurrowZookeeperClient) ExistsW added in v1.1.0

func (z *BurrowZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a message when the watch fires

func (*BurrowZookeeperClient) GetW

func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

GetW returns the data in the specified ZNode as a slice of bytes. It also returns a zk.Stat describing the ZNode, and a channel over which a zk.Event object will be sent if the ZNode changes (data changed, or ZNode deleted).

func (*BurrowZookeeperClient) NewLock

NewLock creates a lock using the provided path. Multiple Zookeeper clients, using the same lock path, can synchronize with each other to assure that only one client has the lock at any point.

type MockModule

type MockModule struct {
	mock.Mock
}

MockModule is a mock of protocol.Module that also satisfies the various subsystem Module variants, and is used in tests. It should never be used in the normal code.

func (*MockModule) AcceptConsumerGroup

func (m *MockModule) AcceptConsumerGroup(status *protocol.ConsumerGroupStatus) bool

AcceptConsumerGroup mocks the notifier.Module AcceptConsumerGroup func

func (*MockModule) Configure

func (m *MockModule) Configure(name string, configRoot string)

Configure mocks the protocol.Module Configure func

func (*MockModule) GetGroupBlacklist

func (m *MockModule) GetGroupBlacklist() *regexp.Regexp

GetGroupBlacklist mocks the notifier.Module GetGroupBlacklist func

func (*MockModule) GetGroupWhitelist

func (m *MockModule) GetGroupWhitelist() *regexp.Regexp

GetGroupWhitelist mocks the notifier.Module GetGroupWhitelist func

func (*MockModule) GetLogger

func (m *MockModule) GetLogger() *zap.Logger

GetLogger mocks the notifier.Module GetLogger func

func (*MockModule) GetName

func (m *MockModule) GetName() string

GetName mocks the notifier.Module GetName func

func (*MockModule) Notify

func (m *MockModule) Notify(status *protocol.ConsumerGroupStatus, eventID string, startTime time.Time, stateGood bool)

Notify mocks the notifier.Module Notify func

func (*MockModule) Start

func (m *MockModule) Start() error

Start mocks the protocol.Module Start func

func (*MockModule) Stop

func (m *MockModule) Stop() error

Stop mocks the protocol.Module Stop func

type MockSaramaBroker

type MockSaramaBroker struct {
	mock.Mock
}

MockSaramaBroker is a mock of SaramaBroker. It is used in tests by multiple packages. It should never be used in the normal code.

func (*MockSaramaBroker) Close

func (m *MockSaramaBroker) Close() error

Close mocks SaramaBroker.Close

func (*MockSaramaBroker) GetAvailableOffsets

func (m *MockSaramaBroker) GetAvailableOffsets(request *sarama.OffsetRequest) (*sarama.OffsetResponse, error)

GetAvailableOffsets mocks SaramaBroker.GetAvailableOffsets

func (*MockSaramaBroker) ID

func (m *MockSaramaBroker) ID() int32

ID mocks SaramaBroker.ID

type MockSaramaClient

type MockSaramaClient struct {
	mock.Mock
}

MockSaramaClient is a mock of SaramaClient. It is used in tests by multiple packages. It should never be used in the normal code.

func (*MockSaramaClient) Brokers

func (m *MockSaramaClient) Brokers() []SaramaBroker

Brokers mocks SaramaClient.Brokers

func (*MockSaramaClient) Close

func (m *MockSaramaClient) Close() error

Close mocks SaramaClient.Close

func (*MockSaramaClient) Closed

func (m *MockSaramaClient) Closed() bool

Closed mocks SaramaClient.Closed

func (*MockSaramaClient) Config

func (m *MockSaramaClient) Config() *sarama.Config

Config mocks SaramaClient.Config

func (*MockSaramaClient) Coordinator

func (m *MockSaramaClient) Coordinator(consumerGroup string) (SaramaBroker, error)

Coordinator mocks SaramaClient.Coordinator

func (*MockSaramaClient) GetOffset

func (m *MockSaramaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error)

GetOffset mocks SaramaClient.GetOffset

func (*MockSaramaClient) InSyncReplicas

func (m *MockSaramaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error)

InSyncReplicas mocks SaramaClient.InSyncReplicas

func (*MockSaramaClient) Leader

func (m *MockSaramaClient) Leader(topic string, partitionID int32) (SaramaBroker, error)

Leader mocks SaramaClient.Leader

func (*MockSaramaClient) NewConsumerFromClient

func (m *MockSaramaClient) NewConsumerFromClient() (sarama.Consumer, error)

NewConsumerFromClient mocks SaramaClient.NewConsumerFromClient

func (*MockSaramaClient) Partitions

func (m *MockSaramaClient) Partitions(topic string) ([]int32, error)

Partitions mocks SaramaClient.Partitions

func (*MockSaramaClient) RefreshCoordinator

func (m *MockSaramaClient) RefreshCoordinator(consumerGroup string) error

RefreshCoordinator mocks SaramaClient.RefreshCoordinator

func (*MockSaramaClient) RefreshMetadata

func (m *MockSaramaClient) RefreshMetadata(topics ...string) error

RefreshMetadata mocks SaramaClient.RefreshMetadata

func (*MockSaramaClient) Replicas

func (m *MockSaramaClient) Replicas(topic string, partitionID int32) ([]int32, error)

Replicas mocks SaramaClient.Replicas

func (*MockSaramaClient) Topics

func (m *MockSaramaClient) Topics() ([]string, error)

Topics mocks SaramaClient.Topics

func (*MockSaramaClient) WritablePartitions

func (m *MockSaramaClient) WritablePartitions(topic string) ([]int32, error)

WritablePartitions mocks SaramaClient.WritablePartitions

type MockSaramaConsumer

type MockSaramaConsumer struct {
	mock.Mock
}

MockSaramaConsumer is a mock of sarama.Consumer. It is used in tests by multiple packages. It should never be used in the normal code.

func (*MockSaramaConsumer) Close

func (m *MockSaramaConsumer) Close() error

Close mocks sarama.Consumer.Close

func (*MockSaramaConsumer) ConsumePartition

func (m *MockSaramaConsumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error)

ConsumePartition mocks sarama.Consumer.ConsumePartition

func (*MockSaramaConsumer) HighWaterMarks

func (m *MockSaramaConsumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks mocks sarama.Consumer.HighWaterMarks

func (*MockSaramaConsumer) Partitions

func (m *MockSaramaConsumer) Partitions(topic string) ([]int32, error)

Partitions mocks sarama.Consumer.Partitions

func (*MockSaramaConsumer) Topics

func (m *MockSaramaConsumer) Topics() ([]string, error)

Topics mocks sarama.Consumer.Topics

type MockSaramaPartitionConsumer

type MockSaramaPartitionConsumer struct {
	mock.Mock
}

MockSaramaPartitionConsumer is a mock of sarama.PartitionConsumer. It is used in tests by multiple packages. It should never be used in the normal code.

func (*MockSaramaPartitionConsumer) AsyncClose

func (m *MockSaramaPartitionConsumer) AsyncClose()

AsyncClose mocks sarama.PartitionConsumer.AsyncClose

func (*MockSaramaPartitionConsumer) Close

Close mocks sarama.PartitionConsumer.Close

func (*MockSaramaPartitionConsumer) Errors

func (m *MockSaramaPartitionConsumer) Errors() <-chan *sarama.ConsumerError

Errors mocks sarama.PartitionConsumer.Errors

func (*MockSaramaPartitionConsumer) HighWaterMarkOffset

func (m *MockSaramaPartitionConsumer) HighWaterMarkOffset() int64

HighWaterMarkOffset mocks sarama.PartitionConsumer.HighWaterMarkOffset

func (*MockSaramaPartitionConsumer) Messages

func (m *MockSaramaPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage

Messages mocks sarama.PartitionConsumer.Messages

type MockTicker

type MockTicker struct {
	mock.Mock
}

MockTicker is a mock Ticker interface that can be used for testing. It should not be used in normal code.

func (*MockTicker) GetChannel

func (m *MockTicker) GetChannel() <-chan time.Time

GetChannel mocks Ticker.GetChannel

func (*MockTicker) Start

func (m *MockTicker) Start()

Start mocks Ticker.Start

func (*MockTicker) Stop

func (m *MockTicker) Stop()

Stop mocks Ticker.Stop

type MockZookeeperClient

type MockZookeeperClient struct {
	mock.Mock

	// InitialError can be set before using the MockZookeeperConnect call to specify an error that should be returned
	// from that call.
	InitialError error

	// EventChannel can be set before using the MockZookeeperConnect call to provide the channel that that call returns.
	EventChannel chan zk.Event

	// Servers stores the slice of strings that is provided to MockZookeeperConnect
	Servers []string

	// SessionTimeout stores the value that is provided to MockZookeeperConnect
	SessionTimeout time.Duration
}

MockZookeeperClient is a mock of the protocol.ZookeeperClient interface to be used for testing. It should not be used in normal code.

func (*MockZookeeperClient) ChildrenW

func (m *MockZookeeperClient) ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error)

ChildrenW mocks protocol.ZookeeperClient.ChildrenW

func (*MockZookeeperClient) Close

func (m *MockZookeeperClient) Close()

Close mocks protocol.ZookeeperClient.Close

func (*MockZookeeperClient) Create

func (m *MockZookeeperClient) Create(path string, data []byte, flags int32, acl []zk.ACL) (string, error)

Create mocks protocol.ZookeeperClient.Create

func (*MockZookeeperClient) ExistsW added in v1.1.0

func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

ExistsW mocks protocol.ZookeeperClient.ExistsW

func (*MockZookeeperClient) GetW

func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

GetW mocks protocol.ZookeeperClient.GetW

func (*MockZookeeperClient) MockZookeeperConnect

func (m *MockZookeeperClient) MockZookeeperConnect(servers []string, sessionTimeout time.Duration, logger *zap.Logger) (protocol.ZookeeperClient, <-chan zk.Event, error)

MockZookeeperConnect is a func that mocks the ZookeeperConnect call, but allows us to pre-populate the return values and save the arguments provided for assertions.

func (*MockZookeeperClient) NewLock

NewLock mocks protocol.ZookeeperClient.NewLock

type MockZookeeperLock

type MockZookeeperLock struct {
	mock.Mock
}

MockZookeeperLock is a mock of the protocol.ZookeeperLock interface. It should not be used in normal code.

func (*MockZookeeperLock) Lock

func (m *MockZookeeperLock) Lock() error

Lock mocks protocol.ZookeeperLock.Lock

func (*MockZookeeperLock) Unlock

func (m *MockZookeeperLock) Unlock() error

Unlock mocks protocol.ZookeeperLock.Unlock

type PausableTicker

type PausableTicker struct {
	// contains filtered or unexported fields
}

PausableTicker is an implementation of Ticker which can be stopped and restarted without changing the underlying channel. This is useful for cases where you may need to stop performing actions for a while (such as sending notifications), but you do not want to tear down everything.

func (*PausableTicker) GetChannel

func (ticker *PausableTicker) GetChannel() <-chan time.Time

GetChannel returns the channel over which ticks will be sent. This channel can be used over multiple Start/Stop cycles, and will not be closed.

func (*PausableTicker) Start

func (ticker *PausableTicker) Start()

Start begins sending ticks over the channel at the interval that has already been configured. If the ticker is already sending ticks, this func has no effect.

func (*PausableTicker) Stop

func (ticker *PausableTicker) Stop()

Stop stops ticks from being sent over the channel. If the ticker is not currently sending ticks, this func has no effect

type SaramaBroker

type SaramaBroker interface {
	// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
	ID() int32

	// Close closes the connection associated with the broker
	Close() error

	// GetAvailableOffsets sends an OffsetRequest to the broker and returns the OffsetResponse that was received
	GetAvailableOffsets(*sarama.OffsetRequest) (*sarama.OffsetResponse, error)
}

SaramaBroker is an internal interface on the sarama.Broker struct. It is used with the SaramaClient interface in order to provide a fully testable interface for the pieces of Sarama that are used inside Burrow. Currently, this interface only defines the methods that Burrow is using. It should not be considered a complete interface for sarama.Broker

type SaramaClient

type SaramaClient interface {
	// Config returns the Config struct of the client. This struct should not be altered after it has been created.
	Config() *sarama.Config

	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
	Brokers() []SaramaBroker

	// Topics returns the set of available topics as retrieved from cluster metadata.
	Topics() ([]string, error)

	// Partitions returns the sorted list of all partition IDs for the given topic.
	Partitions(topic string) ([]int32, error)

	// WritablePartitions returns the sorted list of all writable partition IDs for the given topic, where "writable"
	// means "having a valid leader accepting writes".
	WritablePartitions(topic string) ([]int32, error)

	// Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the
	// cluster metadata.
	Leader(topic string, partitionID int32) (SaramaBroker, error)

	// Replicas returns the set of all replica IDs for the given partition.
	Replicas(topic string, partitionID int32) ([]int32, error)

	// InSyncReplicas returns the set of all in-sync replica IDs for the given partition. In-sync replicas are replicas
	// which are fully caught up with the partition leader.
	InSyncReplicas(topic string, partitionID int32) ([]int32, error)

	// RefreshMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics.
	// If no topics are provided, it will refresh metadata for all topics.
	RefreshMetadata(topics ...string) error

	// GetOffset queries the cluster to get the most recent available offset at the given time (in milliseconds) on the
	// topic/partition combination. Time should be OffsetOldest for the earliest available offset, OffsetNewest for the
	// offset of the message that will be produced next, or a time.
	GetOffset(topic string, partitionID int32, time int64) (int64, error)

	// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached value if it's
	// available. You can call RefreshCoordinator to update the cached value. This function only works on Kafka 0.8.2 and
	// higher.
	Coordinator(consumerGroup string) (SaramaBroker, error)

	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache. This function only
	// works on Kafka 0.8.2 and higher.
	RefreshCoordinator(consumerGroup string) error

	// Close shuts down all broker connections managed by this client. It is required to call this function before a client
	// object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a
	// client before you close the client.
	Close() error

	// Closed returns true if the client has already had Close called on it
	Closed() bool

	// NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the
	// underlying client when shutting down this consumer.
	NewConsumerFromClient() (sarama.Consumer, error)
}

SaramaClient is an internal interface to the sarama.Client. We use our own interface because while sarama.Client is an interface, sarama.Broker is not. This makes it difficult to test code which uses the Broker objects. This interface operates in the same way, with the addition of an interface function for creating consumers on the client.

type Ticker

type Ticker interface {
	// Start sending ticks over the channel
	Start()

	// Stop sending ticks over the channel
	Stop()

	// Return the channel that ticks will be sent over
	GetChannel() <-chan time.Time
}

Ticker is a generic interface for a channel that delivers `ticks' of a clock at intervals.

func NewPausableTicker

func NewPausableTicker(d time.Duration) Ticker

NewPausableTicker returns a Ticker that has not yet been started, but the channel is ready to use. This ticker can be started and stopped multiple times without needing to swap the ticker channel

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL