Documentation
¶
Overview ¶
Package dispatcher is an asynchronous task queue/job queue based on distributed message passing.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Log ¶
type Log interface { Info(args ...interface{}) Infof(format string, args ...interface{}) Debug(args ...interface{}) Debugf(format string, args ...interface{}) Error(args ...interface{}) Errorf(format string, args ...interface{}) }
Log is an interface of logger which is used in dispatcher. By default dispatcher uses logrus. You can pass your own logger which fits this interface to dispatcher in server config.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server contains AMQP connection and creates publisher. Server is a parent of workers and publisher.
func NewServer ¶
func NewServer(cfg *ServerConfig) (*Server, chan struct{}, error)
NewServer creates new server from config and connects to AMQP.
func (*Server) Close ¶
func (s *Server) Close()
Close is a complicated function which handles graceful quit of everything which dispatcher has (workers, publisher and connection). At first it stops reconnection process, then it closes publisher, after this it closes all workers and waits until all of them will finish their tasks and closes their channels. After all of this it closes AMQP connection.
func (*Server) GetWorkerByName ¶
GetWorkerByName returns a pointer to a Worker by its name.
func (*Server) NewWorker ¶
func (s *Server) NewWorker(cfg *WorkerConfig, tasks map[string]TaskConfig) (*Worker, error)
NewWorker creates new worker instance. Takes WorkerConfig and map of TaskConfigs. Map of TaskConfigs needs for task registration inside of this worker.
type ServerConfig ¶
type ServerConfig struct { AMQPConnectionString string ReconnectionRetriesForever bool ReconnectionRetries int ReconnectionIntervalSeconds int64 TLSConfig *tls.Config SecureConnection bool DebugMode bool // for default logger only InitQueues []Queue Exchange string // required DefaultRoutingKey string // required Logger Log }
ServerConfig is a configuration which needs for server creation.
AMQPConnectionString example: amqp://guest:guest@localhost:5672/
ReconnectionRetries - number of reconnection retries, when all retries exceed, server will be closed.
ReconnectionIntervalSeconds - interval in seconds between every retry.
SecureConnection - if true, uses TLSConfig with param InsecureSkipVerify.
DebugMode - if true, enables debug level in logger (by default dispatcher uses logrus and this option enables debug level in it, if you use your own logger, omit this option).
InitQueues - pass queues and binding keys to this field and server will create all of them if they don't exists.
DefaultRoutingKey - default routing key for publishing messages.
Logger - custom logger if you don't want to use dispatcher's default logrus.
type Task ¶
type Task struct { UUID string `json:"uuid"` Name string `json:"name"` RoutingKey string `json:"-"` Args []TaskArgument `json:"args"` Headers map[string]interface{} `json:"-"` }
Task is a task which can be send to AMQP. Workers receive this tasks and handles them via parsing their arguments. You can pass exchange and routing key to task if you want, they will be used in publish function.
type TaskArgument ¶
type TaskArgument struct { Type string `json:"type"` Value interface{} `json:"value"` }
TaskArgument is an argument which will be passed to function. For example, task with such arguments will call the following function:
Arguments:
t := []TaskArgument{ TaskArgument{ Type: "int", Value: 3, }, TaskArgument{ Type: "string", Value: "I am a string", }, }
Function:
func (myInt int, myAwesomeString string) error {}
Types which can be used: bool, string, int int8 int16 int32 int64, uint uint8 uint16 uint32 uint64, float32 float64
type TaskConfig ¶
TaskConfig is task configuration which is needed for task registration in worker. Contains function which will be called by worker and timeout. Timeout is needed in case your task executing for about half an hour but you expected only 1 minute. When timeout exceeded next task will be taken, but that old task will not be stopped. TaskUUIDAsFirstArg - makes task UUID as first argument of all tasks which this worker calls.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker instance. Consists of channel which consume queue.
func (*Worker) Close ¶
func (w *Worker) Close()
Close function gracefully closes worker. At first this function stops worker consuming, then waits until all started by this worker tasks will be finished after all of this it closes channel. This function is also used by server close function for graceful quit of all workers.
type WorkerConfig ¶
type WorkerConfig struct { Limit int Queue string // required BindingKeys []string Name string // required }
WorkerConfig is a configuration for new worker which you want to create.
Limit - number of parallel tasks which will be executed.
Queue - name of queue which worker will consume.
Binding keys - biding keys for queue which will be created.
Name - worker name