Documentation

Overview

    Package tqtesting contains helpers for running server/tq in tests and on localhost.

    Index

    Constants

    View Source
    const ClockTag = "tq-scheduler-sleep"

      ClockTag tags the clock used in scheduler's sleep.

      Variables

      This section is empty.

      Functions

      func TasksCollector

      func TasksCollector(tl *TaskList) func(context.Context, *Task)

        TasksCollector returns a callback that adds tasks to the given list.

        Can be passed as TaskSucceeded or TaskFailed callback to the Scheduler.

        Synchronizes access to the list internally, but the list should be read from only when the Scheduler is paused.

        Types

        type Executor

        type Executor interface {
        	// Execute is called from Run to execute the task.
        	//
        	// The executor may execute the task right away in a blocking way or dispatch
        	// it to some other goroutine. Either way it must call `done` callback when it
        	// is done executing the task, indicating whether the task should be
        	// reenqueued for a retry.
        	//
        	// It is safe to call Scheduler's Submit from inside Execute.
        	//
        	// Receives the exact same context as Run(...), in particular this context
        	// is canceled when Run is done.
        	Execute(ctx context.Context, t *Task, done func(retry bool))
        }

          Executor knows how to execute tasks when their ETA arrives.

          type LoopbackHTTPExecutor

          type LoopbackHTTPExecutor struct {
          	Handler http.Handler
          }

            LoopbackHTTPExecutor is an Executor that executes tasks by calling the given HTTP handler.

            func (*LoopbackHTTPExecutor) Execute

            func (e *LoopbackHTTPExecutor) Execute(ctx context.Context, t *Task, done func(retry bool))

              Execute dispatches the task to the HTTP handler in a dedicated goroutine.

              Marks the task as failed if the response status code is outside of range [200-299].

              type RunOption

              type RunOption interface {
              	// contains filtered or unexported methods
              }

                RunOption influences behavior of Run call.

                func ParallelExecute

                func ParallelExecute() RunOption

                  ParallelExecute instructs the scheduler to call executor's Execute method in a separate goroutine instead of serially in Run.

                  This more closely resembles real-life behavior but may introduce more unpredictability into tests due to races.

                  func StopAfter

                  func StopAfter(f func(t *Task) bool) RunOption

                    StopAfter will stop the scheduler if the given function returns true, given the just finished task.

                    func StopAfterTask

                    func StopAfterTask(taskClassID string) RunOption

                      StopAfterTask will stop the scheduler after it finishes executing a task of the given task class ID.

                      func StopBefore

                      func StopBefore(f func(t *Task) bool) RunOption

                        StopBefore will stop the scheduler if the given function returns true, given the next task to be executed.

                        If such next task has specified ETA, StopBeforeTask does NOT provide any guarantee about what `clock.Now` returns by the time Run stops.

                        It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a different next task (by ETA) when Run stops.

                        func StopBeforeTask

                        func StopBeforeTask(taskClassID string) RunOption

                          StopBeforeTask will stop the scheduler if the next task to be executed has the given task class ID.

                          The same caveats of StopBefore also apply for StopBeforeTask.

                          func StopWhenDrained

                          func StopWhenDrained() RunOption

                            StopWhenDrained will stop the scheduler after it finishes executing the last task and there are no more tasks scheduled.

                            It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a pending queue of tasks even if Run stops.

                            type Scheduler

                            type Scheduler struct {
                            	// Executor knows how to execute tasks when their ETA arrives.
                            	Executor Executor
                            
                            	// MaxAttempts is the maximum number of attempts for a task, including the
                            	// first attempt.
                            	//
                            	// If negative the number of attempts is unlimited.
                            	//
                            	// Default is 20.
                            	MaxAttempts int
                            
                            	// MinBackoff is an initial retry delay for failed tasks.
                            	//
                            	// It is doubled after each failed attempt until it reaches MaxBackoff after
                            	// which it stays constant.
                            	//
                            	// Default is 1 sec.
                            	MinBackoff time.Duration
                            
                            	// MaxBackoff is an upper limit on a retry delay.
                            	//
                            	// Default is 5 min.
                            	MaxBackoff time.Duration
                            
                            	// TaskSucceeded is called from within the executor's `done` callback whenever
                            	// a task finishes successfully, perhaps after a bunch of retries.
                            	//
                            	// Receives the same context as passed to Run.
                            	TaskSucceeded func(ctx context.Context, task *Task)
                            
                            	// TaskFailed is called from within the executor's `done` callback whenever
                            	// a task fails after being attempted MaxAttempts times.
                            	//
                            	// Receives the same context as passed to Run.
                            	TaskFailed func(ctx context.Context, task *Task)
                            	// contains filtered or unexported fields
                            }

                              Scheduler knows how to execute submitted tasks when they are due.

                              This is a very primitive in-memory unholy hybrid of Cloud Tasks and PubSub services that can be used in tests and on localhost.

                              Must be configured before the first Run call.Can be reconfigured between Run calls, but changing the configuration while Run is running is not allowed.

                              Scheduler implements tq.Submitter interface.

                              func (*Scheduler) Run

                              func (s *Scheduler) Run(ctx context.Context, opts ...RunOption)

                                Run executes the scheduler's loop until the context is canceled or one of the stop conditions are hit.

                                By default executes tasks serially. Pass ParallelExecute() option to execute them asynchronously.

                                Upon exit all executing tasks has finished, there still may be pending tasks.

                                Panics if Run is already running (perhaps in another goroutine).

                                func (*Scheduler) Submit

                                func (s *Scheduler) Submit(ctx context.Context, p *reminder.Payload) error

                                  Submit schedules a task for later execution.

                                  func (*Scheduler) Tasks

                                  func (s *Scheduler) Tasks() TaskList

                                    Tasks returns a snapshot of the scheduler state.

                                    Recalculates it from scratch, so it is a pretty expensive call.

                                    Tasks are ordered by ETA: currently executing tasks first, then scheduled tasks.

                                    type Task

                                    type Task struct {
                                    	Payload proto.Message // a clone of the original AddTask payload, if available
                                    
                                    	Task    *taskspb.Task           // a clone of the Cloud Tasks task as passed to Submit
                                    	Message *pubsubpb.PubsubMessage // a clone of the PubSub message as passed to Submit
                                    
                                    	Name  string    // full task name (perhaps generated)
                                    	Class string    // TaskClass.ID passed in RegisterTaskClass.
                                    	ETA   time.Time // when the task is due, always set at now or in future
                                    
                                    	Finished  time.Time // when the task finished last execution attempt
                                    	Attempts  int       // 0 initially, incremented before each execution attempt
                                    	Executing bool      // true if executing right now
                                    	// contains filtered or unexported fields
                                    }

                                      Task represents an enqueued or executing task.

                                      func (*Task) Copy

                                      func (t *Task) Copy() *Task

                                        Copy makes a shallow copy of the task.

                                        type TaskList

                                        type TaskList []*Task

                                          TaskList is a collection of tasks.

                                          func (TaskList) Executing

                                          func (tl TaskList) Executing() TaskList

                                            Executing returns a list of tasks executing right now.

                                            func (TaskList) Filter

                                            func (tl TaskList) Filter(cb func(*Task) bool) TaskList

                                              Filter returns a new task list with tasks matching the filter.

                                              func (TaskList) Payloads

                                              func (tl TaskList) Payloads() []proto.Message

                                                Payloads returns a list with individual task payloads.

                                                func (TaskList) Pending

                                                func (tl TaskList) Pending() TaskList

                                                  Pending returns a list of tasks waiting execution.

                                                  func (TaskList) SortByETA

                                                  func (tl TaskList) SortByETA() TaskList

                                                    SortByETA sorts the list in-place by ETA.

                                                    The full sorting key is (!task.Executing, task.ETA, task.Class, task.Name)

                                                    Returns it to allow chaining calls.