Package tasks implements asynchronous invocation processing.



This section is empty.


AllTypes is a slice of all known types of tasks.

View Source
var ErrConflict = fmt.Errorf("the task is already leased")

ErrConflict is returned by Lease if the task does not exist or is already leased.

View Source
var FinalizationTasks = tq.RegisterTaskClass(tq.TaskClass{
	ID:                  "try-finalize-inv",
	Prototype:           &taskspb.TryFinalizeInvocation{},
	Kind:                tq.Transactional,
	InheritTraceContext: true,
	Queue:               "finalizer",
	RoutingPrefix:       "/internal/tasks/finalizer",

FinalizationTasks describes how to route finalization tasks.

The handler is implemented in internal/services/finalizer.

View Source
var PermanentFailure = errors.BoolTag{
	Key: errors.NewTagKey("permanent failure to process invocation task"),

PermanentFailure set in an error indicates that the err is not resolvable by a retry. Such task is doomed.

View Source
var UseFinalizationTQ = experiments.Register("rdb-use-tq-finalization")

UseFinalizationTQ experiment enables using server/tq for finalization tasks.


func Delete

func Delete(ctx context.Context, typ Type, id string) error

Delete deletes a task.

func Enqueue

func Enqueue(typ Type, taskID string, invID invocations.ID, payload interface{}, processAfter time.Time) *spanner.Mutation

Enqueue inserts one row to InvocationTasks.

func EnqueueBQExport

func EnqueueBQExport(invID invocations.ID, payload *pb.BigQueryExport, processAfter time.Time) *spanner.Mutation

EnqueueBQExport inserts one row to InvocationTasks for a bq export task.

func Lease

func Lease(ctx context.Context, typ Type, id string, duration time.Duration) (invID invocations.ID, payload []byte, err error)

Lease leases an invocation task. If the task does not exist or is already leased, returns ErrConflict.

func Peek

func Peek(ctx context.Context, typ Type, f func(id string) error) error

Peek calls f on available tasks of a given type.

func StartInvocationFinalization

func StartInvocationFinalization(ctx context.Context, id invocations.ID)

StartInvocationFinalization changes invocation state to FINALIZING and enqueues a TryFinalizeInvocation task.

The caller is responsible for ensuring that the invocation is active.

TODO(nodir): this package is not a great place for this function, but there is no better package at the moment. Keep it here for now, but consider a new package as the code base grows.


type Dispatcher

type Dispatcher struct {
	// How often to query for tasks. Defaults to 1m.
	QueryInterval time.Duration

	// How long to lease a task for. Defaults to 1m.
	LeaseDuration time.Duration

	// Number of tasks to process concurrently. Defaults to GOMAXPROCS.
	Workers int

Dispatcher queries for available tasks and dispatches them to goroutines.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context, taskType Type, fn TaskFunc)

Run queries tasks and dispatches them to goroutines until ctx is canceled. Logs errors.

type TaskFunc

type TaskFunc func(ctx context.Context, invID invocations.ID, payload []byte) error

TaskFunc can execute a task. If the returned error is tagged with PermanentFailure, then the failed task is deleted.

type Type

type Type string

Type is a value for InvocationTasks.TaskType column. It defines what a task does.

const (
	// BQExport is a type of task that exports an invocation to BigQuery.
	// The task payload is binary-encoded BigQueryExport message.
	BQExport Type = "bq_export"

	// TryFinalizeInvocation is a type of task that tries to finalize an
	// invocation. No payload.
	TryFinalizeInvocation Type = "finalize"

Types of invocation tasks. Used as InvocationTasks.TaskType column value.

func (Type) Key

func (t Type) Key(taskID string) spanner.Key

Key returns a Spanner key for the InvocationTasks row.


Path Synopsis