Documentation
¶
Index ¶
- Constants
- Variables
- func Async(limit int) func(*notifierConfig)
- func DeadLetterTopic(topic string, consumerGroup ConsumerGroupName) string
- func ObserveProduceLatency(err error, startTime time.Time)
- func SASLConfigFromString(s string) sasl.Mechanism
- type CanConsumeBroadcast
- type ConsumerGroupName
- type Filtered
- type HandlerOpt
- func IsDeadLetterHandler(isDeadLetter bool) HandlerOpt
- func WithBaseTopic(topic string) HandlerOpt
- func WithBatch(size int) HandlerOpt
- func WithConcurrency(parallelism int) HandlerOpt
- func WithQueueDepthLimit(n int) HandlerOpt
- func WithRetrying(retry bool) HandlerOpt
- func WithTimeout(d time.Duration) HandlerOpt
- type HeartbeatEvent
- type Library
- func (lib *Library[ID, TX, DB]) BroadcastConsumerLastLatency() (lastGap time.Duration, configuredHeartbeat time.Duration)
- func (lib *Library[ID, TX, DB]) CatchUpProduce(ctx context.Context, sleepTime time.Duration, batchSize int) (chan struct{}, error)
- func (lib *Library[ID, TX, DB]) Configure(conn DB, tracer eventmodels.Tracer, mustRegisterTopics bool, ...)
- func (lib *Library[ID, TX, DB]) ConfigureBroadcastHeartbeat(dur time.Duration)
- func (lib *Library[ID, TX, DB]) ConsumeBroadcast(handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt)
- func (lib *Library[ID, TX, DB]) ConsumeExactlyOnce(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, ...)
- func (lib *Library[ID, TX, DB]) ConsumeIdempotent(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, ...)
- func (lib *Library[ID, TX, DB]) DB() eventmodels.AbstractDB[ID, TX]
- func (lib *Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers()
- func (lib *Library[ID, TX, DB]) GetBroadcastConsumerGroupName() string
- func (lib *Library[ID, TX, DB]) HasDB() bool
- func (lib *Library[ID, TX, DB]) InstanceID() int32
- func (lib *Library[ID, TX, DB]) IsConfigured() bool
- func (lib *Library[ID, TX, DB]) Produce(ctx context.Context, method eventmodels.ProduceMethod, ...) (err error)
- func (lib *Library[ID, TX, DB]) ProduceFromTable(ctx context.Context, eventsByTopic map[string][]ID) error
- func (lib *Library[ID, TX, DB]) SetBroadcastConsumerBaseName(name string)
- func (lib *Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks(max uint32)
- func (lib *Library[ID, TX, DB]) SetEnhanceDB(enhance bool)
- func (lib *Library[ID, TX, DB]) SetLazyTxProduce(lazy bool)
- func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started chan struct{}, stopped chan struct{}, err error)
- func (lib *Library[ID, TX, DB]) StartConsumingOrPanic(ctx context.Context) (stopped chan struct{})
- func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer
- func (lib *Library[ID, TX, DB]) ValidateTopics(ctx context.Context, topics []string) error
- type LibraryNoDB
- func (lib *LibraryNoDB) CreateTopics(ctx context.Context, why string, topics []string) error
- func (lib *LibraryNoDB) RecordError(category string, err error) error
- func (lib *LibraryNoDB) RecordErrorNoWait(category string, err error) error
- func (lib *LibraryNoDB) SetTopicConfig(topicConfig kafka.TopicConfig)
- type NotifierOpt
- type SASLConfig
- type SASLMethod
- type TLSConfig
- type Unfiltered
Constants ¶
const UnregisteredTopicError errors.String = "topic is not pre-registered"
UnregisteredTopicError is the base error when attempting to create a topic that isn't pre-preregistered when pre-registration is required.
Variables ¶
var AckLatency = mustRegister(prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: prefix + "acknowledgement_latency", Help: "Latency from message creation to message receive acknowledgement", }, []string{"topic", "consumer_group"}, ))
var ConsumeCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "messages_consumed", Help: "Number of messaged consumed", }, []string{"topic", "consumer_group"}, ))
var ConsumersWaitingForQueueConcurrencyDemand = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "consumer_group_queue_concurrency_demand", Help: "Per-consumer group concurrency limit, total waiting OR processing", }, []string{"consumer_group"}, ))
var ConsumersWaitingForQueueConcurrencyLimit = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "consumer_group_queue_concurrency_wait", Help: "Per-consumer group concurrency limit, total waiting", }, []string{"consumer_group"}, ))
var DeadLetterConsumeCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "dead_letter_consume", Help: "Number of messages (by handler name and topic) consumed from the dead letter queue", }, []string{"handler_name", "topic"}, ))
var DeadLetterProduceCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "dead_letter_produce", Help: "Number of messages (by handler name and topic) produced to the dead letter queue", }, []string{"handler_name", "topic"}, ))
var ErrTopicCreationTimeout errors.String = "event library topic creation deadline exceeded"
var ErrorCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "errors", Help: "Number of errors (by category) from the eventing system", }, []string{"category"}, ))
var HandlerBatchConcurrency = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "batch_handler_concurrency", Help: "How many batches are being processed in parallel", }, []string{"handler_name"}, ))
var HandlerBatchQueued = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "batch_handler_queued", Help: "How many messages are queued for batch handling", }, []string{"handler_name"}, ))
var HandlerErrorCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "handler_errors", Help: "Number of errors (by handler name and topic) from the eventing system", }, []string{"handler_name", "topic"}, ))
var HandlerLatency = mustRegister(prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: prefix + "handler_latencies", Help: "Per-handler processing times (seconds)", }, []string{"handler_name", "topic"}, ))
var HandlerPanicCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "handler_panic", Help: "Number handler panics (by handler name and topic)", }, []string{"handler_name", "topic"}, ))
var HandlerSuccessCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "handler_success", Help: "Number handler success (by handler name and topic)", }, []string{"handler_name", "topic"}, ))
var HandlerWaitingForActiveConcurrencyDemand = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "active_queue_concurrency_demand", Help: "Global concurrency limit, total waiting OR processing", }, []string{"handler_name", "topic"}, ))
var HandlerWaitingForActiveConcurrencyLimit = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "active_queue_concurrency_wait", Help: "Global concurrency limit, total waiting", }, []string{"handler_name", "topic"}, ))
var HandlerWaitingForQueueConcurrencyDemand = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "handler_queue_concurrency_demand", Help: "Per-handler concurrency limit, total waiting OR processing", }, []string{"handler_name", "topic"}, ))
var HandlerWaitingForQueueConcurrencyLimit = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "handler_queue_concurrency_wait", Help: "Per-handler concurrency limit, total waiting", }, []string{"handler_name", "topic"}, ))
var (
LegalTopicNames = regexp.MustCompile(fmt.Sprintf(`^[-._a-zA-Z0-9]{1,%d}$`, maxTopicNameLength))
)
var LongestHandlerLatency = mustRegister(prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: prefix + "unfinished_handler_time", Help: "Handlers that have been processing for a while (seconds)", }, []string{"handler_name", "topic"}, ))
var ProduceFromTxSplit = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "batches_in_tx", Help: "Number in-transaction messages", }, []string{"method"}, ))
var ProduceTopicCounts = mustRegister(prometheus.NewCounterVec( prometheus.CounterOpts{ Name: prefix + "messages_produced", Help: "Number of messaged produced", }, []string{"topic", "produce_method"}, ))
var TransmissionLatency = mustRegister(prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: prefix + "transmission_latency", Help: "Latency from message creation to message receipt (seconds)", }, []string{"topic", "consumer_group"}, ))
Functions ¶
func Async ¶
func Async(limit int) func(*notifierConfig)
Async delivers events asynchronously with a limit on the number of outstanding events. A limit of -1 indicates no limit. A limit of zero indicates synchronous delivery. A positive number allows that many events to be outstanding at once but no more.
func DeadLetterTopic ¶
func DeadLetterTopic(topic string, consumerGroup ConsumerGroupName) string
DeadLetterTopic returns the topic name to use for dead letters. The dead letter topics include the consumer group name because otherwise messages could be cross-delivered between consumer groups.
func ObserveProduceLatency ¶
func SASLConfigFromString ¶
SASLConfigFromString returns nil for errors and is meant for testing situations
Types ¶
type CanConsumeBroadcast ¶
type CanConsumeBroadcast interface { ConsumeBroadcast(handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt) Tracer() eventmodels.Tracer InstanceID() int32 }
CanConsumeBroadcast is implemented by Library
type ConsumerGroupName ¶
type ConsumerGroupName interface { String() string // contains filtered or unexported methods }
func NewConsumerGroup ¶
func NewConsumerGroup(name string) ConsumerGroupName
type Filtered ¶
type Filtered[K comparable, E any] map[int32]*keyeddistributor.Distributor[eventmodels.Event[E], K]
func RegisterFiltered ¶
func RegisterFiltered[E any, K comparable](handlerName string, topic eventmodels.BoundTopic[E], key func(eventmodels.Event[E]) K, opts ...NotifierOpt) Filtered[K, E]
RegisterFiltered pre-registers a filtered broadcast event distributor that will will receive events from any Library that is [Configure]()ed after RegisterFiltered is called. See keyeddistributor for behavior details.
keyeddistributor: https://pkg.go.dev/github.com/memsql/keyeddistributor
func (Filtered[K, E]) Subscribe ¶
func (f Filtered[K, E]) Subscribe(lib CanConsumeBroadcast, key K) *keyeddistributor.Reader[eventmodels.Event[E]]
type HandlerOpt ¶
type HandlerOpt func(*registeredHandler, *LibraryNoDB)
func IsDeadLetterHandler ¶
func IsDeadLetterHandler(isDeadLetter bool) HandlerOpt
IsDeadLetterHandler can be used when registering a handler for dead letter topics. Normally this is not needed as dead letter handlers are created automatically if the consumer uses OnFailureRetryLater. Using IsDeadLetterHandler only makes sense for creating custom handlers to deal with OnFailureSave events. Use DeadLetterTopic to form the dead letter topic name.
Dead letter handlers record different metrics and use different simultaneous runner limits. OnFailureBlock is appropriate with dead letter handler because where would you save a dead letter from a dead letter consumer?
func WithBaseTopic ¶
func WithBaseTopic(topic string) HandlerOpt
WithBaseTopic is applied automatically when processing dead letter topics
func WithBatch ¶
func WithBatch(size int) HandlerOpt
WithBatchSize specifies the maximum number of messages to deliver at once. This should be used only with batch consumers as there is no advantage of batching with a non-batch consumer. If used with a non-batch consumer, the default WithConcurrency is 1.
func WithConcurrency ¶
func WithConcurrency(parallelism int) HandlerOpt
WithConcurrency specifies the maximum number of instances of the handler that can be running at the same time. Setting this to 1 means that delivery of messages is single-threaded. If used with a non-batch consumer, the default WithBatch is 1.
func WithQueueDepthLimit ¶
func WithQueueDepthLimit(n int) HandlerOpt
WithQueueDepthLimit places a maximum number of outstanding messages for this handler to be working on. This counts both active handlers and ones that are waiting for backoff. If WithBatchDelivery is also true, then the limit also acts as a maximum size for the batch.
func WithRetrying ¶
func WithRetrying(retry bool) HandlerOpt
WithRetrying controls message delivery. Normally, multiple attempts are made to deliver messages.
WithRetrying(false) means only one attempt will be made.
In connection with OnFailureRetryLater, OnFailureSave, the message will be immediately sent to corresponding dead letter topic. In connection with OnFailureDiscard, it will be dropped after just one delivery attempt.
In combination with OnFailureBlock, no messages will get acknowledged in the consumer group (since they must be ack'ed in-order) and eventually processing of the consumer group will stall.
func WithTimeout ¶
func WithTimeout(d time.Duration) HandlerOpt
WithTimeout limits the duration of retries of delivery attempts on a per-message basis. When the limit is exceeded, delivery is considered to have failed.
When delivery has failed, failure handling becomes key:
OnFailureRetryLater & OnFailureSave: the event will be sent to the dead letter topic.
OnFailureDiscard: the event will be ack'ed without processing.
OnFailureBlock: no messages will get acknowledged in the consumer group (since they must be ack'ed in-order) and eventually processing of the consumer group will stall.
type HeartbeatEvent ¶
type HeartbeatEvent struct{}
type Library ¶
type Library[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodels.AbstractDB[ID, TX]] struct { LibraryNoDB // contains filtered or unexported fields }
func New ¶
func New[ID eventmodels.AbstractID[ID], TX eventmodels.AbstractTX, DB eventmodels.AbstractDB[ID, TX]]() *Library[ID, TX, DB]
New creates an event Library. It can be used from init() since it requires no arguments. The library is not ready to use.
After New(), Configure() must be called.
After New(), ConsumeExactlyOnce(), ConsumeIdempotent(), and ConsumeBroadcast() may be called to register event consumers.
After the call to Configure() and all Consume*() calls, consumers can be started with StartConsuming(); background production of orphaned messages may be started with CatchUpProduce(); messages may be produced with Produce().
Configure() and Consume*() may not be used once consumption or production has started.
func (*Library[ID, TX, DB]) BroadcastConsumerLastLatency ¶
func (lib *Library[ID, TX, DB]) BroadcastConsumerLastLatency() (lastGap time.Duration, configuredHeartbeat time.Duration)
BroadcastConsumerLastLatency returns the time since the sending of the last broadcast event that was received. It also returns the configured heartbeat interval. As long as the latency is less than twice the configured interval, the broadcast consumer can be considred to be working.
func (*Library[ID, TX, DB]) CatchUpProduce ¶
func (lib *Library[ID, TX, DB]) CatchUpProduce(ctx context.Context, sleepTime time.Duration, batchSize int) (chan struct{}, error)
CatchUpProduce starts a background thread that looks for events that were written to the database during a transaction but were not sent to Kafka
The returned channel is closed when CatchUpProduce shuts down (due to context cancel)
CatchUpProduce can only be used after Configure.
func (*Library[ID, TX, DB]) Configure ¶
func (lib *Library[ID, TX, DB]) Configure(conn DB, tracer eventmodels.Tracer, mustRegisterTopics bool, saslMechanism sasl.Mechanism, tlsConfig *TLSConfig, brokers []string)
Configure sets up the Library so that it has the configuration it needs to run. The database connection is optional. Without it, certain features will always error:
CatchUpProduce requires a database StartConsuming requires a database if ConsumeExactlyOnce has been called
The conn parameter may be nil, in which case CatchUpProduce() and ProduceFromTable() will error.
func (*Library[ID, TX, DB]) ConfigureBroadcastHeartbeat ¶
func (*Library[ID, TX, DB]) ConsumeBroadcast ¶
func (lib *Library[ID, TX, DB]) ConsumeBroadcast(handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt)
ConsumeBroadcast should be used to consume events where every instance of the consumer will receive every event. Only events timestamped after the start time of the consuming server will be delivered. Duplicate events are possible.
When broadcast handlers return error, the message will be dropped. By default broadcast handlers are not retried and the handler timeout is 30 seconds.
func (*Library[ID, TX, DB]) ConsumeExactlyOnce ¶
func (lib *Library[ID, TX, DB]) ConsumeExactlyOnce(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, handlerName string, handler eventmodels.HandlerTxInterface[ID, TX], opts ...HandlerOpt)
ConsumeExactlyOnce delivers messages transactionally (with an open transaction) and they are delivered exactly once. If the handler returns error, the transaction rolls back and the message can be re-delivered. Messages will only be delivered to one instance of the consuming server(s).
A consumerGroupName can and should be reused, but only if all consuming servers register all handler instances for that consumerGroupName (messages will only be delivered successfully once per consumer group)
func (*Library[ID, TX, DB]) ConsumeIdempotent ¶
func (lib *Library[ID, TX, DB]) ConsumeIdempotent(consumerGroup ConsumerGroupName, onFailure eventmodels.OnFailure, handlerName string, handler eventmodels.HandlerInterface, opts ...HandlerOpt)
ConsumeIdempotent should be used to consume events at least once where the handler takes care of any issues that arise from consuming the event more than once. The event will be presented to handlers until a handler has consumed it without error.
Events handled with ConsumeIdempotent will generally be consumed on only one instance of the server that has registered to consume such events. If there is no registered consumer, events will be held until there is a consumer. Duplicate events are possible.
A consumerGroupName can and should be reused, but only if all consuming servers register all handler instances for that consumerGroupName. If multiple handlers use the same consumerGroupName and one of them returns error, then the message can be re-delivered to the handlers that did not return error.
func (*Library[ID, TX, DB]) DB ¶
func (lib *Library[ID, TX, DB]) DB() eventmodels.AbstractDB[ID, TX]
func (*Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers ¶ added in v0.5.0
func (lib *Library[ID, TX, DB]) DoNotLockBroadcastConsumerNumbers()
DoNotLockBroadcastConsumerNumbers must be used before starting the library. If called, no database locks will be taken to reserve broadcast consumer numbers. There is a tradeoff: this saves a database connection that would otherwise sit around holding a lock, but it makes the allocation and refreshing of broadcast consumers much more expensive in terms of interactions with Kafka.
It is safe to use this in combination with other broadcast consumers that do take locks: they'll use different namespace prefixes unless you mess that up by using SetBroadcastConsumerBaseName() with the same name for both lock-free and locked instances.
func (*Library[ID, TX, DB]) GetBroadcastConsumerGroupName ¶
GetBroadcastConsumerGroupName this will be "" until a broadcast consumer group is figured out
func (*Library[ID, TX, DB]) InstanceID ¶
func (*Library[ID, TX, DB]) IsConfigured ¶
IsConfigured reports if the library exists and has been configured
func (*Library[ID, TX, DB]) Produce ¶
func (lib *Library[ID, TX, DB]) Produce(ctx context.Context, method eventmodels.ProduceMethod, events ...eventmodels.ProducingEvent) (err error)
Produce sends events directly to Kafka. It is not transactional. Use tx.Produce to produce from within a transaction.
func (*Library[ID, TX, DB]) ProduceFromTable ¶
func (lib *Library[ID, TX, DB]) ProduceFromTable(ctx context.Context, eventsByTopic map[string][]ID) error
ProduceFromTable is used to send events that have been written during a transaction. If a CatchUpProducer is running, the events will be forwarded to that thread. If not, they'll be sent to Kafka synchronously. Sending to Kafka synchronously is slow.
ProduceFromTable can only be used after Configure.
func (*Library[ID, TX, DB]) SetBroadcastConsumerBaseName ¶
func (*Library[ID, TX, DB]) SetBroadcastConsumerMaxLocks ¶
func (*Library[ID, TX, DB]) SetEnhanceDB ¶
For testing
func (*Library[ID, TX, DB]) SetLazyTxProduce ¶
SetLazyTxProduce controls what to do if a transaction produces an event but the Library doesn't have a running producer. Defaults to false. If true, the event will be left in the database for some other Library to pick up sometime in the future.
func (*Library[ID, TX, DB]) StartConsuming ¶
func (lib *Library[ID, TX, DB]) StartConsuming(ctx context.Context) (started chan struct{}, stopped chan struct{}, err error)
StartConsuming should be called only after all Consume* requests have have been made and Configure has been called.
It returns two channels: one is closed when the consumers have started The other is closed when all of the consumers have stopped. Consumers will finish after the passed context is canceled.
StartConsuming synchronously creates the broadcast consumer group if there are any broadcast consumers. All the other work it does, like creating groups that don't exist and establishing the reader connections happens asynchronously.
func (*Library[ID, TX, DB]) StartConsumingOrPanic ¶
StartConsumingOrPanic is a wapper around StartConsuming that returns only after the consumers have started. If StartConsuming returns error, it panics.
func (*Library[ID, TX, DB]) Tracer ¶
func (lib *Library[ID, TX, DB]) Tracer() eventmodels.Tracer
func (*Library[ID, TX, DB]) ValidateTopics ¶ added in v0.2.0
ValidateTopics will be fast whenever it can be fast. Sometimes it will wait for topics to be listed. ValidateTopics topics can only be used after Configure.
type LibraryNoDB ¶
type LibraryNoDB struct { ProduceSyncCount atomic.Uint64 // used and exported for testing only // contains filtered or unexported fields }
func (*LibraryNoDB) CreateTopics ¶
CreateTopics orechestrates the creation of topics that have not already been successfully created. The set of created topics is in lib.topicsSeen. It is expected that createTopics will be called simultaneously from multiple threads. Its behavior is optimized to do minimal work and to return almost instantly if there are no topics that need creating.
func (*LibraryNoDB) RecordError ¶ added in v0.5.0
func (lib *LibraryNoDB) RecordError(category string, err error) error
func (*LibraryNoDB) RecordErrorNoWait ¶ added in v0.5.0
func (lib *LibraryNoDB) RecordErrorNoWait(category string, err error) error
func (*LibraryNoDB) SetTopicConfig ¶
func (lib *LibraryNoDB) SetTopicConfig(topicConfig kafka.TopicConfig)
SetTopicConfig can be used to override the configuration parameters for new topics. If no override has been set, then the default configuration for new topics is simply: 2 partitions. High volume topics should use 10 or even 20 partitions.
Topics will be auto-created when a message is sent. Topics will be auto-created on startup for all topics that are consumed.
type NotifierOpt ¶
type NotifierOpt func(*notifierConfig)
func WrapHandlerOpt ¶
func WrapHandlerOpt(handlerOpts ...HandlerOpt) NotifierOpt
type SASLConfig ¶
SASLConfig unmarshals into a sasl.Mechanism from a strings like:
"none" "plain:username:password" "sha256:username:password" "sha512:username:password"
func (*SASLConfig) UnmarshalText ¶
func (sc *SASLConfig) UnmarshalText(text []byte) error
type SASLMethod ¶
type SASLMethod string
const ( SASLNone SASLMethod = "none" SASLPlain SASLMethod = "plain" SASLSHA256 SASLMethod = "sha256" SASLSHA512 SASLMethod = "sha512" )
type TLSConfig ¶
TLSConfig wraps tls.Config so that special features can be added to its unmarshaler.
The following extras are defined:
"DelayHostnameValidation":true - if included, InsecureSkipVerify will be set to true and custom validation of certificates will be done with the ServerName only validated after it is set and PeerCertificates are set.
func (*TLSConfig) UnmarshalJSON ¶
type Unfiltered ¶
type Unfiltered[E any] map[int32]*eventdistributor.Distributor[eventmodels.Event[E]]
func RegisterUnfiltered ¶
func RegisterUnfiltered[E any](handlerName string, topic eventmodels.BoundTopic[E], opts ...NotifierOpt) Unfiltered[E]
RegisterUnfiltered pre-registers an unfiltered broadcast event distributor that will will receive events from any Library that is [Configure]()ed after RegisterUnfiltered is called. See eventdistributor for behavior details.
eventdistributor: https://pkg.go.dev/github.com/sharnoff/eventdistributor
func (Unfiltered[E]) Subscribe ¶
func (u Unfiltered[E]) Subscribe(lib CanConsumeBroadcast) eventdistributor.Reader[eventmodels.Event[E]]
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package eventdb has support functions for implementing the abstract types
|
Package eventdb has support functions for implementing the abstract types |
Package eventmodels has the interfaces needed to use the events package
|
Package eventmodels has the interfaces needed to use the events package |
Package eventnodb provides some convenience wrappers for using events without a database.
|
Package eventnodb provides some convenience wrappers for using events without a database. |
Package eventpg implements the abstract types for PostgreSQL
|
Package eventpg implements the abstract types for PostgreSQL |
Package events2 implements the abstract types using SingleStore
|
Package events2 implements the abstract types using SingleStore |
Package eventtest has abstract tests to validate database implementations
|
Package eventtest has abstract tests to validate database implementations |