eventbus

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventStatusPending   = "pending"
	EventStatusDequeue   = "dequeue"
	EventStatusRunning   = "running"
	EventStatusCompleted = "completed"
	EventStatusFailed    = "failed"
)

Variables

View Source
var ErrEventNotFound = fmt.Errorf("event not found")

Functions

This section is empty.

Types

type ErrEventRunningDelayed

type ErrEventRunningDelayed struct {
	// contains filtered or unexported fields
}

ErrEventRunningDelayed stand for long-run event that need to delay add back.

func NewErrEventRunningDelayed

func NewErrEventRunningDelayed(message string, delay time.Duration) ErrEventRunningDelayed

func (ErrEventRunningDelayed) Delay

func (ErrEventRunningDelayed) Error

func (e ErrEventRunningDelayed) Error() string

type Event

type Event struct {
	EventID     string
	Type        string
	Payload     string
	Status      string
	CreatedAt   time.Time
	UpdatedAt   time.Time
	ScheduledAt time.Time
	RetryCount  int // The number of times the events has been retried
}

type EventBus

type EventBus interface {
	Publish(ctx context.Context, event IEvent) error
	Subscribe(eventType string, handler EventHandler)
	Start(ctx context.Context, workers int) error
	Close(ctx context.Context) error
}

EventBus stands for event bus.

func NewEventBus

func NewEventBus(repository EventRepository, options ...Option) (EventBus, error)

NewEventBus new an event bus.

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, payload string) error
}

EventHandler represents a function that handle a event

type EventHandlerFunc

type EventHandlerFunc func(ctx context.Context, payload string) error

func (EventHandlerFunc) Handle

func (f EventHandlerFunc) Handle(ctx context.Context, payload string) error

Handle calls f(ctx, event).

type EventRepository

type EventRepository interface {
	Get(ctx context.Context, id string) (*Event, error)
	Save(ctx context.Context, event *Event) error
	ListAndLockUnfinishedEvents(ctx context.Context, limit int, eventTypes []string) ([]*Event, error)
	UpdateStatus(ctx context.Context, event *Event, status string) error
	UpdateRetryCount(ctx context.Context, event *Event, retryCount int) error
	Search(ctx context.Context, filter *Filter) ([]*Event, error)
}

type Filter

type Filter struct {
	Type    []string
	Payload string
	Status  []string
}

type IEvent

type IEvent interface {
	EventType() string
	Payload() []byte
	Delay() time.Duration
}

type Impl

type Impl struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Impl implement event bus

func (*Impl) Close

func (engine *Impl) Close(ctx context.Context) error

Close exit event bus

func (*Impl) Publish

func (engine *Impl) Publish(ctx context.Context, iEvent IEvent) error

Publish publish an event with payload and delay.

func (*Impl) Start

func (engine *Impl) Start(ctx context.Context, workers int) error

Start start event bus

func (*Impl) Subscribe

func (engine *Impl) Subscribe(eventType string, handler EventHandler)

Subscribe register a handler for the event type.

type Option

type Option func(impl *Impl)

Option options of impl

func WithBatchSize

func WithBatchSize(batchSize int) Option

WithBatchSize set batch size

func WithMaxRetries

func WithMaxRetries(retry int) Option

WithMaxRetries set max retry

func WithSyncPeriod

func WithSyncPeriod(duration time.Duration) Option

WithSyncPeriod set sync period

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL