events

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2025 License: MIT Imports: 37 Imported by: 0

README

events - golang events wrapper makes using Kafka type safe and easy

GoDoc unit tests report card codecov

Install:

go get github.com/singlestore-labs/events

The events library tries to make using Kafka super easy. It provides:

  • Single line message produce

  • Single line message consume

  • Full type safety without casts

  • A broadcast abstraction that can be used for cache invalidation

  • Bulk consumers

  • A limit on the number of outstanding consumption go-routines

  • Prometheus metrics

  • Optional required pre-registration of topics

  • Dynamic opt-in/out consumption

  • When used with a transactional database:

    • exactly once semantics for message consumption
    • at least once semantics for messages created during a transaction
    • support for PostgreSQL
    • support for SingleStore

Basics

This library provides a message queue abstraction. It is built on top of Kafka. Events are categorized into "topics".

To use events, you must initialize the event library.

In code, you must register the topic, binding it to a particular Go model.

To publish to a topic, you can do so in one line either inside a transaction (with transactional guarantees) or outside a transaction (without guarantees).

To consume a topic, you can do so with one line. You need to pick which flavor of consumer to use (idempotent, exactly-once, or broadcast).

Initialize the library

In init() or in main(), create an instance:

eventLib := events.New()

That instance needs to be configured. It can be configured before or after consumers are registered.

eventLib.Configure(conn, tracer, true, saslMechanism, tlsConfig, brokers)

After Configure and after consumers are registered, consumers can be started:

consumerStarted, consumerFinished, err := eventLib.StartConsuming(ctx)

You know that consumers are ready and processing when the consumerStarted channel is closed.

After Configure you can start a background produce thread. This handles sending events that were created transactionally but not actually sent at the time of the transaction. There should be at least one server doing this. The more servers doing this, the longer the delay can be. In the control plane, this is handled by the state service.

producerDone, err := eventLib.CatchUpProduce(ctx, time.Second*5, 64)

Create at least one consumer group per service:

StateServerConsumer := events.NewConsumerGroup("state-server")
Event ids

The event id (Kafka "key") can be used by Kafka to discard obsolete versions of the same message. We are not currently using this feature, but we could in the future. The key should should either be random or the topic+key should represent the current state of an object. This implies, for example, that if you have an object that represents a user's membership within a group, the key MUST have both the userid and the groupid.

Consume events

There are three patterns for event consumption.

Every consumer must have a consumer group, a handler name, and a handler.

The handler is created by combining the registered topic with code.

Handlers can be called in parallel if there are multiple outstanding messages. Kafka does not guarantee message ordering so expect messages to come in any order.

Behavior for failure must be chosen for each handler.

For the non-broadcast consumers, the system tries to deliver each message once per consumer group. If a consumer group used by multiple server instances (same service) that is still true: only one instance should get the message.

There are no guarantees made about event ordering. In practice, generally events sent earlier will be consumed earlier, but out-of-order deliver must be tolerated by all consumers.

Consumer groups

Consumer groups are a Kafka concept. Multiple topics can be in the same consumer group. There is a cost for each consumer group and Kafka limits the number of them. Consumer group names should not be shared between different services. For this reason, include the service name in the consumer group name.

Do not share consumer groups between different services.

Except for possible starvation due to the use of OnFailureBlock (see below), there is little need to have more than one consumer group per service. Each consumer group will be read one-message-at-a-time in a go-routine. The processing of the events happens in other go-routines. At high volume, it could be useful to have multiple consumer groups in a service.

When a consumer group is created, or a topic added to a consumer group, it will read messages from the beginning of time for each (new) topic. The binding of topics and handlers to consumer groups should be kept very stable.

Consumer group names must be registered with NewConsumerGroup()

Handler names

Multiple handlers can be invoked for each topic in each consumer group. To make error reporting sane, handlers must be named. The name will show up in error messages and in prometheus parameterizations.

Failure behavior

There are four possible behaviors for what happens if a handler returns error. They are:

  • eventmodels.OnFailureDiscard: If the handler returns error beyond the retry point, which could be just one failure, the message will be discarded from the point-of-view of that handler.
  • eventmodels.OnFailureBlock: If the handler returns error, the handler will be called again. And again. This behavior can cause re-delivery of old messages in the same consumer group since no messages would get committed. It can cause starvation of handlers and consumer groups due to per-handler and per-consumer-group limits on the number of simultaneous outstanding messages waiting to be handled.
  • eventmodels.OnFailureSave: If the handler returns error beyond the retry point, which could be just one failure, the message will be copied into a per-topic, per-consumer-group dead letter queue (topic). The message will be left there. Alerts are required to know that there is a problem and the queue needs to be looked at.
  • eventmodels.OnFailureRetryLater: If the handler returns error beyond the retry point, which could be just one failure, the message will be copied into a per-topic, per-consumer-group dead letter queue (topic). Messages in the dead letter queue will be automatically re-processed. The re-processing is subject to limits on the number of simultaneous outstanding messages and thus if the handler fails again during re-processing, it may prevent further failures from being re-processed.

In all cases, prometheus metrics will be incremented to count the failures on a per-handler basis.

Idempotent consumer

The idempotent consumer tries to deliver events once. It can deliver the same message multiple times. The consumer must handle that itself.

eventLib.ConsumeIdempotent(StateServerConsumer, eventmodels.OnFailureDiscard, "send-cluster-created-email", MyTopic.Handler(
	func(ctx context.Context, e eventmodels.Event[PayloadForMyTopic]) error {
		// do something
		return nil
	}))
Exactly-once consumer

The exactly-once consumer includes an transaction in its callback. If that tranaction commits, the message is considered to have been delivered. If it does not commit, the message is not considered delivered and the handler may be called again in the future.

eventLib.ConsumeExactlyOnce(StateServerConsumer, eventmodels.OnFailureDiscard, "send-cluster-created-email", MyTopic.HandlerTx(
	func(ctx context.Context, tx *sql.Tx, e eventmodels.Event[PayloadForMyTopic]) error {
		// do something
		return nil
	}))
Broadcast consumer

Broadcast consumers are special. They're meant for things like cache invalidation.

The other consumer start from the beginning of time for each topic and try to deliver each message just once.

The broadcast consumer ignores messages that were created before the consumer was started. It delivers messages to all handlers in all services.

eventLib.ConsumeBroadcast("invalidate-cluster-cache", MyTopic.Handler(
	func(ctx context.Context, e eventmodels.Event[PayloadForMyTopic]) error {
		// do something
		return nil
	}))

Batching

Each of the consumers can be used with a handler that processes one message at a time or with a handler that processes a slice of messages at once.

To process with batches, use MyTopic.BatchHandler with ConsumeIdempotent or ConsumeBroadcast, or MyTopic.BatchHandlerTx with ConsumeExactlyOnce.

When processing in batches, there is both a limit on the size of the batch (defaults to 30 and can be overridden with WithBatch) and a limit on the number of batches that can be processed in parallel (defaults to 3 and can be overridden with WithConcurrency).

Batches of more than one message are only formed when the limit on batch concurrency has been reached so that no additional handler can be invoked. When that happens messages are queued. When a concurrent handler becomes available, a batch will be formed out of the queued messages.

This behavior is designed to increase efficiency when handling more than one message at time is more efficient than handling one message at a time. This is usually true for the ConsumeExactlyOnce handlers because fewer transactions will be created.

Dynamic opt-in/opt-out consumption of broadcast consumers

To add/remove consumers at runtime, use RegisterFiltered and RegisterUnfiltered. RegisterFiltered uses a function to extract a key from an event so that you can then subscribe to specific keys.

Without a database

The event library does not require a database connection. Without a database connection some features are not available:

  • produce from within a transaction
  • catch up production
  • exactly once consumption

Kafka configuration

For production use, configure:

At least 3 availability zones (replicas) and each topic needs:

  • min.insync.replicas: at least 2 and less than the number of availability zones
  • replication.factor: the number of availabilty zones

Because broadcast topics sometimes use the offsetsForTimes request, message timestamps must be set to the the non-default value of LogAppendTime. This is the default for topics created by the events library. See discusstion with ChatGPT about why this is important.

  • message.timestamp.type: LogAppendTime

Since the message timestamps will use the log append time, if message consumers want to know when the message was created, a second timestamp needs to be within the message itself. Perhaps as a header.

Documentation

Index

Constants

View Source
const UnregisteredTopicError errors.String = "topic is not pre-registered"

UnregisteredTopicError is the base error when attempting to create a topic that isn't pre-preregistered when pre-registration is required.

Variables

View Source
var AckLatency = mustRegister(prometheus.NewHistogramVec(
	prometheus.HistogramOpts{
		Name: prefix + "acknowledgement_latency",
		Help: "Latency from message creation to message receive acknowledgement",
	},
	[]string{"topic", "consumer_group"},
))
View Source
var ConsumeCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "messages_consumed",
		Help: "Number of messaged consumed",
	},
	[]string{"topic", "consumer_group"},
))
View Source
var ConsumersWaitingForQueueConcurrencyDemand = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "consumer_group_queue_concurrency_demand",
		Help: "Per-consumer group concurrency limit, total waiting OR processing",
	},
	[]string{"consumer_group"},
))
View Source
var ConsumersWaitingForQueueConcurrencyLimit = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "consumer_group_queue_concurrency_wait",
		Help: "Per-consumer group concurrency limit, total waiting",
	},
	[]string{"consumer_group"},
))
View Source
var DeadLetterConsumeCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "dead_letter_consume",
		Help: "Number of messages (by handler name and topic) consumed from the dead letter queue",
	},
	[]string{"handler_name", "topic"},
))
View Source
var DeadLetterProduceCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "dead_letter_produce",
		Help: "Number of messages (by handler name and topic) produced to the dead letter queue",
	},
	[]string{"handler_name", "topic"},
))
View Source
var ErrTopicCreationTimeout errors.String = "event library topic creation deadline exceeded"
View Source
var ErrorCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "errors",
		Help: "Number of errors (by category) from the eventing system",
	},
	[]string{"category"},
))
View Source
var HandlerBatchConcurrency = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "batch_handler_concurrency",
		Help: "How many batches are being processed in parallel",
	},
	[]string{"handler_name"},
))
View Source
var HandlerBatchQueued = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "batch_handler_queued",
		Help: "How many messages are queued for batch handling",
	},
	[]string{"handler_name"},
))
View Source
var HandlerErrorCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "handler_errors",
		Help: "Number of errors (by handler name and topic) from the eventing system",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerLatency = mustRegister(prometheus.NewHistogramVec(
	prometheus.HistogramOpts{
		Name: prefix + "handler_latencies",
		Help: "Per-handler processing times (seconds)",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerPanicCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "handler_panic",
		Help: "Number handler panics (by handler name and topic)",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerSuccessCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "handler_success",
		Help: "Number handler success (by handler name and topic)",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerWaitingForActiveConcurrencyDemand = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "active_queue_concurrency_demand",
		Help: "Global concurrency limit, total waiting OR processing",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerWaitingForActiveConcurrencyLimit = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "active_queue_concurrency_wait",
		Help: "Global concurrency limit, total waiting",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerWaitingForQueueConcurrencyDemand = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "handler_queue_concurrency_demand",
		Help: "Per-handler concurrency limit, total waiting OR processing",
	},
	[]string{"handler_name", "topic"},
))
View Source
var HandlerWaitingForQueueConcurrencyLimit = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "handler_queue_concurrency_wait",
		Help: "Per-handler concurrency limit, total waiting",
	},
	[]string{"handler_name", "topic"},
))
View Source
var (
	LegalTopicNames = regexp.MustCompile(fmt.Sprintf(`^[-._a-zA-Z0-9]{1,%d}$`, maxTopicNameLength))
)
View Source
var LongestHandlerLatency = mustRegister(prometheus.NewGaugeVec(
	prometheus.GaugeOpts{
		Name: prefix + "unfinished_handler_time",
		Help: "Handlers that have been processing for a while (seconds)",
	},
	[]string{"handler_name", "topic"},
))
View Source
var ProduceFromTxSplit = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "batches_in_tx",
		Help: "Number in-transaction messages",
	},
	[]string{"method"},
))
View Source
var ProduceTopicCounts = mustRegister(prometheus.NewCounterVec(
	prometheus.CounterOpts{
		Name: prefix + "messages_produced",
		Help: "Number of messaged produced",
	},
	[]string{"topic", "produce_method"},
))
View Source
var TransmissionLatency = mustRegister(prometheus.NewHistogramVec(
	prometheus.HistogramOpts{
		Name: prefix + "transmission_latency",
		Help: "Latency from message creation to message receipt (seconds)",
	},
	[]string{"topic", "consumer_group"},
))

Functions

func Async

func Async(limit int) func(*notifierConfig)

Async delivers events asynchronously with a limit on the number of outstanding events. A limit of -1 indicates no limit. A limit of zero indicates synchronous delivery. A positive number allows that many events to be outstanding at once but no more.

func DeadLetterTopic

func DeadLetterTopic(topic string, consumerGroup ConsumerGroupName) string

DeadLetterTopic returns the topic name to use for dead letters. The dead letter topics include the consumer group name because otherwise messages could be cross-delivered between consumer groups.

func ObserveProduceLatency

func ObserveProduceLatency(err error, startTime time.Time)

func SASLConfigFromString

func SASLConfigFromString(s string) sasl.Mechanism

SASLConfigFromString returns nil for errors and is meant for testing situations

Types

type CanConsumeBroadcast

type CanConsumeBroadcast interface {
	ConsumeBroadcast(handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt)
	Tracer() eventmodels.Tracer
	InstanceID() int32
}

CanConsumeBroadcast is implemented by Library

type ConsumerGroupName

type ConsumerGroupName interface {
	String() string
	// contains filtered or unexported methods
}

func NewConsumerGroup

func NewConsumerGroup(name string) ConsumerGroupName

type Filtered

func RegisterFiltered

func RegisterFiltered[E any, K comparable](handlerName string, topic eventmodels.BoundTopic[E], key func(eventmodels.Event[E]) K, opts ...NotifierOpt) Filtered[K, E]

RegisterFiltered pre-registers a filtered broadcast event distributor that will will receive events from any Library that is [Configure]()ed after RegisterFiltered is called. See keyeddistributor for behavior details.

keyeddistributor: https://pkg.go.dev/github.com/memsql/keyeddistributor

func (Filtered[K, E]) Subscribe

func (f Filtered[K, E]) Subscribe(lib CanConsumeBroadcast, key K) *keyeddistributor.Reader[eventmodels.Event[E]]

type HandlerOpt

type HandlerOpt func(*registeredHandler, *LibraryNoDB)

func IsDeadLetterHandler

func IsDeadLetterHandler(isDeadLetter bool) HandlerOpt

IsDeadLetterHandler can be used when registering a handler for dead letter topics. Normally this is not needed as dead letter handlers are created automatically if the consumer uses OnFailureRetryLater. Using IsDeadLetterHandler only makes sense for creating custom handlers to deal with OnFailureSave events. Use DeadLetterTopic to form the dead letter topic name.

Dead letter handlers record different metrics and use different simultaneous runner limits. OnFailureBlock is appropriate with dead letter handler because where would you save a dead letter from a dead letter consumer?

func WithBaseTopic

func WithBaseTopic(topic string) HandlerOpt

WithBaseTopic is applied automatically when processing dead letter topics

func WithBatch

func WithBatch(size int) HandlerOpt

WithBatchSize specifies the maximum number of messages to deliver at once. This should be used only with batch consumers as there is no advantage of batching with a non-batch consumer. If used with a non-batch consumer, the default WithConcurrency is 1.

func WithConcurrency

func WithConcurrency(parallelism int) HandlerOpt

WithConcurrency specifies the maximum number of instances of the handler that can be running at the same time. Setting this to 1 means that delivery of messages is single-threaded. If used with a non-batch consumer, the default WithBatch is 1.

func WithQueueDepthLimit

func WithQueueDepthLimit(n int) HandlerOpt

WithQueueDepthLimit places a maximum number of outstanding messages for this handler to be working on. This counts both active handlers and ones that are waiting for backoff. If WithBatchDelivery is also true, then the limit also acts as a maximum size for the batch.

func WithRetrying

func WithRetrying(retry bool) HandlerOpt

WithRetrying controls message delivery. Normally, multiple attempts are made to deliver messages.

WithRetrying(false) means only one attempt will be made.

In connection with OnFailureRetryLater, OnFailureSave, the message will be immediately sent to corresponding dead letter topic. In connection with OnFailureDiscard, it will be dropped after just one delivery attempt.

In combination with OnFailureBlock, no messages will get acknowledged in the consumer group (since they must be ack'ed in-order) and eventually processing of the consumer group will stall.

func WithTimeout

func WithTimeout(d time.Duration) HandlerOpt

WithTimeout limits the duration of retries of delivery attempts on a per-message basis. When the limit is exceeded, delivery is considered to have failed.

When delivery has failed, failure handling becomes key:

OnFailureRetryLater & OnFailureSave: the event will be sent to the dead letter topic.

OnFailureDiscard: the event will be ack'ed without processing.

OnFailureBlock: no messages will get acknowledged in the consumer group (since they must be ack'ed in-order) and eventually processing of the consumer group will stall.

type HeartbeatEvent

type HeartbeatEvent struct{}

type Library

type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodels.AbstractDB[ID, TX]] struct {
	LibraryNoDB
	// contains filtered or unexported fields
}

func New

func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodels.AbstractDB[ID, TX]]() *Library[ID, TX, DB]

New creates an event Library. It can be used from init() since it requires no arguments. The library is not ready to use.

After New(), Configure() must be called.

After New(), ConsumeExactlyOnce(), ConsumeIdempotent(), and ConsumeBroadcast() may be called to register event consumers.

After the call to Configure() and all Consume*() calls, consumers can be started with StartConsuming(); background production of orphaned messages may be started with CatchUpProduce(); messages may be produced with Produce().

Configure() and Consume*() may not be used once consumption or production has started.

func (*Library[ID, TX, DB]) BroadcastConsumerLastLatency

func (lib *Library[ID, TX, DB]) BroadcastConsumerLastLatency() (lastGap time.Duration, configuredHeartbeat time.Duration)

BroadcastConsumerLastLatency returns the time since the sending of the last broadcast event that was received. It also returns the configured heartbeat interval. As long as the latency is less than twice the configured interval, the broadcast consumer can be considred to be working.

func (*Library[ID, TX, DB]) CatchUpProduce

func (lib *Library[ID, TX, DB]) CatchUpProduce(ctx context.Context, sleepTime time.Duration, batchSize int) (chan struct{}, error)

CatchUpProduce starts a background thread that looks for events that were written to the database during a transaction but were not sent to Kafka

The returned channel is closed when CatchUpProduce shuts down (due to context cancel)

CatchUpProduce can only be used after Configure.

func (*Library[ID, TX, DB]) Configure

func (lib *Library[ID, TX, DB]) Configure(conn DB, tracer eventmodels.Tracer, mustRegisterTopics bool, saslMechanism sasl.Mechanism, tlsConfig *TLSConfig, brokers []string)

Configure sets up the Library so that it has the configuration it needs to run. The database connection is optional. Without it, certain features will always error:

CatchUpProduce requires a database
StartConsuming requires a database if ConsumeExactlyOnce has been called

The conn parameter may be nil, in which case CatchUpProduce() and ProduceFromTable() will error.

func (*Library[ID, TX, DB]) ConfigureBroadcastHeartbeat

func (lib *Library[ID, TX, DB]) ConfigureBroadcastHeartbeat(dur time.Duration)

func (*Library[ID, TX, DB]) ConsumeBroadcast

func (lib *Library[ID, TX, DB]) ConsumeBroadcast(handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt)

ConsumeBroadcast should be used to consume events where every instance of the consumer will receive every event. Only events timestamped after the start time of the consuming server will be delivered. Duplicate events are possible.

When broadcast handlers return error, the message will be dropped. By default broadcast handlers are not retried and the handler timeout is 30 seconds.

func (*Library[ID, TX, DB]) ConsumeExactlyOnce

func (lib *Library[ID, TX, DB]) ConsumeExactlyOnce(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, handlerName string, handler eventmodels.HandlerTxInterface[ID, TX], opts ...HandlerOpt)

ConsumeExactlyOnce delivers messages transactionally (with an open transaction) and they are delivered exactly once. If the handler returns error, the transaction rolls back and the message can be re-delivered. Messages will only be delivered to one instance of the consuming server(s).

A consumerGroupName can and should be reused, but only if all consuming servers register all handler instances for that consumerGroupName (messages will only be delivered successfully once per consumer group)

func (*Library[ID, TX, DB]) ConsumeIdempotent

func (lib *Library[ID, TX, DB]) ConsumeIdempotent(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt)

ConsumeIdempotent should be used to consume events at least once where the handler takes care of any issues that arise from consuming the event more than once. The event will be presented to handlers until a handler has consumed it without error.

Events handled with ConsumeIdempotent will generally be consumed on only one instance of the server that has registered to consume such events. If there is no registered consumer, events will be held until there is a consumer. Duplicate events are possible.

A consumerGroupName can and should be reused, but only if all consuming servers register all handler instances for that consumerGroupName. If multiple handlers use the same consumerGroupName and one of them returns error, then the message can be re-delivered to the handlers that did not return error.

func (*Library[ID, TX, DB]) DB

func (lib *Library[ID, TX, DB]) DB() eventmodels.AbstractDB[ID, TX]

func (*Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers added in v0.5.0

func (lib *Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers()

DoNotLockBroadcastConsumerNumbers must be used before starting the library. If called, no database locks will be taken to reserve broadcast consumer numbers. There is a tradeoff: this saves a database connection that would otherwise sit around holding a lock, but it makes the allocation and refreshing of broadcast consumers much more expensive in terms of interactions with Kafka.

It is safe to use this in combination with other broadcast consumers that do take locks: they'll use different namespace prefixes unless you mess that up by using SetBroadcastConsumerBaseName() with the same name for both lock-free and locked instances.

func (*Library[ID, TX, DB]) GetBroadcastConsumerGroupName

func (lib *Library[ID, TX, DB]) GetBroadcastConsumerGroupName() string

GetBroadcastConsumerGroupName this will be "" until a broadcast consumer group is figured out

func (*Library[ID, TX, DB]) HasDB added in v0.5.0

func (lib *Library[ID, TX, DB]) HasDB() bool

func (*Library[ID, TX, DB]) InstanceID

func (lib *Library[ID, TX, DB]) InstanceID() int32

func (*Library[ID, TX, DB]) IsConfigured

func (lib *Library[ID, TX, DB]) IsConfigured() bool

IsConfigured reports if the library exists and has been configured

func (*Library[ID, TX, DB]) Produce

func (lib *Library[ID, TX, DB]) Produce(ctx context.Context, method eventmodels.ProduceMethod, events ...eventmodels.ProducingEvent) (err error)

Produce sends events directly to Kafka. It is not transactional. Use tx.Produce to produce from within a transaction.

func (*Library[ID, TX, DB]) ProduceFromTable

func (lib *Library[ID, TX, DB]) ProduceFromTable(ctx context.Context, eventsByTopic map[string][]ID) error

ProduceFromTable is used to send events that have been written during a transaction. If a CatchUpProducer is running, the events will be forwarded to that thread. If not, they'll be sent to Kafka synchronously. Sending to Kafka synchronously is slow.

ProduceFromTable can only be used after Configure.

func (*Library[ID, TX, DB]) SetBroadcastConsumerBaseName

func (lib *Library[ID, TX, DB]) SetBroadcastConsumerBaseName(name string)

func (*Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks

func (lib *Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks(max uint32)

func (*Library[ID, TX, DB]) SetEnhanceDB

func (lib *Library[ID, TX, DB]) SetEnhanceDB(enhance bool)

For testing

func (*Library[ID, TX, DB]) SetLazyTxProduce

func (lib *Library[ID, TX, DB]) SetLazyTxProduce(lazy bool)

SetLazyTxProduce controls what to do if a transaction produces an event but the Library doesn't have a running producer. Defaults to false. If true, the event will be left in the database for some other Library to pick up sometime in the future.

func (*Library[ID, TX, DB]) StartConsuming

func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started chan struct{}, stopped chan struct{}, err error)

StartConsuming should be called only after all Consume* requests have have been made and Configure has been called.

It returns two channels: one is closed when the consumers have started The other is closed when all of the consumers have stopped. Consumers will finish after the passed context is canceled.

StartConsuming synchronously creates the broadcast consumer group if there are any broadcast consumers. All the other work it does, like creating groups that don't exist and establishing the reader connections happens asynchronously.

func (*Library[ID, TX, DB]) StartConsumingOrPanic

func (lib *Library[ID, TX, DB]) StartConsumingOrPanic(ctx context.Context) (stopped chan struct{})

StartConsumingOrPanic is a wapper around StartConsuming that returns only after the consumers have started. If StartConsuming returns error, it panics.

func (*Library[ID, TX, DB]) Tracer

func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer

func (*Library[ID, TX, DB]) ValidateTopics added in v0.2.0

func (lib *Library[ID, TX, DB]) ValidateTopics(ctx context.Context, topics []string) error

ValidateTopics will be fast whenever it can be fast. Sometimes it will wait for topics to be listed. ValidateTopics topics can only be used after Configure.

type LibraryNoDB

type LibraryNoDB struct {
	ProduceSyncCount atomic.Uint64 // used and exported for testing only
	// contains filtered or unexported fields
}

func (*LibraryNoDB) CreateTopics

func (lib *LibraryNoDB) CreateTopics(ctx context.Context, why string, topics []string) error

CreateTopics orechestrates the creation of topics that have not already been successfully created. The set of created topics is in lib.topicsSeen. It is expected that createTopics will be called simultaneously from multiple threads. Its behavior is optimized to do minimal work and to return almost instantly if there are no topics that need creating.

func (*LibraryNoDB) RecordError added in v0.5.0

func (lib *LibraryNoDB) RecordError(category string, err error) error

func (*LibraryNoDB) RecordErrorNoWait added in v0.5.0

func (lib *LibraryNoDB) RecordErrorNoWait(category string, err error) error

func (*LibraryNoDB) SetTopicConfig

func (lib *LibraryNoDB) SetTopicConfig(topicConfig kafka.TopicConfig)

SetTopicConfig can be used to override the configuration parameters for new topics. If no override has been set, then the default configuration for new topics is simply: 2 partitions. High volume topics should use 10 or even 20 partitions.

Topics will be auto-created when a message is sent. Topics will be auto-created on startup for all topics that are consumed.

type NotifierOpt

type NotifierOpt func(*notifierConfig)

func WrapHandlerOpt

func WrapHandlerOpt(handlerOpts ...HandlerOpt) NotifierOpt

type SASLConfig

type SASLConfig struct {
	Mechanism sasl.Mechanism
}

SASLConfig unmarshals into a sasl.Mechanism from a strings like:

"none"
"plain:username:password"
"sha256:username:password"
"sha512:username:password"

func (*SASLConfig) UnmarshalText

func (sc *SASLConfig) UnmarshalText(text []byte) error

type SASLMethod

type SASLMethod string
const (
	SASLNone   SASLMethod = "none"
	SASLPlain  SASLMethod = "plain"
	SASLSHA256 SASLMethod = "sha256"
	SASLSHA512 SASLMethod = "sha512"
)

type TLSConfig

type TLSConfig struct {
	*tls.Config
}

TLSConfig wraps tls.Config so that special features can be added to its unmarshaler.

The following extras are defined:

"DelayHostnameValidation":true - if included, InsecureSkipVerify
will be set to true and custom validation of certificates will be done
with the ServerName only validated after it is set and
PeerCertificates are set.

func (*TLSConfig) UnmarshalJSON

func (sc *TLSConfig) UnmarshalJSON(b []byte) error

type Unfiltered

type Unfiltered[E any] map[int32]*eventdistributor.Distributor[eventmodels.Event[E]]

func RegisterUnfiltered

func RegisterUnfiltered[E any](handlerName string, topic eventmodels.BoundTopic[E], opts ...NotifierOpt) Unfiltered[E]

RegisterUnfiltered pre-registers an unfiltered broadcast event distributor that will will receive events from any Library that is [Configure]()ed after RegisterUnfiltered is called. See eventdistributor for behavior details.

eventdistributor: https://pkg.go.dev/github.com/sharnoff/eventdistributor

func (Unfiltered[E]) Subscribe

Directories

Path Synopsis
Package eventdb has support functions for implementing the abstract types
Package eventdb has support functions for implementing the abstract types
Package eventmodels has the interfaces needed to use the events package
Package eventmodels has the interfaces needed to use the events package
Package eventnodb provides some convenience wrappers for using events without a database.
Package eventnodb provides some convenience wrappers for using events without a database.
Package eventpg implements the abstract types for PostgreSQL
Package eventpg implements the abstract types for PostgreSQL
Package events2 implements the abstract types using SingleStore
Package events2 implements the abstract types using SingleStore
Package eventtest has abstract tests to validate database implementations
Package eventtest has abstract tests to validate database implementations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL