queue

package
v0.1.0-alpha.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package queue provides a channel-based queue implementation and job processing utilities.

Index

Constants

This section is empty.

Variables

View Source
var ErrClosedQueue = errors.New("queue is closed")

ErrClosedQueue is returned when attempting to operate on a closed queue.

View Source
var ErrTimeout = errors.New("timeout")

ErrTimeout is returned when an enqueue operation times out.

Functions

This section is empty.

Types

type ChanQueue

type ChanQueue[T any] struct {
	// contains filtered or unexported fields
}

ChanQueue is a thread-safe channel-based queue implementation.

func NewChanQueue

func NewChanQueue[T any](bufferSize int, enqueueTimeout time.Duration) *ChanQueue[T]

NewChanQueue creates a new channel-based queue with the specified buffer size and enqueue timeout.

func (*ChanQueue[T]) Close

func (q *ChanQueue[T]) Close(_ context.Context) error

Close closes the queue and prevents further operations.

func (*ChanQueue[T]) EnqueueJob

func (q *ChanQueue[T]) EnqueueJob(ctx context.Context, job T) error

EnqueueJob adds a job to the queue with timeout support.

func (*ChanQueue[T]) GetJobChan

func (q *ChanQueue[T]) GetJobChan(_ context.Context) (chan T, error)

GetJobChan returns the underlying channel for reading jobs.

func (*ChanQueue[T]) Open

func (q *ChanQueue[T]) Open(_ context.Context) error

Open initializes the queue and makes it ready to accept jobs.

type Handler

type Handler[T any] interface {
	Handle(ctx context.Context, job T)
}

Handler defines the interface for processing jobs.

type HandlerFunc

type HandlerFunc[T any] func(ctx context.Context, job T)

HandlerFunc is an adapter to allow the use of ordinary functions as Handlers.

func (HandlerFunc[T]) Handle

func (f HandlerFunc[T]) Handle(ctx context.Context, job T)

Handle calls f(ctx, job).

type Processor

type Processor[T any] struct {
	// contains filtered or unexported fields
}

Processor manages a pool of workers to process jobs from a queue.

func New

func New[T any](handler Handler[T], queue Provider[T], workersAmount int, shutdownTimeout time.Duration) *Processor[T]

New creates a new Processor with the specified handler, queue, and configuration.

func (*Processor[T]) Enqueue

func (p *Processor[T]) Enqueue(ctx context.Context, job T) error

Enqueue adds a job to the queue for processing.

func (*Processor[T]) Run

func (p *Processor[T]) Run(ctx context.Context) error

Run starts the queue processor and blocks until all workers complete.

type Provider

type Provider[T any] interface {
	Open(ctx context.Context) error
	Close(ctx context.Context) error
	EnqueueJob(ctx context.Context, job T) error
	GetJobChan(ctx context.Context) (chan T, error)
}

Provider defines the interface for queue implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL