Documentation ¶
Overview ¶
Package queue provides multiple patterns which implements the app.Runtime interface.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrNoItem = errors.New("queue: no item")
ErrNoItem should be returned by Consumers when no item has been consumed.
Functions ¶
This section is empty.
Types ¶
type CommonOption ¶
type CommonOption interface { SequentialOption ConcurrentOption }
CommonOption are options which are common to all queue based runtimes.
func LogHandler ¶
func LogHandler(h slog.Handler) CommonOption
LogHandler configures the underlying slog.Handler.
type ConcurrentOption ¶
type ConcurrentOption interface {
// contains filtered or unexported methods
}
ConcurrentOption are options for configuring the ConcurrentRuntime.
func MaxConcurrentProcessors ¶
func MaxConcurrentProcessors(n uint) ConcurrentOption
MaxConcurrentProcessors configures a limit for the number of processor goroutines actively running.
type ConcurrentRuntime ¶
type ConcurrentRuntime[T any] struct { // contains filtered or unexported fields }
ConcurrentRuntime is a bedrock.Runtime for concurrently processing items from a queue.
func Concurrent ¶
func Concurrent[T any](c Consumer[T], p Processor[T], opts ...ConcurrentOption) *ConcurrentRuntime[T]
Concurrent returns a fully initialized ConcurrentRuntime.
Concurrent will consume and process items as concurrent processes. For every item returned by the Consumer, c, the Processor, p, is called in a separate goroutine to process the item. Due to the concurrent execution of the Consumer and Processor, new items will be consumed before the current item has been completely processed.
Example ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() var n int c := consumerFunc[int](func(_ context.Context) (int, error) { n += 1 return n, nil }) var processed atomic.Int64 var mu sync.Mutex var nums []int p := processorFunc[int](func(_ context.Context, n int) error { processed.Add(1) if processed.Load() > 5 { cancel() return nil } // items are processed concurrently so we can print them here // since the order is not gauranteed mu.Lock() nums = append(nums, n) mu.Unlock() return nil }) rt := Concurrent[int](c, p, LogHandler(slog.Default().Handler())) err := rt.Run(ctx) if err != nil { fmt.Println(err) return } // since the numbers are processed concurrently // there's no gaurantee that the list only contains // 1, 2, 3, 4, 5. fmt.Println(sum(nums) >= 15)
Output: true
Example (MaxConcurrentProcessors) ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() var n int c := consumerFunc[int](func(_ context.Context) (int, error) { n += 1 return n, nil }) var processed atomic.Int64 var mu sync.Mutex var nums []int p := processorFunc[int](func(_ context.Context, n int) error { processed.Add(1) if processed.Load() > 5 { cancel() return nil } // items are processed concurrently so we can print them here // since the order is not gauranteed mu.Lock() nums = append(nums, n) mu.Unlock() return nil }) rt := Concurrent[int]( c, p, LogHandler(slog.Default().Handler()), MaxConcurrentProcessors(1), ) err := rt.Run(ctx) if err != nil { fmt.Println(err) return } // Since there's only 1 processor goroutine and the // nums are consumed sequentially the nums slice // should be gauranteed to be 1 thru 5. slices.Sort(nums) fmt.Println(nums)
Output: [1 2 3 4 5]
type Consumer ¶
Consumer consumes items from a queue.
If no item is consumed, then the Consumer should return ErrNoItem.
type SequentialOption ¶
type SequentialOption interface {
// contains filtered or unexported methods
}
SequentialOption are options for configuring the SequentialRuntime.
type SequentialRuntime ¶
type SequentialRuntime[T any] struct { // contains filtered or unexported fields }
SequentialRuntime is a bedrock.Runtime for sequentially processing items from a queue.
func Sequential ¶
func Sequential[T any](c Consumer[T], p Processor[T], opts ...SequentialOption) *SequentialRuntime[T]
Sequential returns a fully initialized SequentialRuntime.
Sequential will first consume an item from the Consumer, c. Then, process that item with the given Processor, p. After, processing the item, this sequence repeats. Thus, no new item will be consumed from the queue until the current item has been processed.
Example ¶
ctx, cancel := context.WithCancel(context.Background()) defer cancel() var n int c := consumerFunc[int](func(_ context.Context) (int, error) { n += 1 return n, nil }) p := processorFunc[int](func(_ context.Context, n int) error { if n > 5 { cancel() return nil } // items are processed sequentially in this case so we can // compare based on the printed lines fmt.Println(n) return nil }) rt := Sequential[int](c, p, LogHandler(slog.Default().Handler())) err := rt.Run(ctx) if err != nil { fmt.Println(err) return }
Output: 1 2 3 4 5
Directories ¶
Path | Synopsis |
---|---|
Package pubsub provides default implementations for using Google Cloud PubSub with the runtimes in the queue package.
|
Package pubsub provides default implementations for using Google Cloud PubSub with the runtimes in the queue package. |
Package sqs provides default implementations for using AWS SQS with the runtimes in the queue package.
|
Package sqs provides default implementations for using AWS SQS with the runtimes in the queue package. |
sqsslog
Package sqsslog provides slog helpers for SQS related data fields.
|
Package sqsslog provides slog helpers for SQS related data fields. |