streamconfig

package
v2.4.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2019 License: ISC Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ConsumerDefaults = Consumer{Global: GlobalDefaults}

ConsumerDefaults holds the default values for Consumer.

View Source
var GlobalDefaults = Global{
	HandleErrors:                       true,
	HandleInterrupt:                    true,
	Name:                               "",
	AllowEnvironmentBasedConfiguration: true,
}

GlobalDefaults provide a default of global preferences.

View Source
var ProducerDefaults = Producer{Global: GlobalDefaults}

ProducerDefaults holds the default values for Producer.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	Inmem          inmemconfig.Consumer
	Kafka          kafkaconfig.Consumer
	Pubsub         pubsubconfig.Consumer
	Standardstream standardstreamconfig.Consumer

	Global
}

Consumer contains the configuration for all the different consumer that implement the stream.Consumer interface. When a consumer is instantiated, these options can be passed into the new consumer to determine its behavior. If the consumer only has to support a single implementation of the interface, then all other configuration values can be ignored.

func NewConsumer

func NewConsumer(options ...Option) (Consumer, error)

NewConsumer returns a new Consumer configuration struct, containing the values passed into the function. If any error occurs during configuration validation, an error is returned as the second argument.

func TestNewConsumer

func TestNewConsumer(tb testing.TB, defaults bool, options ...Option) Consumer

TestNewConsumer returns a new consumer configuration struct, optionally with the default values removed.

func (Consumer) FromEnv

func (c Consumer) FromEnv() (Consumer, error)

FromEnv populates the Consumer based on the environment variables set with the prefix based on the consumer name.

func (Consumer) WithOptions

func (c Consumer) WithOptions(opts ...Option) Consumer

WithOptions takes the current Consumer, applies the supplied Options, and returns the resulting Consumer.

type Global

type Global struct {
	// AllowEnvironmentBasedConfiguration allows you to disable configuring the
	// stream client based on predefined environment variables. This is enabled by
	// default, but can be disabled if you want full control over the behavior of
	// the stream client without any outside influence.
	AllowEnvironmentBasedConfiguration bool `ignored:"true"`

	// HandleErrors determines whether the consumer should handle any stream
	// errors by itself, and terminate the application if any error occurs. This
	// defaults to true. If manually set to false, the `Errors()` channel needs to
	// be consumed manually, and any appropriate action needs to be taken when an
	// errors occurs, otherwise the consumer will get stuck once an error occurs.
	HandleErrors bool `ignored:"true"`

	// HandleInterrupt determines whether the consumer should close itself
	// gracefully when an interrupt signal (^C) is received. This defaults to true
	// to increase first-time ease-of-use, but if the application wants to handle
	// these signals manually, this flag disables the automated implementation.
	HandleInterrupt bool `ignored:"true"`

	// Logger is the configurable logger instance to log messages. If left
	// undefined, a no-op logger will be used.
	Logger *zap.Logger `ignored:"true"`

	// Name is the name of the current processor. It is currently only used to
	// determine the prefix for environment-variable based configuration values.
	// For example, if Name is set to `MyProcessor`, then all environment
	// variable-based configurations need to start with `MYPROCESSOR_`. If no name
	// is set, then the prefix is "consumer" is used, so you prepend all
	// environment variables with "CONSUMER_.
	Name string `ignored:"true"`

	// Type is the type of the current processor.
	Type string `envconfig:"client_type"`
}

Global is a common set of preferences shared between consumers and producers.

type Option

type Option interface {
	// contains filtered or unexported methods
}

An Option configures a Consumer and/or Producer.

func ConsumerOptions

func ConsumerOptions(fn func(c *Consumer)) Option

ConsumerOptions is a convenience accessor to manually set consumer options.

func DisableEnvironmentConfig

func DisableEnvironmentConfig() Option

DisableEnvironmentConfig prevents the consumer or producer to be configured via environment variables, instead of the default configuration to allow environment variable-based configurations.

func InmemListen

func InmemListen() Option

InmemListen configures the inmem consumer to continuously listen for any new messages in the configured store.

This option has no effect when applied to a producer.

func InmemStore

func InmemStore(s stream.Store) Option

InmemStore adds a store to the inmem consumer and producer.

func KafkaBroker

func KafkaBroker(s string) Option

KafkaBroker adds a broker to the list of configured Kafka brokers.

func KafkaCommitInterval

func KafkaCommitInterval(d time.Duration) Option

KafkaCommitInterval sets the consumer's CommitInterval.

This option has no effect when applied to a producer.

func KafkaCompressionCodec

func KafkaCompressionCodec(s kafkaconfig.Compression) Option

KafkaCompressionCodec sets the compression codec for the produced messages.

// This option has no effect when applied to a consumer.

func KafkaDebug

func KafkaDebug() Option

KafkaDebug enabled debugging for Kafka.

func KafkaGroupID

func KafkaGroupID(s string) Option

KafkaGroupID sets the group ID for the consumer.

This option has no effect when applied to a producer.

func KafkaGroupIDRandom

func KafkaGroupIDRandom() Option

KafkaGroupIDRandom sets the group ID for the consumer to a random ID. This can be used to configure one-off consumers that should not share their state in a consumer group.

This option has no effect when applied to a producer.

func KafkaHandleTransientErrors

func KafkaHandleTransientErrors() Option

KafkaHandleTransientErrors passes _all_ errors to the errors channel, including the ones that are considered "transient", and the consumer or producer can resolve themselves eventually.

func KafkaHeartbeatInterval

func KafkaHeartbeatInterval(d time.Duration) Option

KafkaHeartbeatInterval sets the consumer or producer HeartbeatInterval.

func KafkaID

func KafkaID(s string) Option

KafkaID sets the consumer or producer ID.

func KafkaMaxDeliveryRetries

func KafkaMaxDeliveryRetries(i int) Option

KafkaMaxDeliveryRetries sets the MaxDeliveryRetries.

This option has no effect when applied to a consumer.

func KafkaMaxInFlightRequests

func KafkaMaxInFlightRequests(i int) Option

KafkaMaxInFlightRequests sets the maximum allowed in-flight requests for both consumers and producers.

func KafkaMaxPollInterval

func KafkaMaxPollInterval(d time.Duration) Option

KafkaMaxPollInterval sets the maximum allowed poll timeout.

This option has no effect when applied to a producer.

func KafkaMaxQueueBufferDuration

func KafkaMaxQueueBufferDuration(d time.Duration) Option

KafkaMaxQueueBufferDuration sets the MaxQueueBufferDuration.

This option has no effect when applied to a consumer.

func KafkaMaxQueueSizeKBytes

func KafkaMaxQueueSizeKBytes(i int) Option

KafkaMaxQueueSizeKBytes sets the MaxQueueSizeKBytes.

This option has no effect when applied to a consumer.

func KafkaMaxQueueSizeMessages

func KafkaMaxQueueSizeMessages(i int) Option

KafkaMaxQueueSizeMessages sets the MaxQueueSizeMessages.

This option has no effect when applied to a consumer.

func KafkaOffsetHead

func KafkaOffsetHead(i uint32) Option

KafkaOffsetHead sets the OffsetDefault.

This option has no effect when applied to a producer.

func KafkaOffsetInitial

func KafkaOffsetInitial(s kafkaconfig.Offset) Option

KafkaOffsetInitial sets the OffsetInitial.

This option has no effect when applied to a producer.

func KafkaOffsetTail

func KafkaOffsetTail(i uint32) Option

KafkaOffsetTail sets the OffsetDefault.

This option has no effect when applied to a producer.

func KafkaOrderedDelivery

func KafkaOrderedDelivery() Option

KafkaOrderedDelivery sets `MaxInFlightRequests` to `1` for the producer, to guarantee ordered delivery of messages.

see: https://git.io/vpgiV see: https://git.io/vpgDg

This option has no effect when applied to a consumer.

func KafkaRequireAllAck

func KafkaRequireAllAck() Option

KafkaRequireAllAck configures the producer wait for a acks from all brokers available in the Kafka cluster.

This option has no effect when applied to a consumer.

func KafkaRequireLeaderAck

func KafkaRequireLeaderAck() Option

KafkaRequireLeaderAck configures the producer wait for a single ack by the Kafka cluster leader broker.

This option has no effect when applied to a consumer.

func KafkaRequireNoAck

func KafkaRequireNoAck() Option

KafkaRequireNoAck configures the producer not to wait for any broker acks.

This option has no effect when applied to a consumer.

func KafkaRetryBackoff

func KafkaRetryBackoff(d time.Duration) Option

KafkaRetryBackoff configures the producer to use the configured retry backoff before retrying a connection failure. See `KafkaMaxDeliveryRetries` to configure the amount of retries to execute before returning an error.

This option has no effect when applied to a consumer.

func KafkaSSL

func KafkaSSL(capath, certpath, crlpath, keypassword, keypath, keystorepassword, keystorepath string) Option

KafkaSSL configures the producer or consumer to use the specified SSL config.

func KafkaSecurityProtocol

func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option

KafkaSecurityProtocol configures the producer or consumer to use the specified security protocol.

func KafkaSessionTimeout

func KafkaSessionTimeout(d time.Duration) Option

KafkaSessionTimeout configures the producer or consumer to use the specified session timeout.

func KafkaStatisticsInterval

func KafkaStatisticsInterval(d time.Duration) Option

KafkaStatisticsInterval configures the producer or consumer to use the specified statistics interval.

func KafkaTopic

func KafkaTopic(s string) Option

KafkaTopic configures the producer or consumer to use the specified topic. In case of the consumer, this option can be used multiple times to consume from more than one topic. In case of the producer, the last usage of this option will set the final topic to produce to.

func Logger

func Logger(l *zap.Logger) Option

Logger sets the logger for the consumer or producer.

func ManualErrorHandling

func ManualErrorHandling() Option

ManualErrorHandling prevents the consumer or producer to automatically handle stream errors. When this option is passed, the application itself needs to listen to, and act on the `Errors()` channel.

func ManualInterruptHandling

func ManualInterruptHandling() Option

ManualInterruptHandling prevents the consumer or producer to automatically handle interrupt signals. When this option is passed, the application itself needs to handle Unix interrupt signals to properly close the consumer or producer when required.

func Name

func Name(s string) Option

Name sets the name for the consumer or producer.

func ProducerOptions

func ProducerOptions(fn func(p *Producer)) Option

ProducerOptions is a convenience accessor to manually set producer options.

func StandardstreamReader

func StandardstreamReader(w io.ReadCloser) Option

StandardstreamReader sets the reader to use as the message stream from which to read.

This option has no effect when applied to a producer.

func StandardstreamWriter

func StandardstreamWriter(w io.Writer) Option

StandardstreamWriter sets the writer to use as the message stream to write to.

This option has no effect when applied to a consumer.

func TestConsumerOptions

func TestConsumerOptions(tb testing.TB, options ...Option) []Option

TestConsumerOptions returns an array of consumer options ready to be used during testing.

type Producer

type Producer struct {
	Inmem          inmemconfig.Producer
	Kafka          kafkaconfig.Producer
	Pubsub         pubsubconfig.Producer
	Standardstream standardstreamconfig.Producer

	Global
}

Producer contains the configuration for all the different consumer that implement the stream.Producer interface. When a producer is instantiated, these options can be passed into the new producer to determine its behavior. If the producer only has to support a single implementation of the interface, then all other configuration values can be ignored.

func NewProducer

func NewProducer(options ...Option) (Producer, error)

NewProducer returns a new Producer configuration struct, containing the values passed into the function. If any error occurs during configuration validation, an error is returned as the second argument.

func TestNewProducer

func TestNewProducer(tb testing.TB, defaults bool, options ...Option) Producer

TestNewProducer returns a new producer configuration struct, optionally with the default values removed.

func (Producer) FromEnv

func (p Producer) FromEnv() (Producer, error)

FromEnv populates the Producer based on the environment variables set with the prefix based on the producer name.

func (Producer) WithOptions

func (p Producer) WithOptions(opts ...Option) Producer

WithOptions takes the current Producer, applies the supplied Options, and returns the resulting Producer.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL