Documentation
¶
Overview ¶
Package goncordia provides a transactional job queue engine for Go. It supports multiple storage backends (Postgres, MySQL, SQLite, MongoDB, Redis, in-memory) through a driver interface parameterized by the native transaction type of each backend.
Transactional usage (shared transaction with business logic):
tx, _ := pool.Begin(ctx)
_, _ = queries.CreateOrder(ctx, tx, orderParams)
_, _ = client.EnqueueTx(ctx, tx, SendConfirmationEmailArgs{OrderID: id}, nil)
tx.Commit(ctx) // both operations are atomic
Non-transactional usage (at-least-once semantics):
client.Enqueue(ctx, SendConfirmationEmailArgs{OrderID: id}, nil)
Index ¶
- type Client
- func (c *Client[TTx]) Cancel(ctx context.Context, id string) error
- func (c *Client[TTx]) Enqueue(ctx context.Context, args core.JobArgs, opts *core.InsertOpts) (*driver.JobInsertResult, error)
- func (c *Client[TTx]) EnqueueMany(ctx context.Context, args []core.JobArgs, opts *core.InsertOpts) ([]driver.JobInsertResult, error)
- func (c *Client[TTx]) EnqueueManyTx(ctx context.Context, tx TTx, args []core.JobArgs, opts *core.InsertOpts) ([]driver.JobInsertResult, error)
- func (c *Client[TTx]) EnqueueTx(ctx context.Context, tx TTx, args core.JobArgs, opts *core.InsertOpts) (*driver.JobInsertResult, error)
- type ClientConfig
- type CronConfig
- type CronScheduler
- type JobMiddleware
- type PeriodicJob
- type WorkerConfig
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client[TTx any] struct { // contains filtered or unexported fields }
Client enqueues jobs into the job queue. TTx is the transaction type of the chosen backend driver (e.g. *pgx.Tx for pgxv5, *sql.Tx for stdlib, mongo.SessionContext for mongodb).
func NewClient ¶
func NewClient[TTx any](d driver.Driver[TTx], cfg ClientConfig) *Client[TTx]
NewClient creates a Client backed by the given driver.
func (*Client[TTx]) Cancel ¶
Cancel marks a job as cancelled. The job must be in available or scheduled state.
func (*Client[TTx]) Enqueue ¶
func (c *Client[TTx]) Enqueue(ctx context.Context, args core.JobArgs, opts *core.InsertOpts) (*driver.JobInsertResult, error)
Enqueue inserts a single job without a transaction (at-least-once semantics). Safe to call for all backends; for SQL/MongoDB backends prefer EnqueueTx for atomicity.
func (*Client[TTx]) EnqueueMany ¶
func (c *Client[TTx]) EnqueueMany(ctx context.Context, args []core.JobArgs, opts *core.InsertOpts) ([]driver.JobInsertResult, error)
EnqueueMany inserts multiple jobs in a single batch (non-transactional).
func (*Client[TTx]) EnqueueManyTx ¶
func (c *Client[TTx]) EnqueueManyTx(ctx context.Context, tx TTx, args []core.JobArgs, opts *core.InsertOpts) ([]driver.JobInsertResult, error)
EnqueueManyTx inserts multiple jobs within an existing transaction.
func (*Client[TTx]) EnqueueTx ¶
func (c *Client[TTx]) EnqueueTx(ctx context.Context, tx TTx, args core.JobArgs, opts *core.InsertOpts) (*driver.JobInsertResult, error)
EnqueueTx inserts a job within an existing transaction. The job becomes visible to workers only when tx is committed. Only available on backends with Capabilities.NativeTx == true.
type ClientConfig ¶
type ClientConfig struct {
// DefaultQueue is used when InsertOpts.Queue is empty. Default: "default".
DefaultQueue string
}
ClientConfig controls optional Client behavior.
type CronConfig ¶
type CronConfig struct {
// TickInterval controls how often the scheduler checks for due jobs.
// Default: 1 second.
TickInterval time.Duration
// Clock overrides the time source. Defaults to clock.Real{}.
Clock clock.Clock
}
CronConfig configures a CronScheduler.
type CronScheduler ¶
type CronScheduler[TTx any] struct { // contains filtered or unexported fields }
CronScheduler enqueues periodic jobs on a configurable tick. It does not process jobs — pair it with a WorkerPool.
Usage:
cs := goncordia.NewCronScheduler(d, []goncordia.PeriodicJob{
{Schedule: core.Every(time.Hour), Args: CleanupArgs{}},
}, goncordia.CronConfig{})
go cs.Start(ctx)
func NewCronScheduler ¶
func NewCronScheduler[TTx any](d driver.Driver[TTx], jobs []PeriodicJob, cfg CronConfig) *CronScheduler[TTx]
NewCronScheduler creates a CronScheduler backed by d. jobs is the list of periodic jobs to manage.
type JobMiddleware ¶
type JobMiddleware func(ctx context.Context, job *core.RawJob, next func(context.Context, *core.RawJob) error) error
JobMiddleware wraps job execution. Call next to continue the chain. Middleware is applied around the actual job handler, inside panic recovery, so err is never nil when the handler panicked — panics are converted to errors.
type PeriodicJob ¶
type PeriodicJob struct {
// Schedule determines when the job runs.
Schedule core.Schedule
// Args is the job to enqueue.
Args core.JobArgs
// Opts are passed through to Enqueue on each tick (optional).
Opts *core.InsertOpts
}
PeriodicJob pairs a Schedule with the job args to enqueue on each tick.
type WorkerConfig ¶
type WorkerConfig struct {
// Queues lists the queues this worker pool processes.
// If empty, only "default" is polled.
Queues []string
// Concurrency is the maximum number of jobs running simultaneously.
// Default: 10.
Concurrency int
// PollInterval is how long to wait between polls when the queue is empty.
// Only used when the backend has no push notification support.
// Default: 1 second.
PollInterval time.Duration
// RetryPolicy controls retry timing. Default: ExponentialRetry.
RetryPolicy core.RetryPolicy
// ShutdownTimeout is the max duration to wait for in-flight jobs during shutdown.
// Default: 30 seconds.
ShutdownTimeout time.Duration
// Clock overrides the time source. Defaults to clock.Real{}.
// Inject clock.NewMock() in tests to control time.
Clock clock.Clock
// Middleware is applied around each job execution in order (outermost first).
// Use it to add tracing, logging, or metrics without modifying job handlers.
Middleware []JobMiddleware
}
WorkerConfig configures the worker pool.
type WorkerPool ¶
type WorkerPool[TTx any] struct { // contains filtered or unexported fields }
WorkerPool processes jobs from the queue using a pool of goroutines. TTx is the driver's transaction type (needed only for type parameter inference; the pool itself does not open user-visible transactions).
func NewWorkerPool ¶
func NewWorkerPool[TTx any](d driver.Driver[TTx], registry *core.Registry, cfg WorkerConfig) *WorkerPool[TTx]
NewWorkerPool creates a WorkerPool. Register workers using core.RegisterWorker before calling Start.
func (*WorkerPool[TTx]) Start ¶
func (p *WorkerPool[TTx]) Start(ctx context.Context) error
Start launches the fetch-and-process loops. Blocks until ctx is cancelled or Stop is called.
func (*WorkerPool[TTx]) Stop ¶
func (p *WorkerPool[TTx]) Stop()
Stop initiates a graceful shutdown, waiting up to ShutdownTimeout for in-flight jobs.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package core contains the backend-agnostic engine logic: job/worker registration, retry policies, scheduling, and middleware.
|
Package core contains the backend-agnostic engine logic: job/worker registration, retry policies, scheduling, and middleware. |
|
Package driver defines the interfaces that all storage backend drivers must implement.
|
Package driver defines the interfaces that all storage backend drivers must implement. |
|
bun
Package bundriver provides a goncordia driver that accepts *bun.Tx transactions.
|
Package bundriver provides a goncordia driver that accepts *bun.Tx transactions. |
|
cassandra
Package cassandradriver provides a goncordia driver backed by Apache Cassandra.
|
Package cassandradriver provides a goncordia driver backed by Apache Cassandra. |
|
clickhouse
Package clickhousedriver provides a goncordia driver backed by ClickHouse.
|
Package clickhousedriver provides a goncordia driver backed by ClickHouse. |
|
dynamodb
Package dynamodbdriver provides a goncordia driver backed by Amazon DynamoDB.
|
Package dynamodbdriver provides a goncordia driver backed by Amazon DynamoDB. |
|
firestore
Package firestoredriver provides a goncordia driver backed by Google Cloud Firestore.
|
Package firestoredriver provides a goncordia driver backed by Google Cloud Firestore. |
|
gorm
Package gormdriver provides a goncordia driver that accepts *gorm.DB transactions.
|
Package gormdriver provides a goncordia driver that accepts *gorm.DB transactions. |
|
memory
Package memory provides an in-memory driver for testing and development.
|
Package memory provides an in-memory driver for testing and development. |
|
mongodb
Package mongodriver provides a goncordia driver backed by MongoDB.
|
Package mongodriver provides a goncordia driver backed by MongoDB. |
|
pgxv5
Package pgxv5 provides a goncordia driver backed by PostgreSQL via pgx/v5.
|
Package pgxv5 provides a goncordia driver backed by PostgreSQL via pgx/v5. |
|
redis
Package redisdriver provides a goncordia driver backed by Redis.
|
Package redisdriver provides a goncordia driver backed by Redis. |
|
stdlib
Package stdlib provides a goncordia driver backed by database/sql.
|
Package stdlib provides a goncordia driver backed by database/sql. |
|
Package gontest provides test helpers for goncordia.
|
Package gontest provides test helpers for goncordia. |
|
internal
|
|
|
clock
Package clock provides a Clock interface and implementations for real and mock time.
|
Package clock provides a Clock interface and implementations for real and mock time. |
|
Package otelgoncordia provides OpenTelemetry instrumentation for goncordia.
|
Package otelgoncordia provides OpenTelemetry instrumentation for goncordia. |