Documentation ¶
Index ¶
- Constants
- Variables
- func GetPrincipal(app, subnet, domain string) string
- func NewAdmin(brokerAddrs []string, pConfig *AdminConfig) (sarama.ClusterAdmin, error)
- func SetMaxMessageSize(maxSize int32)
- func SetMaxRetryInterval(maxPause time.Duration)
- type Acls
- type AdminConfig
- type ConsumerGroup
- func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
- func (cg *ConsumerGroup) Checker(ctx context.Context, state *health.CheckState) error
- func (cg *ConsumerGroup) Close(ctx context.Context, optFuncs ...OptFunc) (err error)
- func (cg *ConsumerGroup) Initialise(ctx context.Context) error
- func (cg *ConsumerGroup) IsInitialised() bool
- func (cg *ConsumerGroup) StopListeningToConsumer(ctx context.Context) (err error)
- type ConsumerGroupChannels
- type ConsumerGroupConfig
- type ErrBrokersNotReachable
- type ErrInvalidBrokers
- type ErrNoChannel
- type HealthInfo
- type HealthInfoMap
- type IConsumerGroup
- type IProducer
- type Message
- type OptFunc
- type Producer
- func (p *Producer) AddHeader(key, value string)
- func (p *Producer) Channels() *ProducerChannels
- func (p *Producer) Checker(ctx context.Context, state *health.CheckState) error
- func (p *Producer) Close(ctx context.Context) (err error)
- func (p *Producer) Initialise(ctx context.Context) error
- func (p *Producer) IsInitialised() bool
- type ProducerChannels
- type ProducerConfig
- type SaramaAsyncProducer
- type SaramaConsumerGroup
- type SaramaConsumerGroupClaim
- type SaramaConsumerGroupSession
- type SaramaMessage
- func (M SaramaMessage) Commit()
- func (M SaramaMessage) CommitAndRelease()
- func (M SaramaMessage) Context() context.Context
- func (M SaramaMessage) GetData() []byte
- func (M SaramaMessage) GetHeader(key string) string
- func (M SaramaMessage) Mark()
- func (M SaramaMessage) Offset() int64
- func (M SaramaMessage) Release()
- func (M SaramaMessage) UpstreamDone() chan struct{}
- type SecurityConfig
- type TopicAuth
- type TopicAuthList
Constants ¶
const ( Errors = "Errors" Ready = "Ready" Closer = "Closer" Closed = "Closed" Upstream = "Upstream" UpstreamDone = "UpstreamDone" Output = "Output" )
channel names
const ( OffsetNewest = sarama.OffsetNewest OffsetOldest = sarama.OffsetOldest )
Common constants
const ( // ServiceName is the name of this service: Kafka. ServiceName = "Kafka" // MsgHealthyProducer Check message returned when Kafka producer is healthy. MsgHealthyProducer = "kafka producer is healthy" // MsgHealthyConsumerGroup Check message returned when Kafka consumer group is healthy. MsgHealthyConsumerGroup = "kafka consumer group is healthy" // ProducerMinBrokersHealthy is the minimum number of healthy brokers required for a healthcheck to not be considered critical for a producer ProducerMinBrokersHealthy = 2 // ProducerMinBrokersHealthy is the minimum number of healthy brokers required for a healthcheck to not be considered critical for a consumer ConsumerMinBrokersHealthy = 1 )
const (
TraceIDHeaderKey = string(request.RequestIdKey)
)
Variables ¶
var ConsumeErrRetryPeriod = 250 * time.Millisecond
ConsumeErrRetryPeriod is the initial time period between consumer retries on error (for consumer groups)
var ErrInitSarama = errors.New("failed to initialise client")
ErrInitSarama is used when Sarama client cannot be initialised
var ErrShutdownTimedOut = errors.New("shutdown context timed out")
ErrShutdownTimedOut represents an error received due to the context deadline being exceeded
var ErrTLSCannotLoadCACerts = errors.New("cannot load CA Certs")
ErrTLSCannotLoadCACerts is returned when the certs file cannot be loaded
var ErrUninitialisedProducer = errors.New("producer is not initialised")
ErrUninitialisedProducer is used when a caller tries to send a message to the output channel with an uninitialised producer.
var InitRetryPeriod = 250 * time.Millisecond
InitRetryPeriod is the initial time period between initialisation retries (for producers and consumer gropus)
var MaxRetryInterval = 31 * time.Second
MaxRetryInterval is the maximum time between retries (plus or minus a random amount)
Functions ¶
func GetPrincipal ¶ added in v2.3.0
func NewAdmin ¶ added in v2.2.1
func NewAdmin(brokerAddrs []string, pConfig *AdminConfig) (sarama.ClusterAdmin, error)
NewAdmin creates an admin-based client
func SetMaxMessageSize ¶
func SetMaxMessageSize(maxSize int32)
SetMaxMessageSize sets the Sarama MaxRequestSize and MaxResponseSize values to the provided maxSize
func SetMaxRetryInterval ¶ added in v2.4.1
SetMaxRetryInterval sets MaxRetryInterval to its duration argument
Types ¶
type Acls ¶ added in v2.3.0
type Acls []*sarama.AclCreation
type AdminConfig ¶ added in v2.3.0
type AdminConfig struct { KafkaVersion *string KeepAlive *time.Duration RetryBackoff *time.Duration RetryMax *int SecurityConfig *SecurityConfig }
AdminConfig exposes the optional configurable parameters for an admin client to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a Kafka consumer group instance.
func NewConsumerGroup ¶
func NewConsumerGroup(ctx context.Context, brokerAddrs []string, topic, group string, channels *ConsumerGroupChannels, cgConfig *ConsumerGroupConfig) (*ConsumerGroup, error)
NewConsumerGroup creates a new consumer group with the provided parameters
func (*ConsumerGroup) Channels ¶
func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
Channels returns the ConsumerGroup channels for this consumer group
func (*ConsumerGroup) Checker ¶
func (cg *ConsumerGroup) Checker(ctx context.Context, state *health.CheckState) error
Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close(ctx context.Context, optFuncs ...OptFunc) (err error)
Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout but is not recommended
func (*ConsumerGroup) Initialise ¶
func (cg *ConsumerGroup) Initialise(ctx context.Context) error
Initialise creates a new Sarama ConsumerGroup and the consumer/error loops, only if it was not already initialised.
func (*ConsumerGroup) IsInitialised ¶
func (cg *ConsumerGroup) IsInitialised() bool
IsInitialised returns true only if Sarama ConsumerGroup has been correctly initialised.
func (*ConsumerGroup) StopListeningToConsumer ¶
func (cg *ConsumerGroup) StopListeningToConsumer(ctx context.Context) (err error)
StopListeningToConsumer stops any more messages being consumed off kafka topic
type ConsumerGroupChannels ¶
type ConsumerGroupChannels struct { Upstream chan Message Errors chan error Ready chan struct{} Closer chan struct{} Closed chan struct{} }
ConsumerGroupChannels represents the channels used by ConsumerGroup.
func CreateConsumerGroupChannels ¶
func CreateConsumerGroupChannels(bufferSize int) *ConsumerGroupChannels
CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels. You can provide the buffer size to determine the number of messages that will be buffered in the upstream channel (to receive messages)
func (*ConsumerGroupChannels) LogErrors ¶
func (consumerChannels *ConsumerGroupChannels) LogErrors(ctx context.Context, errMsg string)
LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.
func (*ConsumerGroupChannels) Validate ¶
func (consumerChannels *ConsumerGroupChannels) Validate() error
Validate returns ErrNoChannel if any consumer channel is nil
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { KafkaVersion *string KeepAlive *time.Duration RetryBackoff *time.Duration RetryBackoffFunc *func(retries int) time.Duration Offset *int64 SecurityConfig *SecurityConfig }
ConsumerGroupConfig exposes the optional configurable parameters for a consumer group, to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.
type ErrBrokersNotReachable ¶
type ErrBrokersNotReachable struct {
Addrs []string
}
ErrBrokersNotReachable is an Error type for 'Broker Not reachable' with a list of unreachable addresses
func (*ErrBrokersNotReachable) Error ¶
func (e *ErrBrokersNotReachable) Error() string
Error returns the error message with a list of unreachable addresses
type ErrInvalidBrokers ¶
type ErrInvalidBrokers struct {
Addrs []string
}
ErrInvalidBrokers is an Error type for 'Invalid topic info' with a list of invalid broker addresses
func (*ErrInvalidBrokers) Error ¶
func (e *ErrInvalidBrokers) Error() string
Error returns the error message with a list of broker addresses that returned unexpected responses
type ErrNoChannel ¶
type ErrNoChannel struct {
ChannelNames []string
}
ErrNoChannel is an Error type generated when a kafka producer or consumer is created with a missing channel
func (*ErrNoChannel) Error ¶
func (e *ErrNoChannel) Error() string
Error returns the error message with a list of missing channels
type HealthInfo ¶ added in v2.4.3
HealthInfo contains the health information for one broker
type HealthInfoMap ¶ added in v2.4.3
type HealthInfoMap map[*sarama.Broker]HealthInfo
HealthInfoMap contains the health information for a set of brokers
func (*HealthInfoMap) ErrorMsg ¶ added in v2.4.3
func (h *HealthInfoMap) ErrorMsg() string
ErrorMsg returns an tailored message according to the information kept in HealthInfoMap
func (*HealthInfoMap) UpdateStatus ¶ added in v2.4.3
func (h *HealthInfoMap) UpdateStatus(state *health.CheckState, minHealthyThreshold int, msgHealthy string) error
UpdateStatus returns the health status string according to the provided minimum number of healthy brokers for the group to be considered healthy. If the health status is OK, the provided msgHealthy will be used as status message.
type IConsumerGroup ¶
type IConsumerGroup interface { Channels() *ConsumerGroupChannels IsInitialised() bool Initialise(ctx context.Context) error StopListeningToConsumer(ctx context.Context) (err error) Checker(ctx context.Context, state *health.CheckState) error Close(ctx context.Context, optFuncs ...OptFunc) (err error) }
IConsumerGroup is an interface representing a Kafka Consumer Group.
type IProducer ¶
type IProducer interface { Channels() *ProducerChannels IsInitialised() bool Initialise(ctx context.Context) error Checker(ctx context.Context, state *health.CheckState) error Close(ctx context.Context) (err error) AddHeader(key, value string) }
IProducer is an interface representing a Kafka Producer
type Message ¶
type Message interface { // GetData returns the message contents. GetData() []byte // GetHeader takes a key for the header and returns the value if the key exist in the header. GetHeader(key string) string // Context returns a context with traceid. Context() context.Context // Mark marks the message as consumed, but doesn't commit the offset to the backend Mark() // Commit marks the message as consumed and commits its offset to the backend Commit() // Release closes the UpstreamDone channel for this message Release() // CommitAndRelease marks a message as consumed, commits it and closes the UpstreamDone channel CommitAndRelease() // Offset returns the message offset Offset() int64 // UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed UpstreamDone() chan struct{} }
Message represents a single kafka message.
type OptFunc ¶ added in v2.7.0
type OptFunc func()
OptFunc is basically an optional function that is run once the upstream channel is closed and before consumer closer is called. for example , while doing the graceful shutdown you would have received messages which are not processed, like you would have released a message when you receive it from upstream and added to a batch and after a certain time when the batch is processed then only messages are committed back . Now if you don't process this batch during the graceful shutdown this can create a lag in the consumer group.
The optional functions can basically do this for you. During the graceful shutdown you can pass, say for the above case the batch processing to process the unprocessed batch and commit them back to the consumer group while making sure no new messages are received.The optional function is run once the upstream channel is closed to make sure no new messages are received and before the consumer is closed so that the remaining messages can be processed and committed back to the consumer group.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a producer of Kafka messages
func NewProducer ¶
func NewProducer(ctx context.Context, brokerAddrs []string, topic string, channels *ProducerChannels, pConfig *ProducerConfig) (producer *Producer, err error)
NewProducer returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.
func (*Producer) Channels ¶
func (p *Producer) Channels() *ProducerChannels
Channels returns the Producer channels for this producer
func (*Producer) Checker ¶
Checker checks health of Kafka producer and updates the provided CheckState accordingly
func (*Producer) Close ¶
Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline. Passing a nil context will provide no timeout and this is not recommended
func (*Producer) Initialise ¶
Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.
func (*Producer) IsInitialised ¶
IsInitialised returns true only if Sarama producer has been correctly initialised.
type ProducerChannels ¶
type ProducerChannels struct { Output chan []byte Errors chan error Ready chan struct{} Closer chan struct{} Closed chan struct{} }
ProducerChannels represents the channels used by Producer.
func CreateProducerChannels ¶
func CreateProducerChannels() *ProducerChannels
CreateProducerChannels initialises a ProducerChannels with new channels.
func (*ProducerChannels) LogErrors ¶
func (producerChannels *ProducerChannels) LogErrors(ctx context.Context, errMsg string)
LogErrors creates a go-routine that waits on chErrors channel and logs any error received. It exits on chCloser channel event. Provided context and errMsg will be used in the log Event.
func (*ProducerChannels) Validate ¶
func (producerChannels *ProducerChannels) Validate() error
Validate returns ErrNoChannel if any producer channel is nil
type ProducerConfig ¶
type ProducerConfig struct { KafkaVersion *string MaxMessageBytes *int RetryMax *int KeepAlive *time.Duration RetryBackoff *time.Duration RetryBackoffFunc *func(retries, maxRetries int) time.Duration SecurityConfig *SecurityConfig }
ProducerConfig exposes the optional configurable parameters for a producer to overwrite default Sarama config values. Any value that is not provied will use the default Sarama config value.
type SaramaAsyncProducer ¶
type SaramaAsyncProducer = sarama.AsyncProducer
SaramaAsyncProducer is a wrapper around sarama.AsyncProducer
type SaramaConsumerGroup ¶
type SaramaConsumerGroup = sarama.ConsumerGroup
SaramaConsumerGroup is a wrapper around sarama.ConsumerGroup
type SaramaConsumerGroupClaim ¶
type SaramaConsumerGroupClaim = sarama.ConsumerGroupClaim
SaramaConsumerGroupClaim is a wrapper around sarama.ConsumerGroupClaim
type SaramaConsumerGroupSession ¶
type SaramaConsumerGroupSession = sarama.ConsumerGroupSession
SaramaConsumerGroupSession is a wrapper around sarama.ConsumerGroupSession
type SaramaMessage ¶
type SaramaMessage struct {
// contains filtered or unexported fields
}
SaramaMessage represents a Sarama specific Kafka message
func (SaramaMessage) Commit ¶
func (M SaramaMessage) Commit()
Commit marks the message as consumed, and then commits the offset to the backend
func (SaramaMessage) CommitAndRelease ¶ added in v2.1.0
func (M SaramaMessage) CommitAndRelease()
CommitAndRelease marks the message as consumed, commits the offset to the backend and releases the UpstreamDone channel
func (SaramaMessage) Context ¶ added in v2.6.0
func (M SaramaMessage) Context() context.Context
Context returns a context with traceid.
func (SaramaMessage) GetData ¶
func (M SaramaMessage) GetData() []byte
GetData returns the message contents.
func (SaramaMessage) GetHeader ¶ added in v2.6.0
func (M SaramaMessage) GetHeader(key string) string
GetHeader takes a key for the header and returns the value if the key exist in the header.
func (SaramaMessage) Mark ¶ added in v2.1.0
func (M SaramaMessage) Mark()
Mark marks the message as consumed, but doesn't commit the offset to the backend
func (SaramaMessage) Offset ¶
func (M SaramaMessage) Offset() int64
Offset returns the message offset
func (SaramaMessage) Release ¶ added in v2.1.0
func (M SaramaMessage) Release()
Release closes the UpstreamDone channel, but doesn't mark the message or commit the offset
func (SaramaMessage) UpstreamDone ¶
func (M SaramaMessage) UpstreamDone() chan struct{}
UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed (same effect as calling Release)
type SecurityConfig ¶ added in v2.2.0
type SecurityConfig struct { RootCACerts string ClientCert string ClientKey string InsecureSkipVerify bool }
SecurityConfig is common to producers and consumer configs, above
func GetSecurityConfig ¶ added in v2.3.0
func GetSecurityConfig(caCerts, clientCert, clientKey string, skipVerify bool) *SecurityConfig
type TopicAuth ¶ added in v2.3.0
type TopicAuthList ¶ added in v2.3.0
func (TopicAuthList) Apply ¶ added in v2.3.0
func (t TopicAuthList) Apply(adm sarama.ClusterAdmin) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package avro provides a user functionality to return the avro encoding of s.
|
Package avro provides a user functionality to return the avro encoding of s. |
examples
|
|