ratus

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Ratus

Go codecov Go Reference Swagger Validator Go Report Card Status

Ratus is a RESTful asynchronous task queue server. It translated concepts of distributed task queues into a set of resources that conform to REST principles and provides a consistent HTTP API for various backends.

The key features of Ratus are:

  • Self-contained binary with a fast in-memory storage.
  • Support multiple embedded or external storage engines.
  • Guaranteed at-least-once execution of tasks.
  • Unified model for prioritized and time-based scheduling.
  • Task-level timeout control with automatic recovery.
  • Language agnostic RESTful API with built-in Swagger UI.
  • Load balancing across a dynamic number of consumers.
  • Horizontal scaling through replication and partitioning.
  • Native support for Prometheus and Kubernetes.

Terminal screenshot

Quick Start

Installation

Ratus offers a variety of installation options:

  • Docker images are available on Docker Hub and GitHub Packages.
  • Kubernetes and Docker Compose examples can be found in the deployments directory.
  • Pre-built binaries for all major platforms are available on the GitHub releases page.
  • Build from source with go install github.com/hyperonym/ratus/cmd/ratus@latest.

Running Ratus from the command line is as simple as typing:

$ ratus

The above command will start an ephemeral Ratus instance using the default in-memory storage engine memdb and listen on the default HTTP port of 80.

To use another port and enable on-disk snapshot for persistence, start Ratus with:

$ ratus --port 8000 --engine memdb --memdb-snapshot-path ratus.db

Depending on the storage engine you choose, you may also need to deploy the corresponding database or broker. Using the mongodb engine as an example, assuming the database is already running locally, then start Ratus with:

$ ratus --port 8000 --engine mongodb --mongodb-uri mongodb://127.0.0.1:27017
Basic Usage

Concepts introduced by Ratus will be bolded below, see Concepts (a.k.a cheat sheet) to learn more.

cURL

A producer creates a new task and pushes it to the example topic:

$ curl -X POST -d '{"payload": "hello world"}' "http://127.0.0.1:8000/v1/topics/example/tasks/1"
Example response
{
	"created": 1,
	"updated": 0
}

A consumer can then make a promise to claim and execute the next task in the example topic:

$ curl -X POST "http://127.0.0.1:8000/v1/topics/example/promises?timeout=30s"
Example response
{
	"_id": "1",
	"topic": "example",
	"state": 1,
	"nonce": "e4SN6Si1nOnE53ou",
	"produced": "2022-07-29T20:00:00.0Z",
	"scheduled": "2022-07-29T20:00:00.0Z",
	"consumed": "2022-07-29T20:00:10.0Z",
	"deadline": "2022-07-29T20:00:40.0Z",
	"payload": "hello world"
}

After executing the task, remember to acknowledge Ratus that the task is completed using a commit:

$ curl -X PATCH "http://127.0.0.1:8000/v1/topics/example/tasks/1"
Example response
{
	"_id": "1",
	"topic": "example",
	"state": 2,
	"nonce": "",
	"produced": "2022-07-29T20:00:00.0Z",
	"scheduled": "2022-07-29T20:00:00.0Z",
	"consumed": "2022-07-29T20:00:10.0Z",
	"deadline": "2022-07-29T20:00:40.0Z",
	"payload": "hello world"
}

If a commit is not received before the promised deadline, the state of the task will be set back to pending, which in turn allows consumers to try to execute it again.

Go Client

Ratus comes with a Go client library that not only encapsulates all API calls, but also provides idiomatic poll-execute-commit workflows like Client.Poll and Client.Subscribe. The examples directory contains ready-to-run examples for using the library:

  • The hello world example demonstrated the basic usage of the client library.
  • The crawl frontier example implemented a simple URL frontier for distributed web crawlers. It utilized advanced features like concurrent subscribers and time-based task scheduling.

Concepts

Data Model
  • Task references an idempotent unit of work that should be executed asynchronously.
  • Topic refers to an ordered subset of tasks with the same topic name property.
  • Promise represents a claim on the ownership of an active task.
  • Commit contains a set of updates to be applied to a task.
Workflow
  • Producer client pushes tasks with their desired date-of-execution (scheduled times) to a topic.
  • Consumer client makes a promise to execute a task polled from a topic and acknowledges with a commit upon completion.
Topology
  • Both producer and consumer clients can have multiple instances running simultaneously.
  • Consumer instances can be added dynamically to increase throughput, and tasks will be naturally load balanced among consumers.
  • Consumer instances can be removed (or crash) at any time without risking to lose the task being executing: a task that has not received a commit after the promised deadline will be picked up and executed again by other consumers.
Task States
  • pending (0): The task is ready to be executed or is waiting to be executed in the future.
  • active (1): The task is being processed by a consumer. Active tasks that have timed out will be automatically reset to the pending state. Consumer code should handle failure and set the state to pending to retry later if necessary.
  • completed (2): The task has completed its execution. If the storage engine implementation supports TTL, completed tasks will be automatically deleted after the retention period has expired.
  • archived (3): The task is stored as an archive. Archived tasks will never be deleted due to expiration.
Behavior
  • Task IDs across all topics share the same namespace (ADR). Topics are simply subsets generated based on the topic properties of the tasks, so topics do not need to be created explicitly.
  • Ratus is a task scheduler when consumers can keep up with the task generation speed, or a priority queue when consumers cannot keep up with the task generation speed.
  • Tasks will not be executed until the scheduled time arrives. After the scheduled time, excessive tasks will be executed in the order of the scheduled time.

Engines

Ratus provides a consistent API for various backends, allowing users to choose a specific engine based on their needs without having to modify client-side code.

To use a specific engine, set the --engine flag or ENGINE environment variable to one of the following names:

Name Persistence Replication Partitioning Expiration
memdb ○/●
mongodb
MemDB

MemDB

MemDB is the default storage engine for Ratus. It is implemented on top of go-memdb, which is built on immutable radix trees. MemDB is suitable for development and production environments where durability is not critical.

Persistence

The MemDB storage engine is ephemeral by default, but it also provides snapshot-based persistence options. By setting the --memdb-snapshot-path flag or MEMDB_SNAPSHOT_PATH environment variable to a non-empty file path, Ratus will write on-disk snapshots at an interval specified by MEMDB_SNAPSHOT_INTERVAL.

MemDB does not write Append-Only Files (AOF), which means in case of Ratus stopping working without a graceful shutdown for any reason you should be prepared to lose the latest minutes of data. If durability is critical to your workflow, switch to an external storage engine like mongodb.

Implementation Details
  • List operations are relatively expensive as they require scanning the entire database or index until the required number of results are collected. Fortunately, these operations are not used in most scenarios.
  • Snapshotting is performed along with the periodic background jobs when appropriate. Writing snapshot files may delay the execution of background jobs if the amount of data is large.
  • Since the resolution of the scheduled time in MemDB is in millisecond level and is affected by the instance's own clock, the order in which consumers receive tasks is not strictly guaranteed.
  • TTL cannot be disabled for completed tasks, in order to preserve a task forever, set it to the archived state.
MongoDB

MongoDB

Ratus works best with MongoDB version ~4.4. MongoDB 5.0+ is also supported but requires additional considerations, see Implementation Details to learn more.

💭 TL;DR set MONGODB_DISABLE_ATOMIC_POLL=true when using Ratus with MongoDB 5.0+.

Replication

When using the MongoDB storage engine, the Ratus instance itself is stateless. For high availability, start multiple instances of Ratus and connect them to the same MongoDB replica set.

All Ratus instances should run behind load balancers configured with health checks. Producer and consumer clients should connect to the load balancer, not directly to the instances.

Partitioning

Horizontal scaling could be achieved through sharding the task collection. However, with the help of the TTL mechanism, partitioning is not necessary in most cases. The best performance and the strongest atomicity can only be obtained without sharding.

If the amount of data exceeds the capacity of a single node or replica set, choose from the following sharding options:

  • If there is a large number of topics, use a hashed index on the topic field as the shard key, this will also enable the best polling performance on a sharded cluster.
  • If there is a huge amount of tasks in a few topics, use a hashed index on the _id field as the shard key, this will also result in a more balanced data distribution.
Implementation Details
  • When using the MongoDB storage engine, tasks across all topics are stored in the same collection.
  • Task is the only concrete data model in the MongoDB storage engine, while topics and promises are just conceptual entities for enforcing the RESTful design principles.
  • Since the resolution of the scheduled time in MongoDB is in millisecond level and is affected by the instance's own clock, the order in which consumers receive tasks is not strictly guaranteed.
  • TTL cannot be disabled for completed tasks, in order to preserve a task forever, set it to the archived state.
  • It is not recommended to upsert tasks on sharded collections using the topic field as the shard key. Due to MongoDB's own limitations, atomic operations cannot be used in this case, and only a fallback scheme equivalent to delete before insert can be used, so atomicity and performance cannot be guaranteed. This problem can be circumvented by using simple inserts in conjunction with fine-tuned TTL settings.
  • By default, polling is implemented through findAndModify. In the event of a conflict, MongoDB's native optimistic concurrency control (OCC) will transparently retry the operation. But in MongoDB 5.0 and above, the retry will report a WriteConflict error in the database server's log (although the operation is still successful from the client's perspective). You can choose to ignore this error, or circumvent the problem by setting MONGODB_DISABLE_ATOMIC_POLL=true when using MongoDB 5.0+. This option will make Ratus to not use findAndModify for polling and instead rely on the application-level OCC layer to ensure atomicity.
Index Models

The following indexes will be created on startup, unless MONGODB_DISABLE_INDEX_CREATION is set to true:

Key Patterns Partial Filter Expression TTL
{"topic": "hashed"} - -
{"topic": 1, "scheduled": 1} {"state": 0} -
{"deadline": 1} {"state": 1} -
{"topic": 1} {"state": 1} -
{"consumed": 1} {"state": 2} MONGODB_RETENTION_PERIOD

Observability

Metrics and Labels

Ratus exposes the following Prometheus metrics on the /metrics endpoint:

Name Type Labels
ratus_request_duration_seconds histogram topic, method, endpoint, status_code
ratus_chore_duration_seconds histogram -
ratus_task_schedule_delay_seconds gauge topic, producer, consumer
ratus_task_execution_duration_seconds gauge topic, producer, consumer
ratus_task_produced_count_total counter topic, producer
ratus_task_consumed_count_total counter topic, producer, consumer
ratus_task_committed_count_total counter topic, producer, consumer
Liveness and Readiness

Ratus supports liveness and readiness probes via HTTP GET requests:

  • The /livez endpoint returns a status code of 200 if the instance is running.
  • The /readyz endpoint returns a status code of 200 if the instance is ready to accept traffic.

Caveats

  • 🚨 Topic names and task IDs must not contain plus signs ('+') due to gin-gonic/gin#2633.
  • It is not recommended to use Ratus as the primary storage of tasks. Instead, consider storing the complete task record in a database, and use a minimal descriptor as the payload for Ratus.
  • Ratus is a simple and efficient alternative to task queues like Celery. Consider to use RabbitMQ or Kafka if you need high-throughput message passing without task management.

Frequently Asked Questions

For more details, see Architectural Decision Records.

Why HTTP API?

Asynchronous task queues are typically used for long-running background tasks, so the overhead of HTTP is not significant compared to the time spent by the tasks themselves. On the other hand, the HTTP-based RESTful API can be easily accessed by all languages without using dedicated client libraries.

How to poll from multiple topics?

If the number of topics is limited and you don't care about the priority between them, you can choose to create multiple threads/goroutines to listen to them simultaneously. Alternatively, you can create a topic of topics to get the topic names in turn and then get the next task from the corresponding topic.

Roadmap

  • Storage engine options
    • MemDB
      • Ephemeral
      • Persistence with snapshots
      • Persistence with AOF
    • MongoDB
      • Standalone
      • Replica set
      • Sharded cluster
    • Redis
      • Standalone
      • Sentinel
      • Cluster
    • RDBMS
      • MySQL
      • PostgreSQL
    • Message broker
      • RabbitMQ
      • Amazon SQS
  • Multi-language documents
    • English
    • Chinese

See the open issues for a full list of proposed features.

Contributing

This project is open-source. If you have any ideas or questions, please feel free to reach out by creating an issue!

Contributions are greatly appreciated, please refer to CONTRIBUTING.md for more information.

License

Ratus is available under the Apache License 2.0.


© 2022-2024 Hyperonym

Documentation

Overview

Package ratus contains data models and a client library for Go applications.

Index

Constants

View Source
const DefaultConcurrencyDelay = 1 * time.Second

DefaultConcurrencyDelay is the default value of SubscribeOptions's ConcurrencyDelay.

View Source
const DefaultDrainInterval = 5 * time.Second

DefaultDrainInterval is the default value of SubscribeOptions's DrainInterval.

View Source
const DefaultErrorInterval = 30 * time.Second

DefaultErrorInterval is the default value of SubscribeOptions's ErrorInterval.

View Source
const DefaultLimit = 10

DefaultLimit is the default number of resources to return in pagination.

View Source
const DefaultTimeout = "10m"

DefaultTimeout is the default timeout duration for task execution.

View Source
const NonceLength = 16

NonceLength is the length of the randomly generated nonce strings.

View Source
const StatusClientClosedRequest = 499

StatusClientClosedRequest is the code for client closed request errors.

Variables

View Source
var (
	// ErrBadRequest is returned when the request is malformed.
	ErrBadRequest = errors.New("bad request")

	// ErrNotFound is returned when the requested resource is not found.
	ErrNotFound = errors.New("not found")

	// ErrConflict is returned when the resource conflicts with existing ones.
	ErrConflict = errors.New("conflict")

	// ErrClientClosedRequest is returned when the client closed the request.
	ErrClientClosedRequest = errors.New("client closed request")

	// ErrInternalServerError is returned when the server encountered a
	// situation it does not know how to handle.
	ErrInternalServerError = errors.New("internal server error")

	// ErrServiceUnavailable is returned when the service is unavailable.
	ErrServiceUnavailable = errors.New("service unavailable")
)

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is an HTTP client that talks to Ratus.

func NewClient

func NewClient(o *ClientOptions) (*Client, error)

NewClient creates a new Ratus client instance.

func (*Client) DeletePromise

func (c *Client) DeletePromise(ctx context.Context, id string) (*Deleted, error)

DeletePromise deletes a promise by the unique ID of its target task.

func (*Client) DeletePromises

func (c *Client) DeletePromises(ctx context.Context, topic string) (*Deleted, error)

DeletePromises deletes all promises in a topic.

func (*Client) DeleteTask

func (c *Client) DeleteTask(ctx context.Context, id string) (*Deleted, error)

DeleteTask deletes a task by its unique ID.

func (*Client) DeleteTasks

func (c *Client) DeleteTasks(ctx context.Context, topic string) (*Deleted, error)

DeleteTasks deletes all tasks in a topic.

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(ctx context.Context, topic string) (*Deleted, error)

DeleteTopic deletes a topic and its tasks.

func (*Client) DeleteTopics

func (c *Client) DeleteTopics(ctx context.Context) (*Deleted, error)

DeleteTopics deletes all topics and tasks.

func (*Client) GetLiveness

func (c *Client) GetLiveness(ctx context.Context) error

GetLiveness checks the liveness of the instance.

func (*Client) GetPromise

func (c *Client) GetPromise(ctx context.Context, id string) (*Promise, error)

GetPromise gets a promise by the unique ID of its target task.

func (*Client) GetReadiness

func (c *Client) GetReadiness(ctx context.Context) error

GetReadiness checks the readiness of the instance.

func (*Client) GetTask

func (c *Client) GetTask(ctx context.Context, id string) (*Task, error)

GetTask gets a task by its unique ID.

func (*Client) GetTopic

func (c *Client) GetTopic(ctx context.Context, topic string) (*Topic, error)

GetTopic gets information about a topic.

func (*Client) InsertPromise

func (c *Client) InsertPromise(ctx context.Context, p *Promise) (*Task, error)

InsertPromise makes a promise to claim and execute a task if it is in pending state.

func (*Client) InsertTask

func (c *Client) InsertTask(ctx context.Context, t *Task) (*Updated, error)

InsertTask inserts a new task.

func (*Client) InsertTasks

func (c *Client) InsertTasks(ctx context.Context, ts []*Task) (*Updated, error)

InsertTasks inserts a batch of tasks while ignoring existing ones.

func (*Client) ListPromises

func (c *Client) ListPromises(ctx context.Context, topic string, limit, offset int) ([]*Promise, error)

ListPromises lists all promises in a topic.

func (*Client) ListTasks

func (c *Client) ListTasks(ctx context.Context, topic string, limit, offset int) ([]*Task, error)

ListTasks lists all tasks in a topic.

func (*Client) ListTopics

func (c *Client) ListTopics(ctx context.Context, limit, offset int) ([]*Topic, error)

ListTopics lists all topics.

func (*Client) PatchTask

func (c *Client) PatchTask(ctx context.Context, id string, m *Commit) (*Task, error)

PatchTask applies a set of updates to a task and returns the updated task.

func (*Client) Poll

func (c *Client) Poll(ctx context.Context, topic string, p *Promise) (*Context, error)

Poll claims and returns the next available task in a topic. An error wrapping ErrNotFound is returned if the topic is empty, or if no task in the topic has reached its scheduled time of execution.

func (*Client) PostPromises

func (c *Client) PostPromises(ctx context.Context, topic string, p *Promise) (*Task, error)

PostPromises makes a promise to claim and execute the next available task in a topic.

func (*Client) Request

func (c *Client) Request(ctx context.Context, method, endpoint string, body, result any) error

Request calls an API endpoint and stores the response body in the value pointed to by result. Error messages from Ratus will be translated into errors and returned.

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, o *SubscribeOptions, f SubscribeHandler) error

Subscribe to a topic and attach a handler function to listen for new tasks and errors. Calling Subscribe will block the calling goroutine indefinitely unless the context times out or gets canceled.

func (*Client) UpsertPromise

func (c *Client) UpsertPromise(ctx context.Context, p *Promise) (*Task, error)

UpsertPromise makes a promise to claim and execute a task regardless of its current state.

func (*Client) UpsertTask

func (c *Client) UpsertTask(ctx context.Context, t *Task) (*Updated, error)

UpsertTask inserts or updates a task.

func (*Client) UpsertTasks

func (c *Client) UpsertTasks(ctx context.Context, ts []*Task) (*Updated, error)

UpsertTasks inserts or updates a batch of tasks.

type ClientOptions

type ClientOptions struct {

	// Origin of the Ratus instance or load balancer to connect to.
	// An origin is a combination of a scheme, hostname, and port.
	// Reference: https://web.dev/same-site-same-origin/#origin
	Origin string

	// Common header key-value pairs for every outgoing request.
	Headers map[string]string

	// Timeout specifies a time limit for requests made by this client.
	// This is not related to the timeout for task execution.
	// A Timeout of zero means no timeout.
	Timeout time.Duration
}

ClientOptions contains options to configure a Ratus client.

type Commit

type Commit struct {

	// If not empty, the commit will be accepted only if the value matches the
	// corresponding nonce of the target task.
	Nonce string `json:"nonce,omitempty" bson:"nonce,omitempty"`

	// If not empty, transfer the task to the specified topic.
	Topic string `json:"topic,omitempty" bson:"topic,omitempty"`

	// If not nil, set the state of the task to the specified value.
	// If nil, the state of the task will be set to "completed" by default.
	State *TaskState `json:"state,omitempty" bson:"state,omitempty"`

	// If not nil, set the scheduled time of the task to the specified value.
	Scheduled *time.Time `json:"scheduled,omitempty" bson:"scheduled,omitempty"`

	// If not nil, use this value to replace the payload of the task.
	Payload any `json:"payload,omitempty" bson:"payload,omitempty"`

	// A duration relative to the time the commit is accepted, indicating that
	// the task will be scheduled to execute after this duration. When the
	// absolute scheduled time is specified, the scheduled time will take
	// precedence. It is recommended to use relative durations whenever
	// possible to avoid clock synchronization issues. The value must be a
	// valid duration string parsable by time.ParseDuration. This field is only
	// used when creating a commit and will be cleared after converting to an
	// absolute scheduled time.
	Defer string `json:"defer,omitempty" bson:"-"`
}

Commit contains a set of updates to be applied to a task.

type Context

type Context struct {
	context.Context

	// Task acquired by the polling operation.
	Task *Task
	// contains filtered or unexported fields
}

Context wraps around context.Context to carry scoped values throughout the poll-execute-commit workflow. Its deadline will be automatically set based on the execution deadline of the acquired task. It also provides chainable methods for setting up commits. Since the custom context is only used in parameters and return values, it is not considered anti-pattern. Reference: https://github.com/golang/go/issues/22602

func (*Context) Abstain

func (ctx *Context) Abstain() *Context

Abstain is equivalent to calling SetState(TaskStatePending).

func (*Context) Archive

func (ctx *Context) Archive() *Context

Archive is equivalent to calling SetState(TaskStateArchived).

func (*Context) Commit

func (ctx *Context) Commit() error

Commit applies updates to the acquired task.

func (*Context) Force

func (ctx *Context) Force() *Context

Force sets the Nonce field of the commit to empty to allow force commits.

func (*Context) Reschedule

func (ctx *Context) Reschedule(t time.Time) *Context

Reschedule is equivalent to calling Abstain followed by SetScheduled(t).

func (*Context) Reset added in v0.2.0

func (ctx *Context) Reset() *Context

Reset discards all uncommitted updates.

func (*Context) Retry

func (ctx *Context) Retry(duration string) *Context

Retry is equivalent to calling Abstain followed by SetDefer(duration).

func (*Context) SetDefer

func (ctx *Context) SetDefer(duration string) *Context

SetDefer sets the value for the Defer field of the commit.

func (*Context) SetNonce

func (ctx *Context) SetNonce(nonce string) *Context

SetNonce sets the value for the Nonce field of the commit.

func (*Context) SetPayload

func (ctx *Context) SetPayload(v any) *Context

SetPayload sets the value for the Payload field of the commit.

func (*Context) SetScheduled

func (ctx *Context) SetScheduled(t time.Time) *Context

SetScheduled sets the value for the Scheduled field of the commit.

func (*Context) SetState

func (ctx *Context) SetState(s TaskState) *Context

SetState sets the value for the State field of the commit.

func (*Context) SetTopic

func (ctx *Context) SetTopic(topic string) *Context

SetTopic sets the value for the Topic field of the commit.

type Deleted

type Deleted struct {

	// Number of resources deleted by the operation.
	Deleted int64 `json:"deleted"`
}

Deleted contains result of a delete operation.

type Error

type Error struct {

	// The error object.
	Error struct {

		// Code of the error.
		Code int `json:"code"`

		// Message of the error.
		Message string `json:"message"`
	} `json:"error"`
}

Error contains an error message.

func NewError

func NewError(err error) *Error

NewError creates an error message from an error type.

func (*Error) Err

func (e *Error) Err() error

Err returns an error type from the error message. It will automatically wrap sentinel error types based on the code and remove duplicates in the message.

type Promise

type Promise struct {

	// Unique ID of the promise, which is the same as the target task ID.
	// A promise with an empty ID is considered an "wildcard promise", and
	// Ratus will assign an appropriate task based on the status of the queue.
	// A task can only be owned by a single promise at a given time.
	ID string `json:"_id,omitempty" bson:"_id" form:"_id"`

	// Identifier of the consumer instance who consumed the task.
	Consumer string `json:"consumer,omitempty" bson:"consumer,omitempty" form:"consumer"`

	// The deadline for the completion of execution promised by the consumer.
	// Consumer code needs to commit the task before this deadline, otherwise
	// the task is determined to have timed out and will be reset to the
	// "pending" state, allowing other consumers to retry.
	Deadline *time.Time `json:"deadline,omitempty" bson:"deadline,omitempty" form:"deadline"`

	// Timeout duration for task execution promised by the consumer. When the
	// absolute deadline time is specified, the deadline will take precedence.
	// It is recommended to use relative durations whenever possible to avoid
	// clock synchronization issues. The value must be a valid duration string
	// parsable by time.ParseDuration. This field is only used when creating a
	// promise and will be cleared after converting to an absolute deadline.
	Timeout string `json:"timeout,omitempty" bson:"-" form:"timeout"`
}

Promise represents a claim on the ownership of an active task.

type Promises

type Promises struct {
	Data []*Promise `json:"data"`
}

Promises contains a list of promise resources.

type SubscribeHandler

type SubscribeHandler func(ctx *Context, err error)

SubscribeHandler defines the signature of handler functions for the Subscribe method.

type SubscribeOptions

type SubscribeOptions struct {

	// A wildcard promise containing consumer and timeout settings.
	// The promise will be reused for all polling operations, thus the ID and
	// deadline fields will be ignored.
	Promise *Promise
	// Name of the topic to subscribe to.
	Topic string

	// Maximum number of tasks to be executed concurrently.
	Concurrency int
	// Delay added before starting each polling goroutine to avoid spikes.
	// If zero, DefaultConcurrencyDelay is used.
	ConcurrencyDelay time.Duration

	// Pause duration after successful polls.
	// By default will proceed to the next poll immediately without pausing.
	PollInterval time.Duration
	// Pause duration when the topic has been emptied.
	// If zero, DefaultDrainInterval is used.
	DrainInterval time.Duration
	// Pause duration when an error occurs.
	// If zero, DefaultErrorInterval is used.
	ErrorInterval time.Duration
}

SubscribeOptions contains options for subscribing to a topic.

type Task

type Task struct {

	// User-defined unique ID of the task.
	// Task IDs across all topics share the same namespace.
	ID string `json:"_id" bson:"_id"`

	// Topic that the task currently belongs to. Tasks under the same topic
	// will be executed according to the scheduled time.
	Topic string `json:"topic" bson:"topic"`

	// Current state of the task. At a given moment, the state of a task may be
	// either "pending", "active", "completed" or "archived".
	State TaskState `json:"state" bson:"state"`

	// The nonce field stores a random string for implementing an optimistic
	// concurrency control (OCC) layer outside of the storage engine. Ratus
	// ensures consumers can only commit to tasks that have not changed since
	// the promise was made by verifying the nonce field.
	Nonce string `json:"nonce" bson:"nonce"`

	// Identifier of the producer instance who produced the task.
	Producer string `json:"producer,omitempty" bson:"producer,omitempty"`
	// Identifier of the consumer instance who consumed the task.
	Consumer string `json:"consumer,omitempty" bson:"consumer,omitempty"`

	// The time the task was created.
	// Timestamps are generated by the instance running Ratus, remember to
	// perform clock synchronization before running multiple instances.
	Produced *time.Time `json:"produced,omitempty" bson:"produced,omitempty"`
	// The time the task is scheduled to be executed. Tasks will not be
	// executed until the scheduled time arrives. After the scheduled time,
	// excessive tasks will be executed in the order of the scheduled time.
	Scheduled *time.Time `json:"scheduled,omitempty" bson:"scheduled,omitempty"`
	// The time the task was claimed by a consumer.
	// Not to confuse this with the time of commit, which is not recorded.
	Consumed *time.Time `json:"consumed,omitempty" bson:"consumed,omitempty"`
	// The deadline for the completion of execution promised by the consumer.
	// Consumer code needs to commit the task before this deadline, otherwise
	// the task is determined to have timed out and will be reset to the
	// "pending" state, allowing other consumers to retry.
	Deadline *time.Time `json:"deadline,omitempty" bson:"deadline,omitempty"`

	// A minimal descriptor of the task to be executed.
	// It is not recommended to rely on Ratus as the main storage of tasks.
	// Instead, consider storing the complete task record in a database, and
	// use a minimal descriptor as the payload to reference the task.
	Payload any `json:"payload,omitempty" bson:"payload,omitempty"`

	// A duration relative to the time the task is accepted, indicating that
	// the task will be scheduled to execute after this duration. When the
	// absolute scheduled time is specified, the scheduled time will take
	// precedence. It is recommended to use relative durations whenever
	// possible to avoid clock synchronization issues. The value must be a
	// valid duration string parsable by time.ParseDuration. This field is only
	// used when creating a task and will be cleared after converting to an
	// absolute scheduled time.
	Defer string `json:"defer,omitempty" bson:"-"`
}

Task references an idempotent unit of work that should be executed asynchronously.

func (*Task) Decode added in v0.4.0

func (t *Task) Decode(v any) error

Decode parses the payload of the task and stores the result in the value pointed by the specified pointer.

type TaskState

type TaskState int32

TaskState indicates the state of a task.

const (
	// The "pending" state indicates that the task is ready to be executed or
	// is waiting to be executed in the future.
	TaskStatePending TaskState = iota

	// The "active" state indicates that the task is being processed by a
	// consumer. Active tasks that have timed out will be automatically reset
	// to the "pending" state. Consumer code should handle failure and set the
	// state to "pending" to retry later if necessary.
	TaskStateActive

	// The "completed" state indicates that the task has completed its execution.
	// If the storage engine implementation supports TTL, completed tasks will
	// be automatically deleted after the retention period has expired.
	TaskStateCompleted

	// The "archived" state indicates that the task is stored as an archive.
	// Archived tasks will never be deleted due to expiration.
	TaskStateArchived
)

type Tasks

type Tasks struct {
	Data []*Task `json:"data"`
}

Tasks contains a list of task resources.

type Topic

type Topic struct {

	// User-defined unique name of the topic.
	Name string `json:"name" bson:"_id"`

	// The number of tasks that belong to the topic.
	Count int64 `json:"count,omitempty" bson:"count,omitempty"`
}

Topic refers to an ordered subset of tasks with the same topic name property.

type Topics

type Topics struct {
	Data []*Topic `json:"data"`
}

Topics contains a list of topic resources.

type Updated

type Updated struct {

	// Number of resources created by the operation.
	Created int64 `json:"created"`

	// Number of resources updated by the operation.
	Updated int64 `json:"updated"`
}

Updated contains result of an update operation.

Directories

Path Synopsis
cmd
Package docs embeds documentation files.
Package docs embeds documentation files.
examples
internal
config
Package config contains configurations and command line arguments.
Package config contains configurations and command line arguments.
controller
Package controller implements controllers for API endpoints.
Package controller implements controllers for API endpoints.
engine
Package engine defines the interface for storage engine implementations.
Package engine defines the interface for storage engine implementations.
engine/memdb
Package memdb implements the storage engine interface for MemDB.
Package memdb implements the storage engine interface for MemDB.
engine/mongodb
Package mongodb implements the storage engine interface for MongoDB.
Package mongodb implements the storage engine interface for MongoDB.
engine/stub
Package stub provides a stub engine that returns canned data for testing.
Package stub provides a stub engine that returns canned data for testing.
metrics
Package metrics registers Prometheus metrics.
Package metrics registers Prometheus metrics.
middleware
Package middleware provides functions to inspect and transform requests.
Package middleware provides functions to inspect and transform requests.
nonce
Package nonce generates random alphanumeric strings of fixed length.
Package nonce generates random alphanumeric strings of fixed length.
reqtest
Package reqtest provides utilities for testing requests and responses.
Package reqtest provides utilities for testing requests and responses.
router
Package router provides API endpoint routing.
Package router provides API endpoint routing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL