Documentation
¶
Overview ¶
Package kafka provides functionality for interacting with Apache Kafka.
The kafka package offers a simplified interface for working with Kafka message brokers, providing connection management, message publishing, and consuming capabilities with a focus on reliability and ease of use.
Architecture ¶
The package follows the "accept interfaces, return structs" Go idiom:
- Client interface: Defines the contract for Kafka operations
- KafkaClient struct: Concrete implementation of the Client interface
- Message interface: Defines the contract for consumed messages
- Constructor returns *KafkaClient (concrete type)
- FX module provides both *KafkaClient and Client interface
This design allows:
- Direct usage: Use *KafkaClient for simple cases
- Interface usage: Depend on Client interface for testability and flexibility
- Zero adapters needed: Consumer code can use type aliases
Core Features:
- Robust connection management with automatic reconnection
- Simple publishing interface with error handling
- Consumer interface with automatic commit handling
- Consumer group support
- Separate schema_registry package for Confluent Schema Registry
- Integration with the Logger package for structured logging
- Distributed tracing support via message headers
Basic Usage (Direct) ¶
import (
"github.com/Aleph-Alpha/std/v1/kafka"
"context"
"sync"
)
// Create a new Kafka client (returns concrete *KafkaClient)
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
GroupID: "my-consumer-group",
IsConsumer: true,
EnableAutoCommit: false, // Manual commit for safety
DataType: "json", // Automatic JSON serializer (default)
})
if err != nil {
log.Fatal("Failed to connect to Kafka", err)
}
defer client.GracefulShutdown()
// Publish a message (automatically serialized as JSON)
ctx := context.Background()
event := map[string]interface{}{"id": "123", "name": "John"}
err = client.Publish(ctx, "key", event)
if err != nil {
log.Printf("Failed to publish message: %v", err)
}
FX Module Integration ¶
The package provides an FX module that injects both concrete and interface types:
import (
"github.com/Aleph-Alpha/std/v1/kafka"
"go.uber.org/fx"
)
app := fx.New(
kafka.FXModule,
fx.Provide(
func() kafka.Config {
return kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
IsConsumer: false,
DataType: "json",
}
},
),
fx.Invoke(func(k kafka.Client) {
// Use the Client interface
ctx := context.Background()
event := map[string]interface{}{"id": "123"}
k.Publish(ctx, "key", event)
}),
)
app.Run()
Using Type Aliases (Recommended for Consumer Code) ¶
Consumer applications can use type aliases to avoid creating adapters:
// In your application's messaging package package messaging import stdKafka "github.com/Aleph-Alpha/std/v1/kafka" // Type aliases reference std interfaces directly type Client = stdKafka.Client type Message = stdKafka.Message
Then use these aliases throughout your application:
func MyService(kafka messaging.Client) {
kafka.Publish(ctx, "key", data)
}
Consuming Messages ¶
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
// Process the message
fmt.Printf("Received: %s\n", string(msg.Body()))
// Commit the message
if err := msg.CommitMsg(); err != nil {
log.Printf("Failed to commit: %v", err)
}
}
High-Throughput Consumption with Parallel Workers ¶
For high-volume topics, use ConsumeParallel to process messages concurrently:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Use 5 concurrent workers for better throughput
msgChan := client.ConsumeParallel(ctx, wg, 5)
for msg := range msgChan {
// Process messages concurrently
processMessage(msg)
// Commit the message
if err := msg.CommitMsg(); err != nil {
log.Printf("Failed to commit: %v", err)
}
}
Distributed Tracing with Message Headers ¶
This package supports distributed tracing by allowing you to propagate trace context through message headers, enabling end-to-end visibility across services.
Publisher Example (sending trace context):
import (
"github.com/Aleph-Alpha/std/v1/tracer"
// other imports...
)
// Create a tracer client
tracerClient := tracer.NewClient(tracerConfig, log)
// Create a span for the operation that includes publishing
ctx, span := tracerClient.StartSpan(ctx, "process-and-publish")
defer span.End()
// Process data...
// Extract trace context as headers before publishing
traceHeaders := tracerClient.GetCarrier(ctx)
// Publish with trace headers
err = kafkaClient.Publish(ctx, "key", message, traceHeaders)
if err != nil {
span.RecordError(err)
log.Error("Failed to publish message", err, nil)
}
Consumer Example (continuing the trace):
msgChan := kafkaClient.Consume(ctx, wg)
for msg := range msgChan {
// Extract trace headers from the message
headers := msg.Header()
// Create a new context with the trace information
ctx = tracerClient.SetCarrierOnContext(ctx, headers)
// Create a span as a child of the incoming trace
ctx, span := tracerClient.StartSpan(ctx, "process-message")
defer span.End()
// Add relevant attributes to the span
span.SetAttributes(map[string]interface{}{
"message.size": len(msg.Body()),
"message.key": msg.Key(),
})
// Process the message...
if err := processMessage(msg.Body()) {
// Record any errors in the span
span.RecordError(err)
continue
}
// Commit successful processing
if err := msg.CommitMsg(); err != nil {
span.RecordError(err)
log.Error("Failed to commit message", err, nil)
}
}
FX Module Integration:
This package provides a fx module for easy integration:
app := fx.New( logger.FXModule, // Optional: provides std logger kafka.FXModule, // ... other modules ) app.Run()
The Kafka module will automatically use the logger if it's available in the dependency injection container.
Configuration:
The kafka client can be configured via environment variables or explicitly:
KAFKA_BROKERS=localhost:9092,localhost:9093 KAFKA_TOPIC=events KAFKA_GROUP_ID=my-consumer-group
Custom Logger Integration:
You can integrate the std/v1/logger for better error logging:
import (
"github.com/Aleph-Alpha/std/v1/logger"
"github.com/Aleph-Alpha/std/v1/kafka"
)
// Create logger
log := logger.NewLoggerClient(logger.Config{
Level: logger.Info,
ServiceName: "my-service",
})
// Create Kafka client with logger
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
Logger: log, // Kafka internal errors will use this logger
IsConsumer: false,
})
Commit Modes and Acknowledgment:
Consumer Commit Modes:
The module supports different commit modes for consumers:
Manual Commit (Recommended for safety):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
GroupID: "my-group",
IsConsumer: true,
EnableAutoCommit: false, // Manual commit (default)
})
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
var event UserEvent
if err := msg.BodyAs(&event); err != nil {
continue // Don't commit on error
}
if err := processEvent(event); err != nil {
continue // Don't commit on processing error
}
// Commit only after successful processing
if err := msg.CommitMsg(); err != nil {
log.Error("Failed to commit", err, nil)
}
}
Auto-Commit (For high-throughput, at-least-once semantics):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
GroupID: "my-group",
IsConsumer: true,
EnableAutoCommit: true, // Enable auto-commit
CommitInterval: 1 * time.Second, // Commit every second
})
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
// Process message - no need to call msg.CommitMsg()
// Offsets are committed automatically every CommitInterval
processEvent(msg)
}
Producer Acknowledgment Modes:
The module supports different producer acknowledgment modes:
Fire-and-Forget (Fastest, least safe):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
IsConsumer: false,
RequiredAcks: kafka.RequireNone, // No acknowledgment (0)
})
// Fast but no guarantee of delivery
err := client.Publish(ctx, "key", event)
Leader Acknowledgment (Balanced):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
IsConsumer: false,
RequiredAcks: kafka.RequireOne, // Leader only (1)
WriteTimeout: 5 * time.Second,
})
// Balanced speed and durability
err := client.Publish(ctx, "key", event)
All Replicas Acknowledgment (Most durable, default):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
IsConsumer: false,
RequiredAcks: kafka.RequireAll, // All in-sync replicas (-1)
WriteTimeout: 10 * time.Second,
})
// Slowest but most durable
err := client.Publish(ctx, "key", event)
Automatic Serializer Selection:
The Kafka client can automatically select serializers based on the DataType config. This eliminates the need to manually configure serializers for common use cases.
JSON (default):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
DataType: "json", // Auto-uses JSONSerializer
})
// Can now publish structs directly
event := UserEvent{Name: "John", Age: 30}
err = client.Publish(ctx, "key", event) // Auto-serialized to JSON
String:
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "logs",
DataType: "string", // Auto-uses StringSerializer
})
err = client.Publish(ctx, "key", "log message")
Gob (Go binary encoding):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "data",
DataType: "gob", // Auto-uses GobSerializer
})
data := MyStruct{Field1: "value", Field2: 123}
err = client.Publish(ctx, "key", data)
Raw bytes (no serialization):
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "binary",
DataType: "bytes", // No-op serializer
})
rawBytes := []byte{0x01, 0x02, 0x03}
err = client.Publish(ctx, "key", rawBytes)
Supported DataTypes:
- "json" (default): JSONSerializer/Deserializer
- "string": StringSerializer/Deserializer
- "gob": GobSerializer/Deserializer
- "bytes": NoOpSerializer/Deserializer
- "protobuf": Requires custom serializer
- "avro": Requires custom serializer
Serialization Support:
The kafka package provides optional serialization support for automatic encoding/decoding of messages. Serializers are injected via dependency injection (FX) for better testability.
Using JSON Serialization (with FX):
import (
"go.uber.org/fx"
"github.com/Aleph-Alpha/std/v1/kafka"
)
type UserEvent struct {
Event string `json:"event"`
UserID int `json:"user_id"`
Email string `json:"email"`
}
app := fx.New(
kafka.FXModule,
fx.Provide(
func() kafka.Config {
return kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "user-events",
IsConsumer: false,
}
},
// Inject serializers
func() kafka.Serializer {
return &kafka.JSONSerializer{}
},
func() kafka.Deserializer {
return &kafka.JSONDeserializer{}
},
),
)
// Publishing with automatic serialization
event := UserEvent{
Event: "signup",
UserID: 123,
Email: "user@example.com",
}
err := client.Publish(ctx, "user-123", event) // Automatically serialized
// Consuming with automatic deserialization
msgChan := client.Consume(ctx, wg)
for msg := range msgChan {
var event UserEvent
if err := msg.BodyAs(&event); err != nil { // Automatic JSON fallback
log.Error("Failed to deserialize", err, nil)
continue
}
fmt.Printf("User %d signed up: %s\n", event.UserID, event.Email)
msg.CommitMsg()
}
Using Protocol Buffers (with FX):
import (
"google.golang.org/protobuf/proto"
"github.com/Aleph-Alpha/std/v1/kafka"
)
app := fx.New(
kafka.FXModule,
fx.Provide(
func() kafka.Config { /* ... */ },
// Inject Protobuf serializers
func() kafka.Serializer {
return &kafka.ProtobufSerializer{
MarshalFunc: proto.Marshal,
}
},
func() kafka.Deserializer {
return &kafka.ProtobufDeserializer{
UnmarshalFunc: proto.Unmarshal,
}
},
),
)
// Publish protobuf message
protoMsg := &pb.UserEvent{
Event: "signup",
UserId: 123,
Email: "user@example.com",
}
err = client.Publish(ctx, "user-123", protoMsg) // Automatically serialized
// Consume protobuf messages
msgChan := consumer.Consume(ctx, wg)
for msg := range msgChan {
var event pb.UserEvent
if err := msg.BodyAs(&event); err != nil{
log.Error("Failed to deserialize", err, nil)
continue
}
fmt.Printf("User %d signed up: %s\n", event.UserId, event.Email)
msg.CommitMsg()
}
Using Raw Bytes (no serialization):
// No serializer needed - []byte is always passed through
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
IsConsumer: false,
})
// Publish raw bytes
message := []byte(`{"event":"signup","user_id":123}`)
err = client.Publish(ctx, "user-123", message) // Passed through directly
Custom Serializers:
You can implement your own serializer for custom formats:
Apache Avro Example:
import "github.com/linkedin/goavro/v2"
codec, _ := goavro.NewCodec(`{
"type": "record",
"name": "UserEvent",
"fields": [
{"name": "event", "type": "string"},
{"name": "user_id", "type": "int"}
]
}`)
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
Serializer: &kafka.AvroSerializer{
MarshalFunc: func(data interface{}) ([]byte, error) {
return codec.BinaryFromNative(nil, data)
},
},
Deserializer: &kafka.AvroDeserializer{
UnmarshalFunc: func(data []byte, target interface{}) error {
native, _, err := codec.NativeFromBinary(data)
if err != nil {
return err
}
if targetMap, ok := target.(*map[string]interface{}); ok {
*targetMap = native.(map[string]interface{})
}
return nil
},
},
})
// Publish and consume work the same way
eventData := map[string]interface{}{"event": "signup", "user_id": 123}
err = client.Publish(ctx, "user-123", eventData)
Schema Registry Integration:
For production schema management with Confluent Schema Registry, use the separate schema_registry package:
import (
"github.com/Aleph-Alpha/std/v1/kafka"
"github.com/Aleph-Alpha/std/v1/schema_registry"
)
// Create schema registry client
registry, _ := schema_registry.NewClient(schema_registry.Config{
URL: "http://localhost:8081",
})
// Create serializer with schema registry
serializer, _ := schema_registry.NewAvroSerializer(
schema_registry.AvroSerializerConfig{
Registry: registry,
Subject: "users-value",
Schema: avroSchema,
MarshalFunc: marshalFunc,
},
)
// Inject via FX
fx.Provide(
func() (schema_registry.Registry, error) {
return schema_registry.NewClient(schema_registry.Config{
URL: "http://localhost:8081",
})
},
func(registry schema_registry.Registry) (kafka.Serializer, error) {
return schema_registry.NewAvroSerializer(...)
},
)
See the schema_registry package documentation for complete examples.
Multi-Format Support:
Use MultiFormatSerializer to support multiple formats:
multiSerializer := kafka.NewMultiFormatSerializer()
multiSerializer.RegisterSerializer("protobuf", &kafka.ProtobufSerializer{
MarshalFunc: proto.Marshal,
})
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
Serializer: multiSerializer,
})
Built-in Serializers:
- JSONSerializer: JSON format (default fallback)
- ProtobufSerializer: Protocol Buffers
- AvroSerializer: Apache Avro
- StringSerializer: Plain text
- GobSerializer: Go gob encoding (Go-to-Go only)
- NoOpSerializer: Raw []byte passthrough
- MultiFormatSerializer: Multiple formats with dynamic selection
Observability ¶
The Kafka client supports optional observability through the Observer interface from std/v1/observability. This allows external code to track all Kafka operations for metrics, tracing, and logging without tight coupling.
Basic Observability Setup ¶
To enable observability, provide an Observer implementation in the configuration:
import "github.com/Aleph-Alpha/std/v1/observability"
// Implement the Observer interface
type MyObserver struct {
metrics *prometheus.Metrics
}
func (o *MyObserver) ObserveOperation(ctx observability.OperationContext) {
if ctx.Component == "kafka" {
switch ctx.Operation {
case "produce":
o.metrics.RecordKafkaPublish(
ctx.Resource, // topic name
ctx.Size, // message bytes
ctx.Duration, // operation duration
ctx.Error, // error if any
)
case "consume":
o.metrics.RecordKafkaConsume(
ctx.Resource, // topic name
ctx.SubResource, // partition
ctx.Size, // message bytes
ctx.Duration, // fetch duration
ctx.Error, // error if any
)
}
}
}
// Configure Kafka client with observer
observer := &MyObserver{metrics: myMetrics}
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
Observer: observer, // ← Enable observability
})
Observed Operations ¶
The observer is notified of the following operations:
Produce Operation:
- Component: "kafka"
- Operation: "produce"
- Resource: Topic name (e.g., "user-events")
- SubResource: "" (empty)
- Duration: Time taken to publish message
- Size: Message size in bytes
- Error: Any error that occurred during publishing
- Metadata: nil (reserved for future use)
Consume Operation:
- Component: "kafka"
- Operation: "consume"
- Resource: Topic name (e.g., "user-events")
- SubResource: Partition number (e.g., "3")
- Duration: Time taken to fetch message from Kafka
- Size: Message size in bytes
- Error: Any error that occurred during consumption
- Metadata: nil (reserved for future use)
Example OperationContext for Publish:
observability.OperationContext{
Component: "kafka",
Operation: "produce",
Resource: "user-events",
SubResource: "",
Duration: 12 * time.Millisecond,
Size: 2048,
Error: nil,
}
Example OperationContext for Consume:
observability.OperationContext{
Component: "kafka",
Operation: "consume",
Resource: "user-events",
SubResource: "3", // partition
Duration: 8 * time.Millisecond,
Size: 1024,
Error: nil,
}
Integration with Prometheus ¶
Example: Track Kafka metrics with Prometheus:
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/Aleph-Alpha/std/v1/observability"
)
type KafkaObserver struct {
publishTotal *prometheus.CounterVec
publishDuration *prometheus.HistogramVec
publishBytes *prometheus.CounterVec
consumeTotal *prometheus.CounterVec
consumeDuration *prometheus.HistogramVec
consumeBytes *prometheus.CounterVec
}
func NewKafkaObserver() *KafkaObserver {
return &KafkaObserver{
publishTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_messages_published_total",
Help: "Total number of messages published to Kafka",
},
[]string{"topic", "status"},
),
publishDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "kafka_publish_duration_seconds",
Help: "Duration of Kafka publish operations",
Buckets: prometheus.DefBuckets,
},
[]string{"topic"},
),
publishBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_publish_bytes_total",
Help: "Total bytes published to Kafka",
},
[]string{"topic"},
),
consumeTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_messages_consumed_total",
Help: "Total number of messages consumed from Kafka",
},
[]string{"topic", "partition", "status"},
),
consumeDuration: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "kafka_consume_duration_seconds",
Help: "Duration of Kafka consume operations",
Buckets: prometheus.DefBuckets,
},
[]string{"topic"},
),
consumeBytes: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kafka_consume_bytes_total",
Help: "Total bytes consumed from Kafka",
},
[]string{"topic"},
),
}
}
func (o *KafkaObserver) ObserveOperation(ctx observability.OperationContext) {
if ctx.Component != "kafka" {
return
}
status := "success"
if ctx.Error != nil {
status = "error"
}
switch ctx.Operation {
case "produce":
o.publishTotal.WithLabelValues(ctx.Resource, status).Inc()
o.publishDuration.WithLabelValues(ctx.Resource).Observe(ctx.Duration.Seconds())
if ctx.Error == nil {
o.publishBytes.WithLabelValues(ctx.Resource).Add(float64(ctx.Size))
}
case "consume":
o.consumeTotal.WithLabelValues(ctx.Resource, ctx.SubResource, status).Inc()
o.consumeDuration.WithLabelValues(ctx.Resource).Observe(ctx.Duration.Seconds())
if ctx.Error == nil {
o.consumeBytes.WithLabelValues(ctx.Resource).Add(float64(ctx.Size))
}
}
}
FX Module with Observer ¶
When using FX, you can inject an observer into the Kafka client:
import (
"go.uber.org/fx"
"github.com/Aleph-Alpha/std/v1/kafka"
"github.com/Aleph-Alpha/std/v1/observability"
)
app := fx.New(
kafka.FXModule,
fx.Provide(
// Provide your observer implementation
func() observability.Observer {
return NewKafkaObserver()
},
// Provide Kafka config with observer
func(observer observability.Observer) kafka.Config {
return kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
Observer: observer, // Inject observer
}
},
),
)
app.Run()
No-Op Observer ¶
If you don't provide an observer, there's zero overhead:
client, err := kafka.NewClient(kafka.Config{
Brokers: []string{"localhost:9092"},
Topic: "events",
Observer: nil, // ← No observer, no overhead
})
The client checks if Observer is nil before calling it, so there's no performance impact when observability is disabled.
Thread Safety ¶
All methods on the Kafka type are safe for concurrent use by multiple goroutines, except for Close() which should only be called once.
Index ¶
- Constants
- Variables
- func RegisterKafkaLifecycle(params KafkaLifecycleParams)
- func ValidateDataType(dataType string, hasSerializer, hasDeserializer bool) error
- type AvroDeserializer
- type AvroSerializer
- type Client
- type Config
- type ConsumerMessage
- func (cm *ConsumerMessage) Body() []byte
- func (cm *ConsumerMessage) BodyAs(target interface{}) error
- func (cm *ConsumerMessage) CommitMsg() error
- func (cm *ConsumerMessage) Header() map[string]interface{}
- func (cm *ConsumerMessage) Key() string
- func (cm *ConsumerMessage) Offset() int64
- func (cm *ConsumerMessage) Partition() int
- type Deserializer
- type GobDeserializer
- type GobSerializer
- type JSONDeserializer
- type JSONSerializer
- type KafkaClient
- func (k *KafkaClient) Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message
- func (k *KafkaClient) ConsumeParallel(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message
- func (k *KafkaClient) Deserialize(msg Message, target interface{}) error
- func (k *KafkaClient) GracefulShutdown()
- func (k *KafkaClient) IsAuthenticationError(err error) bool
- func (k *KafkaClient) IsPermanentError(err error) bool
- func (k *KafkaClient) IsRetryableError(err error) bool
- func (k *KafkaClient) IsTemporaryError(err error) bool
- func (k *KafkaClient) Publish(ctx context.Context, key string, data interface{}, ...) error
- func (k *KafkaClient) SetDefaultSerializers()
- func (k *KafkaClient) SetDeserializer(d Deserializer)
- func (k *KafkaClient) SetSerializer(s Serializer)
- func (k *KafkaClient) TranslateError(err error) error
- func (k *KafkaClient) WithDeserializer(deserializer Deserializer) *KafkaClient
- func (k *KafkaClient) WithLogger(logger Logger) *KafkaClient
- func (k *KafkaClient) WithObserver(observer observability.Observer) *KafkaClient
- func (k *KafkaClient) WithSerializer(serializer Serializer) *KafkaClient
- type KafkaLifecycleParams
- type KafkaParams
- type Logger
- type Message
- type MultiFormatDeserializer
- type MultiFormatSerializer
- type NoOpDeserializer
- type NoOpSerializer
- type ProtoMarshaler
- type ProtoMessage
- type ProtoUnmarshaler
- type ProtobufDeserializer
- type ProtobufSerializer
- type SASLConfig
- type Serializer
- type StringDeserializer
- type StringSerializer
- type TLSConfig
Constants ¶
const ( DefaultMinBytes = 1 DefaultMaxBytes = 10e6 // 10MB DefaultMaxWait = 10 * time.Second DefaultCommitInterval = 1 * time.Second DefaultStartOffset = -2 // FirstOffset DefaultPartition = -1 // Automatic partition assignment DefaultRequiredAcks = -1 // WaitForAll DefaultBatchSize = 100 DefaultBatchTimeout = 1 * time.Second DefaultRebalanceTimeout = 30 * time.Second DefaultMaxAttempts = 10 DefaultWriteTimeout = 10 * time.Second // Producer acknowledgment modes RequireNone = 0 // Fire-and-forget (no acknowledgment) RequireOne = 1 // Wait for leader only RequireAll = -1 // Wait for all in-sync replicas (most durable) // Consumer offset modes FirstOffset = -2 // Start from the beginning LastOffset = -1 // Start from the end )
Default values for configuration
Variables ¶
var ( // ErrConnectionFailed is returned when connection to Kafka cannot be established ErrConnectionFailed = errors.New("connection failed") // ErrConnectionLost is returned when connection to Kafka is lost ErrConnectionLost = errors.New("connection lost") // ErrBrokerNotAvailable is returned when broker is not available ErrBrokerNotAvailable = errors.New("broker not available") // ErrReplicaNotAvailable is returned when replica is not available ErrReplicaNotAvailable = errors.New("replica not available") // ErrAuthenticationFailed is returned when authentication fails ErrAuthenticationFailed = errors.New("authentication failed") // ErrAuthorizationFailed is returned when authorization fails ErrAuthorizationFailed = errors.New("authorization failed") // ErrInvalidCredentials is returned when credentials are invalid ErrInvalidCredentials = errors.New("invalid credentials") // ErrTopicNotFound is returned when topic doesn't exist ErrTopicNotFound = errors.New("topic not found") // ErrTopicAlreadyExists is returned when topic already exists ErrTopicAlreadyExists = errors.New("topic already exists") // ErrPartitionNotFound is returned when partition doesn't exist ErrPartitionNotFound = errors.New("partition not found") // ErrInvalidPartition is returned when partition is invalid ErrInvalidPartition = errors.New("invalid partition") // ErrGroupNotFound is returned when consumer group doesn't exist ErrGroupNotFound = errors.New("consumer group not found") // ErrGroupCoordinatorNotAvailable is returned when group coordinator is not available ErrGroupCoordinatorNotAvailable = errors.New("group coordinator not available") // ErrNotGroupCoordinator is returned when broker is not the group coordinator ErrNotGroupCoordinator = errors.New("not group coordinator") // ErrInvalidGroupID is returned when group ID is invalid ErrInvalidGroupID = errors.New("invalid group id") // ErrUnknownMemberID is returned when member ID is unknown ErrUnknownMemberID = errors.New("unknown member id") // ErrInvalidSessionTimeout is returned when session timeout is invalid ErrInvalidSessionTimeout = errors.New("invalid session timeout") // ErrRebalanceInProgress is returned when rebalance is in progress ErrRebalanceInProgress = errors.New("rebalance in progress") // ErrInvalidCommitOffset is returned when commit offset is invalid ErrInvalidCommitOffset = errors.New("invalid commit offset") // ErrMessageTooLarge is returned when message exceeds size limits ErrMessageTooLarge = errors.New("message too large") // ErrInvalidMessage is returned when message format is invalid ErrInvalidMessage = errors.New("invalid message") // ErrInvalidMessageSize is returned when message size is invalid ErrInvalidMessageSize = errors.New("invalid message size") // ErrOffsetOutOfRange is returned when offset is out of range ErrOffsetOutOfRange = errors.New("offset out of range") // ErrInvalidFetchSize is returned when fetch size is invalid ErrInvalidFetchSize = errors.New("invalid fetch size") // ErrLeaderNotAvailable is returned when leader is not available ErrLeaderNotAvailable = errors.New("leader not available") // ErrNotLeaderForPartition is returned when broker is not the leader for partition ErrNotLeaderForPartition = errors.New("not leader for partition") // ErrRequestTimedOut is returned when request times out ErrRequestTimedOut = errors.New("request timed out") // ErrNetworkError is returned for network-related errors ErrNetworkError = errors.New("network error") // ErrProducerFenced is returned when producer is fenced ErrProducerFenced = errors.New("producer fenced") // ErrTransactionCoordinatorFenced is returned when transaction coordinator is fenced ErrTransactionCoordinatorFenced = errors.New("transaction coordinator fenced") // ErrInvalidTransactionState is returned when transaction state is invalid ErrInvalidTransactionState = errors.New("invalid transaction state") // ErrInvalidProducerEpoch is returned when producer epoch is invalid ErrInvalidProducerEpoch = errors.New("invalid producer epoch") // ErrInvalidTxnState is returned when transaction state is invalid ErrInvalidTxnState = errors.New("invalid transaction state") // ErrInvalidProducerIDMapping is returned when producer ID mapping is invalid ErrInvalidProducerIDMapping = errors.New("invalid producer id mapping") // ErrInvalidTransaction is returned when transaction is invalid ErrInvalidTransaction = errors.New("invalid transaction") // ErrUnsupportedVersion is returned when version is not supported ErrUnsupportedVersion = errors.New("unsupported version") // ErrUnsupportedForMessageFormat is returned when message format is not supported ErrUnsupportedForMessageFormat = errors.New("unsupported for message format") // ErrInvalidRequest is returned when request is invalid ErrInvalidRequest = errors.New("invalid request") // ErrInvalidConfig is returned when configuration is invalid ErrInvalidConfig = errors.New("invalid config") // ErrNotController is returned when broker is not the controller ErrNotController = errors.New("not controller") // ErrInvalidReplicationFactor is returned when replication factor is invalid ErrInvalidReplicationFactor = errors.New("invalid replication factor") // ErrInvalidReplica is returned when replica is invalid ErrInvalidReplica = errors.New("invalid replica") // ErrReplicaNotAvailableError is returned when replica is not available ErrReplicaNotAvailableError = errors.New("replica not available") // ErrPolicyViolation is returned when policy is violated ErrPolicyViolation = errors.New("policy violation") // ErrOutOfOrderSequence is returned when sequence is out of order ErrOutOfOrderSequence = errors.New("out of order sequence") // ErrDuplicateSequence is returned when sequence is duplicated ErrDuplicateSequence = errors.New("duplicate sequence") // ErrTooManyInFlightRequests is returned when there are too many in-flight requests ErrTooManyInFlightRequests = errors.New("too many in-flight requests") // ErrWriterNotInitialized is returned when writer is not initialized ErrWriterNotInitialized = errors.New("writer not initialized") // ErrReaderNotInitialized is returned when reader is not initialized ErrReaderNotInitialized = errors.New("reader not initialized") // ErrContextCanceled is returned when context is canceled ErrContextCanceled = errors.New("context canceled") // ErrContextDeadlineExceeded is returned when context deadline is exceeded ErrContextDeadlineExceeded = errors.New("context deadline exceeded") // ErrUnknownError is returned for unknown/unhandled errors ErrUnknownError = errors.New("unknown error") )
Common Kafka error types that can be used by consumers of this package. These provide a standardized set of errors that abstract away the underlying Kafka-specific error details.
var FXModule = fx.Module("kafka", fx.Provide( NewClientWithDI, fx.Annotate( func(k *KafkaClient) Client { return k }, fx.As(new(Client)), ), ), fx.Invoke(RegisterKafkaLifecycle), )
FXModule is an fx.Module that provides and configures the Kafka client. This module registers the Kafka client with the Fx dependency injection framework, making it available to other components in the application.
The module provides: 1. *KafkaClient (concrete type) for direct use 2. Client interface for dependency injection 3. Lifecycle management for graceful startup and shutdown
Usage:
app := fx.New(
kafka.FXModule,
// other modules...
)
Functions ¶
func RegisterKafkaLifecycle ¶
func RegisterKafkaLifecycle(params KafkaLifecycleParams)
RegisterKafkaLifecycle registers the Kafka client with the fx lifecycle system. This function sets up proper initialization and graceful shutdown of the Kafka client.
Parameters:
- params: The lifecycle parameters containing the Kafka client
The function:
- On application start: Ensures the client is ready for use
- On application stop: Triggers a graceful shutdown of the Kafka client, closing producers and consumers cleanly.
This ensures that the Kafka client remains available throughout the application's lifetime and is properly cleaned up during shutdown.
func ValidateDataType ¶
ValidateDataType checks if the provided DataType is supported Returns error if DataType requires custom serializer but none is provided
Types ¶
type AvroDeserializer ¶
type AvroDeserializer struct {
// UnmarshalFunc is the function used to unmarshal from Avro format.
UnmarshalFunc func([]byte, interface{}) error
}
AvroDeserializer implements Deserializer for Apache Avro format.
func (*AvroDeserializer) Deserialize ¶
func (a *AvroDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize converts Avro bytes to the target structure.
type AvroSerializer ¶
type AvroSerializer struct {
// MarshalFunc is the function used to marshal to Avro format.
MarshalFunc func(interface{}) ([]byte, error)
}
AvroSerializer implements Serializer for Apache Avro format.
This serializer requires an external Avro library. It works with any type that implements the AvroMarshaler interface or you can provide a custom MarshalFunc.
Example with linkedin/goavro:
codec, _ := goavro.NewCodec(schemaJSON)
serializer := &kafka.AvroSerializer{
MarshalFunc: func(data interface{}) ([]byte, error) {
return codec.BinaryFromNative(nil, data)
},
}
func (*AvroSerializer) Serialize ¶
func (a *AvroSerializer) Serialize(data interface{}) ([]byte, error)
Serialize converts data to Avro bytes.
type Client ¶ added in v0.12.0
type Client interface {
// Publish sends a single message to Kafka with the specified key and body.
// The body is serialized using the configured serializer.
// Optional headers can be provided as the last parameter.
Publish(ctx context.Context, key string, data interface{}, headers ...map[string]interface{}) error
// Consume starts consuming messages from Kafka with a single worker.
// Returns a channel that delivers consumed messages.
Consume(ctx context.Context, wg *sync.WaitGroup) <-chan Message
// ConsumeParallel starts consuming messages with multiple concurrent workers.
// Provides better throughput for high-volume topics.
ConsumeParallel(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message
// Deserialize converts a Message to an object using the configured deserializer.
Deserialize(msg Message, target interface{}) error
// SetSerializer sets the serializer for outgoing messages.
SetSerializer(s Serializer)
// SetDeserializer sets the deserializer for incoming messages.
SetDeserializer(d Deserializer)
// SetDefaultSerializers configures default serializers based on the DataType config.
SetDefaultSerializers()
// TranslateError translates Kafka errors to more user-friendly messages.
TranslateError(err error) error
// IsRetryableError checks if an error can be retried.
IsRetryableError(err error) bool
// IsTemporaryError checks if an error is temporary.
IsTemporaryError(err error) bool
// IsPermanentError checks if an error is permanent.
IsPermanentError(err error) bool
// IsAuthenticationError checks if an error is authentication-related.
IsAuthenticationError(err error) bool
// GracefulShutdown closes all Kafka connections cleanly.
GracefulShutdown()
}
Client provides a high-level interface for interacting with Apache Kafka. It abstracts producer and consumer operations with a simplified API.
This interface is implemented by the concrete *KafkaClient type.
type Config ¶
type Config struct {
// Brokers is a list of Kafka broker addresses
Brokers []string
// Topic is the Kafka topic to publish to or consume from
Topic string
// GroupID is the consumer group ID for coordinated consumption
// Only used when IsConsumer is true
GroupID string
// IsConsumer determines whether this client will act as a consumer
// Set to true for consumers, false for publishers
IsConsumer bool
// MinBytes is the minimum number of bytes to fetch in a single request
// Default: 1 byte
MinBytes int
// MaxBytes is the maximum number of bytes to fetch in a single request
// Default: 10MB
MaxBytes int
// MaxWait is the maximum amount of time to wait for MinBytes to become available
// Default: 10s
MaxWait time.Duration
// CommitInterval is how often to commit offsets automatically
// Only used when EnableAutoCommit is true
// Default: 1s
CommitInterval time.Duration
// EnableAutoCommit determines whether offsets are committed automatically
// When true, offsets are committed at CommitInterval
// When false, you must call msg.CommitMsg() manually
// Default: false (manual commit for safety)
EnableAutoCommit bool
// EnableAutoOffsetStore determines whether to automatically store offsets
// When true with EnableAutoCommit false, offsets are stored but not committed
// This allows you to control commit timing while still tracking progress
// Default: true
EnableAutoOffsetStore bool
// StartOffset determines where to start consuming from when there's no committed offset
// Options: FirstOffset (-2), LastOffset (-1)
// Default: FirstOffset
StartOffset int64
// Partition is the partition to produce to or consume from
// Set to -1 for automatic partition assignment
// Default: -1
Partition int
// RequiredAcks determines how many replica acknowledgments to wait for
// Options:
// RequireNone (0): Don't wait for acknowledgment (fire-and-forget, fastest but least safe)
// RequireOne (1): Wait for leader only (balance of speed and durability)
// RequireAll (-1): Wait for all in-sync replicas (slowest but most durable)
// Default: RequireAll (-1)
RequiredAcks int
// WriteTimeout is the timeout for write operations
// If RequiredAcks > 0, this is how long to wait for acknowledgment
// Default: 10s
WriteTimeout time.Duration
// Async enables async write mode for producer
// When true, writes are batched and sent asynchronously
// Default: false
Async bool
// BatchSize is the maximum number of messages to batch together
// Only used when Async is true
// Default: 100
BatchSize int
// BatchTimeout is the maximum time to wait before sending a batch
// Only used when Async is true
// Default: 1s
BatchTimeout time.Duration
// CompressionCodec specifies the compression algorithm to use
// Options: nil (no compression), gzip, snappy, lz4, zstd
// Default: nil
CompressionCodec string
// MaxAttempts is the maximum number of attempts to deliver a message
// Default: 10
MaxAttempts int
// AllowAutoTopicCreation determines whether to allow automatic topic creation
// Default: false
AllowAutoTopicCreation bool
// TLS contains TLS/SSL configuration
TLS TLSConfig
// SASL contains SASL authentication configuration
SASL SASLConfig
// DataType specifies the default data type for automatic serializer selection
// When no explicit serializer is provided, the client will use a default serializer
// based on this type.
// Options: "json" (default), "string", "protobuf", "avro", "gob", "bytes"
// Default: "json"
DataType string
}
Config defines the top-level configuration structure for the Kafka client. It contains all the necessary configuration sections for establishing connections, setting up producers and consumers.
type ConsumerMessage ¶
type ConsumerMessage struct {
// contains filtered or unexported fields
}
ConsumerMessage implements the Message interface and wraps a Kafka message.
func (*ConsumerMessage) Body ¶
func (cm *ConsumerMessage) Body() []byte
Body returns the message payload as a byte slice.
func (*ConsumerMessage) BodyAs ¶
func (cm *ConsumerMessage) BodyAs(target interface{}) error
BodyAs deserializes the message body into the target structure. It uses the configured Deserializer, or falls back to JSONDeserializer if none is configured.
This is a convenience method that makes consuming messages more intuitive:
msgChan := consumer.Consume(ctx, wg)
for msg := range msgChan {
var event UserEvent
if err := msg.BodyAs(&event); err != nil {
log.Printf("Failed to deserialize: %v", err)
continue
}
fmt.Printf("Event: %s, UserID: %d\n", event.Event, event.UserID)
msg.CommitMsg()
}
func (*ConsumerMessage) CommitMsg ¶
func (cm *ConsumerMessage) CommitMsg() error
CommitMsg commits the message, informing Kafka that the message has been successfully processed.
Returns an error if the commit fails.
func (*ConsumerMessage) Header ¶
func (cm *ConsumerMessage) Header() map[string]interface{}
Header returns the headers associated with the message.
func (*ConsumerMessage) Key ¶
func (cm *ConsumerMessage) Key() string
Key returns the message key as a string.
func (*ConsumerMessage) Offset ¶
func (cm *ConsumerMessage) Offset() int64
Offset returns the offset of this message.
func (*ConsumerMessage) Partition ¶
func (cm *ConsumerMessage) Partition() int
Partition returns the partition this message came from.
type Deserializer ¶
type Deserializer interface {
// Deserialize converts a byte slice into the target data structure
Deserialize(data []byte, target interface{}) error
}
Deserializer defines the interface for deserializing data received from Kafka. Implementations can provide custom deserialization logic.
type GobDeserializer ¶
type GobDeserializer struct{}
GobDeserializer implements Deserializer using Go's gob decoding.
func (*GobDeserializer) Deserialize ¶
func (g *GobDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize converts gob bytes to the target structure.
type GobSerializer ¶
type GobSerializer struct{}
GobSerializer implements Serializer using Go's gob encoding. This is useful for Go-to-Go communication where both sides use the same types.
Note: Gob encoding is Go-specific and not interoperable with other languages.
func (*GobSerializer) Serialize ¶
func (g *GobSerializer) Serialize(data interface{}) ([]byte, error)
Serialize converts data to gob bytes.
type JSONDeserializer ¶
type JSONDeserializer struct{}
JSONDeserializer implements Deserializer using JSON decoding. This is the default deserializer provided by the Kafka module.
func (*JSONDeserializer) Deserialize ¶
func (j *JSONDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize converts JSON bytes to the target structure.
type JSONSerializer ¶
type JSONSerializer struct{}
JSONSerializer implements Serializer using JSON encoding. This is the default serializer provided by the Kafka module.
Features:
- Handles any Go type that can be marshaled to JSON
- Automatically passes through []byte without modification
- Thread-safe
func (*JSONSerializer) Serialize ¶
func (j *JSONSerializer) Serialize(data interface{}) ([]byte, error)
Serialize converts data to JSON bytes.
type KafkaClient ¶ added in v0.12.0
type KafkaClient struct {
// contains filtered or unexported fields
}
KafkaClient represents a client for interacting with Apache Kafka. It manages connections and provides methods for publishing and consuming messages.
KafkaClient implements the Client interface.
func NewClient ¶
func NewClient(cfg Config) (*KafkaClient, error)
NewClient creates and initializes a new KafkaClient with the provided configuration. This function sets up the producer and/or consumer based on the configuration.
Parameters:
- cfg: Configuration for connecting to Kafka
Returns a new KafkaClient instance that is ready to use.
Example:
client, err := kafka.NewClient(config)
if err != nil {
log.Printf("ERROR: failed to create Kafka client: %v", err)
return nil, err
}
defer client.GracefulShutdown()
func NewClientWithDI ¶
func NewClientWithDI(params KafkaParams) (*KafkaClient, error)
NewClientWithDI creates a new Kafka client using dependency injection. This function is designed to be used with Uber's fx dependency injection framework where dependencies are automatically provided via the KafkaParams struct.
Parameters:
- params: A KafkaParams struct that contains the Config instance and optionally a Logger, Serializer, Deserializer, and Observer instances required to initialize the Kafka client. This struct embeds fx.In to enable automatic injection of these dependencies.
Returns:
- *KafkaClient: A fully initialized Kafka client ready for use.
Example usage with fx:
app := fx.New(
kafka.FXModule,
logger.FXModule, // Optional: provides logger
fx.Provide(
func() kafka.Config {
return loadKafkaConfig() // Your config loading function
},
func() kafka.Serializer {
return &kafka.JSONSerializer{} // Or ProtobufSerializer, etc.
},
func() kafka.Deserializer {
return &kafka.JSONDeserializer{}
},
func(metrics *prometheus.Metrics) observability.Observer {
return &MyObserver{metrics: metrics} // Optional observer
},
),
)
Under the hood, this function injects the optional logger, serializer, deserializer, and observer before delegating to the standard NewClient function.
func (*KafkaClient) Consume ¶ added in v0.12.0
Consume starts consuming messages from the topic specified in the configuration. This method provides a channel where consumed messages will be delivered.
Parameters:
- ctx: Context for cancellation control
- wg: WaitGroup for coordinating shutdown
Returns a channel that delivers Message interfaces for each consumed message.
Example:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgChan := kafkaClient.Consume(ctx, wg)
for msg := range msgChan {
// Option 1: Use BodyAs for automatic deserialization
var event UserEvent
if err := msg.BodyAs(&event); err != nil {
log.Printf("Failed to deserialize: %v", err)
continue
}
// Option 2: Use Body() for raw bytes
rawBytes := msg.Body()
fmt.Println("Received:", string(rawBytes))
// Commit successful processing
if err := msg.CommitMsg(); err != nil {
log.Printf("Failed to commit message: %v", err)
}
}
func (*KafkaClient) ConsumeParallel ¶ added in v0.12.0
func (k *KafkaClient) ConsumeParallel(ctx context.Context, wg *sync.WaitGroup, numWorkers int) <-chan Message
ConsumeParallel starts consuming messages from the topic with multiple concurrent goroutines. This method provides better throughput for high-volume topics by processing messages in parallel.
Parameters:
- ctx: Context for cancellation control
- wg: WaitGroup for coordinating shutdown
- numWorkers: Number of concurrent goroutines to use for consuming (recommended: 1-10)
Returns a channel that delivers Message interfaces for each consumed message.
Example:
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Use 5 concurrent workers for high throughput
msgChan := kafkaClient.ConsumeParallel(ctx, wg, 5)
for msg := range msgChan {
// Process the message
fmt.Println("Received:", string(msg.Body()))
// Commit successful processing
if err := msg.CommitMsg(); err != nil {
log.Printf("Failed to commit message: %v", err)
}
}
func (*KafkaClient) Deserialize ¶ added in v0.12.0
func (k *KafkaClient) Deserialize(msg Message, target interface{}) error
Deserialize is a helper method to deserialize a message body. It automatically uses the injected Deserializer or falls back to JSONDeserializer.
Parameters:
- msg: The message to deserialize
- target: Pointer to the target structure to deserialize into
Returns an error if deserialization fails.
Example with FX-injected deserializer:
// In your FX app:
fx.Provide(func() kafka.Deserializer {
return &kafka.JSONDeserializer{}
})
// Usage:
msgChan := kafkaClient.Consume(ctx, wg)
for msg := range msgChan {
var event UserEvent
if err := kafkaClient.Deserialize(msg, &event); err != nil {
log.Printf("Failed to deserialize: %v", err)
continue
}
fmt.Printf("Event: %s, UserID: %d\n", event.Event, event.UserID)
msg.CommitMsg()
}
func (*KafkaClient) GracefulShutdown ¶ added in v0.12.0
func (k *KafkaClient) GracefulShutdown()
GracefulShutdown closes the Kafka client's connections cleanly. This method ensures that all resources are properly released when the application is shutting down.
The shutdown process: 1. Signals all goroutines to stop by closing the shutdownSignal channel 2. Closes the writer if it exists 3. Closes the reader if it exists
Any errors during shutdown are logged but not propagated, as they typically cannot be handled at this stage of application shutdown.
func (*KafkaClient) IsAuthenticationError ¶ added in v0.12.0
func (k *KafkaClient) IsAuthenticationError(err error) bool
IsAuthenticationError returns true if the error is authentication-related
func (*KafkaClient) IsPermanentError ¶ added in v0.12.0
func (k *KafkaClient) IsPermanentError(err error) bool
IsPermanentError returns true if the error is permanent and should not be retried
func (*KafkaClient) IsRetryableError ¶ added in v0.12.0
func (k *KafkaClient) IsRetryableError(err error) bool
IsRetryableError returns true if the error is retryable
func (*KafkaClient) IsTemporaryError ¶ added in v0.12.0
func (k *KafkaClient) IsTemporaryError(err error) bool
IsTemporaryError returns true if the error is temporary
func (*KafkaClient) Publish ¶ added in v0.12.0
func (k *KafkaClient) Publish(ctx context.Context, key string, data interface{}, headers ...map[string]interface{}) error
Publish sends a message to the Kafka topic specified in the configuration. This method is thread-safe and respects context cancellation.
Parameters:
- ctx: Context for cancellation control
- key: Message key for partitioning
- msg: Message payload as a byte slice
- headers: Optional message headers as a map of key-value pairs; can be used for metadata and distributed tracing propagation
The headers parameter is particularly useful for distributed tracing, allowing trace context to be propagated across service boundaries through message queues. When using with the tracer package, you can extract trace headers and include them in the message:
traceHeaders := tracerClient.GetCarrier(ctx) err := kafkaClient.Publish(ctx, "key", message, traceHeaders)
Returns an error if publishing fails or if the context is canceled.
Example:
ctx := context.Background()
message := []byte("Hello, Kafka!")
// Basic publishing without headers
err := kafkaClient.Publish(ctx, "my-key", message, nil)
if err != nil {
log.Printf("Failed to publish message: %v", err)
}
Example with distributed tracing:
// Create a span for the publish operation
ctx, span := tracer.StartSpan(ctx, "publish-message")
defer span.End()
// Extract trace context to include in the message headers
traceHeaders := tracerClient.GetCarrier(ctx)
// Publish the message with trace headers
err := kafkaClient.Publish(ctx, "my-key", message, traceHeaders)
if err != nil {
span.RecordError(err)
log.Printf("Failed to publish message: %v", err)
}
func (*KafkaClient) SetDefaultSerializers ¶ added in v0.12.0
func (k *KafkaClient) SetDefaultSerializers()
SetDefaultSerializers sets default serializers on the Kafka client based on config DataType This is called automatically during client creation if no serializers are provided
func (*KafkaClient) SetDeserializer ¶ added in v0.12.0
func (k *KafkaClient) SetDeserializer(d Deserializer)
SetDeserializer sets the deserializer for the Kafka client. This is typically called by the FX module during initialization.
func (*KafkaClient) SetSerializer ¶ added in v0.12.0
func (k *KafkaClient) SetSerializer(s Serializer)
SetSerializer sets the serializer for the Kafka client. This is typically called by the FX module during initialization.
func (*KafkaClient) TranslateError ¶ added in v0.12.0
func (k *KafkaClient) TranslateError(err error) error
TranslateError converts Kafka-specific errors into standardized application errors. This function provides abstraction from the underlying Kafka implementation details, allowing application code to handle errors in a Kafka-agnostic way.
It maps common Kafka errors to the standardized error types defined above. If an error doesn't match any known type, it's returned unchanged.
func (*KafkaClient) WithDeserializer ¶ added in v0.13.0
func (k *KafkaClient) WithDeserializer(deserializer Deserializer) *KafkaClient
WithDeserializer attaches a deserializer to the Kafka client for decoding messages. This method uses the builder pattern and returns the client for method chaining.
The deserializer will be used to decode messages when consuming from Kafka. If not set, msg.BodyAs() will use JSONDeserializer as a fallback.
This is useful for non-FX usage where you want to set deserializers after creating the client. When using FX, deserializers can be injected via NewClientWithDI.
Example:
client, err := kafka.NewClient(config)
if err != nil {
return err
}
client = client.WithDeserializer(&kafka.JSONDeserializer{})
defer client.GracefulShutdown()
func (*KafkaClient) WithLogger ¶ added in v0.13.0
func (k *KafkaClient) WithLogger(logger Logger) *KafkaClient
WithLogger attaches a logger to the Kafka client for internal logging. This method uses the builder pattern and returns the client for method chaining.
The logger will be used for lifecycle events, background worker logs, and cleanup errors. This is particularly useful for debugging and monitoring consumer worker behavior.
This is useful for non-FX usage where you want to enable logging after creating the client. When using FX, the logger is automatically injected via NewClientWithDI.
Example:
client, err := kafka.NewClient(config)
if err != nil {
return err
}
client = client.WithLogger(myLogger)
defer client.GracefulShutdown()
func (*KafkaClient) WithObserver ¶ added in v0.13.0
func (k *KafkaClient) WithObserver(observer observability.Observer) *KafkaClient
WithObserver attaches an observer to the Kafka client for tracking operations. This method uses the builder pattern and returns the client for method chaining.
The observer will be notified of all produce and consume operations, allowing external code to collect metrics, create traces, or log operations.
This is useful for non-FX usage where you want to enable observability after creating the client. When using FX, use NewClientWithDI instead, which automatically injects the observer.
Example:
client, err := kafka.NewClient(config)
if err != nil {
return err
}
client = client.WithObserver(myObserver)
defer client.GracefulShutdown()
func (*KafkaClient) WithSerializer ¶ added in v0.13.0
func (k *KafkaClient) WithSerializer(serializer Serializer) *KafkaClient
WithSerializer attaches a serializer to the Kafka client for encoding messages. This method uses the builder pattern and returns the client for method chaining.
The serializer will be used to encode messages before publishing to Kafka. If not set, you can only publish []byte data directly.
This is useful for non-FX usage where you want to set serializers after creating the client. When using FX, serializers can be injected via NewClientWithDI.
Example:
client, err := kafka.NewClient(config)
if err != nil {
return err
}
client = client.WithSerializer(&kafka.JSONSerializer{})
defer client.GracefulShutdown()
type KafkaLifecycleParams ¶
type KafkaLifecycleParams struct {
fx.In
Lifecycle fx.Lifecycle
Client *KafkaClient
}
KafkaLifecycleParams groups the dependencies needed for Kafka lifecycle management
type KafkaParams ¶
type KafkaParams struct {
fx.In
Config Config
Logger Logger `optional:"true"` // Optional logger from std/v1/logger
Serializer Serializer `optional:"true"` // Optional serializer
Deserializer Deserializer `optional:"true"` // Optional deserializer
Observer observability.Observer `optional:"true"` // Optional observer for metrics/tracing
}
KafkaParams groups the dependencies needed to create a Kafka client
type Logger ¶
type Logger interface {
// InfoWithContext logs an informational message with trace context.
InfoWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
// WarnWithContext logs a warning message with trace context.
WarnWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
// ErrorWithContext logs an error message with trace context.
ErrorWithContext(ctx context.Context, msg string, err error, fields ...map[string]interface{})
}
Logger is an interface that matches the std/v1/logger.Logger interface. It provides context-aware structured logging with optional error and field parameters.
type Message ¶
type Message interface {
// CommitMsg commits the message, informing Kafka that the message
// has been successfully processed.
CommitMsg() error
// Body returns the message payload as a byte slice.
Body() []byte
// BodyAs deserializes the message body into the target structure.
// It uses the configured Deserializer, or falls back to JSONDeserializer.
// This is a convenience method equivalent to calling Deserialize(msg, target).
BodyAs(target interface{}) error
// Key returns the message key as a string.
Key() string
// Header returns the headers associated with the message.
Header() map[string]interface{}
// Partition returns the partition this message came from.
Partition() int
// Offset returns the offset of this message.
Offset() int64
}
Message defines the interface for consumed messages from Kafka. This interface abstracts the underlying Kafka message structure and provides methods for committing messages.
type MultiFormatDeserializer ¶
type MultiFormatDeserializer struct {
// DefaultFormat is the format to use when not specified (default: "json")
DefaultFormat string
// Deserializers maps format names to their deserializer implementations
Deserializers map[string]Deserializer
}
MultiFormatDeserializer supports multiple deserialization formats.
func NewMultiFormatDeserializer ¶
func NewMultiFormatDeserializer() *MultiFormatDeserializer
NewMultiFormatDeserializer creates a new multi-format deserializer with default formats.
func (*MultiFormatDeserializer) Deserialize ¶
func (m *MultiFormatDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize converts data using the appropriate deserializer based on format.
func (*MultiFormatDeserializer) DeserializeWithFormat ¶
func (m *MultiFormatDeserializer) DeserializeWithFormat(format string, data []byte, target interface{}) error
DeserializeWithFormat explicitly specifies the format to use.
func (*MultiFormatDeserializer) RegisterDeserializer ¶
func (m *MultiFormatDeserializer) RegisterDeserializer(format string, deserializer Deserializer)
RegisterDeserializer adds a custom deserializer for a specific format.
type MultiFormatSerializer ¶
type MultiFormatSerializer struct {
// DefaultFormat is the format to use when not specified (default: "json")
DefaultFormat string
// Serializers maps format names to their serializer implementations
Serializers map[string]Serializer
}
MultiFormatSerializer supports multiple serialization formats with automatic detection. It can handle JSON, Protobuf, Avro, and custom formats based on configuration or content type.
func NewMultiFormatSerializer ¶
func NewMultiFormatSerializer() *MultiFormatSerializer
NewMultiFormatSerializer creates a new multi-format serializer with default formats.
func (*MultiFormatSerializer) RegisterSerializer ¶
func (m *MultiFormatSerializer) RegisterSerializer(format string, serializer Serializer)
RegisterSerializer adds a custom serializer for a specific format.
func (*MultiFormatSerializer) Serialize ¶
func (m *MultiFormatSerializer) Serialize(data interface{}) ([]byte, error)
Serialize converts data using the appropriate serializer based on format. The format can be specified as a prefix in the form "format:data" or uses DefaultFormat.
func (*MultiFormatSerializer) SerializeWithFormat ¶
func (m *MultiFormatSerializer) SerializeWithFormat(format string, data interface{}) ([]byte, error)
SerializeWithFormat explicitly specifies the format to use.
type NoOpDeserializer ¶
type NoOpDeserializer struct{}
NoOpDeserializer does not perform any deserialization. The target must be a *[]byte to receive the raw bytes.
func (*NoOpDeserializer) Deserialize ¶
func (n *NoOpDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize copies the raw bytes to the target if it's a *[]byte.
type NoOpSerializer ¶
type NoOpSerializer struct{}
NoOpSerializer passes through byte slices without modification. Use this when you want to handle serialization yourself or work with raw bytes.
func (*NoOpSerializer) Serialize ¶
func (n *NoOpSerializer) Serialize(data interface{}) ([]byte, error)
Serialize returns the data as-is if it's a byte slice, otherwise returns an error.
type ProtoMarshaler ¶
ProtoMarshaler is an interface for types that can marshal themselves to protobuf format. This is useful for custom protobuf implementations.
type ProtoMessage ¶
type ProtoMessage interface {
ProtoReflect() interface{} // This matches proto.Message
Reset()
String() string
}
ProtoMessage is an interface that matches the proto.Message interface from google.golang.org/protobuf. This allows the ProtobufSerializer to work with any protobuf message without requiring a direct dependency.
type ProtoUnmarshaler ¶
ProtoUnmarshaler is an interface for types that can unmarshal themselves from protobuf format.
type ProtobufDeserializer ¶
type ProtobufDeserializer struct {
// UnmarshalFunc is the function used to unmarshal protobuf messages.
// If nil, will attempt to use the ProtoUnmarshaler interface.
// For google protobuf, use: proto.Unmarshal
UnmarshalFunc func([]byte, interface{}) error
}
ProtobufDeserializer implements Deserializer for Protocol Buffer messages.
Example usage with google protobuf:
import "google.golang.org/protobuf/proto"
deserializer := &kafka.ProtobufDeserializer{
UnmarshalFunc: proto.Unmarshal,
}
func (*ProtobufDeserializer) Deserialize ¶
func (p *ProtobufDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize converts protobuf bytes to the target structure.
type ProtobufSerializer ¶
type ProtobufSerializer struct {
// MarshalFunc is the function used to marshal protobuf messages.
// If nil, will attempt to use the ProtoMarshaler interface.
// For google protobuf, use: proto.Marshal
MarshalFunc func(interface{}) ([]byte, error)
}
ProtobufSerializer implements Serializer for Protocol Buffer messages.
This serializer works with any type that implements:
- ProtoMessage interface (google.golang.org/protobuf/proto.Message)
- ProtoMarshaler interface (custom types with Marshal() method)
Example usage with google protobuf:
import "google.golang.org/protobuf/proto"
serializer := &kafka.ProtobufSerializer{
MarshalFunc: proto.Marshal,
}
Example with custom marshal function:
serializer := &kafka.ProtobufSerializer{
MarshalFunc: func(m interface{}) ([]byte, error) {
return myCustomProtoMarshal(m)
},
}
func (*ProtobufSerializer) Serialize ¶
func (p *ProtobufSerializer) Serialize(data interface{}) ([]byte, error)
Serialize converts a protobuf message to bytes.
type SASLConfig ¶
type SASLConfig struct {
// Enabled determines whether to use SASL authentication
Enabled bool
// Mechanism specifies the SASL mechanism to use
// Options: "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"
Mechanism string
// Username is the SASL username
Username string
// Password is the SASL password
Password string
}
SASLConfig contains SASL authentication configuration parameters.
type Serializer ¶
type Serializer interface {
// Serialize converts the input data to a byte slice
Serialize(data interface{}) ([]byte, error)
}
Serializer defines the interface for serializing data before publishing to Kafka. Implementations can provide custom serialization logic (e.g., JSON, Protobuf, Avro, etc.).
type StringDeserializer ¶
type StringDeserializer struct{}
StringDeserializer implements Deserializer for string data.
func (*StringDeserializer) Deserialize ¶
func (s *StringDeserializer) Deserialize(data []byte, target interface{}) error
Deserialize converts bytes to string.
type StringSerializer ¶
type StringSerializer struct {
// Encoding specifies the character encoding (default: UTF-8)
Encoding string
}
StringSerializer implements Serializer for string data. This is useful for text-based messages.
func (*StringSerializer) Serialize ¶
func (s *StringSerializer) Serialize(data interface{}) ([]byte, error)
Serialize converts data to bytes.
type TLSConfig ¶
type TLSConfig struct {
// Enabled determines whether to use TLS/SSL for the connection
Enabled bool
// CACertPath is the file path to the CA certificate for verifying the broker
CACertPath string
// ClientCertPath is the file path to the client certificate
ClientCertPath string
// ClientKeyPath is the file path to the client certificate's private key
ClientKeyPath string
// InsecureSkipVerify controls whether to skip verification of the server's certificate
// WARNING: Setting this to true is insecure and should only be used in testing
InsecureSkipVerify bool
}
TLSConfig contains TLS/SSL configuration parameters.