tq

package
v0.0.0-...-4a11b79 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2020 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package tq implements simple routing layer for task queue tasks.

Index

Constants

This section is empty.

Variables

View Source
var Retry = errors.BoolTag{Key: errors.NewTagKey("the task should be retried")}

Retry is an error tag used to indicate that the handler wants the task to be redelivered later.

See Handler doc for more details.

Functions

func RequestHeaders

func RequestHeaders(ctx context.Context) (*taskqueue.RequestHeaders, error)

RequestHeaders returns the special task-queue HTTP request headers for the current task handler.

Returns an error if called from outside of a task handler.

Types

type Dispatcher

type Dispatcher struct {
	BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default
	// contains filtered or unexported fields
}

Dispatcher submits and handles task queue tasks.

func (*Dispatcher) AddTask

func (d *Dispatcher) AddTask(ctx context.Context, tasks ...*Task) error

AddTask submits given tasks to an appropriate task queue.

It means, at some later time in some other GAE process, callbacks registered as handlers for corresponding proto types will be called.

If the given context is transactional or namespaced, inherits the transaction/namespace. Note if running outside of a transaction and multiple tasks are passed, the operation is not atomic: it returns an error if at least one enqueue operation failed (there's no way to figure out which one exactly).

Returns only transient errors. Unlike regular Task Queue's Add, ErrTaskAlreadyAdded is not considered an error.

func (*Dispatcher) DeleteTask

func (d *Dispatcher) DeleteTask(ctx context.Context, tasks ...*Task) error

DeleteTask deletes the specified tasks from their queues.

If the given context is transactional or namespaced, inherits the transaction/namespace. Note if running outside of a transaction and multiple tasks are passed, the operation is not atomic: it returns an error if at least one enqueue operation failed (there's no way to figure out which one exactly).

Returns only transient errors. Unlike regular Task Queue's Delete, attempts to delete an unknown or tombstoned task are not considered errors.

func (*Dispatcher) GetQueues

func (d *Dispatcher) GetQueues() []string

GetQueues returns the names of task queues known to the dispatcher.

func (*Dispatcher) InstallRoutes

func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain)

InstallRoutes installs appropriate HTTP routes in the router.

Must be called only after all task handlers are registered!

func (*Dispatcher) Internals

func (d *Dispatcher) Internals() interface{}

Internals is used by tqtesting package and must not be used directly.

For that reason it returns opaque interface type, to curb the curiosity.

We do this to avoid linking testing implementation into production binaries. We make testing live in a different package and use this secret back door API to talk to Dispatcher.

func (*Dispatcher) RegisterTask

func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue string, opts *taskqueue.RetryOptions)

RegisterTask tells the dispatcher that tasks of given proto type should be handled by the given handler and routed through the given task queue.

'prototype' should be a pointer to some concrete proto message. It will be used only for its type signature.

Intended to be called during process startup. Panics if such message has already been registered.

type Handler

type Handler func(ctx context.Context, payload proto.Message) error

Handler is called to handle one enqueued task.

The passed context is produced by a middleware chain installed with InstallHandlers. In addition it carries task queue request headers, accessible through RequestHeaders(ctx) function. They are passed implicitly via the context to avoid complicating Handler signature for a feature that most callers aren't going to use.

If the returned error is tagged with Retry tag, the request finishes with HTTP status 409, indicating to the task queue that it should redeliver the task (which it may or may not do, depending on its RetryOptions). Same happens if the error is transient (i.e. tagged with transient.Tag), except the request finishes with HTTP status 500. This difference allows to distinguish "expected" retry requests (errors tagged with Retry) from "unexpected" ones (errors tagged with transient.Tag). Retry tag should be used **only** if the handler is fully aware of Task Queues semantics and it **explicitly** wants the task to be retried because it can't be processed right now and the handler expects that the retry will help.

For a contrived example, if the handler can process the task only after 2 PM, but it is 01:55 PM now, the handler should return an error tagged with Retry to indicate this. On the other hand, if the handler failed to process the task due to a RPC timeout or some other exceptional transient situation, it should return an error tagged with transient.Tag.

Note that it is OK (and often desirable) to tag an error with both Retry and transient.Tag. Such errors propagate through the call stack as transient, until they reach Dispatcher, which treats them as retriable.

An untagged error (or success) marks the task as "done", it won't be retried.

type Task

type Task struct {
	// Payload is task's payload as well as indicator of its type.
	//
	// Tasks are routed based on type of the payload message, see RegisterTask.
	Payload proto.Message

	// NamePrefix, if not empty, is a string that will be prefixed to the task's
	// name. Characters in NamePrefix must be appropriate task queue name
	// characters. NamePrefix can be useful because the Task Queue system allows
	// users to search for tasks by prefix.
	//
	// Lexicographically close names can cause hot spots in the Task Queues
	// backend. If NamePrefix is specified, users should try and ensure that
	// it is friendly to sharding (e.g., begins with a hash string).
	//
	// Setting NamePrefix and/or DeduplicationKey will result in a named task
	// being generated. This task can be cancelled using DeleteTask.
	NamePrefix string

	// DeduplicationKey is optional unique key of the task.
	//
	// If a task of a given proto type with a given key has already been enqueued
	// recently, this task will be silently ignored.
	//
	// Such tasks can only be used outside of transactions.
	//
	// Setting NamePrefix and/or DeduplicationKey will result in a named task
	// being generated. This task can be cancelled using DeleteTask.
	DeduplicationKey string

	// Title is optional string that identifies the task in HTTP logs.
	//
	// It will show up as a suffix in task handler URL. It exists exclusively to
	// simplify reading HTTP logs. It serves no other purpose! In particular,
	// it is NOT a task name.
	//
	// Handlers won't ever see it. Pass all information through the task body.
	Title string

	// Delay specifies the duration the task queue service must wait before
	// executing the task.
	//
	// Either Delay or ETA may be set, but not both.
	Delay time.Duration

	// ETA specifies the earliest time a task may be executed.
	//
	// Either Delay or ETA may be set, but not both.
	ETA time.Time

	// Retry options for this task.
	//
	// If given, overrides default options set when this task was registered.
	RetryOptions *taskqueue.RetryOptions
}

Task contains task body and additional parameters that influence how it is routed.

func (*Task) Name

func (task *Task) Name() string

Name generates and returns the task's name.

If the task is not a named task (doesn't have NamePrefix or DeduplicationKey set), this will return an empty string.

Directories

Path Synopsis
Package tqtesting can be used in unit tests to simulate task queue calls produced by tq.Dispatcher.
Package tqtesting can be used in unit tests to simulate task queue calls produced by tq.Dispatcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL