nq

package module
v0.1.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2022 License: MIT Imports: 19 Imported by: 1

README

Reliable, Efficient and Cancellable Distributed Task Queue in Go

Go Report Card GoDoc

NQ ( Nats Queue ) is Go package to queuing and processing jobs in background with workers. Backend by nats-server and with a focus on cancel-ability of enqueued jobs.

Overview of how NQ works:

  • Client puts tasks into streams ( nats jetstream )
  • Server pulls those tasks and executes them in a goroutine worker
  • Processed concurrently by multiple workers
  • Tasks states are stored in nats key-value store. ( An interface can be implemented to support other stores )

Task streams can be used to distribute work across multiple machines, where machines can run server ( worker server ) and brokers, for high availability and horizontal scaling.

NQ requires nats-server with jetstream support. Feedback is appreciated.

How does it work?: Task Queue Figure This package was designed such that a task should always be cancellable by client. Upon network partision ( eg. disconnect from nats-server ), workers can be configured to cancel and quit instantly.

For scalable task execution, client can submit task to queues which are load-balanced across servers ( subscribed to said queues ). When a task is to be cancelled, client issues cancel request to all servers subsribed to the queue, think multicast, and responsible server cancels executing task via go-context. A task in state ∈ {Completed, Failed, Cancelled} cannot be cancelled. While a task still in queue, pending for it's execution, will be removed from the queue and a task in execution will be cancelled by calling cancel method on it's context. For successful cancellations it is important that ProcessingFunc, the function executing said task, should respect context provided to it.

To learn more about stream, subjects and other nats related terms refer

Features

  • Multiple task queues
  • Deadline and Timeout for tasks
  • CLI to inspect queues
  • Tasks can be cancelled via context
  • Auto-shutdown of worker server if at any time server is incapable of respecting a cancel request. Eg. losing connection to nats-server
  • Reconnection to nats-server for automatic failover
  • Cancel tasks at any time by id.
  • Horizontally scalable workers

Something

  • Cancel tasks
  • Fetch task status
  • Automatic failover
  • Monitoring and Alerting

Cancel Tasks

Tasks that are either waiting for execution or being executed on any worker, can be cancelled. Cancellation of a task requires it's taskID.

// Cancel a task by ID
ack, err := client.Enqueue(taskSignature);
client.Cancel(ack.ID)
// If queue name is known, faster way to issue cancel request
ack, err := client.Enqueue(taskSignature);
client.CancelInQueue(ack.ID, "<queue-name>")

Fetch task status

msg, _ := client.Fetch(ack.ID)
msgStatus := msg.GetStatus() // string formatted status

Automatic Failover

ShutdownOnNatsDisconnect option will shutdown workers and server is connection to nats-server is broken. Useful when tasks being cancellable at all times is of utmost importance. Note: When disconnect is observed, workers would stop processing instantly and if any remaining task/s will stay in the message queue ( available to other instances of worker-servers, in multi-deployment )

srv := nq.NewServer(nq.NatsClientOpt{
	Addr: "nats://127.0.0.1:4222",
}, nq.Config{
	ServerName:  nq.GenerateServerName(),
	Concurrency: 2,
	LogLevel:    nq.InfoLevel,
}, nq.NoAuthentcation(), nq.ShutdownOnNatsDisconnect(),
)

Server can configured to not shutdown and instead try to reconnect to nats.

srv := nq.NewServer(nq.NatsClientOpt{
		Addr:          "nats://127.0.0.1:4222",
		ReconnectWait: time.Second * 5,
		MaxReconnects: 100,
	}, nq.Config{ServerName:  "local-serv-1"})

Monitoring and Alerting

Refer nats monitoring section and monitoring tool by nats-io

Quickstart

Install NQ library

go get -u github.com/dumbmachine/nq

Make sure you have nats-server running locally or in a container. Example:

docker run --rm -p 4222:4222 --name nats-server -ti nats:latest -js

Now create a client to publish jobs.


package main

import (
	"context"
	"encoding/json"
	"errors"
	"flag"
	"log"

	"net/http"
	"time"

	"github.com/dumbmachine/nq"
	"github.com/dumbmachine/nq/base"
)

type UrlPayload struct {
	Url string `json:"url"`
}

// Stream / Subject name
const (
	StreamName      = "scrap-url"
	SubjectNameDev  = "scrap-url-dev"
	SubjectNameProd = "scrap-url-prod"
)

func main() {
	log.Info("Startup publisher ...")
	client := NewPublishClient(NatsClientOpt{
		Addr: "nats://127.0.0.1:4222",
	}, config.NoAuthentcation(),
	// see godoc for more options
	)

	defer client.Close()

	stream, err := client.CreateTaskStream(StreamName, []string{
		SubjectNameDev, SubjectNameProd,
	})
	if err != nil {
		panic(err)
	}

	defer stream.Delete()

	b, err := json.Marshal(UrlPayload{Url: "https://httpstat.us/200?sleep=5000"})
	if err != nil {
		log.Println(err)
	}

	task1 := config.NewTask(SubjectNameDev, b)
	if ack, err := stream.Publish(task1); err == nil {
		log.Println("Acknowledgement: ", ack.ID)
	} else {
		log.Println(err)
	}

}

Now create a worker server

package main

import (
	"context"
	"encoding/json"
	"errors"
	"flag"
	"log"

	"net/http"
	"time"

	"github.com/dumbmachine/nq"
	"github.com/dumbmachine/nq/base"
)

type UrlPayload struct {
	Url string `json:"url"`
}

// An example function
func fetchHTML(ctx context.Context, task *TaskPayload) error {
	var payload UrlPayload
	if err := json.Unmarshal(task.Payload, &payload); err != nil {
		return errors.New("invalid payload")
	}
	client := &http.Client{Timeout: 30 * time.Second}
	req, _ := http.NewRequest("GET", payload.Url, nil)
	req = req.WithContext(ctx)
	if _, err := client.Do(req); err != nil {
		return err
	}
	return nil
}

// Stream / Subject name
const (
	StreamName      = "scrap-url"
	SubjectNameDev  = "scrap-url-dev"
	SubjectNameProd = "scrap-url-prod"
)

func main() {
	log.Println("Startup subscriber ...")
	srv := config.NewServer(config.NatsClientOpt{
		Addr: "nats://127.0.0.1:4222",
	}, config.Config{
		ServerName:  "local",
		Concurrency: 1,
		LogLevel:    log.InfoLevel,
	}, config.NoAuthentcation(), config.ShutdownOnNatsDisconnect())

	srv.Register(StreamName, SubjectNameDev, fetchHTML)

	if err := srv.Run(); err != nil {
		panic(err)
	}
}

For more checkout Getting Started To learn more about nq APIs, see godoc

Acknowledgements

Async : Many of the design ideas are taken from async

License

NQ is released under the MIT license. See LICENSE.

Documentation

Overview

nq provides a go package to publish/process tasks via nats

Index

Constants

View Source
const (
	// waiting for task to be recieved by worker
	Pending = iota

	// task is being processed by a worker
	Processing

	// taskFN returns an error
	Failed

	// successfully processed
	Completed

	// cancelled by user
	Cancelled
)

Possible task statuses

View Source
const (
	KVName = "package"
)

Variables

View Source
var ErrFailedToConnect = errors.New("failed to connect to nats client")
View Source
var ErrInvalidTaskPayload = errors.New("invalid task payload") // Happens when malformed data is sent to task-stream
View Source
var ErrQueueNotFound = errors.New("nq: queue not found")
View Source
var ErrServerClosed = errors.New("nq: Server closed")
View Source
var ErrServerNameEmpty = errors.New("server name cannot be empty")
View Source
var ErrStreamNotCreated = errors.New("nq: stream not created")
View Source
var ErrTaskIDEmpty = errors.New("nq: task id cannot be empty")
View Source
var ErrTaskNotFound = errors.New("task not found")

Functions

func CancelStreamNameToStreamName

func CancelStreamNameToStreamName(stream, subject string) string

func EncodeTMToJSON

func EncodeTMToJSON(t *TaskMessage) ([]byte, error)

func GenerateServerName

func GenerateServerName() string

Generates a random UUID for server

For use-cases that rqeuire long running workers, ussage of random name is discouraged

func NoAuthentcation

func NoAuthentcation() noAuthentication

Connect to nats-server without any authentication

Default

func ShutdownOnNatsDisconnect

func ShutdownOnNatsDisconnect() shutdownOnNatsDisconnect

Shutdown server and workers if connection with nats-server is broken. Results in any waiting tasks being cancelled. Option is useful when workers should be `cancellable` at all times.

By default, inactive

func StreamNameToCancelStreamName

func StreamNameToCancelStreamName(subject string) string

streamNameToCancelStreamName returns the name of stream responsible for cancellation of tasks in given stream

func StreamNameToDurableStreamName

func StreamNameToDurableStreamName(srvName, stream string) string

Returns a durable name for stream

Helps re-establishing connection to nats-server while maintaining sequence state

func TokenAuthentication

func TokenAuthentication(username, password string) tokenAuthentication

Connect to nats-server using token authentication

Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/tokens

func UserPassAuthentcation

func UserPassAuthentcation(username, password string) uPassAuthentication

Connect to nats-server using username:password pair

Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/username_passwordß

Types

type CancelPayload

type CancelPayload string

type CancellationStore

type CancellationStore struct {
	// contains filtered or unexported fields
}

TODO: Do we really need a log here?

func NewCancelations

func NewCancelations() CancellationStore

NewCancelations returns a Cancelations instance.

func (*CancellationStore) Add

func (c *CancellationStore) Add(id string, fn context.CancelFunc)

Add adds a new cancel func to the collection.

func (*CancellationStore) Delete

func (c *CancellationStore) Delete(id string)

Delete deletes a cancel func from the collection given an id.

func (*CancellationStore) Get

func (c *CancellationStore) Get(id string) (fn context.CancelFunc, ok bool)

Get returns a cancel func given an id.

type ClientConnectionOption

type ClientConnectionOption interface {
	String() string
	Type() ClientOptionType
	Value() interface{}
}

type ClientOption

type ClientOption struct {
	Timeout              time.Duration //todo
	AuthenticationType   ClientOptionType
	AuthenticationObject interface{}
	NatsOption           []nats.Option
	// Defaults to false
	ShutdownOnNatsDisconnect bool
}

Internal representation of options for nats-server connection

type ClientOptionType

type ClientOptionType int
const (
	// Authentication types
	// TODO: error on using multiple of belwo
	UPassAuthenticationOpt ClientOptionType = iota
	TokenAuthenticationOpt
	NoAuthenticationOpt

	// General options
	ShutdownOnNatsDisconnectOpt
)

type Config

type Config struct {
	// Durable name for this workser. Required to re-establish connection
	// to nats-server
	ServerName string

	// Maximum number of concurrent processing of tasks.
	//
	// If set to a zero or negative value, the number of CPUs usable by the current process is picked.
	Concurrency int

	// Predicate function to determine whether the error returned from a task is an error.
	// If function returns true, Server will retry the task ( bounded by retry-limit set on task )
	//
	// By default, non-nil the function returns true.
	IsFailureFn func(error) bool

	// Logger specifies the logger used by the server instance.
	//
	// go's logger is used by default.
	Logger ilog.Base

	// LogLevel specifies the minimum log level to enable.
	//
	// InfoLevel is used by default.
	LogLevel LogLevel
}

Server config

type HandlerFunc

type HandlerFunc func(context.Context, *TaskPayload) error

The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) ProcessTask

func (fn HandlerFunc) ProcessTask(ctx context.Context, task *TaskPayload) error

ProcessTask calls fn(ctx, task)

type LogLevel

type LogLevel int32

LogLevel represents a log level.

const (

	// DebugLevel is the lowest level of logging.
	// Debug logs are intended for debugging and development purposes.
	DebugLevel LogLevel

	// InfoLevel is used for gener al informational log messages.
	InfoLevel

	// WarnLevel is used for undesired but relatively expected events,
	// which may indicate a problem.
	WarnLevel

	// ErrorLevel is used for undesired and unexpected events that
	// the program can recover from.
	ErrorLevel

	// FatalLevel is used for undesired and unexpected events that
	// the program cannot recover from.
	FatalLevel
)

type NatsBroker

type NatsBroker struct {
	// contains filtered or unexported fields
}

func NewNatsBroker

func NewNatsBroker(conf NatsClientOpt, opt ClientOption, natsConnectionClosed chan struct{}, forceReRegister chan struct{}) (*NatsBroker, error)

NewNatsBroker returns a new instance of NatsBroker.

func (*NatsBroker) AddStream

func (n *NatsBroker) AddStream(conf nats.StreamConfig) error

func (*NatsBroker) Cancel

func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error)

func (*NatsBroker) Close

func (n *NatsBroker) Close() error

func (*NatsBroker) ConnectoQueue

func (n *NatsBroker) ConnectoQueue(q *Queue) error

Creates queue ( stream in nats ) if not exists

func (*NatsBroker) DeleteStream

func (n *NatsBroker) DeleteStream(name string) error

func (*NatsBroker) Ping

func (n *NatsBroker) Ping() error

func (*NatsBroker) Publish

func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error)

func (*NatsBroker) PublishWithMeta

func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error)

TODO: This is toxix

func (*NatsBroker) Submit

func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error)

type NatsClientOpt

type NatsClientOpt struct {
	// nats server address
	Addr string

	// Name for key-value store used to store task metadata
	//
	// Defaults to nq
	DBName string

	// ReconnectWait is an Option to set the wait time between reconnect attempts.
	//
	// Defaults to 10 seconds
	ReconnectWait time.Duration

	// MaxReconnects is an Option to set the maximum number of reconnect attempts.
	//
	// Defaults to 100
	MaxReconnects int
}

NatsClientOpt represent NATS connection configuration option.

type PackagePubAck

type PackagePubAck struct {
	// ID assigned to published message
	ID string
	*nats.PubAck
}

type ProcessingFunc

type ProcessingFunc func(context.Context, *TaskPayload) error

Signature for function executed by a worker. `ProcessingFunc` type are be registered to subjects, process messages published by client

type PublishClient

type PublishClient struct {
	// contains filtered or unexported fields
}

func NewPublishClient

func NewPublishClient(config NatsClientOpt, opts ...ClientConnectionOption) *PublishClient

func (*PublishClient) Cancel

func (p *PublishClient) Cancel(id string) error

Fetch qname from kv store instead

func (*PublishClient) CancelInQueue

func (p *PublishClient) CancelInQueue(id string, qname string) error

Faster than using `Cancel` method, if queue name is known

func (*PublishClient) Close

func (p *PublishClient) Close() error

Also delete stream for cleanup

func (*PublishClient) DeleteQueue

func (p *PublishClient) DeleteQueue(qname string)

Cleanup method

func (*PublishClient) Enqueue

func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error)

func (*PublishClient) Fetch

func (p *PublishClient) Fetch(id string) (*TaskMessage, error)

func (*PublishClient) PublishToSubject

func (p *PublishClient) PublishToSubject(name string, task *Task, opts ...TaskOption) (*TaskMessage, error)

Publish a TaskMessage into a stream

type PublishClientIFace

type PublishClientIFace interface {
	Ping() error
	Close() error

	Fetch(id string) (*TaskMessage, error)

	// Jetstream related
	AddStream(nats.StreamConfig) error
	DeleteStream(name string) error

	// Task submition related
	Publish(subject string, payload []byte) (*nats.PubAck, error)

	// Task cancellation related
	Cancel(subject string, id string) (*TaskMessage, error)
}

type PullAction

type PullAction struct {
	Q            *Queue
	Subscription *nats.Subscription
	Fn           ProcessingFunc
}

type PullStore

type PullStore struct {
	// contains filtered or unexported fields
}

func NewPullStore

func NewPullStore() PullStore

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Internally `Queue`s represent an abstraction over a nats stream -> subject

func NewQueue

func NewQueue(name string) *Queue

func (*Queue) DurableStream

func (q *Queue) DurableStream(prefix string) string

type ResultHandlerIFACE

type ResultHandlerIFACE interface {
	// Get the result of a task in nats kv store
	Get(id string) (*TaskMessage, error)
	// Set the result of a task in nats kv store
	Set(id string, data []byte) error

	GetAllKeys(id string, data []byte) ([]string, error)
}

type ResultHandlerNats

type ResultHandlerNats struct {
	// contains filtered or unexported fields
}

func NewResultHandlerNats

func NewResultHandlerNats(name string, js nats.JetStreamContext) *ResultHandlerNats

func (*ResultHandlerNats) Get

func (rn *ResultHandlerNats) Get(id string) (*TaskMessage, error)

func (*ResultHandlerNats) GetAllKeys

func (rn *ResultHandlerNats) GetAllKeys(id string, data []byte) ([]string, error)

Get all keys from nats key-value store

func (*ResultHandlerNats) Set

func (rn *ResultHandlerNats) Set(id string, data []byte) error

type Server

type Server struct {
	// contains filtered or unexported fields
}

Responsible for task lifecycle management and processing

func NewServer

func NewServer(natsConfig NatsClientOpt, servCfg Config, opts ...ClientConnectionOption) *Server

func (*Server) Register

func (srv *Server) Register(qname string, fn ProcessingFunc)

Subscribe to a stream

func (*Server) Run

func (srv *Server) Run() error

Run starts the task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.

Run returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.

func (*Server) Shutdown

func (srv *Server) Shutdown()

Shutdown gracefully shuts down the server. It gracefully closes all active workers. The server will wait for active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.

func (*Server) Start

func (srv *Server) Start() error

Start starts the worker server. Once the server has started, it pulls tasks off queues and starts a worker goroutine for each task and then call Handler to process it. Tasks are processed concurrently by the workers up to the number of concurrency specified in Config.Concurrency.

Start returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.

func (*Server) Stop

func (srv *Server) Stop()

Stop signals the server to stop pulling new tasks off queues. Stop can be used before shutting down the server to ensure that all currently active tasks are processed before server shutdown.

Stop does not shutdown the server, make sure to call Shutdown before exit.

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(queue string, payload []byte, opts ...TaskOption) *Task

payload is jsonified data of whatever the ProcessingFunc expects

type TaskCancellationMessage

type TaskCancellationMessage struct {
	// ID corresponds to task's ID
	ID string

	// StreamName is the name of stream whose subject is handled by this task
	StreamName string
}

type TaskMessage

type TaskMessage struct {

	// Sequence indicates sequence number of message in nats jetstream
	Sequence uint64

	// ID is a unique identifier for each task, used for cancellation.
	ID string

	// . Autofilled
	StreamName string

	//
	Queue string

	// Payload holds data needed to process the task.
	Payload []byte

	// Status indicated status of task execution
	Status int

	// Timeout specifies timeout in seconds.
	// Use zero to indicate no deadline.
	Timeout int64

	// Deadline specifies the deadline for the task in Unix time.
	// Use zero to indicate no deadline.
	Deadline int64

	// CompletedAt is the time the task was processed successfully in Unix time,
	// the number of seconds elapsed since January 1, 1970 UTC.
	//
	// Negative value indicated cancelled.
	// Use zero to indicate no value.
	CompletedAt int64

	// Current
	CurrentRetry int

	// Total number of retries possible for this task
	MaxRetry int
	// contains filtered or unexported fields
}

func DecodeTMFromJSON

func DecodeTMFromJSON(data []byte) (*TaskMessage, error)

func (*TaskMessage) GetStatus

func (msg *TaskMessage) GetStatus() string

type TaskOption

type TaskOption interface {
	String() string
	Type() TaskOptionType
	Value() interface{}
}

func Deadline

func Deadline(t time.Time) TaskOption

Deadline returns an option to specify the deadline for the given task. If it reaches the deadline before the Handler returns, then the task will be retried.

If there's a conflicting Timeout option, whichever comes earliest will be used.

func Retry

func Retry(n int) TaskOption

func TaskID

func TaskID(id string) TaskOption

TaskID returns an option to specify the task ID.

func Timeout

func Timeout(d time.Duration) TaskOption

Timeout returns an option to specify how long a task may run.

Zero duration means no limit ( math.MaxInt64 is chosen )

If there's a conflicting Deadline option, whichever comes earliest will be used.

type TaskOptionType

type TaskOptionType int
const (
	MaxRetryOpt TaskOptionType = iota
	TaskIDOpt
	// QueueOpt
	TimeoutOpt
	DeadlineOpt
)

type TaskPayload

type TaskPayload struct {
	ID      string
	Payload []byte
}

Directories

Path Synopsis
internal
log
tools module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL