mika

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2026 License: MIT Imports: 21 Imported by: 0

README

MiKa (Micro Kafka)

MiKa is a lightweight Kafka client written in Go that simplifies working with Kafka in event-driven systems. It is wrapper around the fully-fledged twmb/franz-go Kafka client which handles all communication between the client and the Kafka brokers.

Why MiKa?

MiKa offers the following functionality out of the box

  • Consumer-to-topic binding: Consumers are attached to topics (1-to-1) and registered during client initialization. MiKa is responsible for delegating records to the correct consumer based on the topic where the record originated from. From a users perspective, this makes Kafka behave more like a traditional messaging broker.
  • Record Acknowledgement: MiKa supports record acknowledgment which puts the responsibility on the application to mark records as either succeeded (acked) or failed after consumption. Failed records can be retried or parked in a DLQ.
  • Retries and DLQ: The client can be configured with a retry and DLQ topic. This serves as a mechanism to "unblock" the client while preventing data loss when records are processed unsuccessfully. These records can be replayed later automatically (via the retry topic) or manually (via the DLQ topic). Multiple clients can share the same retry/DLQ topic while only retrying records belonging to that client.
  • Graceful shutdown: The client can be shut down gracefully to allow already polled records to finish processing within a configurable timeframe.

Usage

Installation
go get github.com/emillamm/mika
Initialize and configure the client

The client is created by calling NewKafkaClient(ctx, env) which takes a context and function of the signature func(string)string to get environment configuration. If the context expires, the client will be closed which can be useful in tests. See section Environment configuration about which environment variables can be used to configure the client.

ctx := context.Background()
env := os.Getenv
client, err := mika.NewKafkaClient(ctx, env)

The client can be configured with a consumer group, a retry and DLQ topic. These configurations are optional although a consumer group is required if either retries or DLQ are configured. In most cases you would want to attach a unique consumer group to the client.

client.SetGroup("my-consumer-group")
client.SetRetryTopic("my-retry-topic")
client.SetDlqTopic("my-dlq-topic")
Register consumers
topic := "my-topic"
numberOfRetries := 2
useDlq := true

func myConsumerFunc(record *mika.ConsumeRecord) {
    // process record
}

client.RegisterConsumer(topic, numberOfRetries, useDlq, myConsumerFunc)
Start the client

Calling client.Start() will start a poll, process, commit loop if any consumers are registered and enabled. The method returns a chan error that will receive errors encountered during the lifecycle of the client including fetch and commit errors. Any encountered fetch or commit errors will result in the client being closed.

for err := range client.Start() {
    // handle error
}
Consume records

The consumer function takes a ConsumeRecord as an argument which exposes the underlying *kgo.Record that contains the data for the record (see documentation). It also provides Ack() and Fail(reason error) methods that should be called when the record has been processed.

func consumeMyTopic(record *mika.ConsumeRecord) {
    bytes := record.Underlying.Value
    if err := processBytes(bytes); err != nil {
        record.Fail(err)
        return
    }
    record.Ack()
}

By default, Fail(err) skips retries and sends the record directly to the DLQ (if configured). To opt in to retries, wrap the error in a RetryableError:

func consumeMyTopic(record *mika.ConsumeRecord) {
    bytes := record.Underlying.Value
    if err := processBytes(bytes); err != nil {
        // Retryable errors will be published to the retry topic
        record.Fail(mika.NewRetryableError(err))
        return
    }
    record.Ack()
}

Here is how Ack/Fail work under the hood. Under the hood, records are polled in batches by the client and delegated to the consumers based on the topic they came from. A batch is committed and a new poll is started once all records are completed. In this context, completed means either of the following

  • The record is "acked" by calling record.Ack()
  • The record is "failed" by calling record.Fail(mika.NewRetryableError(reason)) and the record has not exceeded the number of retries allowed by the consumer.
  • The record is "failed" by calling record.Fail(reason) (non-retryable) and the consumer has enabled DLQ.

If a batch of records is never fully completed the poll loop will stall forever. It is important to note that all consumers on this client will stall, not just the offending consumer. To avoid this, it is recommended to use a DLQ or call "ack" failed records that are not worth reprocessing.

Stop the client

The client can be stopped gracefully which allows the current batch of polled records to be processed within a timeframe that can be controlled by a context.

ctx, _ := context.WithTimeout(context.Background(), 15 * time.Second)
client.CloseGracefully(ctx) // non-blocking
client.WaitForDone() // blocks until the client is fully closed

Environment Configuration

MiKa uses environment variables for connection and consumer settings.

Connection Settings
Variable Description Default
KAFKA_BROKERS Comma-separated list of broker addresses localhost:29092
KAFKA_TLS_ENABLED Enable TLS encryption false
KAFKA_SASL_MECHANISM SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 (none)
KAFKA_SASL_USERNAME SASL username (required when mechanism is set)
KAFKA_SASL_PASSWORD SASL password (required when mechanism is set)
Consumer Settings
Variable Description Default
KAFKA_CONSUMER_START_FROM RFC3339 timestamp for initial offset (new consumer groups only) (none)
KAFKA_GLOBAL_SUFFIX Suffix appended to all consumer groups and topics (including retry/DLQ). This is useful for isolating consumer groups and topics in shared Kafka clusters, such as local development or testing environments. For example, setting KAFKA_GLOBAL_SUFFIX=-dev would transform my-group to my-group-dev and my-topic to my-topic-dev. (none)

The KAFKA_GLOBAL_SUFFIX is useful for isolating consumer groups and topics in shared Kafka clusters, such as local development or testing environments. For example, setting KAFKA_GLOBAL_SUFFIX=-dev would transform my-group to my-group-dev and my-topic to my-topic-dev.

Connecting to Confluent Cloud

To connect to a Confluent Cloud cluster, set the following environment variables:

export KAFKA_BROKERS="pkc-xxxxx.region.cloud.confluent.cloud:9092"
export KAFKA_TLS_ENABLED=true
export KAFKA_SASL_MECHANISM=PLAIN
export KAFKA_SASL_USERNAME="<API_KEY>"
export KAFKA_SASL_PASSWORD="<API_SECRET>"

The API key and secret can be generated from the Confluent Cloud Console under your cluster's API keys section.

Documentation

Index

Constants

View Source
const (
	TraceParentHeader = "traceparent"
	TraceStateHeader  = "tracestate"
)

Header keys for trace context propagation

View Source
const FailureHeaderKeyFmt = "mika_err_%s_%s"
View Source
const KeyErrMessage = "msg"
View Source
const KeyOriginalTopic = "org_topic"
View Source
const KeyRetries = "retries"

Variables

View Source
var (
	ErrConsumerTopicAlreadyExists      = consum.ErrConsumerTopicAlreadyExists
	ErrConsumerTopicDoesntExist        = consum.ErrConsumerTopicDoesntExist
	ErrDqlNotConfigured                = errors.New("This client was not configured to consume from dlq")
	ErrRetriedRecordWithoutConsumer    = errors.New("This client does not have a consumer for a record consumed from a retry/dlq topic. This should not happen.")
	ErrRetriedRecordFromDifferentGroup = errors.New("This client group is different from the original client group that published to a retry/dlq topic")
)

Exported types from consumer module

View Source
var ErrClientClosed = errors.New("KafkaClient is closed")
View Source
var ErrHeaderDoesNotExist = errors.New("Record header does not exist")

Functions

func ExtractTraceContext added in v0.1.6

func ExtractTraceContext(ctx context.Context, record *kgo.Record) context.Context

ExtractTraceContext extracts trace context from Kafka record headers into a new context. This should be called when consuming a record to continue the trace from the producer.

func FailureHeaderKey

func FailureHeaderKey(key string, group string) string

func GetFailureHeader

func GetFailureHeader[T HeaderTypes](
	headers *Headers,
	group string,
	key string,
	deserializer func([]byte) (T, error),
) (res T, err error)

func GetHeader

func GetHeader[T HeaderTypes](
	headers *Headers,
	key string,
	deserializer func([]byte) (T, error),
) (res T, err error)

func GetOrSetFailureHeader

func GetOrSetFailureHeader[T HeaderTypes](
	headers *Headers,
	group string,
	key string,
	value T,
	deserializer func([]byte) (T, error),
	serializer func(T) []byte,
) (res T, err error)

func GetOrSetHeader

func GetOrSetHeader[T HeaderTypes](
	headers *Headers,
	key string,
	value T,
	deserializer func([]byte) (T, error),
	serializer func(T) []byte,
) (T, error)

func InjectTraceContext added in v0.1.6

func InjectTraceContext(ctx context.Context, record *kgo.Record)

InjectTraceContext injects the trace context from ctx into the Kafka record headers. This should be called before publishing a record to propagate trace context to consumers.

func IntDeserializer

func IntDeserializer(bytes []byte) (int, error)

func IntSerializer

func IntSerializer(v int) []byte

func LoadKgoClient

func LoadKgoClient(
	env envx.EnvX,
	consumeTopics []string,
	group string,
	startOffset kgo.Offset,
) (client *kgo.Client, err error)

func SetFailureHeader

func SetFailureHeader[T HeaderTypes](
	headers *Headers,
	group string,
	key string,
	value T,
	serializer func(T) []byte,
)

func SetHeader

func SetHeader[T HeaderTypes](
	headers *Headers,
	key string,
	value T,
	serializer func(T) []byte,
)

func SetSpanError added in v0.1.6

func SetSpanError(span trace.Span, err error)

SetSpanError marks the span as having an error.

func SetSpanOK added in v0.1.6

func SetSpanOK(span trace.Span)

SetSpanOK marks the span as successful.

func StartConsumeSpan added in v0.1.6

func StartConsumeSpan(ctx context.Context, record *kgo.Record) (context.Context, trace.Span)

StartConsumeSpan starts a span for a Kafka consume operation. It extracts trace context from the record headers to link to the producer span. Returns the context with the span and the span itself (caller must call span.End()).

func StartProduceSpan added in v0.1.6

func StartProduceSpan(ctx context.Context, topic string, record *kgo.Record) (context.Context, trace.Span)

StartProduceSpan starts a span for a Kafka produce operation. Returns the context with the span and the span itself (caller must call span.End()).

func StringDeserializer

func StringDeserializer(bytes []byte) (string, error)

func StringSerializer

func StringSerializer(v string) []byte

Types

type ConsumeRecord

type ConsumeRecord = consum.ConsumeRecord

type FailureHeaders

type FailureHeaders struct {
	OriginalTopic string
	ErrMessage    string
	Retries       int
}

func InitFailureHeaders

func InitFailureHeaders(headers *Headers, group string, reason error) (state *FailureHeaders, err error)

func (*FailureHeaders) IncrementRetries

func (r *FailureHeaders) IncrementRetries(headers *Headers, group string)

type HeaderTypes

type HeaderTypes interface{ int | string }

type HeaderValue

type HeaderValue struct {
	Index int
	Bytes []byte
}

type Headers

type Headers struct {
	// contains filtered or unexported fields
}

A structure that helps managing record headers

func NewHeaders

func NewHeaders(record *kgo.Record) *Headers

type KafkaClient

type KafkaClient struct {
	// contains filtered or unexported fields
}

func NewKafkaClient

func NewKafkaClient(ctx context.Context, env envx.EnvX) (client *KafkaClient, err error)

Create a new kafka client that will shutdown (not gracefully) if the context expires. In order to shutdown gracefully, i.e. finish processing and committing fetched records, call client.CloseGracefully(ctx) with a context that dictates the graceful shutdown period.

func (*KafkaClient) Close

func (k *KafkaClient) Close()

No-op if client is already closed. Otherwise stop consumption and close underlying client. When the client is fully closed, ErrClientClosed will be returned via the error channel.

func (*KafkaClient) CloseGracefully added in v0.1.8

func (k *KafkaClient) CloseGracefully(ctx context.Context)

Close client by stopping consumption gracefully if consumer is actively consuming and not terminating. Otherwise, this will close client normally.

func (*KafkaClient) DisableConsumerTopic

func (k *KafkaClient) DisableConsumerTopic(ctx context.Context, topic string) error

Disable consumption of topic if it was previously enabled. This means pausing the consumer if it was disabled after the client was started or removing the consumer if it was disabled before the client was started. If the consumer is already disabled, this is a no-op. If topic was never registered, this will return ErrConsumerTopicDoesntExist. If requires remote changes, a call to SyncConsumerTopics() is made. If the sync operation times out (context expires), the registry might be out of sync with the broker. An error is returned in this case and it is up to the caller to handle the error by retrying the sync operation for example.

func (*KafkaClient) DisableDlqConsumption

func (k *KafkaClient) DisableDlqConsumption() error

Stop consuming from dlq topic by pausing fetching from the topic. If a dlq topic was not configured, it returns ErrDqlNotConfigured.

func (*KafkaClient) EnableConsumerTopic

func (k *KafkaClient) EnableConsumerTopic(ctx context.Context, topic string) error

Enable consumption of topic if it was previously disabled. This means resuming the consumer if it was disabled after the client was started or adding the consumer (for the first time) if it was disabled before the client was started. If the consumer is already enabled, this is a no-op. If topic was never registered, this will return ErrConsumerTopicDoesntExist. If requires remote changes, a call to SyncConsumerTopics() is made. If the sync operation times out (context expires), the registry might be out of sync with the broker. An error is returned in this case and it is up to the caller to handle the error by retrying the sync operation for example.

func (*KafkaClient) EnableDlqConsumption

func (k *KafkaClient) EnableDlqConsumption() error

Start consuming from dlq topic by adding topic or resume consumption if it was previously added. If a dlq topic was not configured, it returns ErrDqlNotConfigured

func (*KafkaClient) IsClosed

func (k *KafkaClient) IsClosed() bool

Returns true if the client is closed, false otherwise.

func (*KafkaClient) IsStarted

func (k *KafkaClient) IsStarted() bool

Returns true if client.Start() has been called, false otherwise.

func (*KafkaClient) PublishRecord

func (k *KafkaClient) PublishRecord(
	ctx context.Context,
	topic string,
	record *kgo.Record,
) (err error)

Produce a record to the give topic. If the provided context expires, the method will fail and return an error. If the client is closed, an error will also be returned. If the client is currently terminating gracefully, publishing will be allowed for as long as the underlying client is alive.

func (*KafkaClient) RegisterConsumer

func (k *KafkaClient) RegisterConsumer(c consumer.Consumer) error

See RegisterConsumerFunc for behavior.

func (*KafkaClient) RegisterConsumerFunc added in v0.1.3

func (k *KafkaClient) RegisterConsumerFunc(
	topic string,
	retries int,
	useDlq bool,
	process func(*ConsumeRecord),
) error

Register a topic and consumer. If a topic already exists in the registry, returns ErrConsumerTopicAlreadyExists, otherwise returns nil. Consumers must be registered before the client is started, otherwise it panics. A consumer is enabled by default when registered.

func (*KafkaClient) SetDlqTopic

func (k *KafkaClient) SetDlqTopic(topic string)

Set client dlq topic which allows for consumers to replay failed records. This must be called before client is started otherwise it will panic. This option also requires using a consumer group. The group is used to identify whether or not a client "owns" a record that came from the dlq topic as multiple clients can share the same dlq topic. If a group is not configured when client.Start() is called, it will panic.

func (*KafkaClient) SetGroup

func (k *KafkaClient) SetGroup(group string)

Set client consumer group. This must be called before client is started otherwise it will panic.

func (*KafkaClient) SetRetryTopic

func (k *KafkaClient) SetRetryTopic(topic string)

Set client retry topic which allows for consumers to replay failed records. This must be called before client is started otherwise it will panic. This option also requires using a consumer group. The group is used to identify whether or not a client "owns" a record that came from the retry topic as multiple clients can share the same retry topic. If a group is not configured when client.Start() is called, it will panic.

func (*KafkaClient) Start

func (k *KafkaClient) Start() (errs <-chan error)

Setup client connection to brokers. If any consumers are registered, start consuming immediately. Calling this if the client is already started, will not have an effect. Calling Start if the client is closed, will not have an effect. The returned error channel is unbuffered and needs to have a listener for the full lifecycle of the client. Otherwise, the client will block when trying to push errors onto the channel.

func (*KafkaClient) SyncConsumerTopics

func (k *KafkaClient) SyncConsumerTopics() error

Pause/resume consumption of topics in Kafka according to the status of the consumers in the registry.

func (*KafkaClient) WaitForDone

func (k *KafkaClient) WaitForDone()

Wait for client to be fully closed

type RetryableError added in v0.1.8

type RetryableError struct {
	Err error
}

RetryableError wraps an error to indicate that the failure is eligible for retry. When a record is failed with a RetryableError, MiKa will attempt to publish the record to the retry topic (if configured and retries remain). Non-retryable errors skip the retry topic and go directly to DLQ or return an error.

func NewRetryableError added in v0.1.8

func NewRetryableError(err error) *RetryableError

func (*RetryableError) Error added in v0.1.8

func (e *RetryableError) Error() string

func (*RetryableError) Unwrap added in v0.1.8

func (e *RetryableError) Unwrap() error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL