superbus

package
v0.0.0-...-60192f8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Overview

Package superbus provides an asynchronous task handler.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EagerEventManager

type EagerEventManager struct {
	// contains filtered or unexported fields
}

EagerEventManager is a simple event manager using channels for event management.

func NewEagerEventManager

func NewEagerEventManager() *EagerEventManager

NewEagerEventManager create an EagerEventManager instance.

func (*EagerEventManager) Listen

func (m *EagerEventManager) Listen()

Listen listens for new events.

func (*EagerEventManager) On

func (m *EagerEventManager) On(name string, f EventHandler)

On registers a event handler for a given event.

func (*EagerEventManager) Push

func (m *EagerEventManager) Push(name string, value []byte) error

Push sents an event to the event channel.

func (*EagerEventManager) Stop

func (m *EagerEventManager) Stop()

Stop stops the event listener.

type Event

type Event struct {
	Name  string `json:"name"`
	Value []byte `json:"value"`
}

Event is an event sent to the wire. It contains a name and a value that can be unmarshalled later.

type EventHandler

type EventHandler func(Event)

EventHandler is a function that deals with an event.

type EventManager

type EventManager interface {
	// Listen starts listening to events
	Listen()
	// Stop stops the event listener
	Stop()
	// Push sends an event
	Push(name string, value []byte) error
	// On register a callaback for a specific event
	On(name string, f EventHandler)
}

EventManager describes the methods implemented by an event manager.

type MemStore

type MemStore struct {
	sync.Mutex
	// contains filtered or unexported fields
}

MemStore is a KvStore implementation using a simple in memory map.

func NewMemStore

func NewMemStore() *MemStore

NewMemStore returns a MemStore instance.

func (*MemStore) Clear

func (s *MemStore) Clear()

Clear deletes everything in the memory store.

func (*MemStore) Del

func (s *MemStore) Del(key string) error

Del removes the given key.

func (*MemStore) Get

func (s *MemStore) Get(key string) string

Get returns a value for the given key. Returns an empty string when the value does not exist.

func (*MemStore) Set

func (s *MemStore) Set(key, value string, expiration time.Duration) error

Set insert or replace the value for the given key.

type Operation

type Operation struct {
	Name string      `json:"name"`
	ID   interface{} `json:"id"`
}

Operation is the event sent when we launch a task.

type Payload

type Payload struct {
	ID    uuid.UUID `json:"id"`
	Delay int       `json:"delay"`
	Data  []byte    `json:"data"`
}

Payload is the stored content of a task.

type RedisEventManager

type RedisEventManager struct {
	// contains filtered or unexported fields
}

RedisEventManager is an event manager using redis as a message handler.

func NewRedisEventManager

func NewRedisEventManager(rdc *redis.Client) *RedisEventManager

NewRedisEventManager creates a RedisEventManager instance.

func (*RedisEventManager) Listen

func (m *RedisEventManager) Listen()

Listen listens for new events.

func (*RedisEventManager) On

func (m *RedisEventManager) On(name string, f EventHandler)

On registers a event handler for a given event.

func (*RedisEventManager) Push

func (m *RedisEventManager) Push(name string, value []byte) error

Push sends an event to the event channel.

func (*RedisEventManager) Stop

func (m *RedisEventManager) Stop()

Stop stops the event listener.

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore implements KvStore with redis.

func NewRedisStore

func NewRedisStore(rdb *redis.Client, prefix string) *RedisStore

NewRedisStore returns a RedisStore instance. The prefix is used for each key operation.

func (*RedisStore) Del

func (s *RedisStore) Del(key string) error

Del removes the given key.

func (*RedisStore) Get

func (s *RedisStore) Get(key string) string

Get returns a value for the given key. Returns an empty string when the value does not exist.

func (*RedisStore) Set

func (s *RedisStore) Set(key, value string, expiration time.Duration) error

Set insert or replace the value for the given key.

type Store

type Store interface {
	Get(string) string
	Set(string, string, time.Duration) error
	Del(string) error
}

Store is a very basic key/value store.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a task shortcut.

func (Task) Cancel

func (t Task) Cancel(id interface{}) error

Cancel removes the task's payload, effectively canceling it.

func (Task) IsRunning

func (t Task) IsRunning(id interface{}) bool

IsRunning returns true if the task is currently running or in the queue.

func (Task) Log

func (t Task) Log() *log.Entry

Log returns a log entry for the task.

func (Task) Run

func (t Task) Run(id interface{}, data interface{}) error

Run launches the task.

type TaskHandler

type TaskHandler func(*Operation, *Payload)

TaskHandler is the function called on a task.

type TaskManager

type TaskManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TaskManager is the task manager.

func NewTaskManager

func NewTaskManager(m EventManager, s Store, options ...TaskManagerOption) *TaskManager

NewTaskManager creates a new TaskManager instance.

func (*TaskManager) Launch

func (tm *TaskManager) Launch(name string, id interface{}, delay int, data interface{}) error

Launch sends a task order for later launch.

func (*TaskManager) NewTask

func (tm *TaskManager) NewTask(name string, options ...TaskOption) Task

NewTask creates a new Task instance.

func (*TaskManager) Register

func (tm *TaskManager) Register(name string, f TaskHandler)

Register registers a task handler.

func (*TaskManager) Start

func (tm *TaskManager) Start()

Start starts the events listener and the process workers.

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the event listener and wait for running tasks to finish.

type TaskManagerOption

type TaskManagerOption func(*TaskManager)

TaskManagerOption is a function that sets TaskManager option upon creation.

func WithNumWorkers

func WithNumWorkers(m int) TaskManagerOption

WithNumWorkers set the number of worker processes that handle operations.

func WithOperationPrefix

func WithOperationPrefix(p string) TaskManagerOption

WithOperationPrefix sets a prefix for all operation payloads.

type TaskOption

type TaskOption func(t *Task)

TaskOption is a function that sets Task option upon creation.

func WithTaskDelay

func WithTaskDelay(d int) TaskOption

WithTaskDelay sets the task's delay.

func WithTaskHandler

func WithTaskHandler(f func(data interface{})) TaskOption

WithTaskHandler adds the given handler to the task.

func WithUnmarshall

func WithUnmarshall(f func(data []byte) interface{}) TaskOption

WithUnmarshall registers a function that is responsible for payload decoding.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL