streams

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2022 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

GKES (Go Kafka Event Source) attempts to fill the gaps ub the Go/Kafka library ecosystem. It supplies Exactly Once Semantics (EOS), local state stores and incremental consumer rebalancing to Go Kafka consumers, making it a viable alternative to a traditional Kafka Streams application written in Java.

GKES takes advantage of Go generics. As such, the minimum requred Go version is 1.18.

What it is

GKES is Go/Kafka library tailored towards the development of Event Sourcing applications, by providing a high-throughput, low-latency Kafka client framework. Using Kafka transactions, it provides for EOS, data integrity and high availability. If you wish to use GKES as straight Kafka consumer, it will fit the bill as well. Though there are plenty of libraries for that, and researching which best fits your use case is time well spent.

GKES is not an all-in-one, do-everything black box. Some elements, in particular the StateStore, have been left without comprehensive implementations.

StateStores

A useful and performant local state store rarely has a flat data structure. If your state store does, there are some convenient implementations provided. However, to achieve optimum performance, you will not only need to write a StateStore implementation, but will also need to understand what the proper data structures are for your use case (trees, heaps, maps, disk-based LSM trees or combinations thereof). You can use the provided github.com/aws/go-kafka-event-source/streams/stores.SimpleStore as a starting point.

Vending State

GKES purposefully does not provide a pre-canned way for exposing StateStore data, other than a producing to another Kafka topic. There are as many ways to vend data as there are web applications. Rather than putting effort into inventing yet another one, GKES provides the mechanisms to query StateStores via Interjections. This mechanism can be plugged into whatever request/response mechanism that suits your use-case (gRPC, RESTful HTTP service...any number of web frameworks already in the Go ecosystem). [TODO: provide a simple http example]

Interjections

For this familiar with thw Kafka Streams API, GKES provides for stream `Punctuators“, but we call them `Interjections` (because it sounds cool). Interjections allow you to insert actions into your EventSource at specicifed interval per partition assigned via streams.EventSource.ScheduleInterjection, or at any time via streams.EventSource.Interject. This is useful for bookeeping activities, aggregated metric production or even error handling. Interjections have full access to the StateStore associated with an EventSource and can interact with output topics like any other EventProcessor.

Incremental Consumer Rebalancing

One issue that Kafka conumer applications have long suffered from are latency spikes during a consumer rebalance. The cooperative sticky rebalancing introduced by Kafka and implemented by kgo helps resolve this issue. However, once StateStore are thrown into the mix, things get a bit more complicated because initializing the StateStore on a host invloves consuming a compacted TopicPartion from start to end. GKES solves this with the IncrementalRebalancer and takes it one step further. The IncrementalRebalancer rebalances consumer partitions in a controlled fashion, minimizing latency spikes and limiting the blast of a bad deployment.

Async Processing

GKES provides conventions for asynchronously processing events on the same Kafka partition while still maintaining data/stream integrity. The AsyncBatcher and AsyncJobScheduler allow you to split a TopicPartition into sub-streams by key, ensuring all events for a partitcular key are processed in order, allowing for parallel processing on a given TopicPartition.

There are caveats to asynchronous processing. [TODO: explanation of caveats]

High-Throughput/Low-Latency EOS

A Kafka transaction is a powerful tool which allows for Exactly Once Semantics (EOS) by linking a consumer offset commit to one or more records that are being produced by your application (a StateStore record for example). The history of Kafka EOS is a long and complicated one with varied degrees of performance and efficiency.

Early iterations required one producer transaction per consumer partition, which was very ineffiecient as Topic with 1000 partitions would also require 1000 clients in order to provide EOS. This has since been addressed, but depending on client implementations, there is a high risk of running into "producer fenced" errors as well as reduced throughput.

In a traditional Java Kafka Streams application, transactions are committed according to the auto-commit frequency, which defaults to 100ms. This means that your application will only produce readable records every 100ms per partition. The effect of this is that no matter what you do, your tail latency will be at least 100ms and downstream consumers will receive records in bursts rather than a steady stream. For many use cases, this is unaceptable.

GKES solves this issue by using a configurable transactional producer pool and a type of "Nagle's algorithm". Uncommitted offsets are added to the transaction pool in sequence. Once a producer has reach its record limit, or enough time has elapsed (10ms by default), the head transaction will wait for any incomplete events to finsh, then flush and commit. While this transaction is committing, GKES continues to process events and optimistically begins a new transaction and produces records on the next producer in pool. Since trasnaction produce in sequence, there is no danger of commit offset overlap or duplicate message processing in the case of a failure.

To ensure EOS, your EventSource must use either the IncrementalRebalancer, or kgos cooperative sticky implementation. Though if you're using a StateStore, IncrementalRebalancer should be used to avoid lengthy periods of inactivity during application deployments.

Kafka Client Library

Rather than create yet another Kafka driver, GKES is built on top of kgo. This Kafka client was chosen as it (in our testing) has superior throughput and latency profiles compared to other client libraries currently available to Go developers.

One other key adavantage is that it provides a migration path to cooperative consumer rebalancing, required for our EOS implementation. Other Go Kafka libraries provide cooperative rebalancing, but do not allow you to migrate froma non-cooperative rebalancing strategy (range, sticky etc.). This is a major roadblock for existing deployemtns as the only migration paths are an entirely new consumer group, or to bring your application completely down and re-deploy with a new rebalance strategy. These migration plans, to put it mildly, are big challenge for zero-downtime/live applications. The kgo package now makes this migration possible with zero downtime.

Kgo also has the proper hooks need to implement the IncrementalGroupRebalancer, which is necessary for safe deployments when using a local state store. Kudos to kgo!

Index

Examples

Constants

View Source
const AutoAssign = int32(-1)
View Source
const DefaultBatchDelay = 10 * time.Millisecond
View Source
const DefaultMaxBatchSize = 10000
View Source
const DefaultPendingTxnCount = 1
View Source
const DefaultPoolSize = 3
View Source
const DefaultTargetBatchSize = 1000
View Source
const IncrementalCoopProtocol = "incr_coop"
View Source
const PartitionPreppedOperation = "PartitionPrepped"
View Source
const RecordTypeHeaderKey = "__grt__" // let's keep it small. every byte counts

The record.Header key that GKES uses to transmit type information about an IncomingRecord or a ChangeLogEntry.

View Source
const TxnCommitOperation = "TxnCommit"

Variables

View Source
var ComputeConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU(),
	WorkerQueueDepth:  1000,
	MaxConcurrentKeys: 10000,
}
View Source
var DefaultBalanceStrategies = []BalanceStrategy{IncrementalBalanceStrategy}
View Source
var DefaultConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU(),
	WorkerQueueDepth:  1000,
	MaxConcurrentKeys: 10000,
}
View Source
var DefaultEosConfig = EosConfig{
	PoolSize:        DefaultPoolSize,
	PendingTxnCount: DefaultPendingTxnCount,
	TargetBatchSize: DefaultTargetBatchSize,
	MaxBatchSize:    DefaultMaxBatchSize,
	BatchDelay:      DefaultBatchDelay,
}
View Source
var ErrPartitionNotAssigned = errors.New("partition is not assigned")
View Source
var ErrPartitionNotReady = errors.New("partition is not ready")
View Source
var FastNetworkConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU() * 4,
	WorkerQueueDepth:  100,
	MaxConcurrentKeys: 10000,
}
View Source
var Int32Codec = intCodec[int32]{}
View Source
var Int64Codec = intCodec[int64]{}
View Source
var IntCodec = intCodec[int]{}
View Source
var LexoInt64Codec = lexoInt64Codec{}
View Source
var SlowNetworkConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU() * 16,
	WorkerQueueDepth:  100,
	MaxConcurrentKeys: 10000,
}
View Source
var WideNetworkConfig = SchedulerConfig{
	Concurrency:       runtime.NumCPU() * 32,
	WorkerQueueDepth:  1000,
	MaxConcurrentKeys: 10000,
}

Functions

func DeleteSource

func DeleteSource(sourceConfig EventSourceConfig) error

Deletes all topics associated with a Source. Provided for local testing purpoose only. Do not call this in deployed applications unless your topics are transient in nature.

func JsonItemDecoder

func JsonItemDecoder[T any](record IncomingRecord) (T, error)

A convenience function for decoding an IncomingRecord. Conforms to streams.IncomingRecordDecoder interface needed for streams.RegisterEventType

streams.RegisterEventType(myEventSource, codec.JsonItemDecoder[myType], myHandler, "myType")
// or standalone
myDecoder := codec.JsonItemDecoder[myType]
myItem := myDecoder(incomingRecord)

func NewClient

func NewClient(cluster Cluster, options ...kgo.Opt) (*kgo.Client, error)

NewClient creates a kgo.Client from the options retuned from the provided Cluster and addtional `options`. Used internally and exposed for convenience.

func RegisterEventType

func RegisterEventType[T StateStore, V any](es *EventSource[T], transformer IncomingRecordDecoder[V], eventProcessor EventProcessor[T, V], eventType string)

Registers eventType with a transformer (usuall a codec.Codec) with the supplied EventProcessor.

func SetRecordType

func SetRecordType(r *kgo.Record, recordType string)

A convenience function provided in case you are working with a raw kgo producer and want to integrate with streams. This will ensure that the EventSource will route the record to the proper handler without falling back to the defaultHandler

Types

type AsyncBatcher

type AsyncBatcher[S any, K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewAsyncBatcher

func NewAsyncBatcher[S StateStore, K comparable, V any](eventSource *EventSource[S], executor BatchExecutor[K, V], maxBatchSize, maxConcurrentBatches int, delay time.Duration) *AsyncBatcher[S, K, V]

func (*AsyncBatcher[S, K, V]) Add

func (ab *AsyncBatcher[S, K, V]) Add(batch *Batch[S, K, V]) ExecutionState

type AsyncJobFinalizer

type AsyncJobFinalizer[T any, K comparable, V any] func(*EventContext[T], K, V, error) ExecutionState

A callback invoked when a previously scheduled AsyncJob has been completed.

type AsyncJobProcessor

type AsyncJobProcessor[K comparable, V any] func(K, V) error

A handler invoked when a previously scheduled AsyncJob should be performed.

type AsyncJobScheduler

type AsyncJobScheduler[S StateStore, K comparable, V any] struct {
	// contains filtered or unexported fields
}

The AsyncJobScheduler provides a generic work scheduler/job serializer which takes a key/value as input via Schedule. All work is organized into queues by 'key'. So for a given key, all work is serial allowing the use of the single writer principle in an asynchronous fashion. In practice, it divides a stream partition into it's individual keys and processes the keys in parallel.

After the the scheduling is complete for a key/value, Scheduler will call the `processor` callback defined at initialization. The output of this call will be passed to the `finalizer` callback. If `finalizer` is nil, the event is marked as `Complete`, once the job is finished, ignoring any errors.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/go-kafka-event-source/streams"
	"github.com/aws/go-kafka-event-source/streams/sak"
	"github.com/aws/go-kafka-event-source/streams/stores"
)

type Contact struct {
	Id          string
	PhoneNumber string
	Email       string
	FirstName   string
	LastName    string
	LastContact time.Time
}

type NotifyContactEvent struct {
	ContactId        string
	NotificationType string
}

type EmailNotification struct {
	ContactId string
	Address   string
	Payload   string
}

func (c Contact) Key() string {
	return c.Id
}

func createContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState {
	contactStore := ctx.Store()
	ctx.RecordChange(contactStore.Put(contact))
	fmt.Printf("Created contact: %s\n", contact.Id)
	return streams.Complete
}

// simply providing an example of how you might wrap the store into your own type
type ContactStore struct {
	*stores.SimpleStore[Contact]
}

func NewContactStore(tp streams.TopicPartition) ContactStore {
	return ContactStore{stores.NewJsonSimpleStore[Contact](tp)}
}

var notificationScheduler *streams.AsyncJobScheduler[ContactStore, string, EmailNotification]

func notifyContactAsync(ctx *streams.EventContext[ContactStore], notification NotifyContactEvent) streams.ExecutionState {
	contactStore := ctx.Store()
	if contact, ok := contactStore.Get(notification.ContactId); ok {
		fmt.Printf("Notifying contact: %s asynchronously by %s\n", contact.Id, notification.NotificationType)
		return notificationScheduler.Schedule(ctx, contact.Email, EmailNotification{
			ContactId: contact.Id,
			Address:   contact.Email,
			Payload:   "sending you mail...from a computer!",
		})
	} else {
		fmt.Printf("Contact %s does not exist!\n", notification.ContactId)
	}
	return streams.Complete
}

func sendEmailToContact(key string, notification EmailNotification) error {

	fmt.Printf("Processing an email job with key: '%s'. This may take some time, emails are tricky!\n", key)
	time.Sleep(500 * time.Millisecond)
	return nil
}

func emailToContactComplete(ctx *streams.EventContext[ContactStore], _ string, email EmailNotification, err error) streams.ExecutionState {

	contactStore := ctx.Store()
	if contact, ok := contactStore.Get(email.ContactId); ok {
		fmt.Printf("Notified contact: %s, address: %s, payload: '%s'\n", contact.Id, email.Address, email.Payload)
		contact.LastContact = time.Now()
		contactStore.Put(contact)
	}
	return streams.Complete
}

func main() {
	streams.InitLogger(streams.SimpleLogger(streams.LogLevelError), streams.LogLevelError)

	contactsCluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})
	sourceConfig := streams.EventSourceConfig{
		GroupId:       "ExampleAsyncJobSchedulerGroup",
		Topic:         "ExampleAsyncJobScheduler",
		NumPartitions: 10,
		SourceCluster: contactsCluster,
	}

	destination := streams.Destination{
		Cluster:      sourceConfig.SourceCluster,
		DefaultTopic: sourceConfig.Topic,
	}

	eventSource := sak.Must(streams.NewEventSource(sourceConfig, NewContactStore, nil))

	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], createContact, "CreateContact")
	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[NotifyContactEvent], notifyContactAsync, "NotifyContact")

	notificationScheduler = sak.Must(streams.CreateAsyncJobScheduler(eventSource,
		sendEmailToContact, emailToContactComplete, streams.DefaultConfig))

	eventSource.ConsumeEvents()

	contact := Contact{
		Id:          "123",
		Email:       "billy@bob.com",
		PhoneNumber: "+18005551212",
		FirstName:   "Billy",
		LastName:    "Bob",
	}

	notification := NotifyContactEvent{
		ContactId:        "123",
		NotificationType: "email",
	}

	producer := streams.NewProducer(destination)

	createContactRecord := streams.JsonItemEncoder("CreateContact", contact)
	createContactRecord.WriteKeyString(contact.Id)

	notificationRecord := streams.JsonItemEncoder("NotifyContact", notification)
	notificationRecord.WriteKeyString(notification.ContactId)

	producer.Produce(context.Background(), createContactRecord)
	producer.Produce(context.Background(), notificationRecord)

	eventSource.WaitForSignals(nil)
	// Expected Output: Created contact: 123
	// Notifying contact: 123 asynchronously by email
	// Processing an email job with key: 'billy@bob.com'. This may take some time, emails are tricky!
	// Notified contact: 123, address: billy@bob.com, payload: 'sending you mail...from a computer!'
}

func CreateAsyncJobScheduler

func CreateAsyncJobScheduler[S StateStore, K comparable, V any](
	eventSource *EventSource[S],
	processor AsyncJobProcessor[K, V],
	finalizer AsyncJobFinalizer[S, K, V],
	config SchedulerConfig) (*AsyncJobScheduler[S, K, V], error)

func (*AsyncJobScheduler[S, K, V]) Schedule

func (ap *AsyncJobScheduler[S, K, V]) Schedule(ec *EventContext[S], key K, value V) ExecutionState

func (*AsyncJobScheduler[S, K, V]) SetMaxConcurrentKeys

func (ap *AsyncJobScheduler[S, K, V]) SetMaxConcurrentKeys(size int)

note: this does not increase the number of go-routines processiung work, only the max number of keys we will accept work for before we block the incoming data stream

func (*AsyncJobScheduler[S, K, V]) SetWorkerQueueDepth

func (ap *AsyncJobScheduler[S, K, V]) SetWorkerQueueDepth(size int)

type BalanceStrategy

type BalanceStrategy int
const (
	RangeBalanceStrategy             BalanceStrategy = 0
	RoundRobinBalanceStrategy        BalanceStrategy = 1
	CooperativeStickyBalanceStrategy BalanceStrategy = 2
	IncrementalBalanceStrategy       BalanceStrategy = 3
)

type Batch

type Batch[S any, K comparable, V any] struct {
	Items    []BatchItem[K, V]
	UserData any
	// contains filtered or unexported fields
}

func NewBatch

func NewBatch[S any, K comparable, V any](ec *EventContext[S], cb BatchCallback[S, K, V]) *Batch[S, K, V]

func (*Batch[S, K, V]) Add

func (b *Batch[S, K, V]) Add(items ...BatchItem[K, V]) *Batch[S, K, V]

func (*Batch[S, K, V]) AddKeyValue

func (b *Batch[S, K, V]) AddKeyValue(key K, value V) *Batch[S, K, V]

type BatchCallback

type BatchCallback[S any, K comparable, V any] func(*EventContext[S], *Batch[S, K, V]) ExecutionState

type BatchExecutor

type BatchExecutor[K comparable, V any] func(batch []*BatchItem[K, V])

type BatchItem

type BatchItem[K comparable, V any] struct {
	Key      K
	Value    V
	Err      error
	UserData any
	// contains filtered or unexported fields
}

type BatchProducer

type BatchProducer[S any] struct {
	// contains filtered or unexported fields
}

func NewBatchProducer

func NewBatchProducer[S any](destination Destination) *BatchProducer[S]

func (*BatchProducer[S]) Produce

func (p *BatchProducer[S]) Produce(ec *EventContext[S], records []*Record, cb BatchProducerCallback[S], userData any) ExecutionState

type BatchProducerCallback

type BatchProducerCallback[S any] func(eventContext *EventContext[S], records []*Record, userData any) ExecutionState

type ChangeLogEntry

type ChangeLogEntry struct {
	// contains filtered or unexported fields
}

func CreateChangeLogEntry

func CreateChangeLogEntry[T any](item T, codec Codec[T]) (ChangeLogEntry, error)

func CreateJsonChangeLogEntry

func CreateJsonChangeLogEntry[T any](item T) (ChangeLogEntry, error)

func JsonChangeLogEntryEncoder

func JsonChangeLogEntryEncoder[T any](entryType string, item T) ChangeLogEntry

A convenience function for encoding an item into a ChangeLogEntry suitable writing to a StateStore Please not that the Key on the entry will be left uninitialized. Usage:

entry := codec.JsonChangeLogEntryEncoder("myType", myItem)
entry.WriteKeyString(myItem.Key)

func NewChangeLogEntry

func NewChangeLogEntry() ChangeLogEntry

func (ChangeLogEntry) KeyWriter

func (cle ChangeLogEntry) KeyWriter() *bytes.Buffer

func (ChangeLogEntry) ValueWriter

func (cle ChangeLogEntry) ValueWriter() *bytes.Buffer

func (ChangeLogEntry) WithEntryType

func (cle ChangeLogEntry) WithEntryType(entryType string) ChangeLogEntry

func (ChangeLogEntry) WithKey

func (cle ChangeLogEntry) WithKey(key ...[]byte) ChangeLogEntry

func (ChangeLogEntry) WithKeyString

func (cle ChangeLogEntry) WithKeyString(key ...string) ChangeLogEntry

func (ChangeLogEntry) WithValue

func (cle ChangeLogEntry) WithValue(value ...[]byte) ChangeLogEntry

func (ChangeLogEntry) WriteKey

func (cle ChangeLogEntry) WriteKey(bs ...[]byte)

func (ChangeLogEntry) WriteKeyString

func (cle ChangeLogEntry) WriteKeyString(ss ...string)

func (ChangeLogEntry) WriteValue

func (cle ChangeLogEntry) WriteValue(bs ...[]byte)

func (ChangeLogEntry) WriteValueString

func (cle ChangeLogEntry) WriteValueString(ss ...string)

type ChangeLogReceiver

type ChangeLogReceiver interface {
	ReceiveChange(IncomingRecord) error
}

type CleanupPolicy

type CleanupPolicy int
const (
	CompactCleanupPolicy CleanupPolicy = iota
	DeleteCleanupPolicy
)

type Cluster

type Cluster interface {
	// Returns the list of kgo.Opt(s) that will be used whenever a connection is made to this cluster.
	// At minimum, it should return the kgo.SeedBrokers() option.
	Config() ([]kgo.Opt, error)
}

An interface for implementing a resusable Kafka client configuration. TODO: document reserved options

type Codec

type Codec[T any] interface {
	Encode(*bytes.Buffer, T) error
	Decode([]byte) (T, error)
}
var ByteCodec Codec[[]byte] = byteCodec{}
var StringCodec Codec[string] = stringCodec{}

type DeserializationErrorHandler

type DeserializationErrorHandler func(ec ErrorContext, eventType string, err error) ErrorResponse

type Destination

type Destination struct {
	// The topic to use for records being produced which have empty topic data
	DefaultTopic string
	// Optional, used in CreateDestination call.
	NumPartitions int
	// Optional, used in CreateDestination call.
	ReplicationFactor int
	// Optional, used in CreateDestination call.
	MinInSync int
	// The Kafka cluster where this destination resides.
	Cluster Cluster
}

func CreateDestination

func CreateDestination(destination Destination) (resolved Destination, err error)

type EosConfig

type EosConfig struct {
	// PoolSize is the number of transactional producer clients in the pool.
	PoolSize int
	// The maximum number of pending transactions to be allowed in the pool at any given point in time.
	PendingTxnCount int
	// TargetBatchSize is the target number of events or records (whichever is greater) for a transaction before a commit is attempted.
	TargetBatchSize int
	// MaxBatchSize is the maximum number of events or records (whichever is greater) for a transaction before it will stop accepting new events.
	// Once a transaction reaches MaxBatchSize, it  ust be commited.
	MaxBatchSize int
	// The maximum amount of time to wait before committing a transaction. Once this time has elapsed, the transaction will commit
	// even if TargetBatchSize has not been achieved. This number will be the tail latency of the consume/produce cycle during periods of low activity.
	// Under high load, this setting has little impact unless set too low. If this value is too low, produce batch sizes will be extremely small a
	// and Kafka will need to manage an excessive number of transactions.
	// The recommnded value is 10ms and the minimum allowed value is 1ms.
	BatchDelay time.Duration
}

EosDiagarm

 On-Deck Txn            Pending Txn Channel          Commit Go-Routine
┌───────────┐           ┌─────────────────┐          ┌─────────────────────────────────────┐
│ EventCtx  │           │  Pending Txn(s) │          │  Committing Txn                     │
│ Offset: 7 │           │  ┌───────────┐  │          │  ┌───────────┐                      │
├───────────┤           │  │ EventCtx  │  │          │  │ EventCtx  │  1: Receive Txn      │
│ EventCtx  │           │  │ Offset: 4 │  │          │  │ Offset: 1 │                      │
│ Offset: 8 │           │  ├───────────┤  │          │  ├───────────┤  2: EventCtx(s).Wait │
├───────────┼──────────►│  │ EventCtx  │  ├─────────►│  │ EventCtx  │                      │
│ EventCtx  │           │  │ Offset: 5 │  │          │  │ Offset: 2 │  3: Flush Records    │
│ Offset: 9 │           │  ├───────────┤  │          │  ├───────────┤                      │
└───────────┘           │  │ EventCtx  │  │          │  │ EventCtx  │  4: Commit           │
      ▲                 │  │ Offset: 6 │  │          │  │ Offset: 3 │                      │
      │                 │  └───────────┘  │          │  └───────────┘                      │
      │                 └─────────────────┘          └─────────────────────────────────────┘
Incoming EventCtx

func (EosConfig) IsZero

func (cfg EosConfig) IsZero() bool

IsZero returns true if EosConfig is uninitialized, or all values equal zero. Used to determine whether the EventSource should fall back to DefaultEosConfig.

type ErrorContext

type ErrorContext interface {
	TopicPartition() TopicPartition
	Offset() int64
	Input() (IncomingRecord, bool)
}

type ErrorResponse

type ErrorResponse int

in structs GKES and how to proceed when an error is encountered.

const (
	// Instructs GKES to ignore any error stateand continue processing as normal. If this is used in response to
	// Kafka transaction error, there will likely be data loss or corruption. This ErrorResponse is not recommended as it is unlikely that
	// a consumer will be able to recover gracefully from a transaction error. In almost all situations, FailConsumer is preferred.
	Continue ErrorResponse = iota

	// Instructs GKES to immediately stop processing and the consumer to immediately leave the group.
	// This is preferable to a FatallyExit as Kafka will immediatly recognize the consumer as exiting the group
	// (if there is still comminication with the cluster) and processing of the
	// failed partitions will begin without waiting for the session timeout value.
	FailConsumer

	// As the name implies, the application will fatally exit. The partitions owned by this consumer will not be reassigned until the configured
	// session timeout on the broker.
	FatallyExit
)

func DefaultDeserializationErrorHandler

func DefaultDeserializationErrorHandler(ec ErrorContext, eventType string, err error) ErrorResponse

The default DeserializationErrorHandler. Simply logs the error and returns Continue.

func DefaultTxnErrorHandler

func DefaultTxnErrorHandler(err error) ErrorResponse

The default and recommended TxnErrorHandler. Returns FailConsumer on txn errors.

type EventContext

type EventContext[T any] struct {
	// contains filtered or unexported fields
}

Contains information about the current event. Is passed to EventProcessors and Interjections

func (*EventContext[T]) AsyncJobComplete

func (ec *EventContext[T]) AsyncJobComplete(finalize func() ExecutionState)

AsyncJobComplete should be called when an async event processor has performed it's function. the finalize cunction should return Complete if there are no other pending asynchronous jobs for the event context in question, regardless of error state. `finalize` does no accept any arguments, so you're callback should encapsulate any pertinent data needed for processing. If you are using AsyncBatcher, AsyncJobScheduler or BatchProducer, you should not need to interact with this method directly.

func (*EventContext[T]) Forward

func (ec *EventContext[T]) Forward(records ...*Record)

Forwards records to the transactional producer for your EventSource.

func (*EventContext[T]) Input

func (ec *EventContext[T]) Input() (IncomingRecord, bool)

Return the raw input record for this event or an uninitialized record and false if the EventContect represents an Interjections

func (*EventContext[T]) IsInterjection

func (ec *EventContext[T]) IsInterjection() bool

Returns true if this EventContext represents an Interjection

func (*EventContext[T]) Offset

func (ec *EventContext[T]) Offset() int64

The offset for this event, -1 for an Interjection

func (*EventContext[T]) RecordChange

func (ec *EventContext[T]) RecordChange(entries ...ChangeLogEntry)

Forwards records to the transactional producer for your EventSource. When you add an item to your StateStore, you must call this method for that change to be recorded in the stream. This ensures that when the TopicPartition for this change is tansferred to a new consumer, it will also have this change.

func (*EventContext[T]) Store

func (ec *EventContext[T]) Store() T

Returns the StateStore for this event/TopicPartition

func (*EventContext[T]) TopicPartition

func (ec *EventContext[T]) TopicPartition() TopicPartition

The TopicParition for this event. It is present for both normal events and Interjections

type EventProcessor

type EventProcessor[T any, V any] func(*EventContext[T], V) ExecutionState

A callback invoked when a new record has been received from the EventSource, after it has been transformed via IncomingRecordTransformer.

type EventSource

type EventSource[T StateStore] struct {
	// contains filtered or unexported fields
}

EventSource provides an abstraction over raw kgo.Record/streams.IncomingRecord consumption, allowing the use of strongly typed event handlers. One of the key features of the EventSource is to allow for the routing of events based off of a type header. See RegisterEventType for details.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/go-kafka-event-source/streams"
	"github.com/aws/go-kafka-event-source/streams/sak"
	"github.com/aws/go-kafka-event-source/streams/stores"
)

type Contact struct {
	Id          string
	PhoneNumber string
	Email       string
	FirstName   string
	LastName    string
	LastContact time.Time
}

type NotifyContactEvent struct {
	ContactId        string
	NotificationType string
}

func (c Contact) Key() string {
	return c.Id
}

func createContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState {
	contactStore := ctx.Store()
	ctx.RecordChange(contactStore.Put(contact))
	fmt.Printf("Created contact: %s\n", contact.Id)
	return streams.Complete
}

func deleteContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState {
	contactStore := ctx.Store()
	if entry, ok := contactStore.Delete(contact); ok {
		ctx.RecordChange(entry)
		fmt.Printf("Deleted contact: %s\n", contact.Id)
	}
	return streams.Complete
}

func notifyContact(ctx *streams.EventContext[ContactStore], notification NotifyContactEvent) streams.ExecutionState {
	contactStore := ctx.Store()
	if contact, ok := contactStore.Get(notification.ContactId); ok {
		fmt.Printf("Notifying contact: %s by %s\n", contact.Id, notification.NotificationType)
	} else {
		fmt.Printf("Contact %s does not exist!\n", notification.ContactId)
	}
	return streams.Complete
}

// simply providing an example of how you might wrap the store into your own type
type ContactStore struct {
	*stores.SimpleStore[Contact]
}

func NewContactStore(tp streams.TopicPartition) ContactStore {
	return ContactStore{stores.NewJsonSimpleStore[Contact](tp)}
}

func main() {
	streams.InitLogger(streams.SimpleLogger(streams.LogLevelError), streams.LogLevelError)

	contactsCluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})
	sourceConfig := streams.EventSourceConfig{
		GroupId:       "ExampleEventSourceGroup",
		Topic:         "ExampleEventSource",
		NumPartitions: 10,
		SourceCluster: contactsCluster,
	}

	destination := streams.Destination{
		Cluster:      sourceConfig.SourceCluster,
		DefaultTopic: sourceConfig.Topic,
	}

	eventSource := sak.Must(streams.NewEventSource(sourceConfig, NewContactStore, nil))

	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], createContact, "CreateContact")
	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], deleteContact, "DeleteContact")
	streams.RegisterEventType(eventSource, streams.JsonItemDecoder[NotifyContactEvent], notifyContact, "NotifyContact")

	eventSource.ConsumeEvents()

	contact := Contact{
		Id:          "123",
		PhoneNumber: "+18005551212",
		FirstName:   "Billy",
		LastName:    "Bob",
	}

	notification := NotifyContactEvent{
		ContactId:        "123",
		NotificationType: "email",
	}

	producer := streams.NewProducer(destination)

	createContactRecord := streams.JsonItemEncoder("CreateContact", contact)
	createContactRecord.WriteKeyString(contact.Id)

	deleteContactRecord := streams.JsonItemEncoder("DeleteContact", contact)
	deleteContactRecord.WriteKeyString(contact.Id)

	notificationRecord := streams.JsonItemEncoder("NotifyContact", notification)
	notificationRecord.WriteKeyString(notification.ContactId)

	producer.Produce(context.Background(), createContactRecord)
	producer.Produce(context.Background(), notificationRecord)
	producer.Produce(context.Background(), deleteContactRecord)
	producer.Produce(context.Background(), notificationRecord)

	eventSource.WaitForSignals(nil)
	// Expected  Output: Created contact: 123
	// Notifying contact: 123 by email
	// Deleted contact: 123
	// Contact 123 does not exist!
}

func NewEventSource

func NewEventSource[T StateStore](sourceConfig EventSourceConfig, stateStoreFactory StateStoreFactory[T], defaultProcessor EventProcessor[T, IncomingRecord],
	additionalClientOptions ...kgo.Opt) (*EventSource[T], error)

Create an EventSource. `defaultProcessor` will be invoked if a suitable EventProcessor can not be found, or the IncomingRecord has no RecordType header.

func (*EventSource[T]) ConsumeEvents

func (es *EventSource[T]) ConsumeEvents()

ConsumeEvents starts the underlying Kafka consumer. This call is non-blocking, so if called from main(), it should be followed by some other blocking call to prevent the application from exiting. See streams.EventSource.WaitForSignals for an example.

func (*EventSource[T]) Done

func (es *EventSource[T]) Done() <-chan struct{}

Done blocks while the underlying Kafka consumer is active.

func (*EventSource[T]) EmitMetric

func (es *EventSource[T]) EmitMetric(m Metric)

func (*EventSource[T]) Interject

func (es *EventSource[T]) Interject(partition int32, cmd Interjector[T]) <-chan error

Executes `cmd` in the context of the given partition. `callback“ is an optional, and will be executed once the interjection is complete if non-nil. `callback` is used interally to make InterjectAll a blocking call.

func (*EventSource[T]) InterjectAll

func (es *EventSource[T]) InterjectAll(interjector Interjector[T])

InterjectAll is a convenience function which allows you to Interject into every active partition assigned to the consumer without create an individual timer per partition. The equivalent of calling Interject() on each active partition, blocking until all are performed. It is worth noting that the interjections are run in parallel, so care must be taken not to create a deadlock between partitions via locking mechanisms such as a Mutex. If parallel processing is not of concern, streams.EventSource.InterjectAllSync is an alternative. Useful for gathering store statistics, but can be used in place of a standard Interjection. Example:

preCount := int64(0)
postCount := int64(0)
eventSource.InterjectAllAsync(func (ec *EventContext[myStateStore], when time.Time) streams.ExecutionState {
	store := ec.Store()
	atomic.AddInt64(&preCount, int64(store.Len()))
	store.performBookeepingTasks()
	atomic.AddInt64(&postCount, int64(store.Len()))
	return streams.Complete
})
fmt.Printf("Number of items before: %d, after: %d\n", preCount, postCount)

func (*EventSource[T]) InterjectAllSync

func (es *EventSource[T]) InterjectAllSync(interjector Interjector[T])

InterjectAllSync performs the same function as streams.EventSource.InterjectAll, however it blocks on each iteration. It may be useful if parallel processing is not of concern andyou want to avoid locking on a shared data structure. Example:

itemCount := 0
eventSource.InterjectAll(func (ec *EventContext[myStateStore], when time.Time) streams.ExecutionState {
	store := ec.Store()
	itemCount += store.Len()
	return streams.Complete
})
fmt.Println("Number of items: ", itemCount)

func (*EventSource[T]) ScheduleInterjection

func (es *EventSource[T]) ScheduleInterjection(interjector Interjector[T], every, jitter time.Duration)

ScheduleInterjection sets a timer for `interjector` to be run `every` time interval, plus or minues a random time.Duration not greater than the absolute value of `jitter` on every invocation. `interjector` will have access to EventContext.Store() and can create/delete store items, or forward events just as a standard EventProcessor. Example:

 func cleanupStaleItems(ec *EventContext[myStateStore], when time.Time)  streams.ExecutionState {
	ec.Store().cleanup(when)
	return ec.Complete
 }
 // schedules cleanupStaleItems to be executed every 900ms - 1100ms
 eventSource.ScheduleInterjection(cleanupStaleItems, time.Second, 100 * time.Millisecond)

func (*EventSource[T]) Source added in v1.0.2

func (es *EventSource[T]) Source() *Source

The Source used by the EventSource.

func (*EventSource[T]) State

func (es *EventSource[T]) State() EventSourceState

Returns the EventSourceState of the underlying Source, Healthy or Unhealthy. When the EventSource encounters an unrecoverable error (unable to execute a transaction for example), it will enter an Unhealthy state. Intended to be used by a health check processes for rolling back during a bad deployment.

func (*EventSource[T]) Stop

func (es *EventSource[T]) Stop()

Signals the underlying *kgo.Client that the underlying consumer should exit the group. If you are using an IncrementalGroupRebalancer, this will trigger a graceful exit where owned partitions are surrendered according to it's configuration. If you are not, this call has the same effect as streams.EventSource.StopNow.

Calls to Stop are not blocking. To block during the shut down process, this call should be followed by `<-eventSource.Done()`

To simplify running from main(), the streams.EventSource.WaitForSignals and streams.EventSource.WaitForChannel calls have been provided. So unless you have extremely complex application shutdown logic, you should not need to interact with this method directly.

func (*EventSource[T]) StopNow

func (es *EventSource[T]) StopNow()

Immediately stops the underlying consumer *kgo.Client by invoking sc.client.Close() This has the effect of immediately surrendering all owned partitions, then closing the client. If you are using an IncrementalGroupRebalancer, this can be used as a force quit.

func (*EventSource[T]) WaitForChannel

func (es *EventSource[T]) WaitForChannel(c chan struct{}, callback func())

WaitForChannel is similar to WaitForSignals, but blocks on a `chan struct{}` then invokes `callback` when finished. Useful when you have multiple EventSources in a single application. Example:

func main() {

	myEventSource1 := initEventSource1()
	myEventSource2.ConsumeEvents()

	myEventSource2 := initEventSource2()
	myEventSource2.ConsumeEvents()

	wg := &sync.WaitGroup{}
	wg.Add(2)

	eventSourceChannel = make(chan struct{})

	go myEventSource1.WaitForChannel(eventSourceChannel, wg.Done)
	go myEventSource2.WaitForChannel(eventSourceChannel, wg.Done)

	osChannel := make(chan os.Signal)
	signal.Notify(osChannel, syscall.SIGINT, syscall.SIGHUP)
	<-osChannel
	close(eventSourceChannel)
	wg.Wait()
	fmt.Println("exiting")
}

func (*EventSource[T]) WaitForSignals

func (es *EventSource[T]) WaitForSignals(preHook func(os.Signal) bool, signals ...os.Signal)

WaitForSignals is convenience function suitable for use in a main() function. Blocks until `signals` are received then gracefully closes the consumer by calling streams.EventSource.Stop. If `signals` are not provided, syscall.SIGINT and syscall.SIGHUP are used. If `preHook` is non-nil, it will be invoked before Stop() is invoked. If the preHook returns false, this call continues to block. If true is returned, `signal.Reset(signals...)` is invoked and the consumer shutdown process begins. Simple example:

func main(){
	myEventSource := initEventSource()
	myEventSource.ConsumeEvents()
	myEventSource.WaitForSignals(nil)
	fmt.Println("exiting")
}

Prehook example:

func main(){
	myEventSource := initEventSource()
	myEventSource.ConsumeEvents()
	myEventSource.WaitForSignals(func(s os.Signal) bool {
		fmt.Printf("starting shutdown from signal %v\n", s)
		shutDownSomeOtherProcess()
		return true
	})
	fmt.Println("exiting")
}

In this example, The consumer will close on syscall.SIGINT or syscall.SIGHUP but not syscall.SIGUSR1:

func main(){
	myEventSource := initEventSource()
	myEventSource.ConsumeEvents()
	myEventSource.WaitForSignals(func(s os.Signal) bool {
		if s == syscall.SIGUSR1 {
			fmt.Println("user signal received")
			performSomeTask()
			return false
		}
		return true
	}, syscall.SIGINT and syscall.SIGHUP, syscall.SIGUSR1)
	fmt.Println("exiting")
}

type EventSourceConfig

type EventSourceConfig struct {
	// The group id for the underlying Kafka consumer group.
	GroupId string
	// The Kafka topic to consume
	Topic string
	// The compacted Kafka topic on which to publish/consume [StateStore] data. If not provided, GKES will generate a name which includes
	// Topic and GroupId.
	StateStoreTopic string
	// The desired number of partitions for Topic.
	NumPartitions int
	// The desired replication factor for Topic. Defaults to 1.
	ReplicationFactor int
	// The desired min-insync-replicas for Topic. Defaults to 1.
	MinInSync int
	// The number of Kafka partitions to use for the applications commit log. Defaults to 5 if unset.
	CommitLogPartitions int
	// The Kafka cluster on which Topic resides, or the source of incoming events.
	SourceCluster Cluster
	// StateCluster is the Kafka cluster on which the commit log and the StateStore topic resides. If left unset (recommended), defaults to SourceCluster.
	StateCluster Cluster
	// The consumer rebalance strategies to use for the underlying Kafka consumer group.
	BalanceStrategies []BalanceStrategy
	/*
		CommitOffsets should be set to true if you are migrating from a traditional consumer group.
		This will ensure that the offsets are commited to the consumer group
		when in a mixed fleet scenario (migrating into an EventSource from a standard consumer).
		If the deploytment fails, the original non-EventSource application can then
		resume consuming from the commited offsets. Once the EventSource application is well-established,
		this setting should be switched to false as offsets are managed by another topic.
		In a EventSource application, committing offsets via the standard mechanism only
		consumes resources and provides no benefit.
	*/
	CommitOffsets bool
	/*
		The config used for the eos producer pool. If empty, [DefaultEosConfig] is used. If an EventSource is initialized with an invalid
		[EosConfig], the application will panic.
	*/
	EosConfig EosConfig
	// If non-nil, the EventSorce will emit [Metric] objects of varying types. This is backed by a channel. If the channel is full
	// (presumably because the MetricHandler is not able to keep up),
	// GKES will drop the metric and log at WARN level to prevent processing slow down.
	MetricsHandler MetricsHandler

	// Called when a partition has been assigned to the EventSource consumer client. This does not indicate that the partion is being processed.
	OnPartitionAssigned SourcePartitionEventHandler

	// Called when a perviously assigned partition has been activated, meaning the EventSource will start processing events for this partition. At the time this handler is called, the  StateStore associated with this partition has been bootstrapped and is ready for use.
	OnPartitionActivated SourcePartitionEventHandler

	// Called when a partition is about to be revoked from the EventSource consumer client.
	// This is a blocking call and, as such, should return quickly.
	OnPartitionWillRevoke SourcePartitionEventHandler
	// Called when a partition has been revoked from the EventSource consumer client.
	// This handler is invoked after GKES has stopped processing and has finished removing any associated resources for the partition.
	OnPartitionRevoked          SourcePartitionEventHandler
	DeserializationErrorHandler DeserializationErrorHandler
	TxnErrorHandler             TxnErrorHandler
}

type EventSourceState

type EventSourceState uint64
const (
	Healthy EventSourceState = iota
	Unhealthy
)

type ExecutionState

type ExecutionState int

Returned by an EventProcessor or Interjector in response to an EventContext. ExecutionState should not be conflated with concepts of error state, such as Success or Failure.

const (
	// Complete signals the EventSource that the event or interjection is completely processed.
	// Once Complete is returned, the offset for the associated EventContext will be commited.
	Complete ExecutionState = 0
	// Incomplete signals the EventSource that the event or interjection is still ongoing, and
	// that your application promises to fulfill the EventContext in the future.
	// The offset for the associated EventContext will not be commited.
	Incomplete ExecutionState = 1

	Fatal ExecutionState = 2
)

type GlobalChangeLog

type GlobalChangeLog[T ChangeLogReceiver] struct {
	// contains filtered or unexported fields
}

A GlobalChangeLog is simply a consumer which continously consumes all partitions within the given topic and forwards all records to it's StateStore. GlobalChangeLogs can be useful for sharing small amounts of data between a group of hosts. For example, GKES uses a global change log to keep track of consumer group offsets.

func NewGlobalChangeLog

func NewGlobalChangeLog[T ChangeLogReceiver](cluster Cluster, receiver T, numPartitions int, topic string, cleanupPolicy CleanupPolicy) GlobalChangeLog[T]

Creates a NewGlobalChangeLog consumer and forward all records to `receiver`.

func (GlobalChangeLog[T]) Pause

func (cl GlobalChangeLog[T]) Pause(partition int32)

func (GlobalChangeLog[T]) PauseAllPartitions

func (cl GlobalChangeLog[T]) PauseAllPartitions()

func (GlobalChangeLog[T]) ResumePartitionAt

func (cl GlobalChangeLog[T]) ResumePartitionAt(partition int32, offset int64)

func (GlobalChangeLog[T]) Start

func (cl GlobalChangeLog[T]) Start()

func (GlobalChangeLog[T]) Stop

func (cl GlobalChangeLog[T]) Stop()

type IncomingRecord

type IncomingRecord struct {
	// contains filtered or unexported fields
}

func (IncomingRecord) HeaderValue

func (r IncomingRecord) HeaderValue(name string) []byte

func (IncomingRecord) Headers

func (r IncomingRecord) Headers() []kgo.RecordHeader

func (IncomingRecord) Key

func (r IncomingRecord) Key() []byte

func (IncomingRecord) LeaderEpoch

func (r IncomingRecord) LeaderEpoch() int32

func (IncomingRecord) Offset

func (r IncomingRecord) Offset() int64

func (IncomingRecord) RecordType

func (r IncomingRecord) RecordType() string

func (IncomingRecord) Timestamp

func (r IncomingRecord) Timestamp() time.Time

func (IncomingRecord) TopicPartition

func (r IncomingRecord) TopicPartition() TopicPartition

func (IncomingRecord) Value

func (r IncomingRecord) Value() []byte

type IncomingRecordDecoder

type IncomingRecordDecoder[V any] func(IncomingRecord) (V, error)

A callback invoked when a new record has been received from the EventSource.

type IncrGroupMemberInstructions

type IncrGroupMemberInstructions struct {
	Prepare []TopicPartition
	Forget  []TopicPartition // not currently used
}

type IncrGroupMemberMeta

type IncrGroupMemberMeta struct {
	Preparing []TopicPartition
	Ready     []TopicPartition
	Status    MemberStatus
	LeftAt    int64
}

type IncrGroupPartitionState

type IncrGroupPartitionState struct {
	Preparing []TopicPartition
	Ready     []TopicPartition
}

type IncrRebalanceInstructionHandler

type IncrRebalanceInstructionHandler interface {
	// Called by the IncrementalGroupRebalancer. Signals the instruction handler that this partition is destined for this consumer.
	// In the case of the EventSource, prepartion involves pre-populating the StateStore for this partition.
	PrepareTopicPartition(tp TopicPartition)
	// Called by the IncrementalGroupRebalancer. Signals the instruction handler that it is safe to forget this previously prepped TopicPartition.
	ForgetPreparedTopicPartition(tp TopicPartition)
	// // Called by the IncrementalGroupRebalancer. A valid *kgo.Client, which is on the same cluster as the Source.Topic, must be returned.
	Client() *kgo.Client
}

Defines the interface needed for the IncrementalGroupRebalancer to function. EventSource fulfills this interface. If you are using EventSource, there is nothing else for you to implement.

type IncrementalGroupRebalancer

type IncrementalGroupRebalancer interface {
	kgo.GroupBalancer
	// Must be called by the InstuctionHandler once a TopicPartition is ready for consumption
	PartitionPrepared(TopicPartition)
	// Must be called by the InstuctionHandler if it fails to prepare a TopicPartition it was previously instructed to prepare
	PartitionPreparationFailed(TopicPartition)
	// Must be called by the InstuctionHandler once it receives an assignment
	PartitionsAssigned(...TopicPartition)
	// Must be called by the InstuctionHandler if it wishes to leave the consumer group in a graceful fashion
	GracefullyLeaveGroup() <-chan struct{}
}

The IncrementalGroupRebalancer interface is an extension to kgo.GroupBalancer. This balancer allows for slowly moving partions during consumer topology changes. This helps reduce blast radius in the case of failures, as well as keep the inherent latency penalty of trasnistioning partitions to a minumum.

func IncrementalRebalancer

func IncrementalRebalancer(instructionHandler IncrRebalanceInstructionHandler) IncrementalGroupRebalancer

Creates an IncrementalRebalancer suitatble for use by the kgo Kafka driver. In most cases, the instructionHandler is the EventSource. `activeTransitions` defines how many partitons may be in receivership at any given point in time.

Example, when `activeTransitions` is 1 and the grpoup stat is imbalanced (a new member is added or a member signals it wishes to leave the group), the IncrementalGroupRebalancer will choose 1 partition to move. Once the receiver of that partition signals it is ready for the partition, it will assign it, then choose anothe partion to move. This process continues until the group has reached a balanced state.

In all cases, any unassigned partitions will be assigned immediately. If a consumer host crashes, for example, it's partitions will be assigned immediately, regardless of preparation state.

receivership - the state of being dealt with by an official receiver.

type Interjector

type Interjector[T any] func(*EventContext[T], time.Time) ExecutionState

Defines the method signature needed by the EventSource to perform a stream interjection. See EventSource.Interject.

type JsonCodec

type JsonCodec[T any] struct{}

A generic JSON en/decoder. = Uses "github.com/json-iterator/go".ConfigCompatibleWithStandardLibrary for en/decoding JSON in a perforamnt way

func (JsonCodec[T]) Decode

func (JsonCodec[T]) Decode(b []byte) (T, error)

func (JsonCodec[T]) Encode

func (JsonCodec[T]) Encode(b *bytes.Buffer, t T) error

type LogLevel

type LogLevel int
const (
	LogLevelNone LogLevel = iota
	LogLevelTrace
	LogLevelDebug
	LogLevelInfo
	LogLevelWarn
	LogLevelError
)

type Logger

type Logger interface {
	Tracef(msg string, args ...any)
	Debugf(msg string, args ...any)
	Infof(msg string, args ...any)
	Warnf(msg string, args ...any)
	Errorf(msg string, args ...any)
}

Provides the interface needed by GKES to intergrate with your loggin mechanism. Example:

 import (
	"mylogger"
	"github.com/aws/go-kafka-event-source/streams"
 )

 func main() {
	// GKES will emit log at whatever level is defined by NewLogger()
	// kgo will emit logs at LogLevelError
	streams.InitLogger(mylogger.NewLogger(), streams.LogLevelError)
 }

func InitLogger

func InitLogger(l Logger, kafkaDriverLogLevel LogLevel) Logger

Initializes the GKES logger. `kafkaDriverLogLevel` defines the log level for the underlying kgo clients. This call should be the first interaction with the GKES module. Subsequent calls will have no effect. If never called, the default unitialized logger writes to STDOUT at LogLevelError for both GKES and kgo. Example:

 import "github.com/aws/go-kafka-event-source/streams"

 func main() {
	streams.InitLogger(streams.SimpleLogger(streams.LogLevelInfo), streams.LogLevelError)
	// ... initialize your application
 }

func WrapLogger

func WrapLogger(logger Logger, level LogLevel) Logger

WrapLogger allows GKES to emit logs at a higher level than your own Logger. Useful if you need debug level logging for your own application, but want to cluuter your logs with gstream output. Example:

 import (
	"mylogger"
	"github.com/aws/go-kafka-event-source/streams"
 )

 func main() {
	// your application will emit logs at "Debug"
	// GKES will emit logs at LogLevelError
	// kgo will emit logs at LogLevelNone
	gkesLogger := streams.WrapLogger(mylogger.NewLogger("Debug"), streams.LogLevelError)
	streams.InitLogger(gkesLogger, streams.LogLevelNone)
 }

type MemberStatus

type MemberStatus int

The status of a consumer group member.

const (
	ActiveMember MemberStatus = iota
	InactiveMember
	Defunct
)

type Metric

type Metric struct {
	StartTime      time.Time
	ExecuteTime    time.Time
	EndTime        time.Time
	Count          int
	Bytes          int
	PartitionCount int
	Partition      int32
	Operation      string
	Topic          string
	GroupId        string
}

func (Metric) Duration

func (m Metric) Duration() time.Duration

func (Metric) ExecuteDuration

func (m Metric) ExecuteDuration() time.Duration

func (Metric) Linger

func (m Metric) Linger() time.Duration

type MetricsHandler

type MetricsHandler func(Metric)

type OptionalPartitioner

type OptionalPartitioner struct {
	// contains filtered or unexported fields
}

func NewOptionalPartitioner

func NewOptionalPartitioner(p kgo.Partitioner) OptionalPartitioner

func (OptionalPartitioner) ForTopic

func (op OptionalPartitioner) ForTopic(topic string) kgo.TopicPartitioner

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

A simple kafka producer

func NewProducer

func NewProducer(destination Destination, opts ...kgo.Opt) *Producer

Create a new Producer. Destination provides cluster connect information.

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) Produce

func (p *Producer) Produce(ctx context.Context, record *Record) (err error)

Produces a record, blocking until complete. If the record has not topic, the DefaultTopic of the producer's Destination will be used.

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(ctx context.Context, record *Record, callback func(*Record, error))

Produces a record asynchronously. If callback is non-nill, it will be executed `callback` when the call is complete. If the record has not topic, the DefaultTopic of the producer's Destination will be used.

type Record

type Record struct {
	// contains filtered or unexported fields
}

func JsonItemEncoder

func JsonItemEncoder[T any](recordType string, item T) *Record

A convenience function for encoding an item into a Record suitable for sending to a producer Please not that the Key on the record will be left uninitialized. Usage:

record := codec.JsonItemEncoder("myType", myItem)
record.WriteKeyString(myItem.Key)

func NewRecord

func NewRecord() *Record

func (*Record) Error

func (r *Record) Error() error

func (*Record) KeyWriter

func (r *Record) KeyWriter() *bytes.Buffer

func (*Record) Offset

func (r *Record) Offset() int64

func (*Record) Release

func (r *Record) Release()

func (*Record) ToKafkaRecord

func (r *Record) ToKafkaRecord() *kgo.Record

func (*Record) ValueWriter

func (r *Record) ValueWriter() *bytes.Buffer

func (*Record) WithHeader

func (r *Record) WithHeader(key string, value []byte) *Record

func (*Record) WithKey

func (r *Record) WithKey(key ...[]byte) *Record

func (*Record) WithKeyString

func (r *Record) WithKeyString(key ...string) *Record

func (*Record) WithPartition

func (r *Record) WithPartition(partition int32) *Record

func (*Record) WithRecordType

func (r *Record) WithRecordType(recordType string) *Record

func (*Record) WithTopic

func (r *Record) WithTopic(topic string) *Record

func (*Record) WithValue

func (r *Record) WithValue(value ...[]byte) *Record

func (*Record) WriteKey

func (r *Record) WriteKey(bs ...[]byte)

func (*Record) WriteKeyString

func (r *Record) WriteKeyString(ss ...string)

func (*Record) WriteValue

func (r *Record) WriteValue(bs ...[]byte)

func (*Record) WriteValueString

func (r *Record) WriteValueString(ss ...string)

type SchedulerConfig

type SchedulerConfig struct {
	Concurrency, WorkerQueueDepth, MaxConcurrentKeys int
}

type SimpleCluster

type SimpleCluster []string

A Cluster implementation useful for local development/testing. Establishes a plain text connection to a Kafka cluster. For a more advanced example, see github.com/aws/go-kafka-event-source/msk.

cluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})

func (SimpleCluster) Config

func (sc SimpleCluster) Config() ([]kgo.Opt, error)

Returns []kgo.Opt{kgo.SeedBrokers(sc...)}

type SimpleLogger

type SimpleLogger LogLevel

SimpleLogger implements Logger and writes to STDOUT. Good for development purposes.

func (SimpleLogger) Debugf

func (sl SimpleLogger) Debugf(msg string, args ...any)

func (SimpleLogger) Errorf

func (sl SimpleLogger) Errorf(msg string, args ...any)

func (SimpleLogger) Infof

func (sl SimpleLogger) Infof(msg string, args ...any)

func (SimpleLogger) Tracef

func (sl SimpleLogger) Tracef(msg string, args ...any)

func (SimpleLogger) Warnf

func (sl SimpleLogger) Warnf(msg string, args ...any)

type Source

type Source struct {
	// contains filtered or unexported fields
}

A readonly wrapper of EventSourceConfig. When an EventSource is initialized, it reconciles the actual Topic configuration from the Kafka cluster (or creates it if missing) and wraps the correct [EventSourceConfig in a Source.

func CreateSource

func CreateSource(sourceConfig EventSourceConfig) (resolved *Source, err error)

Creates all necessary topics in the Kafka appropriate clusters as defined by Source. Automatically invoked as part of NewSourceConsumer(). Ignores errros TOPIC_ALREADT_EXISTS errors. Returns a corrected Source where NumPartitions and CommitLogPartitions are pulled from a ListTopics call. This is to prevent drift errors. Returns an error if the details for Source topics could not be retrieved, or if there is a mismatch in partition counts fo the source topic and change log topic.

func (*Source) AsDestination

func (s *Source) AsDestination() Destination

A convenience method for creating a Destination form your Source. Can be used for creating a Producer or BatchProducer which publishes to your EventSource.

func (*Source) BalanceStrategies

func (s *Source) BalanceStrategies() []BalanceStrategy

func (*Source) CommitLogTopicNameForGroupId

func (s *Source) CommitLogTopicNameForGroupId() string

Returns the formatted topic name used for the commit log of Source

func (*Source) Config

func (s *Source) Config() EventSourceConfig

func (*Source) GroupId

func (s *Source) GroupId() string

func (*Source) NumPartitions

func (s *Source) NumPartitions() int

func (*Source) State

func (s *Source) State() EventSourceState

func (*Source) StateStoreTopicName added in v1.0.2

func (s *Source) StateStoreTopicName() string

Returns the formatted topic name used for the StateStore of Source

func (*Source) Topic

func (s *Source) Topic() string

type SourcePartitionEventHandler

type SourcePartitionEventHandler func(*Source, int32)

type StateStore

type StateStore interface {
	ReceiveChange(IncomingRecord) error
	Revoked()
}

type StateStoreFactory

type StateStoreFactory[T StateStore] func(TopicPartition) T

A callback invoked when a new TopicPartition has been assigned to a EventSource. Your callback should return an empty StateStore.

type TopicPartition

type TopicPartition struct {
	Partition int32
	Topic     string
}

type TopicPartitionCallback

type TopicPartitionCallback[T any] func(TopicPartition) T

type TopicPartitionSet

type TopicPartitionSet struct {
	*btree.BTreeG[TopicPartition]
}

A convenience data structure. It is what the name implies, a Set of TopicPartitions. This data structure is not thread-safe. You will need to providde your own locking mechanism.

func NewTopicPartitionSet

func NewTopicPartitionSet() TopicPartitionSet

Returns a new, empty TopicPartitionSet.

func (TopicPartitionSet) Contains

func (tps TopicPartitionSet) Contains(tp TopicPartition) bool

Tertuens true if the tp is currently a member of TopicPartitionSet

func (TopicPartitionSet) Insert

func (tps TopicPartitionSet) Insert(tp TopicPartition) bool

Insert the TopicPartition. Returns true if the item was inserted, false if the item was aready present

func (TopicPartitionSet) Items

func (tps TopicPartitionSet) Items() []TopicPartition

Converts the set to a newly allocate slice of TopicPartitions.

func (TopicPartitionSet) Remove

func (tps TopicPartitionSet) Remove(tp TopicPartition) bool

Removes tp from the TopicPartitionSet. Rerurns true is the item was present.

type TxnErrorHandler

type TxnErrorHandler func(err error) ErrorResponse

Directories

Path Synopsis
package "sak" (Swiss Army knife) provides some basic util functions
package "sak" (Swiss Army knife) provides some basic util functions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL