Documentation
¶
Index ¶
- Constants
- Variables
- func ExtractTraceContext(ctx context.Context, record *kgo.Record) context.Context
- func FailureHeaderKey(key string, group string) string
- func GetFailureHeader[T HeaderTypes](headers *Headers, group string, key string, ...) (res T, err error)
- func GetHeader[T HeaderTypes](headers *Headers, key string, deserializer func([]byte) (T, error)) (res T, err error)
- func GetOrSetFailureHeader[T HeaderTypes](headers *Headers, group string, key string, value T, ...) (res T, err error)
- func GetOrSetHeader[T HeaderTypes](headers *Headers, key string, value T, deserializer func([]byte) (T, error), ...) (T, error)
- func InjectTraceContext(ctx context.Context, record *kgo.Record)
- func IntDeserializer(bytes []byte) (int, error)
- func IntSerializer(v int) []byte
- func LoadKgoClient(env envx.EnvX, consumeTopics []string, group string, startOffset kgo.Offset) (client *kgo.Client, err error)
- func SetFailureHeader[T HeaderTypes](headers *Headers, group string, key string, value T, serializer func(T) []byte)
- func SetHeader[T HeaderTypes](headers *Headers, key string, value T, serializer func(T) []byte)
- func SetSpanError(span trace.Span, err error)
- func SetSpanOK(span trace.Span)
- func StartConsumeSpan(ctx context.Context, record *kgo.Record) (context.Context, trace.Span)
- func StartProduceSpan(ctx context.Context, topic string, record *kgo.Record) (context.Context, trace.Span)
- func StringDeserializer(bytes []byte) (string, error)
- func StringSerializer(v string) []byte
- type ConsumeRecord
- type FailureHeaders
- type HeaderTypes
- type HeaderValue
- type Headers
- type KafkaClient
- func (k *KafkaClient) Close()
- func (k *KafkaClient) CloseGracefully(ctx context.Context)
- func (k *KafkaClient) DisableConsumerTopic(ctx context.Context, topic string) error
- func (k *KafkaClient) DisableDlqConsumption() error
- func (k *KafkaClient) EnableConsumerTopic(ctx context.Context, topic string) error
- func (k *KafkaClient) EnableDlqConsumption() error
- func (k *KafkaClient) IsClosed() bool
- func (k *KafkaClient) IsStarted() bool
- func (k *KafkaClient) PublishRecord(ctx context.Context, topic string, record *kgo.Record) (err error)
- func (k *KafkaClient) RegisterConsumer(c consumer.Consumer) error
- func (k *KafkaClient) RegisterConsumerFunc(topic string, retries int, useDlq bool, process func(*ConsumeRecord)) error
- func (k *KafkaClient) SetDlqTopic(topic string)
- func (k *KafkaClient) SetGroup(group string)
- func (k *KafkaClient) SetRetryTopic(topic string)
- func (k *KafkaClient) Start() (errs <-chan error)
- func (k *KafkaClient) SyncConsumerTopics() error
- func (k *KafkaClient) WaitForDone()
- type RetryableError
Constants ¶
const ( TraceParentHeader = "traceparent" TraceStateHeader = "tracestate" )
Header keys for trace context propagation
const FailureHeaderKeyFmt = "mika_err_%s_%s"
const KeyErrMessage = "msg"
const KeyOriginalTopic = "org_topic"
const KeyRetries = "retries"
Variables ¶
var ( ErrConsumerTopicAlreadyExists = consum.ErrConsumerTopicAlreadyExists ErrConsumerTopicDoesntExist = consum.ErrConsumerTopicDoesntExist ErrDqlNotConfigured = errors.New("This client was not configured to consume from dlq") ErrRetriedRecordWithoutConsumer = errors.New("This client does not have a consumer for a record consumed from a retry/dlq topic. This should not happen.") ErrRetriedRecordFromDifferentGroup = errors.New("This client group is different from the original client group that published to a retry/dlq topic") )
Exported types from consumer module
var ErrClientClosed = errors.New("KafkaClient is closed")
var ErrHeaderDoesNotExist = errors.New("Record header does not exist")
Functions ¶
func ExtractTraceContext ¶ added in v0.1.6
ExtractTraceContext extracts trace context from Kafka record headers into a new context. This should be called when consuming a record to continue the trace from the producer.
func FailureHeaderKey ¶
func GetFailureHeader ¶
func GetOrSetFailureHeader ¶
func GetOrSetHeader ¶
func InjectTraceContext ¶ added in v0.1.6
InjectTraceContext injects the trace context from ctx into the Kafka record headers. This should be called before publishing a record to propagate trace context to consumers.
func IntDeserializer ¶
func IntSerializer ¶
func LoadKgoClient ¶
func SetFailureHeader ¶
func SetFailureHeader[T HeaderTypes]( headers *Headers, group string, key string, value T, serializer func(T) []byte, )
func SetHeader ¶
func SetHeader[T HeaderTypes]( headers *Headers, key string, value T, serializer func(T) []byte, )
func SetSpanError ¶ added in v0.1.6
SetSpanError marks the span as having an error.
func StartConsumeSpan ¶ added in v0.1.6
StartConsumeSpan starts a span for a Kafka consume operation. It extracts trace context from the record headers to link to the producer span. Returns the context with the span and the span itself (caller must call span.End()).
func StartProduceSpan ¶ added in v0.1.6
func StartProduceSpan(ctx context.Context, topic string, record *kgo.Record) (context.Context, trace.Span)
StartProduceSpan starts a span for a Kafka produce operation. Returns the context with the span and the span itself (caller must call span.End()).
func StringDeserializer ¶
func StringSerializer ¶
Types ¶
type ConsumeRecord ¶
type ConsumeRecord = consum.ConsumeRecord
type FailureHeaders ¶
func InitFailureHeaders ¶
func InitFailureHeaders(headers *Headers, group string, reason error) (state *FailureHeaders, err error)
func (*FailureHeaders) IncrementRetries ¶
func (r *FailureHeaders) IncrementRetries(headers *Headers, group string)
type HeaderTypes ¶
type HeaderValue ¶
type Headers ¶
type Headers struct {
// contains filtered or unexported fields
}
A structure that helps managing record headers
func NewHeaders ¶
type KafkaClient ¶
type KafkaClient struct {
// contains filtered or unexported fields
}
func NewKafkaClient ¶
Create a new kafka client that will shutdown (not gracefully) if the context expires. In order to shutdown gracefully, i.e. finish processing and committing fetched records, call client.CloseGracefully(ctx) with a context that dictates the graceful shutdown period.
func (*KafkaClient) Close ¶
func (k *KafkaClient) Close()
No-op if client is already closed. Otherwise stop consumption and close underlying client. When the client is fully closed, ErrClientClosed will be returned via the error channel.
func (*KafkaClient) CloseGracefully ¶ added in v0.1.8
func (k *KafkaClient) CloseGracefully(ctx context.Context)
Close client by stopping consumption gracefully if consumer is actively consuming and not terminating. Otherwise, this will close client normally.
func (*KafkaClient) DisableConsumerTopic ¶
func (k *KafkaClient) DisableConsumerTopic(ctx context.Context, topic string) error
Disable consumption of topic if it was previously enabled. This means pausing the consumer if it was disabled after the client was started or removing the consumer if it was disabled before the client was started. If the consumer is already disabled, this is a no-op. If topic was never registered, this will return ErrConsumerTopicDoesntExist. If requires remote changes, a call to SyncConsumerTopics() is made. If the sync operation times out (context expires), the registry might be out of sync with the broker. An error is returned in this case and it is up to the caller to handle the error by retrying the sync operation for example.
func (*KafkaClient) DisableDlqConsumption ¶
func (k *KafkaClient) DisableDlqConsumption() error
Stop consuming from dlq topic by pausing fetching from the topic. If a dlq topic was not configured, it returns ErrDqlNotConfigured.
func (*KafkaClient) EnableConsumerTopic ¶
func (k *KafkaClient) EnableConsumerTopic(ctx context.Context, topic string) error
Enable consumption of topic if it was previously disabled. This means resuming the consumer if it was disabled after the client was started or adding the consumer (for the first time) if it was disabled before the client was started. If the consumer is already enabled, this is a no-op. If topic was never registered, this will return ErrConsumerTopicDoesntExist. If requires remote changes, a call to SyncConsumerTopics() is made. If the sync operation times out (context expires), the registry might be out of sync with the broker. An error is returned in this case and it is up to the caller to handle the error by retrying the sync operation for example.
func (*KafkaClient) EnableDlqConsumption ¶
func (k *KafkaClient) EnableDlqConsumption() error
Start consuming from dlq topic by adding topic or resume consumption if it was previously added. If a dlq topic was not configured, it returns ErrDqlNotConfigured
func (*KafkaClient) IsClosed ¶
func (k *KafkaClient) IsClosed() bool
Returns true if the client is closed, false otherwise.
func (*KafkaClient) IsStarted ¶
func (k *KafkaClient) IsStarted() bool
Returns true if client.Start() has been called, false otherwise.
func (*KafkaClient) PublishRecord ¶
func (k *KafkaClient) PublishRecord( ctx context.Context, topic string, record *kgo.Record, ) (err error)
Produce a record to the give topic. If the provided context expires, the method will fail and return an error. If the client is closed, an error will also be returned. If the client is currently terminating gracefully, publishing will be allowed for as long as the underlying client is alive.
func (*KafkaClient) RegisterConsumer ¶
func (k *KafkaClient) RegisterConsumer(c consumer.Consumer) error
See RegisterConsumerFunc for behavior.
func (*KafkaClient) RegisterConsumerFunc ¶ added in v0.1.3
func (k *KafkaClient) RegisterConsumerFunc( topic string, retries int, useDlq bool, process func(*ConsumeRecord), ) error
Register a topic and consumer. If a topic already exists in the registry, returns ErrConsumerTopicAlreadyExists, otherwise returns nil. Consumers must be registered before the client is started, otherwise it panics. A consumer is enabled by default when registered.
func (*KafkaClient) SetDlqTopic ¶
func (k *KafkaClient) SetDlqTopic(topic string)
Set client dlq topic which allows for consumers to replay failed records. This must be called before client is started otherwise it will panic. This option also requires using a consumer group. The group is used to identify whether or not a client "owns" a record that came from the dlq topic as multiple clients can share the same dlq topic. If a group is not configured when client.Start() is called, it will panic.
func (*KafkaClient) SetGroup ¶
func (k *KafkaClient) SetGroup(group string)
Set client consumer group. This must be called before client is started otherwise it will panic.
func (*KafkaClient) SetRetryTopic ¶
func (k *KafkaClient) SetRetryTopic(topic string)
Set client retry topic which allows for consumers to replay failed records. This must be called before client is started otherwise it will panic. This option also requires using a consumer group. The group is used to identify whether or not a client "owns" a record that came from the retry topic as multiple clients can share the same retry topic. If a group is not configured when client.Start() is called, it will panic.
func (*KafkaClient) Start ¶
func (k *KafkaClient) Start() (errs <-chan error)
Setup client connection to brokers. If any consumers are registered, start consuming immediately. Calling this if the client is already started, will not have an effect. Calling Start if the client is closed, will not have an effect. The returned error channel is unbuffered and needs to have a listener for the full lifecycle of the client. Otherwise, the client will block when trying to push errors onto the channel.
func (*KafkaClient) SyncConsumerTopics ¶
func (k *KafkaClient) SyncConsumerTopics() error
Pause/resume consumption of topics in Kafka according to the status of the consumers in the registry.
func (*KafkaClient) WaitForDone ¶
func (k *KafkaClient) WaitForDone()
Wait for client to be fully closed
type RetryableError ¶ added in v0.1.8
type RetryableError struct {
Err error
}
RetryableError wraps an error to indicate that the failure is eligible for retry. When a record is failed with a RetryableError, MiKa will attempt to publish the record to the retry topic (if configured and retries remain). Non-retryable errors skip the retry topic and go directly to DLQ or return an error.
func NewRetryableError ¶ added in v0.1.8
func NewRetryableError(err error) *RetryableError
func (*RetryableError) Error ¶ added in v0.1.8
func (e *RetryableError) Error() string
func (*RetryableError) Unwrap ¶ added in v0.1.8
func (e *RetryableError) Unwrap() error