Documentation

Overview

    Package mutate includes the main logic of DM's state machine. The package is a series of "go.chromium.org/luci/tumble".Mutation implementations. Each mutation operates on a single entity group in DM's datastore model, advancing the state machine for the dependency graph by one edge.

    Index

    Constants

    View Source
    const MaxEnsureAttempts = 10

      MaxEnsureAttempts limits the maximum number of EnsureAttempt entities that the EnsureQuestAttempts mutation will emit. If there are more AttemptIDs than this maximum, then EnsureQuestAttempts will do tail-recursion to process the remainder.

      Variables

      This section is empty.

      Functions

      func FinishExecutionFn

      func FinishExecutionFn(c context.Context, eid *dm.Execution_ID, rslt *dm.Result) ([]tumble.Mutation, error)

        FinishExecutionFn is the implementation of distributor.FinishExecutionFn. It's defined here to avoid a circular dependency.

        func ResetExecutionTimeout

        func ResetExecutionTimeout(c context.Context, e *model.Execution) error

          ResetExecutionTimeout schedules a Timeout for this Execution. It inspects the Execution's State to determine which timeout should be set, if any. If no timeout should be active, this will cancel any existing timeouts for this Execution.

          Types

          type AckFwdDep

          type AckFwdDep struct {
          	Dep *model.FwdEdge
          }

            AckFwdDep records the fact that a dependency was completed.

            func (*AckFwdDep) RollForward

            func (f *AckFwdDep) RollForward(c context.Context) (muts []tumble.Mutation, err error)

              RollForward implements tumble.Mutation.

              func (*AckFwdDep) Root

              func (f *AckFwdDep) Root(c context.Context) *ds.Key

                Root implements tumble.Mutation.

                type ActivateExecution

                type ActivateExecution struct {
                	Auth   *dm.Execution_Auth
                	NewTok []byte
                }

                  ActivateExecution executes an execution, moving it from the SCHEDULING->RUNNING state, and resetting the execution timeout (if any).

                  func (*ActivateExecution) RollForward

                  func (a *ActivateExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                    RollForward implements tumble.Mutation

                    func (*ActivateExecution) Root

                      Root implements tumble.Mutation.

                      type AddBackDep

                      type AddBackDep struct {
                      	Dep      *model.FwdEdge
                      	NeedsAck bool // make AckFwdDep iff true
                      }

                        AddBackDep adds a BackDep (and possibly a BackDepGroup). If NeedsAck is true, this mutation will chain to an AckFwdDep. It should only be false if this AddBackDep is spawned from an AddFinishedDeps, where the originating Attempt already knows that this dependency is Finished.

                        func (*AddBackDep) RollForward

                        func (a *AddBackDep) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                          RollForward implements tumble.Mutation.

                          func (*AddBackDep) Root

                          func (a *AddBackDep) Root(c context.Context) *ds.Key

                            Root implements tumble.Mutation.

                            type AddDeps

                            type AddDeps struct {
                            	Auth   *dm.Execution_Auth
                            	Quests []*model.Quest
                            
                            	// Attempts is attempts we think are missing from the global graph.
                            	Attempts *dm.AttemptList
                            
                            	// Deps are fwddeps we think are missing from the auth'd attempt.
                            	Deps *dm.AttemptList
                            }

                              AddDeps transactionally stops the current execution and adds one or more dependencies.

                              func (*AddDeps) RollForward

                              func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                RollForward implements tumble.Mutation

                                This mutation is called directly.

                                func (*AddDeps) Root

                                func (a *AddDeps) Root(c context.Context) *ds.Key

                                  Root implements tumble.Mutation

                                  type AddFinishedDeps

                                  type AddFinishedDeps struct {
                                  	Auth *dm.Execution_Auth
                                  
                                  	// MergeQuests lists quests which need their BuiltBy lists merged. The Quests
                                  	// here must be a subset of the quests mentioned in FinishedAttempts.
                                  	MergeQuests []*model.Quest
                                  
                                  	// FinishedAttempts are a list of attempts that we already know are in the
                                  	// Finished state.
                                  	FinishedAttempts *dm.AttemptList
                                  }

                                    AddFinishedDeps adds a bunch of dependencies which are known in advance to already be in the Finished state.

                                    func (*AddFinishedDeps) RollForward

                                    func (f *AddFinishedDeps) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                      RollForward implements tumble.Mutation

                                      func (*AddFinishedDeps) Root

                                      func (f *AddFinishedDeps) Root(c context.Context) *ds.Key

                                        Root implements tumble.Mutation

                                        type EnsureAttempt

                                        type EnsureAttempt struct {
                                        	ID *dm.Attempt_ID
                                        }

                                          EnsureAttempt ensures that the given Attempt exists. If it doesn't, it's created in a NeedsExecution state.

                                          func (*EnsureAttempt) RollForward

                                          func (e *EnsureAttempt) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                            RollForward implements tumble.Mutation.

                                            func (*EnsureAttempt) Root

                                            func (e *EnsureAttempt) Root(c context.Context) *ds.Key

                                              Root implements tumble.Mutation.

                                              type EnsureQuestAttempts

                                              type EnsureQuestAttempts struct {
                                              	Quest *model.Quest
                                              	AIDs  []uint32
                                              
                                              	// DoNotMergeQuest causes this mutation to not attempt to merge the BuiltBy of
                                              	// Quest.
                                              	DoNotMergeQuest bool
                                              }

                                                EnsureQuestAttempts ensures that the given Attempt exists. If it doesn't, it's created in a NeedsExecution state.

                                                func (*EnsureQuestAttempts) RollForward

                                                func (e *EnsureQuestAttempts) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                  RollForward implements tumble.Mutation.

                                                  func (*EnsureQuestAttempts) Root

                                                    Root implements tumble.Mutation.

                                                    type FinishAttempt

                                                    type FinishAttempt struct {
                                                    	dm.FinishAttemptReq
                                                    }

                                                      FinishAttempt does a couple things:

                                                      Invalidates the current Execution
                                                      Moves the state to Finished
                                                      Creates a new AttemptResult
                                                      Starts RecordCompletion state machine.
                                                      

                                                      func (*FinishAttempt) RollForward

                                                      func (f *FinishAttempt) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                        RollForward implements tumble.Mutation

                                                        This mutation is called directly from FinishAttempt.

                                                        func (*FinishAttempt) Root

                                                        func (f *FinishAttempt) Root(c context.Context) *ds.Key

                                                          Root implements tumble.Mutation

                                                          type FinishExecution

                                                          type FinishExecution struct {
                                                          	EID    *dm.Execution_ID
                                                          	Result *dm.Result
                                                          }

                                                            FinishExecution records the final state of the Execution, and advances the Attempt state machine.

                                                            func NewFinishExecutionAbnormal

                                                            func NewFinishExecutionAbnormal(eid *dm.Execution_ID, status dm.AbnormalFinish_Status, reason string) *FinishExecution

                                                              NewFinishExecutionAbnormal is a shorthand to make a FinishExecution mutation with some abnomal result.

                                                              func (*FinishExecution) RollForward

                                                              func (f *FinishExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                                RollForward implements tumble.Mutation

                                                                func (*FinishExecution) Root

                                                                func (f *FinishExecution) Root(c context.Context) *ds.Key

                                                                  Root implements tumble.Mutation

                                                                  type MergeQuest

                                                                  type MergeQuest struct {
                                                                  	Quest   *model.Quest
                                                                  	AndThen []tumble.Mutation
                                                                  }

                                                                    MergeQuest ensures that the given Quest exists and contains the merged set of BuiltBy entries.

                                                                    func (*MergeQuest) RollForward

                                                                    func (m *MergeQuest) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                                      RollForward implements tumble.Mutation.

                                                                      func (*MergeQuest) Root

                                                                      func (m *MergeQuest) Root(c context.Context) *ds.Key

                                                                        Root implements tumble.Mutation.

                                                                        type RecordCompletion

                                                                        type RecordCompletion struct {
                                                                        	For *dm.Attempt_ID
                                                                        }

                                                                          RecordCompletion marks that fact that an Attempt is completed (Finished) on its corresponding BackDepGroup, and fires off additional AckFwdDep mutations for each incoming dependency that is blocked.

                                                                          In the case where an Attempt has hundreds or thousands of incoming dependencies, the naive implementation of this mutation could easily overfill a single datastore transaction. For that reason, the implementation here unblocks things 64 edges at a time, and keeps returning itself as a mutation until it unblocks less than 64 things (e.g. it does a tail-call).

                                                                          This relies on tumble's tail-call optimization to be performant in terms of the number of transactions, otherwise this would take 1 transaction per 64 dependencies. With the TCO, it could do hundreds or thousands of dependencies, but it will also be fair to other work (e.g. it will allow other Attempts to take dependencies on this Attempt while RecordCompletion is in between tail-calls).

                                                                          func (*RecordCompletion) RollForward

                                                                          func (r *RecordCompletion) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                                            RollForward implements tumble.Mutation.

                                                                            func (*RecordCompletion) Root

                                                                            func (r *RecordCompletion) Root(c context.Context) *ds.Key

                                                                              Root implements tumble.Mutation.

                                                                              type ScheduleExecution

                                                                              type ScheduleExecution struct {
                                                                              	For *dm.Attempt_ID
                                                                              }

                                                                                ScheduleExecution is a placeholder mutation that will be an entry into the Distributor scheduling state-machine.

                                                                                func (*ScheduleExecution) RollForward

                                                                                func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                                                  RollForward implements tumble.Mutation

                                                                                  func (*ScheduleExecution) Root

                                                                                  func (s *ScheduleExecution) Root(c context.Context) *ds.Key

                                                                                    Root implements tumble.Mutation

                                                                                    type TimeoutExecution

                                                                                    type TimeoutExecution struct {
                                                                                    	For   *dm.Execution_ID
                                                                                    	State dm.Execution_State
                                                                                    	// TimeoutAttempt is the number of attempts to stop a STOPPING execution,
                                                                                    	// since this potentially requires an RPC to the distributor to enact.
                                                                                    	TimeoutAttempt uint
                                                                                    	Deadline       time.Time
                                                                                    }

                                                                                      TimeoutExecution is a named mutation which triggers on a delay. If the execution is in the noted state when the trigger hits, this sets the Execution to have an AbnormalFinish status of TIMED_OUT.

                                                                                      func (*TimeoutExecution) HighPriority

                                                                                      func (t *TimeoutExecution) HighPriority() bool

                                                                                        HighPriority implements tumble.DelayedMutation

                                                                                        func (*TimeoutExecution) ProcessAfter

                                                                                        func (t *TimeoutExecution) ProcessAfter() time.Time

                                                                                          ProcessAfter implements tumble.DelayedMutation

                                                                                          func (*TimeoutExecution) RollForward

                                                                                          func (t *TimeoutExecution) RollForward(c context.Context) (muts []tumble.Mutation, err error)

                                                                                            RollForward implements tumble.Mutation

                                                                                            func (*TimeoutExecution) Root

                                                                                            func (t *TimeoutExecution) Root(c context.Context) *ds.Key

                                                                                              Root implements tumble.Mutation