task

package
v2.7.0-nightly.20230526 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2023 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidLengthTask        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowTask          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupTask = fmt.Errorf("proto: unexpected end of group")
)
View Source
var State_name = map[int32]string{
	0: "STATE_UNKNOWN",
	1: "RUNNING",
	2: "SUCCESS",
	3: "FAILURE",
}
View Source
var State_value = map[string]int32{
	"STATE_UNKNOWN": 0,
	"RUNNING":       1,
	"SUCCESS":       2,
	"FAILURE":       3,
}

Functions

func Count

func Count(ctx context.Context, service Service, namespace, group string) (tasks int64, claims int64, retErr error)

Count returns the number of tasks and claims in the given namespace and group (if nonempty)

func DoBatch

func DoBatch(ctx context.Context, doer Doer, inputs []*types.Any, cb CollectFunc) error

DoBatch executes a batch of tasks.

func DoOne

func DoOne(ctx context.Context, doer Doer, input *types.Any) (*types.Any, error)

DoOne executes one task. NOTE: This interface is much less performant than the stream / batch interfaces for many tasks. Only use this interface for development / a small number of tasks.

func DoOrdered

func DoOrdered(ctx context.Context, doer Doer, inputs chan *types.Any, parallelism int, cb CollectFunc) error

DoOrdered processes tasks in parallel, but returns outputs in order via the provided callback cb.

func List

func List(ctx context.Context, svc Service, req *taskapi.ListTaskRequest, send func(info *taskapi.TaskInfo) error) error

List implements the functionality for an arbitrary service's ListTask gRPC

Types

type Cache

type Cache interface {
	Get(ctx context.Context, key string) (output *types.Any, _ error)
	Put(ctx context.Context, key string, output *types.Any) error
}

type Claim

type Claim struct {
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*Claim) Descriptor

func (*Claim) Descriptor() ([]byte, []int)

func (*Claim) Marshal

func (m *Claim) Marshal() (dAtA []byte, err error)

func (*Claim) MarshalLogObject

func (x *Claim) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Claim) MarshalTo

func (m *Claim) MarshalTo(dAtA []byte) (int, error)

func (*Claim) MarshalToSizedBuffer

func (m *Claim) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Claim) ProtoMessage

func (*Claim) ProtoMessage()

func (*Claim) Reset

func (m *Claim) Reset()

func (*Claim) Size

func (m *Claim) Size() (n int)

func (*Claim) String

func (m *Claim) String() string

func (*Claim) Unmarshal

func (m *Claim) Unmarshal(dAtA []byte) error

func (*Claim) XXX_DiscardUnknown

func (m *Claim) XXX_DiscardUnknown()

func (*Claim) XXX_Marshal

func (m *Claim) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Claim) XXX_Merge

func (m *Claim) XXX_Merge(src proto.Message)

func (*Claim) XXX_Size

func (m *Claim) XXX_Size() int

func (*Claim) XXX_Unmarshal

func (m *Claim) XXX_Unmarshal(b []byte) error

type CollectFunc

type CollectFunc = func(index int64, output *types.Any, _ error) error

CollectFunc is the type of a function that is used for collecting the output of a stream / batch of tasks. Index is the index of a task with respect to the order in which the task was created in the stream / batch.

type Doer

type Doer interface {
	// Do creates and returns the results of processing a stream of tasks provided
	// by the input channel. The client should close the input channel when all tasks have
	// been sent (it does not need to be closed if the context is canceled). For each
	// task, the collect function will be called with the results.
	Do(ctx context.Context, inputChan chan *types.Any, cb CollectFunc) error
}

Doer is a doer of tasks. Refer to the DoOne and DoBatch helper functions if a simpler interface is desired.

type Group

type Group struct {
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*Group) Descriptor

func (*Group) Descriptor() ([]byte, []int)

func (*Group) Marshal

func (m *Group) Marshal() (dAtA []byte, err error)

func (*Group) MarshalLogObject

func (x *Group) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Group) MarshalTo

func (m *Group) MarshalTo(dAtA []byte) (int, error)

func (*Group) MarshalToSizedBuffer

func (m *Group) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Group) ProtoMessage

func (*Group) ProtoMessage()

func (*Group) Reset

func (m *Group) Reset()

func (*Group) Size

func (m *Group) Size() (n int)

func (*Group) String

func (m *Group) String() string

func (*Group) Unmarshal

func (m *Group) Unmarshal(dAtA []byte) error

func (*Group) XXX_DiscardUnknown

func (m *Group) XXX_DiscardUnknown()

func (*Group) XXX_Marshal

func (m *Group) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Group) XXX_Merge

func (m *Group) XXX_Merge(src proto.Message)

func (*Group) XXX_Size

func (m *Group) XXX_Size() int

func (*Group) XXX_Unmarshal

func (m *Group) XXX_Unmarshal(b []byte) error

type ProcessFunc

type ProcessFunc = func(ctx context.Context, input *types.Any) (output *types.Any, _ error)

ProcessFunc is the type of a function that is use for processing a task. If an error occurs, then it should be returned. This error will be propagated back to the Doer that created the task.

type Service

type Service interface {
	// NewDoer creates a Doer with the provided namespace and group.
	NewDoer(namespace, group string, cache Cache) Doer
	// NewSource creates a Source with the provided namespace.
	NewSource(namespace string) Source
	// List calls a function on every task under a namespace and group
	List(ctx context.Context, namespace, group string, cb func(namespace, group string, data *Task, claimed bool) error) error
}

Scheduling: A task managed by a Service has a group. The group is used for scheduling purposes. Scheduling is based on maximizing fairness for the groups, and the schedulable unit is a task.

func NewEtcdService

func NewEtcdService(etcdClient *etcd.Client, etcdPrefix string) Service

type Source

type Source interface {
	// Iterate iterates through tasks until the provided context is canceled.
	// For each task, the process function will be called and the results
	// will be returned to the Doer that created the task.
	Iterate(ctx context.Context, cb ProcessFunc) error
}

Source is a source of tasks.

type State

type State int32
const (
	State_STATE_UNKNOWN State = 0
	State_RUNNING       State = 1
	State_SUCCESS       State = 2
	State_FAILURE       State = 3
)

func (State) EnumDescriptor

func (State) EnumDescriptor() ([]byte, []int)

func (State) String

func (x State) String() string

type Task

type Task struct {
	ID                   string     `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	State                State      `protobuf:"varint,2,opt,name=state,proto3,enum=task.State" json:"state,omitempty"`
	Input                *types.Any `protobuf:"bytes,3,opt,name=input,proto3" json:"input,omitempty"`
	Output               *types.Any `protobuf:"bytes,4,opt,name=output,proto3" json:"output,omitempty"`
	Reason               string     `protobuf:"bytes,5,opt,name=reason,proto3" json:"reason,omitempty"`
	Index                int64      `protobuf:"varint,6,opt,name=index,proto3" json:"index,omitempty"`
	XXX_NoUnkeyedLiteral struct{}   `json:"-"`
	XXX_unrecognized     []byte     `json:"-"`
	XXX_sizecache        int32      `json:"-"`
}

TODO: Consider splitting this up into separate structures for each state in a oneof.

func (*Task) Descriptor

func (*Task) Descriptor() ([]byte, []int)

func (*Task) GetID

func (m *Task) GetID() string

func (*Task) GetIndex

func (m *Task) GetIndex() int64

func (*Task) GetInput

func (m *Task) GetInput() *types.Any

func (*Task) GetOutput

func (m *Task) GetOutput() *types.Any

func (*Task) GetReason

func (m *Task) GetReason() string

func (*Task) GetState

func (m *Task) GetState() State

func (*Task) Marshal

func (m *Task) Marshal() (dAtA []byte, err error)

func (*Task) MarshalLogObject

func (x *Task) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Task) MarshalTo

func (m *Task) MarshalTo(dAtA []byte) (int, error)

func (*Task) MarshalToSizedBuffer

func (m *Task) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*Task) ProtoMessage

func (*Task) ProtoMessage()

func (*Task) Reset

func (m *Task) Reset()

func (*Task) Size

func (m *Task) Size() (n int)

func (*Task) String

func (m *Task) String() string

func (*Task) Unmarshal

func (m *Task) Unmarshal(dAtA []byte) error

func (*Task) XXX_DiscardUnknown

func (m *Task) XXX_DiscardUnknown()

func (*Task) XXX_Marshal

func (m *Task) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Task) XXX_Merge

func (m *Task) XXX_Merge(src proto.Message)

func (*Task) XXX_Size

func (m *Task) XXX_Size() int

func (*Task) XXX_Unmarshal

func (m *Task) XXX_Unmarshal(b []byte) error

type TestTask

type TestTask struct {
	ID                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func (*TestTask) Descriptor

func (*TestTask) Descriptor() ([]byte, []int)

func (*TestTask) GetID

func (m *TestTask) GetID() string

func (*TestTask) Marshal

func (m *TestTask) Marshal() (dAtA []byte, err error)

func (*TestTask) MarshalLogObject

func (x *TestTask) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*TestTask) MarshalTo

func (m *TestTask) MarshalTo(dAtA []byte) (int, error)

func (*TestTask) MarshalToSizedBuffer

func (m *TestTask) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*TestTask) ProtoMessage

func (*TestTask) ProtoMessage()

func (*TestTask) Reset

func (m *TestTask) Reset()

func (*TestTask) Size

func (m *TestTask) Size() (n int)

func (*TestTask) String

func (m *TestTask) String() string

func (*TestTask) Unmarshal

func (m *TestTask) Unmarshal(dAtA []byte) error

func (*TestTask) XXX_DiscardUnknown

func (m *TestTask) XXX_DiscardUnknown()

func (*TestTask) XXX_Marshal

func (m *TestTask) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*TestTask) XXX_Merge

func (m *TestTask) XXX_Merge(src proto.Message)

func (*TestTask) XXX_Size

func (m *TestTask) XXX_Size() int

func (*TestTask) XXX_Unmarshal

func (m *TestTask) XXX_Unmarshal(b []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL