Package distributor contains all the adaptors for the various supported distributor protocols. At a high level, it works like this:

* Quests specify a distributor configuration by name as part of their
* When an Execution for that Quest NeedsExecution, DM reads configuration
  (distributor.proto) from luci-config. This configuration is stored
  as part of the Execution so that for the duration of a given Exectuion,
  DM always interacts with the same distributor in the same way (barring
  code changes in DM's adapter logic itself).
* DM uses the selected distributor implementation to start a task and
  record its Token. Additionally, the distributor SHOULD publish on DM's
  pubsub topic to update DM's state. When publishing updates, the
  distributor MUST include the token returned from PrepareTopic (or else
  the published update will be ignored).
* When DM gets a hit on pubsub, it will load the Execution, load its cached
  distributor configuration, and then call HandleNotification for the
  adapter to parse the notification body and return the state of the task.

Adding a new distributor requires:

* Add a new subdir of protos with the configuration proto for the new
  distributor. Each distributor implementation must have its own unique
  Config message.
* Add a matching subdir of this package for the implementation of the
* In the implementation, add a Register method that registers the
  implementation with this package appropriately.
* In the DM frontend, import your new package implementation and run its
  Register method.



This section is empty.


This section is empty.


func InstallHandlers

func InstallHandlers(r *router.Router, base router.MiddlewareChain)

InstallHandlers installs the taskqueue callback handler.

The `base` middleware must have a registry installed with WithRegistry.

func PubsubReceiver

func PubsubReceiver(ctx *router.Context)

PubsubReceiver is the HTTP handler that processes incoming pubsub events delivered to topics prepared with TaskDescription.PrepareTopic, and routes them to the appropriate distributor implementation's HandleNotification method.

It requires that a Registry be installed in c via WithRegistry.

func TaskQueueHandler

func TaskQueueHandler(ctx *router.Context)

TaskQueueHandler is the http handler that routes taskqueue tasks made with Config.EnqueueTask to a distributor's HandleTaskQueueTask method.

This requires that ctx.Context already have a Registry installed via the WithRegistry method.

func WithRegistry

func WithRegistry(c context.Context, r Registry) context.Context

WithRegistry adds the registry to the Context.


type Config

type Config struct {
	// DMHost is the host for the DM API. This may be used by the distributor
	// implementation to pass to jobs so that they can call back into DM's api.
	DMHost string

	// Name is the name of this distributor configuration. This is always the
	// fully-resolved name of the configuration (i.e. aliases are dereferenced).
	Name string

	// Version is the version of the distributor configuration retrieved from
	// luci-config.
	Version string

	// Content is the actual parsed implementation-specific configuration.
	Content proto.Message

Config represents the configuration for a single instance of a given distributor implementation at a given point in time (e.g. version).

func (*Config) EnqueueTask

func (cfg *Config) EnqueueTask(c context.Context, tsk *tq.Task) error

EnqueueTask allows a Distributor to enqueue a TaskQueue task that will be handled by the Distributor's HandleTaskQueueTask method.

func (*Config) PrepareTopic

func (cfg *Config) PrepareTopic(c context.Context, eid *dm.Execution_ID) (topic pubsub.Topic, token string, err error)

PrepareTopic returns a pubsub topic that notifications should be sent to, and is meant to be called from the D.Run method.

It returns the full name of the topic and a token that will be used to route PubSub messages back to the Distributor. The publisher to the topic must be instructed to put the token into the 'auth_token' attribute of PubSub messages. DM will know how to route such messages to D.HandleNotification.

type D

type D interface {
	// Run prepares and runs a new Task from the given parameters.
	// Scheduling the same execution ID multiple times SHOULD return the same
	// Token. It's OK if this doesn't happen, but only one of the scheduled tasks
	// will be able to invoke ActivateExecution; the other one(s) will
	// early-abort and/or timeout.
	// If this returns a non-Transient error, the Execution will be marked as
	// Rejected with the returned error message as the 'Reason'.
	// The various time durations, if non-zero, will be used verbatim for DM to
	// timeout that phase of the task's execution. If the task's execution times
	// out in the 'STOPPING' phase, DM will poll the distributor's GetStatus
	// method up to 3 times with a 30-second gap to attempt to retrieve the final
	// information. After more than 3 times, DM will give up and mark the task as
	// expired.
	// If the distributor doesn't intend to use Pubsub for notifying DM about the
	// final status of the job, set pollTimeout to the amount of time you want DM
	// to wait before polling GetStatus. e.g. if after calling FinishAttempt or
	// EnsureGraphData your distributor needs 10 seconds before it can correctly
	// respond to a GetStatus request, you should set pollTimeout to >= 10s.
	// Otherwise pollTimeout should be set fairly high (e.g. 12 hours) as a hedge
	// against a broken pubsub notification pipeline.
	// If you have the choice between pubsub or not, prefer to use pubsub as it
	// allows DM to more proactively update the graph state (and unblock waiting
	// Attempts, etc.)
	Run(qst *dm.Quest_Desc, auth *dm.Execution_Auth, prevResult *dm.JsonResult) (tok Token, pollTimeout time.Duration, err error)

	// Cancel attempts to cancel a running task. If a task is canceled more than
	// once, this should return nil.
	Cancel(*dm.Quest_Desc, Token) error

	// GetStatus retrieves the current state of the task from the distributor.
	// If this returns a non-Transient error more than 30 seconds after the task
	// was Run(), the execution will be marked Missing with the returned error
	// message as the 'Reason'. If it returns a non-Transient error within 30
	// seconds of being run, DM will automatically treat that as Transient.
	GetStatus(*dm.Quest_Desc, Token) (*dm.Result, error)

	// InfoURL calculates a user-presentable information url for the task
	// identified by Token. This should be a local operation, so it is not the
	// implementation's responsibility to validate the token in this method (e.g.
	// it could point to a non-existent job, etc.)
	InfoURL(Token) string

	// HandleNotification is called whenever DM receives a PubSub message sent to
	// a topic created with Config.PrepareTopic. The Attrs map will omit
	// the 'auth_token' field.
	// Returning (nil, nil) will indicate that DM should ignore this notification.
	// DM will convert pubsub Messages to a delayed GetStatus if a pubsub message
	// is delivered which refers to an Attempt whose status is NeedsExecution,
	// which could happen in the event of a not-fully-settled transacion.
	// DM will ignore any notifications for executions which it doesn't know
	// about.
	HandleNotification(qst *dm.Quest_Desc, notification *Notification) (*dm.Result, error)

	// HandleTaskQueueTask is called if the distributor used Config.EnqueueTask.
	// It may return zero or more Notifications for DM about arbitrary Executions.
	// These notifications will be handled 'later' by the HandleNotification
	// implementation.
	HandleTaskQueueTask(*http.Request) ([]*Notification, error)

	// Validate should return a non-nil error if the given distributor parameters
	// are not appropriate for this Distributor. Payload is guaranteed to be
	// a valid JSON object. This should validate that the content of that JSON
	// object is what the distributor expects.
	Validate(parameters string) error

D is the interface for all distributor implementations.


Unless otherwise noted, DM will retry methods here if they return an error marked as Transient, up to some internal limit. If they return a non-Transient error (or nil) DM will make a best effort not to duplicate calls, but it can't guarantee that.

type Factory

type Factory func(c context.Context, dist *Config) (D, error)

Factory is a function which produces new distributor instance with the provided configuration proto.

c is guaranteed to be non-transactional.

type FactoryMap

type FactoryMap map[proto.Message]Factory

FactoryMap maps nil proto.Message instances (e.g. (*MyMessage)(nil)) to the factory which knows how to turn a Message of that type into a distributor.

type FinishExecutionFn

type FinishExecutionFn func(c context.Context, eid *dm.Execution_ID, rslt *dm.Result) ([]tumble.Mutation, error)

FinishExecutionFn is required to eliminate a circular dependency between mutate <-> distributor. Essentially this just makes a new mutate.FinishExecution.

See mutate.FinishExecutionFn for the only actual implementation of this.

type Notification

type Notification struct {
	ID    *dm.Execution_ID
	Data  []byte
	Attrs map[string]string

Notification represents a notification from the distributor to DM that a particular execution has a status update. Data and Attrs are interpreted purely by the distributor implementation.

type NotifyExecution

type NotifyExecution struct {
	CfgName      string
	Notification *Notification

NotifyExecution is used to finish an execution. Specifically it allows the appropriate distributor to HandleNotification, and then when that concludes, invokes DM's FinishExecution (see mutate.FinishExecution).

func (*NotifyExecution) RollForward

func (f *NotifyExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

RollForward implements tumble.Mutation.

func (*NotifyExecution) Root

func (f *NotifyExecution) Root(c context.Context) *ds.Key

Root implements tumble.Mutation.

type Registry

type Registry interface {
	FinishExecution(c context.Context, eid *dm.Execution_ID, rslt *dm.Result) ([]tumble.Mutation, error)

	// MakeDistributor builds a distributor instance that's configured with the
	// provided config.
	// The configuration for this distributor are obtained from luci-config at the
	// time an Execution is started.
	MakeDistributor(c context.Context, cfgName string) (d D, ver string, err error)

Registry holds a collection of all of the available distributor types.

func GetRegistry

func GetRegistry(c context.Context) Registry

GetRegistry gets the registry from the Context. This will return nil if the Context does not contain a Registry.

func NewRegistry

func NewRegistry(mapping FactoryMap, fFn FinishExecutionFn) Registry

NewRegistry builds a new implementation of Registry configured to load configuration data from luci-config.

The mapping should hold nil-ptrs of various config protos -> respective Factory. When loading from luci-config, when we see a given message type, we'll construct the distributor instance using the provided Factory.

func NewTestingRegistry

func NewTestingRegistry(mocks TestFactoryMap, fFn FinishExecutionFn) Registry

NewTestingRegistry returns a new testing registry.

The mocks dictionary maps from cfgName to a mock implementation of the distributor.

type TestFactoryFn

type TestFactoryFn func(context.Context, *Config) D

type TestFactoryMap

type TestFactoryMap map[string]TestFactoryFn

type Token

type Token string

Token is an opaque token that a distributor should use to uniquely identify a single DM execution.


Path Synopsis