broker

package
v0.0.0-...-4355fa8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelBroker

type ChannelBroker[T task.TaskOrResult] struct {
	// contains filtered or unexported fields
}

ChannelBroker is a task broker implementation that wraps a buffered Go channel. It manages task submission and retrieval for the GoFlow framework.

func NewChannelBroker

func NewChannelBroker[T task.TaskOrResult](bufferSize int) *ChannelBroker[T]

NewChannelBroker creates a new ChannelBroker with a buffered channel of the specified size. The buffer size determines how many tasks can be queued before further submissions block.

func (*ChannelBroker[T]) AwaitShutdown

func (cb *ChannelBroker[T]) AwaitShutdown()

func (*ChannelBroker[T]) Dequeue

func (cb *ChannelBroker[T]) Dequeue(_ context.Context) <-chan T

Dequeue returns a read-only channel of tasks, allowing workers to retrieve tasks for processing.

func (*ChannelBroker[T]) Submit

func (cb *ChannelBroker[T]) Submit(ctx context.Context, t T) error

Submit adds a task to the ChannelBroker's queue. If the queue is full, it will block until space is available.

type Encoder

type Encoder[T task.TaskOrResult] interface {
	Serialise(toSerialise T) ([]byte, error)
	Deserialise(toDeserialise []byte) (T, error)
}

Encoder defines methods for serializing and deserializing tasks of type T, where T satisfies task.TaskOrResult. This allows tasks and results to be encoded for sending over Redis and decoded when retrieved.

type RedisBroker

type RedisBroker[T task.TaskOrResult] struct {
	// contains filtered or unexported fields
}

RedisBroker is a Redis-backed message broker that supports submitting and asynchronously retrieving tasks of type T. It provides a channel-based interface for consuming tasks and includes options for configuring logging.

func NewRedisBroker

func NewRedisBroker[T task.TaskOrResult](
	client redisClient,
	key string,
	encoder Encoder[T],
	opt ...RedisBrokerOption,
) *RedisBroker[T]

NewRedisBroker creates a new RedisBroker instance with the specified Redis client, queue key, and Encoder.

func (*RedisBroker[T]) AwaitShutdown

func (rb *RedisBroker[T]) AwaitShutdown()

AwaitShutdown waits for the background polling goroutine to finish. This method should be called during shutdown to ensure all resources are released.

func (*RedisBroker[T]) Dequeue

func (rb *RedisBroker[T]) Dequeue(ctx context.Context) <-chan T

Dequeue returns a receive-only channel that emits tasks as they are retrieved from the Redis queue. Dequeue starts a background goroutine for polling tasks from the queue. The polling process stops when the provided context is canceled. errors are logged but they will not stop the polling loop. pollRedis exits when the provided context is cancelled.

func (*RedisBroker[T]) Submit

func (rb *RedisBroker[T]) Submit(ctx context.Context, submission T) error

Submit serializes a task and pushes it to the Redis queue. If serialization or pushing fails, Submit returns an error.

type RedisBrokerOption

type RedisBrokerOption interface {
	// contains filtered or unexported methods
}

A RedisBrokerOption sets options such as logger.

func WithLogger

func WithLogger(logger log.Logger) RedisBrokerOption

WithLogger allows you to set logger that will report on basic warnings when interacting with redis.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL