Documentation

Overview

    Package tqtesting contains helpers for running server/tq in tests and on localhost.

    Index

    Constants

    View Source
    const ClockTag = "tq-scheduler-sleep"

      ClockTag tags the clock used in scheduler's sleep.

      Variables

      This section is empty.

      Functions

      func TasksCollector

      func TasksCollector(tl *TaskList) func(context.Context, *Task)

        TasksCollector returns a callback that adds tasks to the given list.

        Can be passed as TaskSucceeded or TaskFailed callback to the Scheduler.

        Synchronizes access to the list internally, but the list should be read from only when the Scheduler is paused.

        Types

        type Executor

        type Executor interface {
        	// Execute is called from Run to execute the task.
        	//
        	// The executor may execute the task right away in a blocking way or dispatch
        	// it to some other goroutine. Either way it must call `done` callback when it
        	// is done executing the task, indicating whether the task should be
        	// reenqueued for a retry.
        	//
        	// It is safe to call Scheduler's Submit from inside Execute.
        	//
        	// Receives the exact same context as Run(...), in particular this context
        	// is canceled when Run is done.
        	Execute(ctx context.Context, t *Task, done func(retry bool))
        }

          Executor knows how to execute tasks when their ETA arrives.

          type LoopbackHTTPExecutor

          type LoopbackHTTPExecutor struct {
          	Handler http.Handler
          }

            LoopbackHTTPExecutor is an Executor that executes tasks by calling the given HTTP handler.

            func (*LoopbackHTTPExecutor) Execute

            func (e *LoopbackHTTPExecutor) Execute(ctx context.Context, t *Task, done func(retry bool))

              Execute dispatches the task to the HTTP handler in a dedicated goroutine.

              Marks the task as failed if the response status code is outside of range [200-299].

              type RunOption

              type RunOption interface {
              	// contains filtered or unexported methods
              }

                RunOption influences behavior of Run call.

                func ParallelExecute

                func ParallelExecute() RunOption

                  ParallelExecute instructs the scheduler to call executor's Execute method in a separate goroutine instead of serially in Run.

                  This more closely resembles real-life behavior but may introduce more unpredictability into tests due to races.

                  func StopWhenDrained

                  func StopWhenDrained() RunOption

                    StopWhenDrained will stop the scheduler after it finishes executing the last task and there are no more tasks scheduled.

                    It is naturally racy if there are other goroutines that submit tasks concurrently. In this situation there may be a pending queue of tasks even if Run stops.

                    type Scheduler

                    type Scheduler struct {
                    	// Executor knows how to execute tasks when their ETA arrives.
                    	Executor Executor
                    
                    	// MaxAttempts is the maximum number of attempts for a task, including the
                    	// first attempt.
                    	//
                    	// If negative the number of attempts is unlimited.
                    	//
                    	// Default is 20.
                    	MaxAttempts int
                    
                    	// MinBackoff is an initial retry delay for failed tasks.
                    	//
                    	// It is doubled after each failed attempt until it reaches MaxBackoff after
                    	// which it stays constant.
                    	//
                    	// Default is 1 sec.
                    	MinBackoff time.Duration
                    
                    	// MaxBackoff is an upper limit on a retry delay.
                    	//
                    	// Default is 5 min.
                    	MaxBackoff time.Duration
                    
                    	// TaskSucceeded is called from within the executor's `done` callback whenever
                    	// a task finishes successfully, perhaps after a bunch of retries.
                    	//
                    	// Receives the same context as passed to Run.
                    	TaskSucceeded func(ctx context.Context, task *Task)
                    
                    	// TaskFailed is called from within the executor's `done` callback whenever
                    	// a task fails after being attempted MaxAttempts times.
                    	//
                    	// Receives the same context as passed to Run.
                    	TaskFailed func(ctx context.Context, task *Task)
                    	// contains filtered or unexported fields
                    }

                      Scheduler knows how to execute submitted tasks when they are due.

                      This is a very primitive in-memory unholy hybrid of Cloud Tasks and PubSub services that can be used in tests and on localhost.

                      Must be configured before the first Run call.Can be reconfigured between Run calls, but changing the configuration while Run is running is not allowed.

                      Scheduler implements tq.Submitter interface.

                      func (*Scheduler) Run

                      func (s *Scheduler) Run(ctx context.Context, opts ...RunOption)

                        Run executes the scheduler's loop until the context is canceled or one of the stop conditions are hit.

                        By default executes tasks serially. Pass ParallelExecute() option to execute them asynchronously.

                        Upon exit all executing tasks has finished, there still may be pending tasks.

                        Panics if Run is already running (perhaps in another goroutine).

                        func (*Scheduler) Submit

                        func (s *Scheduler) Submit(ctx context.Context, p *reminder.Payload) error

                          Submit schedules a task for later execution.

                          func (*Scheduler) Tasks

                          func (s *Scheduler) Tasks() TaskList

                            Tasks returns a snapshot of the scheduler state.

                            Recalculates it from scratch, so it is a pretty expensive call.

                            Tasks are ordered by ETA: currently executing tasks first, then scheduled tasks.

                            type Task

                            type Task struct {
                            	Payload proto.Message // a clone of the original AddTask payload, if available
                            
                            	Task    *taskspb.Task           // a clone of the Cloud Tasks task as passed to Submit
                            	Message *pubsubpb.PubsubMessage // a clone of the PubSub message as passed to Submit
                            
                            	Name string    // full task name (perhaps generated)
                            	ETA  time.Time // when the task is due, always set at now or in future
                            
                            	Finished  time.Time // when the task finished last execution attempt
                            	Attempts  int       // 0 initially, incremented before each execution attempt
                            	Executing bool      // true if executing right now
                            	// contains filtered or unexported fields
                            }

                              Task represents an enqueued or executing task.

                              func (*Task) Copy

                              func (t *Task) Copy() *Task

                                Copy makes a shallow copy of the task.

                                type TaskList

                                type TaskList []*Task

                                  TaskList is a collection of tasks.

                                  func (TaskList) Executing

                                  func (tl TaskList) Executing() TaskList

                                    Executing returns a list of tasks executing right now.

                                    func (TaskList) Filter

                                    func (tl TaskList) Filter(cb func(*Task) bool) TaskList

                                      Filter returns a new task list with tasks matching the filter.

                                      func (TaskList) Payloads

                                      func (tl TaskList) Payloads() []proto.Message

                                        Payloads returns a list with individual task payloads.

                                        func (TaskList) Pending

                                        func (tl TaskList) Pending() TaskList

                                          Pending returns a list of tasks waiting execution.

                                          func (TaskList) SortByETA

                                          func (tl TaskList) SortByETA() TaskList

                                            SortByETA sorts the list in-place by ETA.

                                            Returns it to allow chaining calls.