Documentation
¶
Overview ¶
nq provides a go package to publish/process tasks via nats
Index ¶
- Constants
- Variables
- func CancelStreamNameToStreamName(stream, subject string) string
- func EncodeTMToJSON(t *TaskMessage) ([]byte, error)
- func GenerateServerName() string
- func NoAuthentcation() noAuthentication
- func ShutdownOnNatsDisconnect() shutdownOnNatsDisconnect
- func StreamNameToCancelStreamName(subject string) string
- func StreamNameToDurableStreamName(srvName, stream string) string
- func TokenAuthentication(username, password string) tokenAuthentication
- func UserPassAuthentcation(username, password string) uPassAuthentication
- type CancelPayload
- type CancellationStore
- type ClientConnectionOption
- type ClientOption
- type ClientOptionType
- type Config
- type HandlerFunc
- type LogLevel
- type NatsBroker
- func (n *NatsBroker) AddStream(conf nats.StreamConfig) error
- func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error)
- func (n *NatsBroker) Close() error
- func (n *NatsBroker) ConnectoQueue(q *Queue) error
- func (n *NatsBroker) DeleteStream(name string) error
- func (n *NatsBroker) Ping() error
- func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error)
- func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error)
- func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error)
- type NatsClientOpt
- type PackagePubAck
- type ProcessingFunc
- type PublishClient
- func (p *PublishClient) Cancel(id string) error
- func (p *PublishClient) CancelInQueue(id string, qname string) error
- func (p *PublishClient) Close() error
- func (p *PublishClient) DeleteQueue(qname string)
- func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error)
- func (p *PublishClient) Fetch(id string) (*TaskMessage, error)
- func (p *PublishClient) PublishToSubject(name string, task *Task, opts ...TaskOption) (*TaskMessage, error)
- type PublishClientIFace
- type PullAction
- type PullStore
- type Queue
- type ResultHandlerIFACE
- type ResultHandlerNats
- type Server
- type Task
- type TaskCancellationMessage
- type TaskMessage
- type TaskOption
- type TaskOptionType
- type TaskPayload
Constants ¶
const ( // waiting for task to be recieved by worker Pending = iota // task is being processed by a worker Processing // taskFN returns an error Failed // successfully processed Completed // cancelled by user Cancelled )
Possible task statuses
const (
KVName = "package"
)
Variables ¶
var ErrFailedToConnect = errors.New("failed to connect to nats client")
var ErrInvalidTaskPayload = errors.New("invalid task payload") // Happens when malformed data is sent to task-stream
var ErrQueueNotFound = errors.New("nq: queue not found")
var ErrServerClosed = errors.New("nq: Server closed")
var ErrServerNameEmpty = errors.New("server name cannot be empty")
var ErrStreamNotCreated = errors.New("nq: stream not created")
var ErrTaskIDEmpty = errors.New("nq: task id cannot be empty")
var ErrTaskNotFound = errors.New("task not found")
Functions ¶
func EncodeTMToJSON ¶
func EncodeTMToJSON(t *TaskMessage) ([]byte, error)
func GenerateServerName ¶
func GenerateServerName() string
Generates a random UUID for server
For use-cases that rqeuire long running workers, ussage of random name is discouraged
func NoAuthentcation ¶
func NoAuthentcation() noAuthentication
Connect to nats-server without any authentication
Default
func ShutdownOnNatsDisconnect ¶
func ShutdownOnNatsDisconnect() shutdownOnNatsDisconnect
Shutdown server and workers if connection with nats-server is broken. Results in any waiting tasks being cancelled. Option is useful when workers should be `cancellable` at all times.
By default, inactive
func StreamNameToCancelStreamName ¶
streamNameToCancelStreamName returns the name of stream responsible for cancellation of tasks in given stream
func StreamNameToDurableStreamName ¶
Returns a durable name for stream
Helps re-establishing connection to nats-server while maintaining sequence state
func TokenAuthentication ¶
func TokenAuthentication(username, password string) tokenAuthentication
Connect to nats-server using token authentication
Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/tokens
func UserPassAuthentcation ¶
func UserPassAuthentcation(username, password string) uPassAuthentication
Connect to nats-server using username:password pair
Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/username_passwordß
Types ¶
type CancelPayload ¶
type CancelPayload string
type CancellationStore ¶
type CancellationStore struct {
// contains filtered or unexported fields
}
TODO: Do we really need a log here?
func NewCancelations ¶
func NewCancelations() CancellationStore
NewCancelations returns a Cancelations instance.
func (*CancellationStore) Add ¶
func (c *CancellationStore) Add(id string, fn context.CancelFunc)
Add adds a new cancel func to the collection.
func (*CancellationStore) Delete ¶
func (c *CancellationStore) Delete(id string)
Delete deletes a cancel func from the collection given an id.
func (*CancellationStore) Get ¶
func (c *CancellationStore) Get(id string) (fn context.CancelFunc, ok bool)
Get returns a cancel func given an id.
type ClientConnectionOption ¶
type ClientConnectionOption interface {
String() string
Type() ClientOptionType
Value() interface{}
}
type ClientOption ¶
type ClientOption struct {
Timeout time.Duration //todo
AuthenticationType ClientOptionType
AuthenticationObject interface{}
NatsOption []nats.Option
// Defaults to false
ShutdownOnNatsDisconnect bool
}
Internal representation of options for nats-server connection
type ClientOptionType ¶
type ClientOptionType int
const ( // Authentication types // TODO: error on using multiple of belwo UPassAuthenticationOpt ClientOptionType = iota TokenAuthenticationOpt NoAuthenticationOpt // General options ShutdownOnNatsDisconnectOpt )
type Config ¶
type Config struct {
// Durable name for this workser. Required to re-establish connection
// to nats-server
ServerName string
// Maximum number of concurrent processing of tasks.
//
// If set to a zero or negative value, the number of CPUs usable by the current process is picked.
Concurrency int
// Predicate function to determine whether the error returned from a task is an error.
// If function returns true, Server will retry the task ( bounded by retry-limit set on task )
//
// By default, non-nil the function returns true.
IsFailureFn func(error) bool
// Logger specifies the logger used by the server instance.
//
// go's logger is used by default.
Logger ilog.Base
// LogLevel specifies the minimum log level to enable.
//
// InfoLevel is used by default.
LogLevel LogLevel
}
Server config
type HandlerFunc ¶
type HandlerFunc func(context.Context, *TaskPayload) error
The HandlerFunc type is an adapter to allow the use of ordinary functions as a Handler. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.
func (HandlerFunc) ProcessTask ¶
func (fn HandlerFunc) ProcessTask(ctx context.Context, task *TaskPayload) error
ProcessTask calls fn(ctx, task)
type LogLevel ¶
type LogLevel int32
LogLevel represents a log level.
const ( // DebugLevel is the lowest level of logging. // Debug logs are intended for debugging and development purposes. DebugLevel LogLevel // InfoLevel is used for gener al informational log messages. InfoLevel // WarnLevel is used for undesired but relatively expected events, // which may indicate a problem. WarnLevel // ErrorLevel is used for undesired and unexpected events that // the program can recover from. ErrorLevel // FatalLevel is used for undesired and unexpected events that // the program cannot recover from. FatalLevel )
type NatsBroker ¶
type NatsBroker struct {
// contains filtered or unexported fields
}
func NewNatsBroker ¶
func NewNatsBroker(conf NatsClientOpt, opt ClientOption, natsConnectionClosed chan struct{}, forceReRegister chan struct{}) (*NatsBroker, error)
NewNatsBroker returns a new instance of NatsBroker.
func (*NatsBroker) AddStream ¶
func (n *NatsBroker) AddStream(conf nats.StreamConfig) error
func (*NatsBroker) Cancel ¶
func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error)
func (*NatsBroker) Close ¶
func (n *NatsBroker) Close() error
func (*NatsBroker) ConnectoQueue ¶
func (n *NatsBroker) ConnectoQueue(q *Queue) error
Creates queue ( stream in nats ) if not exists
func (*NatsBroker) DeleteStream ¶
func (n *NatsBroker) DeleteStream(name string) error
func (*NatsBroker) Ping ¶
func (n *NatsBroker) Ping() error
func (*NatsBroker) Publish ¶
func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error)
func (*NatsBroker) PublishWithMeta ¶
func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error)
TODO: This is toxix
func (*NatsBroker) Submit ¶
func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error)
type NatsClientOpt ¶
type NatsClientOpt struct {
// nats server address
Addr string
// Name for key-value store used to store task metadata
//
// Defaults to nq
DBName string
// ReconnectWait is an Option to set the wait time between reconnect attempts.
//
// Defaults to 10 seconds
ReconnectWait time.Duration
// MaxReconnects is an Option to set the maximum number of reconnect attempts.
//
// Defaults to 100
MaxReconnects int
}
NatsClientOpt represent NATS connection configuration option.
type PackagePubAck ¶
type PackagePubAck struct {
// ID assigned to published message
ID string
*nats.PubAck
}
type ProcessingFunc ¶
type ProcessingFunc func(context.Context, *TaskPayload) error
Signature for function executed by a worker. `ProcessingFunc` type are be registered to subjects, process messages published by client
type PublishClient ¶
type PublishClient struct {
// contains filtered or unexported fields
}
func NewPublishClient ¶
func NewPublishClient(config NatsClientOpt, opts ...ClientConnectionOption) *PublishClient
func (*PublishClient) Cancel ¶
func (p *PublishClient) Cancel(id string) error
Fetch qname from kv store instead
func (*PublishClient) CancelInQueue ¶
func (p *PublishClient) CancelInQueue(id string, qname string) error
Faster than using `Cancel` method, if queue name is known
func (*PublishClient) DeleteQueue ¶
func (p *PublishClient) DeleteQueue(qname string)
Cleanup method
func (*PublishClient) Enqueue ¶
func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error)
func (*PublishClient) Fetch ¶
func (p *PublishClient) Fetch(id string) (*TaskMessage, error)
func (*PublishClient) PublishToSubject ¶
func (p *PublishClient) PublishToSubject(name string, task *Task, opts ...TaskOption) (*TaskMessage, error)
Publish a TaskMessage into a stream
type PublishClientIFace ¶
type PublishClientIFace interface {
Ping() error
Close() error
Fetch(id string) (*TaskMessage, error)
// Jetstream related
AddStream(nats.StreamConfig) error
DeleteStream(name string) error
// Task submition related
Publish(subject string, payload []byte) (*nats.PubAck, error)
// Task cancellation related
Cancel(subject string, id string) (*TaskMessage, error)
}
type PullAction ¶
type PullAction struct {
Q *Queue
Subscription *nats.Subscription
Fn ProcessingFunc
}
type PullStore ¶
type PullStore struct {
// contains filtered or unexported fields
}
func NewPullStore ¶
func NewPullStore() PullStore
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Internally `Queue`s represent an abstraction over a nats stream -> subject
func (*Queue) DurableStream ¶
type ResultHandlerIFACE ¶
type ResultHandlerNats ¶
type ResultHandlerNats struct {
// contains filtered or unexported fields
}
func NewResultHandlerNats ¶
func NewResultHandlerNats(name string, js nats.JetStreamContext) *ResultHandlerNats
func (*ResultHandlerNats) Get ¶
func (rn *ResultHandlerNats) Get(id string) (*TaskMessage, error)
func (*ResultHandlerNats) GetAllKeys ¶
func (rn *ResultHandlerNats) GetAllKeys(id string, data []byte) ([]string, error)
Get all keys from nats key-value store
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Responsible for task lifecycle management and processing
func NewServer ¶
func NewServer(natsConfig NatsClientOpt, servCfg Config, opts ...ClientConnectionOption) *Server
func (*Server) Register ¶
func (srv *Server) Register(qname string, fn ProcessingFunc)
Subscribe to a stream
func (*Server) Run ¶
Run starts the task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.
Run returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.
func (*Server) Shutdown ¶
func (srv *Server) Shutdown()
Shutdown gracefully shuts down the server. It gracefully closes all active workers. The server will wait for active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
func (*Server) Start ¶
Start starts the worker server. Once the server has started, it pulls tasks off queues and starts a worker goroutine for each task and then call Handler to process it. Tasks are processed concurrently by the workers up to the number of concurrency specified in Config.Concurrency.
Start returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.
func (*Server) Stop ¶
func (srv *Server) Stop()
Stop signals the server to stop pulling new tasks off queues. Stop can be used before shutting down the server to ensure that all currently active tasks are processed before server shutdown.
Stop does not shutdown the server, make sure to call Shutdown before exit.
type TaskCancellationMessage ¶
type TaskMessage ¶
type TaskMessage struct {
// Sequence indicates sequence number of message in nats jetstream
Sequence uint64
// ID is a unique identifier for each task, used for cancellation.
ID string
// . Autofilled
StreamName string
//
Queue string
// Payload holds data needed to process the task.
Payload []byte
// Status indicated status of task execution
Status int
// Timeout specifies timeout in seconds.
// Use zero to indicate no deadline.
Timeout int64
// Deadline specifies the deadline for the task in Unix time.
// Use zero to indicate no deadline.
Deadline int64
// CompletedAt is the time the task was processed successfully in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
//
// Negative value indicated cancelled.
// Use zero to indicate no value.
CompletedAt int64
// Current
CurrentRetry int
// Total number of retries possible for this task
MaxRetry int
// contains filtered or unexported fields
}
func DecodeTMFromJSON ¶
func DecodeTMFromJSON(data []byte) (*TaskMessage, error)
func (*TaskMessage) GetStatus ¶
func (msg *TaskMessage) GetStatus() string
type TaskOption ¶
type TaskOption interface {
String() string
Type() TaskOptionType
Value() interface{}
}
func Deadline ¶
func Deadline(t time.Time) TaskOption
Deadline returns an option to specify the deadline for the given task. If it reaches the deadline before the Handler returns, then the task will be retried.
If there's a conflicting Timeout option, whichever comes earliest will be used.
func Retry ¶
func Retry(n int) TaskOption
func Timeout ¶
func Timeout(d time.Duration) TaskOption
Timeout returns an option to specify how long a task may run.
Zero duration means no limit ( math.MaxInt64 is chosen )
If there's a conflicting Deadline option, whichever comes earliest will be used.
type TaskOptionType ¶
type TaskOptionType int
const ( MaxRetryOpt TaskOptionType = iota TaskIDOpt // QueueOpt TimeoutOpt DeadlineOpt )