inbox

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultIterationRate is the timeout after which all events
	// in the inbox table will be processed.
	//
	// Default: 5 * time.Second.
	DefaultIterationRate = 5 * time.Second
	// DefaultHandlerTimeout is the timeout after which the handler
	// will be stopped and the status will be set as Fail.
	//
	// Default: 10 * time.Second.
	DefaultHandlerTimeout = 10 * time.Second
	// DefaultRetryAttempts is the max attempts before event marks
	// as 'dead'. 'Dead' means that the event will no longer be
	// processed.
	//
	// Default: 5.
	DefaultRetryAttempts = 5
	// DebugMode enables additional logs for debug inbox process.
	// Now, this option do nothing.
	//
	// Default: false.
	DebugMode = false
)

Variables

View Source
var DefaultLogger = log.Default()
View Source
var ErrNoRecords = errors.New("no records in inbox table")

Functions

This section is empty.

Types

type Client

type Client interface {
	WriteInbox(context.Context, *Record) error
}

Client provides possibility to set records to the inbox table. All records will be processed in the future.

type ErrorCallback added in v0.3.0

type ErrorCallback func(eventID uuid.UUID, msg string)

ErrorCallback prototype of function that can be called on failed or dead message.

type Handler

type Handler interface {
	// Key is a unique identifier of current handler.
	// This string must be not empty and must be unique for each
	// handler that passed to the Registry. Only the first handler with a key
	// will be stored in the Registry, all other handlers with the same key
	// will be ignored.
	Key() string
	// Process is a function that will be executed for each handler associated
	// with specific event_type and key provided by the Handler implementation.
	Process(context.Context, []byte) error
}

type Inbox

type Inbox struct {
	// contains filtered or unexported fields
}

Inbox is struct that implement inbox pattern.

Writing all incoming events in a temporary table to future processing. Then we try to process each event with the provided handlers. In addition, Inbox filters new events. All events with the same event_id will be ignored.

More about inbox pattern you can read at https://softwaremill.com/microservices-101.

func NewInbox

func NewInbox(registry *Registry, conn *sql.DB, opts ...Option) *Inbox

func (*Inbox) Start

func (i *Inbox) Start(ctx context.Context) error

Start creates new inbox table if it not created and starts worker which process records from the table. To stop inbox worker, you can call context close() function.

func (*Inbox) Writer

func (i *Inbox) Writer() Client

Writer creates new Client to store incoming events to the temporary table.

type Logger

type Logger interface {
	Print(...any)
	Printf(string, ...any)
}

type NopLogger added in v0.2.1

type NopLogger struct{}

NopLogger logs nothing. Use it if you want mute Inbox.

func NewNopLogger added in v0.2.1

func NewNopLogger() *NopLogger

func (*NopLogger) Print added in v0.2.1

func (l *NopLogger) Print(...any)

func (*NopLogger) Printf added in v0.2.1

func (l *NopLogger) Printf(string, ...any)

type Option

type Option func(config) config

Option sets specific configuration to the Inbox.

func EnableDebugMode

func EnableDebugMode() Option

func OnDeadCallback added in v0.3.0

func OnDeadCallback(callback ErrorCallback) Option

OnDeadCallback sets custom callback for each message that can not be processed and marks as 'dead'. Function fires if 'dead' message detected.

func WithHandlerTimeout

func WithHandlerTimeout(dur time.Duration) Option

WithHandlerTimeout sets new interval after which handler will be stopped.

func WithIterationRate

func WithIterationRate(dur time.Duration) Option

WithIterationRate sets new interval for process all inbox events.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets custom implementation of Logger.

func WithMaxRetryAttempt added in v0.3.0

func WithMaxRetryAttempt(maxAttempt int) Option

WithMaxRetryAttempt sets custom max attempts for processing event.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record is event that should be processed by inbox worker.

func NewRecord

func NewRecord(id uuid.UUID, eventType string, payload []byte) (*Record, error)

NewRecord creates new record that can be processed by inbox worker.

Parameters:

id - is a unique id for inbox table. ID should be unique or storage
		will ignore all duplicate ids.
eventType - is a topic with which event was published.
payload - the received body.

func (*Record) Attempt

func (r *Record) Attempt() int

func (*Record) CalcNewDeadline

func (r *Record) CalcNewDeadline(dur time.Duration)

func (*Record) Dead

func (r *Record) Dead()

func (*Record) Done

func (r *Record) Done()

Done sets Done status to current Record. Status will be ignored on first save to the outbox table.

func (*Record) Fail

func (r *Record) Fail(err error)

Fail sets Failed status to current Record. Status will be ignored on first save to the outbox table.

func (*Record) Null

func (r *Record) Null()

Null sets Null status to current Record.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry contains all handler that will be processed by Inbox.

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Handlers

func (r *Registry) Handlers() map[string][]Handler

Handlers returns map where key is event type and values are handlers associated to this event type.

func (*Registry) On

func (r *Registry) On(event string, handlers ...Handler)

On register new handlers to specific event key. All handlers will be executed on received event with provided key.

Example:

We have
 - event type = "order_events"
 - handler key = "process_order"
 - handler key = "update_order"
The registry will bind the keys "process_order", "update_order"
with event "order_events" and execute both registered handlers
for each received event with event type "order_events".

We have
- event type = "order_events"
- handler key = "process_order"
- handler key = "process_order"
If we are trying to provide several handlers with the same key,
then only the first handler will be associated with the event type.
The second handler will be ignored.

We have
- event type = "order_events"
- registered handler with key = "process_order"
- new handler key = "process_order"
If you are trying to provide a handler to an already existing event type, for example,
"order_events", and the handler has the same key as already provided, then
this handler will be ignored.

type Status

type Status string

Status defines current status of Record.

const (
	// Progress means the current Record is processed by worker.
	Progress Status = "progress"
	// Failed means the current Record not processed by worker by specific
	// reason.
	Failed Status = "failed"
	// Done means the current Record is successfully processed.
	Done Status = "done"
	// Null means the current Record is not processed yet.
	Null Status = ""
	// Dead means the current Record is not processable.
	Dead Status = "dead"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL