Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// RequiredAcks will be used in Kafka configs
	// to set the 'RequiredAcks' value.
	RequiredAcks = sarama.WaitForAll
)

Functions

func GetPartitions

func GetPartitions(brokerHosts []string, topic string) (partitions []int32, err error)

    GetPartitions is a helper function to look up which partitions are available via the given brokers for the given topic. This should be called only on startup.

    func NewPublisher

    func NewPublisher(cfg *Config) (pubsub.Publisher, error)

      NewPublisher will initiate a new experimental Kafka publisher.

      func NewSubscriber

      func NewSubscriber(cfg *Config, offsetProvider func() int64, offsetBroadcast func(int64)) (pubsub.Subscriber, error)

        NewSubscriber will initiate a the experimental Kafka consumer.

        Types

        type Config

        type Config struct {
        	BrokerHosts []string
        	// BrokerHostsString is used when loading the list from environment variables.
        	// If loaded via the LoadEnvConfig() func, BrokerHosts will get updated with these
        	// values.
        	BrokerHostsString string `envconfig:"KAFKA_BROKER_HOSTS"`
        
        	Partition int32  `envconfig:"KAFKA_PARTITION"`
        	Topic     string `envconfig:"KAFKA_TOPIC"`
        
        	MaxRetry int `envconfig:"KAFKA_MAX_RETRY"`
        
        	// Config is a sarama config struct for more control over the underlying Kafka client.
        	Config *sarama.Config
        }

          Config holds the basic information for working with Kafka.

          func LoadConfigFromEnv

          func LoadConfigFromEnv() *Config

            LoadConfigFromEnv will attempt to load an Kafka object from environment variables. If not populated, nil is returned.

            type Publisher

            type Publisher struct {
            	// contains filtered or unexported fields
            }

              Publisher is an experimental publisher that provides an implementation for Kafka using the Shopify/sarama library.

              func (*Publisher) Publish

              func (p *Publisher) Publish(ctx context.Context, key string, m proto.Message) error

                Publish will marshal the proto message and emit it to the Kafka topic.

                func (*Publisher) PublishRaw

                func (p *Publisher) PublishRaw(_ context.Context, key string, m []byte) error

                  PublishRaw will emit the byte array to the Kafka topic.

                  func (*Publisher) Stop

                  func (p *Publisher) Stop() error

                    Stop will close the pub connection.

                    Source Files