Documentation ¶
Index ¶
- Variables
- func Count(ctx context.Context, service Service, namespace, group string) (tasks int64, claims int64, retErr error)
- func DoBatch(ctx context.Context, doer Doer, inputs []*types.Any, cb CollectFunc) error
- func DoOne(ctx context.Context, doer Doer, input *types.Any) (*types.Any, error)
- func DoOrdered(ctx context.Context, doer Doer, inputs chan *types.Any, parallelism int, ...) error
- func List(ctx context.Context, svc Service, req *taskapi.ListTaskRequest, ...) error
- type Cache
- type Claim
- func (*Claim) Descriptor() ([]byte, []int)
- func (m *Claim) Marshal() (dAtA []byte, err error)
- func (x *Claim) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Claim) MarshalTo(dAtA []byte) (int, error)
- func (m *Claim) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Claim) ProtoMessage()
- func (m *Claim) Reset()
- func (m *Claim) Size() (n int)
- func (m *Claim) String() string
- func (m *Claim) Unmarshal(dAtA []byte) error
- func (m *Claim) XXX_DiscardUnknown()
- func (m *Claim) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Claim) XXX_Merge(src proto.Message)
- func (m *Claim) XXX_Size() int
- func (m *Claim) XXX_Unmarshal(b []byte) error
- type CollectFunc
- type Doer
- type Group
- func (*Group) Descriptor() ([]byte, []int)
- func (m *Group) Marshal() (dAtA []byte, err error)
- func (x *Group) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Group) MarshalTo(dAtA []byte) (int, error)
- func (m *Group) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Group) ProtoMessage()
- func (m *Group) Reset()
- func (m *Group) Size() (n int)
- func (m *Group) String() string
- func (m *Group) Unmarshal(dAtA []byte) error
- func (m *Group) XXX_DiscardUnknown()
- func (m *Group) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Group) XXX_Merge(src proto.Message)
- func (m *Group) XXX_Size() int
- func (m *Group) XXX_Unmarshal(b []byte) error
- type ProcessFunc
- type Service
- type Source
- type State
- type Task
- func (*Task) Descriptor() ([]byte, []int)
- func (m *Task) GetID() string
- func (m *Task) GetIndex() int64
- func (m *Task) GetInput() *types.Any
- func (m *Task) GetOutput() *types.Any
- func (m *Task) GetReason() string
- func (m *Task) GetState() State
- func (m *Task) Marshal() (dAtA []byte, err error)
- func (x *Task) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *Task) MarshalTo(dAtA []byte) (int, error)
- func (m *Task) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*Task) ProtoMessage()
- func (m *Task) Reset()
- func (m *Task) Size() (n int)
- func (m *Task) String() string
- func (m *Task) Unmarshal(dAtA []byte) error
- func (m *Task) XXX_DiscardUnknown()
- func (m *Task) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Task) XXX_Merge(src proto.Message)
- func (m *Task) XXX_Size() int
- func (m *Task) XXX_Unmarshal(b []byte) error
- type TestTask
- func (*TestTask) Descriptor() ([]byte, []int)
- func (m *TestTask) GetID() string
- func (m *TestTask) Marshal() (dAtA []byte, err error)
- func (x *TestTask) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (m *TestTask) MarshalTo(dAtA []byte) (int, error)
- func (m *TestTask) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*TestTask) ProtoMessage()
- func (m *TestTask) Reset()
- func (m *TestTask) Size() (n int)
- func (m *TestTask) String() string
- func (m *TestTask) Unmarshal(dAtA []byte) error
- func (m *TestTask) XXX_DiscardUnknown()
- func (m *TestTask) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TestTask) XXX_Merge(src proto.Message)
- func (m *TestTask) XXX_Size() int
- func (m *TestTask) XXX_Unmarshal(b []byte) error
Constants ¶
This section is empty.
Variables ¶
var ( ErrInvalidLengthTask = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowTask = fmt.Errorf("proto: integer overflow") ErrUnexpectedEndOfGroupTask = fmt.Errorf("proto: unexpected end of group") )
var State_name = map[int32]string{
0: "STATE_UNKNOWN",
1: "RUNNING",
2: "SUCCESS",
3: "FAILURE",
}
var State_value = map[string]int32{
"STATE_UNKNOWN": 0,
"RUNNING": 1,
"SUCCESS": 2,
"FAILURE": 3,
}
Functions ¶
func Count ¶
func Count(ctx context.Context, service Service, namespace, group string) (tasks int64, claims int64, retErr error)
Count returns the number of tasks and claims in the given namespace and group (if nonempty)
func DoOne ¶
DoOne executes one task. NOTE: This interface is much less performant than the stream / batch interfaces for many tasks. Only use this interface for development / a small number of tasks.
Types ¶
type Claim ¶
type Claim struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Claim) Descriptor ¶
func (*Claim) MarshalLogObject ¶
func (x *Claim) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Claim) ProtoMessage ¶
func (*Claim) ProtoMessage()
func (*Claim) XXX_DiscardUnknown ¶
func (m *Claim) XXX_DiscardUnknown()
func (*Claim) XXX_Marshal ¶
func (*Claim) XXX_Unmarshal ¶
type CollectFunc ¶
CollectFunc is the type of a function that is used for collecting the output of a stream / batch of tasks. Index is the index of a task with respect to the order in which the task was created in the stream / batch.
type Doer ¶
type Doer interface { // Do creates and returns the results of processing a stream of tasks provided // by the input channel. The client should close the input channel when all tasks have // been sent (it does not need to be closed if the context is canceled). For each // task, the collect function will be called with the results. Do(ctx context.Context, inputChan chan *types.Any, cb CollectFunc) error }
Doer is a doer of tasks. Refer to the DoOne and DoBatch helper functions if a simpler interface is desired.
type Group ¶
type Group struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Group) Descriptor ¶
func (*Group) MarshalLogObject ¶
func (x *Group) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Group) ProtoMessage ¶
func (*Group) ProtoMessage()
func (*Group) XXX_DiscardUnknown ¶
func (m *Group) XXX_DiscardUnknown()
func (*Group) XXX_Marshal ¶
func (*Group) XXX_Unmarshal ¶
type ProcessFunc ¶
ProcessFunc is the type of a function that is use for processing a task. If an error occurs, then it should be returned. This error will be propagated back to the Doer that created the task.
type Service ¶
type Service interface { // NewDoer creates a Doer with the provided namespace and group. NewDoer(namespace, group string, cache Cache) Doer // NewSource creates a Source with the provided namespace. NewSource(namespace string) Source // List calls a function on every task under a namespace and group List(ctx context.Context, namespace, group string, cb func(namespace, group string, data *Task, claimed bool) error) error }
Scheduling: A task managed by a Service has a group. The group is used for scheduling purposes. Scheduling is based on maximizing fairness for the groups, and the schedulable unit is a task.
type Source ¶
type Source interface { // Iterate iterates through tasks until the provided context is canceled. // For each task, the process function will be called and the results // will be returned to the Doer that created the task. Iterate(ctx context.Context, cb ProcessFunc) error }
Source is a source of tasks.
type Task ¶
type Task struct { ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` State State `protobuf:"varint,2,opt,name=state,proto3,enum=task.State" json:"state,omitempty"` Input *types.Any `protobuf:"bytes,3,opt,name=input,proto3" json:"input,omitempty"` Output *types.Any `protobuf:"bytes,4,opt,name=output,proto3" json:"output,omitempty"` Reason string `protobuf:"bytes,5,opt,name=reason,proto3" json:"reason,omitempty"` Index int64 `protobuf:"varint,6,opt,name=index,proto3" json:"index,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
TODO: Consider splitting this up into separate structures for each state in a oneof.
func (*Task) Descriptor ¶
func (*Task) MarshalLogObject ¶
func (x *Task) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Task) ProtoMessage ¶
func (*Task) ProtoMessage()
func (*Task) XXX_DiscardUnknown ¶
func (m *Task) XXX_DiscardUnknown()
func (*Task) XXX_Unmarshal ¶
type TestTask ¶
type TestTask struct { ID string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*TestTask) Descriptor ¶
func (*TestTask) MarshalLogObject ¶
func (x *TestTask) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*TestTask) MarshalToSizedBuffer ¶
func (*TestTask) ProtoMessage ¶
func (*TestTask) ProtoMessage()
func (*TestTask) XXX_DiscardUnknown ¶
func (m *TestTask) XXX_DiscardUnknown()