superbus

package
v0.0.0-...-5a89564 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: AGPL-3.0 Imports: 9 Imported by: 0

Documentation

Overview

Package superbus provides an asynchronous task handler.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Marshal

func Marshal(v any) ([]byte, error)

Marshal returns the encoded v value.

func Unmarshal

func Unmarshal(data []byte, v any) error

Unmarshal encodes data into v.

func WithUnmarshall

func WithUnmarshall[T any](t *Task)

WithUnmarshall registers a function that is responsible for payload decoding.

Types

type EagerEventManager

type EagerEventManager struct {
	// contains filtered or unexported fields
}

EagerEventManager is a simple event manager using channels for event management.

func NewEagerEventManager

func NewEagerEventManager() *EagerEventManager

NewEagerEventManager create an EagerEventManager instance.

func (*EagerEventManager) Listen

func (m *EagerEventManager) Listen()

Listen listens for new events.

func (*EagerEventManager) On

func (m *EagerEventManager) On(name string, f EventHandler)

On registers a event handler for a given event.

func (*EagerEventManager) Push

func (m *EagerEventManager) Push(name string, value []byte) error

Push sents an event to the event channel.

func (*EagerEventManager) Stop

func (m *EagerEventManager) Stop()

Stop stops the event listener.

type Event

type Event struct {
	Name  string `json:"name"`
	Value []byte `json:"value"`
}

Event is an event sent to the wire. It contains a name and a value that can be unmarshalled later.

type EventHandler

type EventHandler func(Event)

EventHandler is a function that deals with an event.

type EventManager

type EventManager interface {
	// Listen starts listening to events
	Listen()
	// Stop stops the event listener
	Stop()
	// Push sends an event
	Push(name string, value []byte) error
	// On register a callaback for a specific event
	On(name string, f EventHandler)
}

EventManager describes the methods implemented by an event manager.

type MemStore

type MemStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MemStore is a KvStore implementation using a simple in memory map.

func NewMemStore

func NewMemStore() *MemStore

NewMemStore returns a MemStore instance.

func (*MemStore) Clear

func (s *MemStore) Clear()

Clear deletes everything in the memory store.

func (*MemStore) Del

func (s *MemStore) Del(key string) error

Del removes the given key.

func (*MemStore) Get

func (s *MemStore) Get(key string) []byte

Get returns a value for the given key. Returns an empty string when the value does not exist.

func (*MemStore) Set

func (s *MemStore) Set(key string, value []byte, expiration time.Duration) error

Set insert or replace the value for the given key.

type Operation

type Operation struct {
	Name string `json:"name"`
	ID   any    `json:"id"`
}

Operation is the event sent when we launch a task.

type Payload

type Payload struct {
	ID    uuid.UUID `json:"id"`
	Delay int       `json:"delay"`
	Data  []byte    `json:"data"`
}

Payload is the stored content of a task.

type RedisEventManager

type RedisEventManager struct {
	// contains filtered or unexported fields
}

RedisEventManager is an event manager using redis as a message handler.

func NewRedisEventManager

func NewRedisEventManager(rdc *redis.Client) *RedisEventManager

NewRedisEventManager creates a RedisEventManager instance.

func (*RedisEventManager) Listen

func (m *RedisEventManager) Listen()

Listen listens for new events.

func (*RedisEventManager) On

func (m *RedisEventManager) On(name string, f EventHandler)

On registers a event handler for a given event.

func (*RedisEventManager) Push

func (m *RedisEventManager) Push(name string, value []byte) error

Push sends an event to the event channel.

func (*RedisEventManager) Stop

func (m *RedisEventManager) Stop()

Stop stops the event listener.

type RedisStore

type RedisStore struct {
	// contains filtered or unexported fields
}

RedisStore implements KvStore with redis.

func NewRedisStore

func NewRedisStore(rdb *redis.Client, prefix string) *RedisStore

NewRedisStore returns a RedisStore instance. The prefix is used for each key operation.

func (*RedisStore) Del

func (s *RedisStore) Del(key string) error

Del removes the given key.

func (*RedisStore) Get

func (s *RedisStore) Get(key string) []byte

Get returns a value for the given key. Returns an empty string when the value does not exist.

func (*RedisStore) Set

func (s *RedisStore) Set(key string, value []byte, expiration time.Duration) error

Set insert or replace the value for the given key.

type Store

type Store interface {
	Get(string) []byte
	Set(string, []byte, time.Duration) error
	Del(string) error
}

Store is a very basic key/value store.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task is a task shortcut.

func (Task) Cancel

func (t Task) Cancel(id any) error

Cancel removes the task's payload, effectively canceling it.

func (Task) IsRunning

func (t Task) IsRunning(id any) bool

IsRunning returns true if the task is currently running or in the queue.

func (Task) Log

func (t Task) Log() *slog.Logger

Log returns a log entry for the task.

func (Task) Run

func (t Task) Run(id any, data any) error

Run launches the task.

func (Task) Unmarshal

func (t Task) Unmarshal(data []byte) any

Unmarshal returns a decoded task's payload.

type TaskHandler

type TaskHandler func(*Operation, *Payload)

TaskHandler is the function called on a task.

type TaskManager

type TaskManager struct {
	sync.Mutex
	// contains filtered or unexported fields
}

TaskManager is the task manager.

func NewTaskManager

func NewTaskManager(m EventManager, s Store, options ...TaskManagerOption) *TaskManager

NewTaskManager creates a new TaskManager instance.

func (*TaskManager) Launch

func (tm *TaskManager) Launch(name string, id any, delay int, data any) error

Launch sends a task order for later launch.

func (*TaskManager) NewTask

func (tm *TaskManager) NewTask(name string, options ...TaskOption) Task

NewTask creates a new Task instance.

func (*TaskManager) Register

func (tm *TaskManager) Register(name string, f TaskHandler)

Register registers a task handler.

func (*TaskManager) Start

func (tm *TaskManager) Start()

Start starts the events listener and the process workers.

func (*TaskManager) Stop

func (tm *TaskManager) Stop()

Stop stops the event listener and wait for running tasks to finish.

type TaskManagerOption

type TaskManagerOption func(*TaskManager)

TaskManagerOption is a function that sets TaskManager option upon creation.

func WithNumWorkers

func WithNumWorkers(m int) TaskManagerOption

WithNumWorkers set the number of worker processes that handle operations.

func WithOperationPrefix

func WithOperationPrefix(p string) TaskManagerOption

WithOperationPrefix sets a prefix for all operation payloads.

type TaskOption

type TaskOption func(t *Task)

TaskOption is a function that sets Task option upon creation.

func WithTaskDelay

func WithTaskDelay(d int) TaskOption

WithTaskDelay sets the task's delay.

func WithTaskHandler

func WithTaskHandler(f func(data any)) TaskOption

WithTaskHandler adds the given handler to the task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL