keystone

package module
v0.0.0-...-7d10efc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 13 Imported by: 0

README

keystone

Godoc Releases codecov Go Report Card

A core Kafka library for Go, providing reliable messaging primitives and stream-processing foundations.

Example

  • Consumer Group
// Avro schema registry configurations.
// Need to register new schemas here.
sr := avro.NewCachedSchemaRegistry([]string{"foobar"}, 1)

// Kafka configurations
cfg := keystone.Config{
	Brokers:          []string{"localhost:9092"},
	Group:            "sampleConsumer",
	Version:          keystone.Version_2_1_1,
	Topics:           []string{"foobar"},
	BalanceStrategy:  keystone.RoundRobin,
	Offset:           keystone.Oldest,
	ConsumerCallback: Callback,
	Decoder:          keystone.GetDefaultDecoder(),
	EncoderBuilder:   keystone.DefaultEncoderBuilder(),
}

cg := keystone.NewConsumerGroup(cfg)
// Function is called when message recieved.
func Callback(key interface{}, value interface{}) (err error) {
	return nil
}

Documentation

Index

Constants

View Source
const (
	Version_2_1_1 Version = "2.1.1"

	Sticky     BalanceStrategy = "sticky"
	RoundRobin BalanceStrategy = "roundrobin"
	Range      BalanceStrategy = "range"

	Newest Offset = "newest"
	Oldest Offset = "oldest"
)

Variables

This section is empty.

Functions

func NewSaramaLogger

func NewSaramaLogger(logger Logger) *customSaramaLogger

Types

type BalanceStrategy

type BalanceStrategy string

BalanceStrategy consumer group balancing strategy.

type Config

type Config struct {
	Brokers              []string
	Group                string
	Version              Version
	Topics               []string
	BalanceStrategy      BalanceStrategy
	Offset               Offset
	DebugLogger          Logger
	EncoderBuilder       EncoderBuilder
	Decoder              Decoder
	ConsumerCallback     ConsumerCallback
	ConsumerErrorHandler ConsumerErrorHandler
}

Config General configurations.

type ConsumerCallback

type ConsumerCallback func(key, value interface{}) (err error)

ConsumerCallback hanler function for the consumer.

type ConsumerErrorHandler

type ConsumerErrorHandler func(err error) (commitMsg bool)

ConsumerErrorHandler hanlde the error returning from the ConsumerCallback.

type ConsumerGroup

type ConsumerGroup interface {
	// Init initialize the consumer group.
	Init() (err error)
	// Run start the consumer group.
	Run() (err error)
}

ConsumerGroup the interface for manage consumer group

func NewConsumerGroup

func NewConsumerGroup(config Config) ConsumerGroup

type ConsumerGroupHandler

type ConsumerGroupHandler struct {
	// contains filtered or unexported fields
}

ConsumerGroupHandler implementation for ConsumerGroupHandler.

func (*ConsumerGroupHandler) Cleanup

Cleanup cleanup the consumer group session.

func (*ConsumerGroupHandler) ConsumeClaim

ConsumeClaim decode messages and call the consumer callback function configured.

func (*ConsumerGroupHandler) Setup

Setup setup the consumer group session.

type Decoder

type Decoder func(consumerMessage *sarama.ConsumerMessage) (key, value interface{}, err error)

Decoder decode the consumer message to according to the given implementation

func GetAvroDecoder

func GetAvroDecoder(ss avro.SchemaStore) Decoder

GetAvroDecoder creates avro decoder with avro.SchemaStore.

func GetDefaultDecoder

func GetDefaultDecoder() Decoder

GetDefaultDecoder creates a default decoder (string decoder).

type Encoder

type Encoder func(schemaStore avro.SchemaStore, schema string, data []byte) sarama.Encoder

type EncoderBuilder

type EncoderBuilder interface {
	// Build build the sarama.Encoders with the given subject and data.
	Build(subject string, data interface{}) sarama.Encoder
}

EncoderBuilder resposible for creating encoders.

func DefaultEncoderBuilder

func DefaultEncoderBuilder() EncoderBuilder

DefaultEncoderBuilder creates instance of EncoderBuilder.

func NewAvroEncoderBuilder

func NewAvroEncoderBuilder(schemaStore avro.SchemaStore) EncoderBuilder

NewAvroEncoderBuilder create instance of EncoderBuilder.

type Logger

type Logger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}
var (
	DefaultLogger Logger = log.New(io.Discard, "[keystone]", log.LstdFlags)
)

type Offset

type Offset string

Offset message offset of the consumer group.

type Producer

type Producer interface {
	// Produce produce the kafka message to the given topic.
	Produce(topic string, schema string, key interface{}, value interface{}) (partition int32, offset int64, err error)
}

Producer kafka message producer interface.

func NewProducer

func NewProducer(cfg Config) (Producer, error)

NewProducer creates a producer instance.

type Version

type Version string

Version kafka version.

Directories

Path Synopsis
Package avro
Package avro
example
consumergroup command
Package main sample code for group consumer.
Package main sample code for group consumer.
producer command
Package main sample code for producer
Package main sample code for producer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL