archer

package module
v1.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

README

Archer - Simple Job Queue for Go (PostgreSQL)

Archer is a simple, lightweight queue/job worker that uses PostgreSQL as its backing store. It aims to provide an easy-to-use API for defining, enqueuing, and executing background jobs in your Go applications.

For detailed documentation, see the docs.

Features

  • Simple API – Define and register your jobs easily.
  • Backed by PostgreSQL – Reliable storage and concurrency control.
  • Flexible Worker Pool – Customize worker concurrency (instances) and job timeouts as needed.
  • Context Awareness – Each job runs with a context, enabling you to cancel or time out jobs gracefully.
  • Custom Table Name – Store jobs in any table name you prefer.
  • DAG Package – Build complex workflows with directed acyclic graphs.

Installation

go get github.com/dyaksa/archer
Requirements
  • Go 1.20+ (or newer)
  • PostgreSQL 12+ (or newer)
Database Setup

Make sure you have a PostgreSQL database up and running. You’ll need to provide the connection details (address, user, password, etc.) in the archer.Options.

Archer relies on a table structure to manage jobs. Ensure you have run the migration script (if provided), or set up your table accordingly.

How It Works

archer under the hood uses a table like the following.

CREATE TABLE jobs (
  id varchar primary key,
  queue_name varchar not null,
  status varchar not null,
  arguments jsonb not null default '{}'::jsonb,
  result jsonb not null default '{}'::jsonb,
  last_error varchar,
  retry_count integer not null default 0,
  max_retry integer not null default 0,
  retry_interval integer not null default 0,
  scheduled_at timestamptz default now(),
  started_at timestamptz,
  created_at timestamptz not null default now(),
  updated_at timestamptz not null default now()
);

CREATE INDEX ON jobs (queue_name);
CREATE INDEX ON jobs (scheduled_at);
CREATE INDEX ON jobs (status);
CREATE INDEX ON jobs (started_at);

Usage

Importing the package

This package can be used by adding the following import statement to your .go files.

import "github.com/dyaksa/archer"
Worker Example

Below is a complete example of a worker that processes jobs named call_api. The job is expected to have CallApiArgs as arguments and returns a status code.

type CallApiResults struct {
	StatusCode int `json:"status_code"`
}

type CallApiArgs struct {
	URL    string
	Method string
	Body   any
}

func CallClient(ctx context.Context, job job.Job) (any, error) {
	args := CallApiArgs{}
	if err := job.ParseArguments(&args); err != nil {
		return nil, err
	}

	slog.Info("started job request id: " + job.ID)
	defer func() {
		slog.Info("finished job request id: " + job.ID)
	}()

	client := &http.Client{
		Timeout: 10 * time.Second,
	}

	b, err := json.Marshal(args.Body)
	if err != nil {
		return nil, err
	}

	req, err := http.NewRequest(args.Method, args.URL, bytes.NewBuffer(b))
	if err != nil {
		return nil, err
	}

	req.Header.Set("Content-Type", "application/json")

	resp, err := client.Do(req)
	if err != nil {
		return nil, err
	}

	res := CallApiResults{StatusCode: resp.StatusCode}

	return res, nil
}

func main() {
	c := archer.NewClient(&archer.Options{
		Addr:     "localhost:5432",
		Password: "password",
		User:     "admin",
		DBName:   "webhooks",
	})

	c.Register("call_api",
		CallClient,
		archer.WithInstances(1),
		archer.WithTimeout(1*time.Second),
	)

	if err := c.Start(); err != nil {
		panic(err)
	}
}
Client Example (Enqueuing Jobs)

To enqueue a job for processing, create or import the same archer.Client in a different part of your code or even a different service. Then call something like:

type CallApiArgs struct {
	URL    string
	Method string
	Body   any
}

type DataDto struct {
	Name     string `json:"name"`
	Email    string `json:"email"`
	Password string `json:"password"`
}

func main() {
	c := archer.NewClient(&archer.Options{
		Addr:     "localhost:5432",
		Password: "password",
		User:     "admin",
		DBName:   "webhooks",
	})

	dto := DataDto{
		Name:     "John",
		Email:    "sample@mail.com",
		Password: "sample123",
	}


	_, err := c.Schedule(
		context.Background(),
		uuid.NewString(),
		"call_api",
		CallApiArgs{URL: "http://localhost:3001/v4/upsert", Method: "POST", Body: dto},
		archer.WithMaxRetries(3),
	)

	if err != nil {
		return err
	}

slog.Info("done")
}
Running the examples

The example directory contains a sample worker and client. Start the worker with

go run ./example/worker

Then in another terminal, enqueue jobs using

go run ./example/client

These examples demonstrate how to register jobs and schedule them for execution.

Options

You can configure the Archer client with various options:

  • WithInstances(n int) Sets the number of concurrent workers that will process a particular job type.
  • WithTimeout(d time.Duration) Sets a timeout for each job. If the job does not complete within this duration, it is considered failed/cancelled.
  • WithRetryInterval(d time.Duration) Wait duration before retrying a failed job.
  • WithMaxRetries(n int) Maximum number of retry attempts for a job.
  • WithSetTableName(name string) Store jobs in a custom table name.
  • WithSleepInterval(d time.Duration) Delay between polling cycles for new jobs.
  • WithReaperInterval(d time.Duration) Interval for cleaning up finished or dead jobs.
  • WithErrHandler(func(error)) Custom handler for worker errors.
  • archer.NewClient(&archer.Options{ ... })
    • Addr: PostgreSQL host and port (e.g., localhost:5432)
    • User: DB user
    • Password: DB user’s password
    • DBName: Database name

Contributing

  • Fork the repository.
  • Create a new branch for your feature or bugfix.
  • Commit your changes and push them to your fork.
  • Create a Pull Request describing your changes.

We appreciate all contributions, whether they are documentation improvements, bug fixes, or new features!

License

Copyright 2024

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opt *Options, options ...ClientOptionFunc) *Client

func (*Client) Cancel

func (c *Client) Cancel(ctx context.Context, id string) (any, error)

func (*Client) Get

func (c *Client) Get(ctx context.Context, id string) (any, error)

func (Client) Register

func (r Client) Register(name string, w WorkerFn, opts ...WorkerOptionFunc)

func (*Client) Schedule

func (c *Client) Schedule(ctx context.Context, id string, queueName string, arguments interface{}, options ...FnOptions) (any, error)

func (*Client) ScheduleNow

func (c *Client) ScheduleNow(ctx context.Context, id string) (any, error)

func (*Client) Start

func (c *Client) Start() error

func (*Client) Stop

func (c *Client) Stop()

func (*Client) WithTx

func (c *Client) WithTx(tx *sql.Tx) Tx

type ClientOptionFunc

type ClientOptionFunc func(*Client) *Client

func WithErrHandler

func WithErrHandler(fn func(error)) ClientOptionFunc

func WithReaperInterval

func WithReaperInterval(t time.Duration) ClientOptionFunc

func WithSetTableName added in v1.1.3

func WithSetTableName(table string) ClientOptionFunc

func WithSleepInterval

func WithSleepInterval(t time.Duration) ClientOptionFunc

type FnOptions

type FnOptions func(j job.Job) job.Job

func WithMaxRetries

func WithMaxRetries(max int) FnOptions

func WithRetryInterval

func WithRetryInterval(interval time.Duration) FnOptions

func WithScheduleTime

func WithScheduleTime(t time.Time) FnOptions

type Handler

type Handler interface {
	Handle(ctx context.Context, job job.Job) error
}

type Mutate

type Mutate struct {
	store.WrapperTx
	// contains filtered or unexported fields
}

func (*Mutate) Update

func (m *Mutate) Update(ctx context.Context, j job.Job) error

Update updates the given job in the database within a transaction. It wraps the update operation in a transaction context and ensures that the transaction is properly managed.

Parameters:

  • ctx: The context for the update operation.
  • j: The job to be updated.

Returns:

  • error: An error if the update operation fails, otherwise nil.

type Options

type Options struct {
	Addr     string
	User     string
	Password string
	SSL      string
	DBName   string

	MaxIdleConns int
	MaxOpenConns int
}

type Queue

type Queue struct {
	store.WrapperTx
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(db *sql.DB, name string, tableName string) *Queue

func (*Queue) Poll

func (q *Queue) Poll(ctx context.Context) (*job.Job, error)

func (*Queue) RequeueTimeout

func (q *Queue) RequeueTimeout(ctx context.Context, timeout time.Duration) error

type Spawner

type Spawner interface {
	Spawn(runner)
	Wait()
	Shutdown()
	Done() <-chan struct{}
}

type Tx

type Tx interface {
	Schedule(ctx context.Context, id string, queueName string, arguments interface{}, options ...FnOptions) error
	Cancel(ctx context.Context, id string) error
	ScheduleNow(ctx context.Context, id string) error
	Get(ctx context.Context, id string) (*job.Job, error)
	Poll(ctx context.Context, queueName string) (*job.Job, error)
	Update(ctx context.Context, job job.Job) error
	RequeueTimeout(ctx context.Context, queueName string, timeout time.Time) error
}

type Worker

type Worker interface {
	Execute(ctx context.Context, job job.Job) (any, error)
	OnFailure(ctx context.Context, job job.Job) error
}

type WorkerFn

type WorkerFn func(ctx context.Context, job job.Job) (any, error)

type WorkerOptionFunc

type WorkerOptionFunc func(registerConfig) registerConfig

func WithCallbackFailed added in v1.1.5

func WithCallbackFailed(fn func(ctx context.Context, job job.Job, err error) (any, error)) WorkerOptionFunc

func WithCallbackSuccess added in v1.1.5

func WithCallbackSuccess(fn func(ctx context.Context, job job.Job, res any) (any, error)) WorkerOptionFunc

func WithInstances

func WithInstances(i int) WorkerOptionFunc

func WithTimeout

func WithTimeout(t time.Duration) WorkerOptionFunc

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL