kafka

package
v0.0.0-...-bc6c5c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 31, 2018 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ClusterConsumer

func ClusterConsumer(cfg *Config, consumerGroup string, topics []string, opts ...Option) (*cluster.Consumer, error)

ClusterConsumer creates a clustered kafka consumer that uses kafka's built in offset tracking mechanism to manage offsets.

func Consumer

func Consumer(cfg *Config, opts ...Option) (sarama.Consumer, error)

Consumer creates a new kafka sync producer

func MakeTopicName

func MakeTopicName(prefix, env, boundedContext string, args ...string) string

MakeTopicName returns the topic name for the arguments provided; prefix is a convenience for the Heroku Kafka topic prefix

func NewPublisher

func NewPublisher(ctx context.Context, producer sarama.SyncProducer, topic string) eventsourcex.PublisherFunc

NewPublisher creates a kafka publisher

func Producer

func Producer(cfg *Config, opts ...Option) (sarama.SyncProducer, error)

Producer creates a new kafka sync producer

Types

type Config

type Config struct {
	CertPEM    []byte
	KeyPEM     []byte
	CaPEM      []byte
	BrokerList []string
	VerifyTLS  bool
}

Config contains the configuration parameters for the kafka producer

func EnvConfig

func EnvConfig() *Config

EnvConfig returns a new Config instance populated with values from the environment. Expected keys are KAFKA_CERT, KAFKA_KEY, KAFKA_CA, KAFKA_BROKERS If KAFKA_BROKERS is not set, defaults to localhost:9092

func (*Config) Apply

func (c *Config) Apply(config *sarama.Config) error

Apply applies the kafka.Config to the sarama.Config provided. Currently only provides support for TLS sarama connections

type Option

type Option func(*sarama.Config)

Option provides functional operators for Sarama

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription provides a reference to a running stream

func SubscribeStream

func SubscribeStream(ctx context.Context, consumer sarama.Consumer, topic string, h eventsourcex.Handler) (*Subscription, error)

SubscribeStream subscribes a message handler to the specified Kafka topic

func (*Subscription) Done

func (s *Subscription) Done() <-chan struct{}

Done waits for the subscription to be finished

func (*Subscription) Shutdown

func (s *Subscription) Shutdown(ctx context.Context) error

Shutdown attempts to stop all the running goroutines

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL