eventx

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2023 License: MIT Imports: 9 Imported by: 2

README

Library for Transactional Outbox with total ordering

Go Report Card Coverage Status

Main ideal

Overview

It's required two services and one database:

  • Core service with normal business logic (depicted as Service)
  • A service for processing events, including serializing the order of events and publishing to other consumers (depicted as Event Service)

And the database has one table of the form:

CREATE TABLE `events`
(
    `id`         bigint unsigned NOT NULL AUTO_INCREMENT,
    `seq`        bigint unsigned DEFAULT NULL,
    `data`       mediumblob NOT NULL,
    `created_at` timestamp  NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`),
    UNIQUE KEY `idx_events_seq` (`seq`)
);

In normal operation, the following steps will happen:

  1. In one transaction, Service will doing normal business logic, using normal locking mechanism (pessimistic or optimistic) to prevent concurrent writes and then constructs and Insert an event to the database ( seq field is NULL).

  2. After the transaction committed, the Service will ping (using http / grpc calls) the Event Service to check out the new events.

  3. The Event Service will collect all the pings, performs a call to the database to get all unprocessed events with seq is NULL (with limiting).

  4. The Event Service uses the last seq value stored on memory, set the seq for each of event in the previous step in an incremental order. Then updates that sequence numbers to the database.

  5. Other services / threads can listen for new events and using the sequence number to determine whether events already consumed (like offsets in partitions of Kafka).

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEventNotFound = errors.New("not found any events from a sequence")

ErrEventNotFound when select from events table not find events >= sequence (because of retention)

Functions

func MergeContext

func MergeContext(a, b context.Context) context.Context

MergeContext merge the contexts

Types

type EventConstraint added in v0.3.0

type EventConstraint interface {
	// GetID returns the event id
	GetID() uint64

	// GetSequence returns the event sequence number, = 0 if sequence is null
	GetSequence() uint64

	// GetSize returns the approximate size (in bytes) of the event, for limit batch size by event data size
	// using WithSubscriberSizeLimit for configuring this limit
	GetSize() uint64
}

EventConstraint a type constraint for event

type GetSequenceFunc added in v0.5.0

type GetSequenceFunc func(ctx context.Context) (sql.NullInt64, error)

GetSequenceFunc ...

type Option

type Option func(opts *eventxOptions)

Option for configuration

func WithCoreStoredEventsSize

func WithCoreStoredEventsSize(size uint64) Option

WithCoreStoredEventsSize configures the size of stored events

func WithDBProcessorErrorRetryTimer

func WithDBProcessorErrorRetryTimer(d time.Duration) Option

WithDBProcessorErrorRetryTimer configures retry timer duration

func WithDBProcessorRetryTimer

func WithDBProcessorRetryTimer(d time.Duration) Option

WithDBProcessorRetryTimer configures retry timer duration

func WithErrorLogger added in v0.5.1

func WithErrorLogger(fn func(err error)) Option

WithErrorLogger configures callback func for errors

func WithGetLastEventsLimit

func WithGetLastEventsLimit(limit uint64) Option

WithGetLastEventsLimit configures GetLastEvents limit

func WithGetUnprocessedEventsLimit

func WithGetUnprocessedEventsLimit(limit uint64) Option

WithGetUnprocessedEventsLimit configures GetUnprocessedEvents limit

func WithLogger

func WithLogger(logger *zap.Logger) Option

WithLogger configures error zap logger

type Repository

type Repository[E EventConstraint] interface {
	// GetLastEvents returns top *limit* events (events with the highest sequence numbers),
	// by sequence number in ascending order, ignore events with null sequence number
	GetLastEvents(ctx context.Context, limit uint64) ([]E, error)

	// GetUnprocessedEvents returns list of events with the smallest event *id* (not sequence number)
	// *AND* have NULL sequence numbers, in ascending order of event *id*
	// size of the list is limited by *limit*
	GetUnprocessedEvents(ctx context.Context, limit uint64) ([]E, error)

	// GetEventsFrom returns list of events with sequence number >= *from*
	// in ascending order of event sequence numbers, ignoring events with null sequence numbers
	// size of the list is limited by *limit*
	GetEventsFrom(ctx context.Context, from uint64, limit uint64) ([]E, error)

	// UpdateSequences updates only sequence numbers of *events*
	UpdateSequences(ctx context.Context, events []E) error
}

Repository for accessing database, MUST be thread safe

type RetentionJob added in v0.4.0

type RetentionJob[E EventConstraint] struct {
	// contains filtered or unexported fields
}

RetentionJob ...

func NewRetentionJob added in v0.4.0

func NewRetentionJob[E EventConstraint](
	runner *Runner[E],
	repo RetentionRepository,
	options ...RetentionOption,
) *RetentionJob[E]

NewRetentionJob ...

func (*RetentionJob[E]) RunJob added in v0.4.0

func (j *RetentionJob[E]) RunJob(ctx context.Context)

RunJob will stop when the context object is cancelled / deadline exceeded

type RetentionOption added in v0.4.0

type RetentionOption func(opts *retentionOptions)

RetentionOption ...

func WithDeleteBatchSize added in v0.4.0

func WithDeleteBatchSize(size uint64) RetentionOption

WithDeleteBatchSize specifies number events to be deleted with DeleteEventsBefore() method

func WithMaxTotalEvents added in v0.4.0

func WithMaxTotalEvents(maxSize uint64) RetentionOption

WithMaxTotalEvents keep the number of events not more than *maxSize*

func WithRetentionErrorLogger added in v0.4.0

func WithRetentionErrorLogger(logger func(err error)) RetentionOption

WithRetentionErrorLogger config the error logger

func WithRetentionErrorRetryDuration added in v0.4.0

func WithRetentionErrorRetryDuration(d time.Duration) RetentionOption

WithRetentionErrorRetryDuration config the retry duration

type RetentionRepository added in v0.4.0

type RetentionRepository interface {
	// GetMinSequence returns the min sequence number of all events (except events with null sequence numbers)
	// returns null if no events with sequence number existed
	GetMinSequence(ctx context.Context) (sql.NullInt64, error)

	// DeleteEventsBefore deletes events with sequence number < *beforeSeq*
	DeleteEventsBefore(ctx context.Context, beforeSeq uint64) error
}

RetentionRepository for delete old events

type RetryConsumer added in v0.5.0

type RetryConsumer[E EventConstraint] struct {
	// contains filtered or unexported fields
}

RetryConsumer ...

func NewRetryConsumer added in v0.5.0

func NewRetryConsumer[E EventConstraint](
	runner *Runner[E],
	repo Repository[E],
	getSequence GetSequenceFunc,
	setSequence SetSequenceFunc,
	handler func(ctx context.Context, events []E) error,
	options ...RetryConsumerOption,
) *RetryConsumer[E]

NewRetryConsumer ...

func (*RetryConsumer[E]) RunConsumer added in v0.5.0

func (c *RetryConsumer[E]) RunConsumer(ctx context.Context)

RunConsumer until the ctx is finished

type RetryConsumerOption added in v0.5.0

type RetryConsumerOption func(conf *retryConsumerConfig)

RetryConsumerOption ...

func WithConsumerRetryDuration added in v0.5.0

func WithConsumerRetryDuration(d time.Duration) RetryConsumerOption

WithConsumerRetryDuration ...

func WithRetryConsumerErrorLogger added in v0.5.0

func WithRetryConsumerErrorLogger(logger func(err error)) RetryConsumerOption

WithRetryConsumerErrorLogger ...

func WithRetryConsumerFetchLimit added in v0.5.0

func WithRetryConsumerFetchLimit(limit uint64) RetryConsumerOption

WithRetryConsumerFetchLimit ...

type Runner

type Runner[E EventConstraint] struct {
	// contains filtered or unexported fields
}

Runner for running event handling

func NewRunner

func NewRunner[E EventConstraint](
	repo Repository[E], setSequence func(event *E, seq uint64),
	options ...Option,
) *Runner[E]

NewRunner creates a Runner

func (*Runner[E]) NewSubscriber

func (r *Runner[E]) NewSubscriber(from uint64, fetchLimit uint64, options ...SubscriberOption) *Subscriber[E]

NewSubscriber creates a subscriber

func (*Runner[E]) Run

func (r *Runner[E]) Run(ctx context.Context)

Run the runner

func (*Runner[E]) Signal

func (r *Runner[E]) Signal()

Signal to db processor

type SetSequenceFunc added in v0.5.0

type SetSequenceFunc func(ctx context.Context, seq uint64) error

SetSequenceFunc ...

type Subscriber

type Subscriber[E EventConstraint] struct {
	// contains filtered or unexported fields
}

Subscriber for subscribing to events

func (*Subscriber[E]) Fetch

func (s *Subscriber[E]) Fetch(ctx context.Context) ([]E, error)

Fetch get events, if ctx is cancelled / deadline exceed then the fetch will be returned with error = ctx.Err(), and then it can be call again with a normal context object. The list of events returned will never be empty when err = nil

type SubscriberOption

type SubscriberOption func(opts *subscriberOptions)

SubscriberOption for customizing subscribers

func WithSubscriberSizeLimit

func WithSubscriberSizeLimit(sizeLimit uint64) SubscriberOption

WithSubscriberSizeLimit configures limit in size of Fetch batches

type Timer

type Timer interface {
	Reset()
	ResetAfterChan()
	Chan() <-chan time.Time
}

Timer for timer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL