Documentation
¶
Overview ¶
Package messaging provides the logic and data structures that the services will need to communicate with each other over AMQP (as implemented by RabbitMQ).
Index ¶
- Variables
- func StopQueueName(invID string) string
- func StopRequestKey(invID string) string
- func TimeLimitDeltaQueueName(invID string) string
- func TimeLimitDeltaRequestKey(invID string) string
- func TimeLimitRequestKey(invID string) string
- func TimeLimitRequestQueueName(invID string) string
- func TimeLimitResponsesKey(invID string) string
- func TimeLimitResponsesQueueName(invID string) string
- type Client
- func (c *Client) AddConsumer(exchange, exchangeType, queue, key string, handler MessageHandler, ...)
- func (c *Client) AddConsumerMulti(exchange, exchangeType, queue string, keys []string, handler MessageHandler, ...)
- func (c *Client) AddDeletableConsumer(exchange, exchangeType, queue, key string, handler MessageHandler)
- func (c *Client) Close()
- func (c *Client) CreateQueue(name, exchange, key string, durable, autoDelete bool) (*amqp.Channel, error)
- func (c *Client) DeleteQueue(name string) error
- func (c *Client) Listen()
- func (c *Client) Publish(key string, body []byte) error
- func (c *Client) PublishJobUpdate(u *UpdateMessage) error
- func (c *Client) QueueExists(name string) (bool, error)
- func (c *Client) SendStopRequest(invID, user, reason string) error
- func (c *Client) SendTimeLimitDelta(invID, delta string) error
- func (c *Client) SendTimeLimitRequest(invID string) error
- func (c *Client) SendTimeLimitResponse(invID string, timeRemaining int64) error
- func (c *Client) SetupPublishing(exchange string) error
- type Command
- type JobRequest
- type JobState
- type Logger
- type MessageHandler
- type StatusCode
- type StopRequest
- type TimeLimitDelta
- type TimeLimitRequest
- type TimeLimitResponse
- type UpdateMessage
Constants ¶
This section is empty.
Variables ¶
var ( // Info level logger. Can be set by other packages. Defaults to writing to // os.Stdout. Info Logger = log.New(os.Stdout, "", log.Lshortfile) // Warn level logger. Can be set by other packages. Defaults to writing to // os.Stderr. Warn Logger = log.New(os.Stderr, "", log.Lshortfile) // Error level logger. Can be set by other packages. Default to writing to // os.Stderr. Error Logger = log.New(os.Stderr, "", log.Lshortfile) //LaunchCommand is the string used in LaunchCo LaunchCommand = "LAUNCH" //ReindexAllKey is the routing/binding key for full reindex messages. ReindexAllKey = "index.all" //ReindexTemplatesKey is the routing/binding key for templates reindex messages. ReindexTemplatesKey = "index.templates" //IncrementalKey is the routing/binding key for incremental updates IncrementalKey = "metadata.update" //LaunchesKey is the routing/binding key for job launch request messages. LaunchesKey = "jobs.launches" //UpdatesKey is the routing/binding key for job update messages. UpdatesKey = "jobs.updates" //StopsKey is the routing/binding key for job stop request messages. StopsKey = "jobs.stops" //CommandsKey is the routing/binding key for job command messages. CommandsKey = "jobs.commands" // TimeLimitRequestsKey is the routing/binding key for the job time limit messages. TimeLimitRequestsKey = "jobs.timelimits.requests" //TimeLimitDeltaKey is the routing/binding key for the job time limit delta messages. TimeLimitDeltaKey = "jobs.timelimits.deltas" //TimeLimitResponseKey is the routing/binding key for the job time limit //response messages. TimeLimitResponseKey = "jobs.timelimits.responses" //QueuedState is when a job is queued. QueuedState JobState = "Queued" //SubmittedState is when a job has been submitted. SubmittedState JobState = "Submitted" //RunningState is when a job is running. RunningState JobState = "Running" //ImpendingCancellationState is when a job is running but the current step is about //to reach its expiration time. ImpendingCancellationState JobState = "ImpendingCancellation" //SucceededState is when a job has successfully completed the required steps. SucceededState JobState = "Completed" //FailedState is when a job has failed. Duh. FailedState JobState = "Failed" )
Functions ¶
func StopQueueName ¶
StopQueueName returns the formatted queue name for job stop requests. It's based on the passed in string, which is assumed to be the InvocationID for a job, but there's no reason that is required to the case.
func StopRequestKey ¶
StopRequestKey returns the binding key formatted correctly for the jobs exchange based on the InvocationID passed in.
func TimeLimitDeltaQueueName ¶
TimeLimitDeltaQueueName returns the correctly formatted queue name for time limit delta requests. It's based on the passed in string, which is assumed to be the InvocationID for a job, but there's no reason that is required to be the case.
func TimeLimitDeltaRequestKey ¶
TimeLimitDeltaRequestKey returns the binding key formatted correctly for the jobs exchange based on the InvocationID passed in.
func TimeLimitRequestKey ¶
TimeLimitRequestKey returns the formatted binding key based on the passed in job InvocationID.
func TimeLimitRequestQueueName ¶
TimeLimitRequestQueueName returns the formatted queue name for time limit requests. It is based on the passed in job InvocationID.
func TimeLimitResponsesKey ¶
TimeLimitResponsesKey returns the formatted binding key based on the passed in job InvocationID.
func TimeLimitResponsesQueueName ¶
TimeLimitResponsesQueueName returns the formatted queue name for time limit responses. It is based on the passed in job InvocationID.
Types ¶
type Client ¶
type Client struct { Reconnect bool // contains filtered or unexported fields }
Client encapsulates the information needed to interact via AMQP.
func (*Client) AddConsumer ¶
func (c *Client) AddConsumer(exchange, exchangeType, queue, key string, handler MessageHandler, prefetchCount int)
AddConsumer adds a consumer with only one binding, which is usually what you need
func (*Client) AddConsumerMulti ¶
func (c *Client) AddConsumerMulti(exchange, exchangeType, queue string, keys []string, handler MessageHandler, prefetchCount int)
AddConsumerMulti adds a consumer to the list of consumers that need to be created each time the client is set up. Note that this just adds the consumers to a list, it doesn't actually start handling messages yet. You need to call Listen() for that.
func (*Client) AddDeletableConsumer ¶
func (c *Client) AddDeletableConsumer(exchange, exchangeType, queue, key string, handler MessageHandler)
AddDeletableConsumer adds a consumer to the list of consumers that need to be created each time the client is set up. Unlike AddConsumer(), the new consumer will have auto-delete set to true and durable set to false. Make sure that Listen() has been called before calling this function. This only supports a single bind key, for now.
func (*Client) CreateQueue ¶
func (c *Client) CreateQueue(name, exchange, key string, durable, autoDelete bool) (*amqp.Channel, error)
CreateQueue creates a queue with the given name, durability, and auto-delete settings. It then binds it to the given exchange with the provided key. This function does not declare the exchange.
func (*Client) DeleteQueue ¶
DeleteQueue deletes the queue with the given name without regards to safety.
func (*Client) Listen ¶
func (c *Client) Listen()
Listen will wait for messages and pass them off to handlers, which run in their own goroutine.
func (*Client) Publish ¶
Publish sends a message to the configured exchange with a routing key set to the value of 'key'.
func (*Client) PublishJobUpdate ¶
func (c *Client) PublishJobUpdate(u *UpdateMessage) error
PublishJobUpdate sends a mess to the configured exchange with a routing key of "jobs.updates"
func (*Client) QueueExists ¶
QueueExists returns true if the given queue name exists, false or an error otherwise.
func (*Client) SendStopRequest ¶
SendStopRequest sends out a message to the jobs.stops.<invocation_id> topic telling listeners to stop their job.
func (*Client) SendTimeLimitDelta ¶
SendTimeLimitDelta sends out a message to the jobs.timelimits.deltas.<invocationID> topic containing how the job should adjust its timelimit.
func (*Client) SendTimeLimitRequest ¶
SendTimeLimitRequest sends out a message to the job on the "jobs.timelimits.requests.<invocationID>" topic. This should trigger the job to emit a TimeLimitResponse.
func (*Client) SendTimeLimitResponse ¶
SendTimeLimitResponse sends out a message to the jobs.timelimits.responses.<invocationID> topic containing the remaining time for the job.
func (*Client) SetupPublishing ¶
SetupPublishing initializes the publishing functionality of the client. Call this before calling Publish.
type Command ¶
type Command int
Command is tells the receiver of a JobRequest which action to perform
type JobRequest ¶
JobRequest is a generic request type for job related requests.
func NewLaunchRequest ¶
func NewLaunchRequest(j *model.Job) *JobRequest
NewLaunchRequest returns a *JobRequest that has been constructed to be a launch request for the provided job.
type Logger ¶
type Logger interface { Print(args ...interface{}) Printf(format string, args ...interface{}) Println(args ...interface{}) }
type MessageHandler ¶
MessageHandler defines a type for amqp.Delivery handlers.
type StatusCode ¶
type StatusCode int
StatusCode defines a valid exit code for a job.
const ( // Success is the exit code used when the required commands execute correctly. Success StatusCode = iota // StatusDockerPullFailed is the exit code when a 'docker pull' fails. StatusDockerPullFailed // StatusDockerCreateFailed is the exit code when a 'docker create' fails. StatusDockerCreateFailed // StatusInputFailed is the exit code when an input download fails. StatusInputFailed // StatusStepFailed is the exit code when a step in the job fails. StatusStepFailed // StatusOutputFailed is the exit code when the output upload fails. StatusOutputFailed // StatusKilled is the exit code when the job is killed. StatusKilled // StatusTimeLimit is the exit code when the job is killed due to the time // limit being reached. StatusTimeLimit // StatusBadDuration is the exit code when the job is killed because an // unparseable job duration was sent to it. StatusBadDuration )
type StopRequest ¶
StopRequest contains the information needed to stop a job
func NewStopRequest ¶
func NewStopRequest() *StopRequest
NewStopRequest returns a *JobRequest that has been constructed to be a stop request for a running job.
type TimeLimitDelta ¶
TimeLimitDelta is the message that is sent to get road-runner to change its time limit. The 'Delta' field contains a string in Go's Duration string format. More info on the format is available here: https://golang.org/pkg/time/#ParseDuration
type TimeLimitRequest ¶
type TimeLimitRequest struct {
InvocationID string
}
TimeLimitRequest is the message that is sent to road-runner to get it to broadcast its current time limit.
type TimeLimitResponse ¶
TimeLimitResponse is the message that is sent by road-runner in response to a TimeLimitRequest. It contains the current time limit from road-runner.
type UpdateMessage ¶
type UpdateMessage struct { Job *model.Job Version int State JobState Message string SentOn string // Should be the milliseconds since the epoch Sender string // Should be the hostname of the box sending the message. }
UpdateMessage contains the information needed to broadcast a change in state for a job.