Documentation ¶
Overview ¶
nq provides a go package to publish/process tasks via nats
Index ¶
- Constants
- Variables
- func CancelStreamNameToStreamName(stream, subject string) string
- func EncodeTMToJSON(t *TaskMessage) ([]byte, error)
- func GenerateServerName() string
- func NoAuthentcation() noAuthentication
- func ShutdownOnNatsDisconnect() shutdownOnNatsDisconnect
- func StreamNameToCancelStreamName(subject string) string
- func StreamNameToDurableStreamName(srvName, stream string) string
- func TokenAuthentication(username, password string) tokenAuthentication
- func UserPassAuthentcation(username, password string) uPassAuthentication
- type CancelPayload
- type CancellationStore
- type ClientConnectionOption
- type ClientOption
- type ClientOptionType
- type Config
- type Inspector
- type ListenUpdates
- type LogLevel
- type NatsBroker
- func (n *NatsBroker) AddStream(conf nats.StreamConfig) error
- func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error)
- func (n *NatsBroker) Close() error
- func (n *NatsBroker) ConnectoQueue(q *Queue) error
- func (n *NatsBroker) DeleteStream(name string) error
- func (n *NatsBroker) Ping() error
- func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error)
- func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error)
- func (n *NatsBroker) Stats(q *Queue) error
- func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error)
- type NatsClientOpt
- type ProcessingFunc
- type PublishClient
- func (p *PublishClient) Cancel(id string) error
- func (p *PublishClient) CancelInQueue(id string, qname string) error
- func (p *PublishClient) Close() error
- func (p *PublishClient) DeleteQueue(qname string)
- func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error)
- func (p *PublishClient) Fetch(id string) (*TaskMessage, error)
- func (p *PublishClient) GetUpdates(taskID string) (chan *TaskMessage, error)
- func (p *PublishClient) Stats(queue string) error
- type PullAction
- type PullStore
- type Queue
- type ResultHandlerIFACE
- type ResultHandlerNats
- type Server
- type Task
- type TaskCancellationMessage
- type TaskMessage
- type TaskOption
- type TaskOptionType
- type TaskPayload
Constants ¶
const ( // waiting for task to be received by worker Pending = iota // task is being processed by a worker Processing // taskFN returns an error Failed // successfully processed Completed // cancelled by user Cancelled // deleted before being run Deleted )
Possible task statuses
Variables ¶
var ErrCannotCancelDeletedTask = errors.New("deleted task cannot be cancelled") // trying to cancel an already cancelled task
var ErrFailedToConnect = errors.New("failed to connect to nats client")
var ErrInvalidTaskPayload = errors.New("invalid task payload") // Happens when malformed data is sent to task-stream
var ErrNonCancellableState = errors.New("cannot cancel task, in uncancellable state")
var ErrQueueNotFound = errors.New("nq: queue not found")
var ErrServerClosed = errors.New("nq: Server closed")
var ErrServerNameEmpty = errors.New("server name cannot be empty")
var ErrStreamNotCreated = errors.New("nq: stream not created")
var ErrTaskIDEmpty = errors.New("nq: task id cannot be empty")
var ErrTaskNotFound = errors.New("task not found")
Functions ¶
func EncodeTMToJSON ¶
func EncodeTMToJSON(t *TaskMessage) ([]byte, error)
func GenerateServerName ¶
func GenerateServerName() string
Generates a server name, combination of hostname and process id
func NoAuthentcation ¶
func NoAuthentcation() noAuthentication
Connect to nats-server without any authentication
Default
func ShutdownOnNatsDisconnect ¶
func ShutdownOnNatsDisconnect() shutdownOnNatsDisconnect
Shutdown server and workers if connection with nats-server is broken. Results in any executing tasks being cancelled, if not finished in time specified by `shutdownTimeout`. Option is useful when workers should be `cancellable` at all times.
By default, inactive
func StreamNameToCancelStreamName ¶
streamNameToCancelStreamName returns the name of stream responsible for cancellation of tasks in given stream
func StreamNameToDurableStreamName ¶
Returns a durable name for stream
Helps re-establishing connection to nats-server while maintaining sequence state
func TokenAuthentication ¶
func TokenAuthentication(username, password string) tokenAuthentication
Connect to nats-server using token authentication
Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/tokens
func UserPassAuthentcation ¶
func UserPassAuthentcation(username, password string) uPassAuthentication
Connect to nats-server using username:password pair
Read more: https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/username_passwordß
Types ¶
type CancelPayload ¶
type CancelPayload string
type CancellationStore ¶
type CancellationStore struct {
// contains filtered or unexported fields
}
func NewCancelations ¶
func NewCancelations() CancellationStore
NewCancelations returns a Cancelations instance.
func (*CancellationStore) Add ¶
func (c *CancellationStore) Add(id string, fn context.CancelFunc)
Add adds a new cancel func to the collection.
func (*CancellationStore) Delete ¶
func (c *CancellationStore) Delete(id string)
Delete deletes a cancel func from the collection given an id.
func (*CancellationStore) Get ¶
func (c *CancellationStore) Get(id string) (fn context.CancelFunc, ok bool)
Get returns a cancel func given an id.
type ClientConnectionOption ¶
type ClientConnectionOption interface { String() string Type() ClientOptionType Value() interface{} }
type ClientOption ¶
type ClientOption struct { Timeout time.Duration //todo AuthenticationType ClientOptionType AuthenticationObject interface{} NatsOption []nats.Option // Defaults to false ShutdownOnNatsDisconnect bool }
Internal representation of options for nats-server connection
type ClientOptionType ¶
type ClientOptionType int
const ( // Authentication types // TODO: error on using multiple of belwo UPassAuthenticationOpt ClientOptionType = iota TokenAuthenticationOpt NoAuthenticationOpt // General options ShutdownOnNatsDisconnectOpt )
type Config ¶
type Config struct { // Durable name for this workser. Required to re-establish connection // to nats-server ServerName string // Maximum number of concurrent processing of tasks. // // If set to a zero or negative value, the number of CPUs usable by the current process is picked. Concurrency int // Predicate function to determine whether the error returned from a task is an error. // If function returns true, Server will retry the task ( bounded by retry-limit set on task ) // // By default, non-nil the function returns true. IsFailureFn func(error) bool // Logger specifies the logger used by the server instance. // // go's logger is used by default. Logger ilog.Base // LogLevel specifies the minimum log level to enable. // // InfoLevel is used by default. LogLevel LogLevel // ShutdownTimeout specifies the duration to wait to let workers finish their tasks // before forcing them to abort when stopping the server. // // Defaults to timeout of 5 seconds. ShutdownTimeout time.Duration }
Server config
type Inspector ¶ added in v0.3.0
type Inspector struct {
// contains filtered or unexported fields
}
func NewInspector ¶ added in v0.3.0
func NewInspector(broker *NatsBroker) *Inspector
func (*Inspector) AddAnother ¶ added in v0.3.0
type ListenUpdates ¶ added in v0.3.0
type ListenUpdates struct {
// contains filtered or unexported fields
}
type LogLevel ¶
type LogLevel int32
LogLevel represents a log level.
const ( // DebugLevel is the lowest level of logging. // Debug logs are intended for debugging and development purposes. DebugLevel LogLevel // InfoLevel is used for gener al informational log messages. InfoLevel // WarnLevel is used for undesired but relatively expected events, // which may indicate a problem. WarnLevel // ErrorLevel is used for undesired and unexpected events that // the program can recover from. ErrorLevel // FatalLevel is used for undesired and unexpected events that // the program cannot recover from. FatalLevel )
type NatsBroker ¶
type NatsBroker struct {
// contains filtered or unexported fields
}
func NewNatsBroker ¶
func NewNatsBroker(conf NatsClientOpt, opt ClientOption, natsConnectionClosed chan struct{}, forceReRegister chan struct{}) (*NatsBroker, error)
TODO: Allow users to specify `forceReRegister` as a boolean NewNatsBroker returns a new instance of NatsBroker.
func (*NatsBroker) AddStream ¶
func (n *NatsBroker) AddStream(conf nats.StreamConfig) error
func (*NatsBroker) Cancel ¶
func (n *NatsBroker) Cancel(subject string, id string) (*TaskMessage, error)
func (*NatsBroker) Close ¶
func (n *NatsBroker) Close() error
func (*NatsBroker) ConnectoQueue ¶
func (n *NatsBroker) ConnectoQueue(q *Queue) error
Creates queue stream if not exists
Also create underlying nets-stream for queue and cancel-queue
func (*NatsBroker) DeleteStream ¶
func (n *NatsBroker) DeleteStream(name string) error
func (*NatsBroker) Ping ¶
func (n *NatsBroker) Ping() error
func (*NatsBroker) Publish ¶
func (n *NatsBroker) Publish(subject string, payload []byte) (*nats.PubAck, error)
func (*NatsBroker) PublishWithMeta ¶
func (n *NatsBroker) PublishWithMeta(msg *TaskMessage) (*TaskMessage, error)
func (*NatsBroker) Stats ¶
func (n *NatsBroker) Stats(q *Queue) error
Temporary function that fulfill statistic demands from nq-cli
func (*NatsBroker) Submit ¶
func (n *NatsBroker) Submit(subject string, payload TaskPayload) (*TaskMessage, error)
type NatsClientOpt ¶
type NatsClientOpt struct { // nats server address Addr string // Name for key-value store used to store task metadata // // Defaults to nq DBName string // ReconnectWait is an Option to set the wait time between reconnect attempts. // // Defaults to 10 seconds ReconnectWait time.Duration // MaxReconnects is an Option to set the maximum number of reconnect attempts. // // Defaults to 100 MaxReconnects int }
NatsClientOpt represent NATS connection configuration option.
type ProcessingFunc ¶
type ProcessingFunc func(context.Context, *TaskPayload) error
Signature for function executed by a worker. `ProcessingFunc` type are be registered to subjects, process messages published by client
type PublishClient ¶
type PublishClient struct {
// contains filtered or unexported fields
}
Client responsible for interaction with nq tasks
Client is used to enqueue / cancel tasks or fetch metadata for tasks
func NewPublishClient ¶
func NewPublishClient(config NatsClientOpt, opts ...ClientConnectionOption) *PublishClient
NewPublishClient returns a new Client instance, given nats connection options, to interact with nq tasks
func (*PublishClient) Cancel ¶
func (p *PublishClient) Cancel(id string) error
Cancel sends `cancel` request for given task to workers
func (*PublishClient) CancelInQueue ¶
func (p *PublishClient) CancelInQueue(id string, qname string) error
Faster than using `Cancel` method, if queue name is known
func (*PublishClient) Close ¶
func (p *PublishClient) Close() error
Close closes the connection with nats
func (*PublishClient) DeleteQueue ¶
func (p *PublishClient) DeleteQueue(qname string)
Delete a queue
Deletes underlying nats stream assosociated with a queue
func (*PublishClient) Enqueue ¶
func (p *PublishClient) Enqueue(task *Task, opts ...TaskOption) (*TaskMessage, error)
Enqueue can be used to enqueu given task to a queue
Returns TaskMessage and nil error is enqueued successfully, else non-nill error
func (*PublishClient) Fetch ¶
func (p *PublishClient) Fetch(id string) (*TaskMessage, error)
Fetch fetches TaskMessage for given task
func (*PublishClient) GetUpdates ¶ added in v0.3.0
func (p *PublishClient) GetUpdates(taskID string) (chan *TaskMessage, error)
GetUpdates can be used get changes to a task's metadata
Returns error if failed to start watching for changes Channel is closed, once task reaches terminal state
func (*PublishClient) Stats ¶
func (p *PublishClient) Stats(queue string) error
type PullAction ¶
type PullAction struct { Q *Queue Subscription *nats.Subscription Fn ProcessingFunc }
type PullStore ¶
type PullStore struct {
// contains filtered or unexported fields
}
func NewPullStore ¶
func NewPullStore() PullStore
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Internal `Queue`s represent an abstraction over a nats stream -> subject
func (*Queue) DurableStream ¶
type ResultHandlerIFACE ¶
type ResultHandlerNats ¶
type ResultHandlerNats struct {
// contains filtered or unexported fields
}
func NewResultHandlerNats ¶
func NewResultHandlerNats(name string, js nats.JetStreamContext) *ResultHandlerNats
func (*ResultHandlerNats) Get ¶
func (rn *ResultHandlerNats) Get(id string) (*TaskMessage, error)
func (*ResultHandlerNats) GetAllKeys ¶
func (rn *ResultHandlerNats) GetAllKeys(id string, data []byte) ([]string, error)
Get all keys from nats key-value store
func (*ResultHandlerNats) Watch ¶ added in v0.3.0
func (rn *ResultHandlerNats) Watch(id string) (chan *TaskMessage, error)
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Responsible for task lifecycle management and processing
func NewServer ¶
func NewServer(natsConfig NatsClientOpt, servCfg Config, opts ...ClientConnectionOption) *Server
func (*Server) Register ¶
func (srv *Server) Register(qname string, fn ProcessingFunc)
Subscribe to a stream
func (*Server) Run ¶
Run starts the task processing and blocks until an os signal to exit the program is received. Once it receives a signal, it gracefully shuts down all active workers and other goroutines to process the tasks.
Run returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.
func (*Server) Shutdown ¶
func (srv *Server) Shutdown()
Shutdown gracefully shuts down the server. It gracefully closes all active workers. The server will wait for active workers to finish processing tasks for duration specified in Config.ShutdownTimeout. If worker didn't finish processing a task during the timeout, the task will be pushed back to Redis.
func (*Server) Start ¶
Start starts the worker server. Once the server has started, it pulls tasks off queues and starts a worker goroutine for each task and then call Handler to process it. Tasks are processed concurrently by the workers up to the number of concurrency specified in Config.Concurrency.
Start returns any error encountered at server startup time. If the server has already been shutdown, ErrServerClosed is returned.
func (*Server) Stop ¶
func (srv *Server) Stop()
Stop signals the server to stop pulling new tasks off queues. Stop can be used before shutting down the server to ensure that all currently active tasks are processed before server shutdown.
Stop does not shutdown the server, make sure to call Shutdown before exit.
type Task ¶
type Task struct {
// contains filtered or unexported fields
}
Task is a representation work to be performed by a worker
type TaskCancellationMessage ¶
type TaskMessage ¶
type TaskMessage struct { // Sequence indicates sequence number of message in nats jetstream Sequence uint64 // ID is a unique identifier for each task, used for cancellation. ID string // . Autofilled StreamName string // Queue string // Payload holds data needed to process the task. Payload []byte // Status indicated status of task execution Status int // Timeout specifies timeout in seconds. // Use zero to indicate no deadline. Timeout int64 // Deadline specifies the deadline for the task in Unix time. // Use zero to indicate no deadline. Deadline int64 // CompletedAt is the time the task was processed successfully in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // // Negative value indicated cancelled. // Use zero to indicate no value. CompletedAt int64 // Current retry count // // autofilled CurrentRetry int // Total number of retries possible for this task MaxRetry int // contains filtered or unexported fields }
func DecodeTMFromJSON ¶
func DecodeTMFromJSON(data []byte) (*TaskMessage, error)
func (*TaskMessage) GetStatus ¶
func (msg *TaskMessage) GetStatus() string
type TaskOption ¶
type TaskOption interface { String() string Type() TaskOptionType Value() interface{} }
func Deadline ¶
func Deadline(t time.Time) TaskOption
Deadline returns an option to specify the deadline for the given task.
If both Deadline and Timeout options are set, whichever comes earliest will be used.
func Retry ¶
func Retry(n int) TaskOption
Returns an options to specify maximum number of times a task will be retried before being marked as failed.
-ve retry count is assigned defaultRetry ( 0 )
func Timeout ¶
func Timeout(d time.Duration) TaskOption
Timeout returns an option to specify how long a task can run before being cancelled.
Zero duration means no limit ( math.MaxInt32 )
If both Deadline and Timeout options are set, whichever comes earliest will be used.
type TaskOptionType ¶
type TaskOptionType int
const ( MaxRetryOpt TaskOptionType = iota TaskIDOpt // QueueOpt TimeoutOpt DeadlineOpt )