queue

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package queue provides multiple patterns which implements the app.Runtime interface.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNoItem = errors.New("queue: no item")

ErrNoItem should be returned by Consumers when no item has been consumed.

Functions

This section is empty.

Types

type CommonOption

type CommonOption interface {
	SequentialOption
	ConcurrentOption
}

CommonOption are options which are common to all queue based runtimes.

func LogHandler

func LogHandler(h slog.Handler) CommonOption

LogHandler configures the underlying slog.Handler.

type ConcurrentOption

type ConcurrentOption interface {
	// contains filtered or unexported methods
}

ConcurrentOption are options for configuring the ConcurrentRuntime.

func MaxConcurrentProcessors

func MaxConcurrentProcessors(n uint) ConcurrentOption

MaxConcurrentProcessors configures a limit for the number of processor goroutines actively running.

type ConcurrentRuntime

type ConcurrentRuntime[T any] struct {
	// contains filtered or unexported fields
}

ConcurrentRuntime is a bedrock.Runtime for concurrently processing items from a queue.

func Concurrent

func Concurrent[T any](c Consumer[T], p Processor[T], opts ...ConcurrentOption) *ConcurrentRuntime[T]

Concurrent returns a fully initialized ConcurrentRuntime.

Concurrent will consume and process items as concurrent processes. For every item returned by the Consumer, c, the Processor, p, is called in a separate goroutine to process the item. Due to the concurrent execution of the Consumer and Processor, new items will be consumed before the current item has been completely processed.

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var n int
c := consumerFunc[int](func(_ context.Context) (int, error) {
	n += 1
	return n, nil
})

var processed atomic.Int64
var mu sync.Mutex
var nums []int
p := processorFunc[int](func(_ context.Context, n int) error {
	processed.Add(1)
	if processed.Load() > 5 {
		cancel()
		return nil
	}
	// items are processed concurrently so we can print them here
	// since the order is not gauranteed
	mu.Lock()
	nums = append(nums, n)
	mu.Unlock()
	return nil
})

rt := Concurrent[int](c, p, LogHandler(slog.Default().Handler()))

err := rt.Run(ctx)
if err != nil {
	fmt.Println(err)
	return
}

// since the numbers are processed concurrently
// there's no gaurantee that the list only contains
// 1, 2, 3, 4, 5.
fmt.Println(sum(nums) >= 15)
Output:

true
Example (MaxConcurrentProcessors)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var n int
c := consumerFunc[int](func(_ context.Context) (int, error) {
	n += 1
	return n, nil
})

var processed atomic.Int64
var mu sync.Mutex
var nums []int
p := processorFunc[int](func(_ context.Context, n int) error {
	processed.Add(1)
	if processed.Load() > 5 {
		cancel()
		return nil
	}
	// items are processed concurrently so we can print them here
	// since the order is not gauranteed
	mu.Lock()
	nums = append(nums, n)
	mu.Unlock()
	return nil
})

rt := Concurrent[int](
	c,
	p,
	LogHandler(slog.Default().Handler()),
	MaxConcurrentProcessors(1),
)

err := rt.Run(ctx)
if err != nil {
	fmt.Println(err)
	return
}

// Since there's only 1 processor goroutine and the
// nums are consumed sequentially the nums slice
// should be gauranteed to be 1 thru 5.
slices.Sort(nums)
fmt.Println(nums)
Output:

[1 2 3 4 5]

func (*ConcurrentRuntime[T]) Run

func (rt *ConcurrentRuntime[T]) Run(ctx context.Context) error

Run implements the app.Runtime interface

type Consumer

type Consumer[T any] interface {
	Consume(context.Context) (T, error)
}

Consumer consumes items from a queue.

If no item is consumed, then the Consumer should return ErrNoItem.

type Processor

type Processor[T any] interface {
	Process(context.Context, T) error
}

Processor processes items that are retrieved from a queue.

type SequentialOption

type SequentialOption interface {
	// contains filtered or unexported methods
}

SequentialOption are options for configuring the SequentialRuntime.

type SequentialRuntime

type SequentialRuntime[T any] struct {
	// contains filtered or unexported fields
}

SequentialRuntime is a bedrock.Runtime for sequentially processing items from a queue.

func Sequential

func Sequential[T any](c Consumer[T], p Processor[T], opts ...SequentialOption) *SequentialRuntime[T]

Sequential returns a fully initialized SequentialRuntime.

Sequential will first consume an item from the Consumer, c. Then, process that item with the given Processor, p. After, processing the item, this sequence repeats. Thus, no new item will be consumed from the queue until the current item has been processed.

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var n int
c := consumerFunc[int](func(_ context.Context) (int, error) {
	n += 1
	return n, nil
})
p := processorFunc[int](func(_ context.Context, n int) error {
	if n > 5 {
		cancel()
		return nil
	}
	// items are processed sequentially in this case so we can
	// compare based on the printed lines
	fmt.Println(n)
	return nil
})

rt := Sequential[int](c, p, LogHandler(slog.Default().Handler()))

err := rt.Run(ctx)
if err != nil {
	fmt.Println(err)
	return
}
Output:

1
2
3
4
5

func (*SequentialRuntime[T]) Run

func (rt *SequentialRuntime[T]) Run(ctx context.Context) error

Run implements the app.Runtime interface.

Directories

Path Synopsis
Package pubsub provides default implementations for using Google Cloud PubSub with the runtimes in the queue package.
Package pubsub provides default implementations for using Google Cloud PubSub with the runtimes in the queue package.
sqs
Package sqs provides default implementations for using AWS SQS with the runtimes in the queue package.
Package sqs provides default implementations for using AWS SQS with the runtimes in the queue package.
sqsslog
Package sqsslog provides slog helpers for SQS related data fields.
Package sqsslog provides slog helpers for SQS related data fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL