task

package
v2.2.4-0...-0ca215b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskSuccess = "success"
	TaskFail    = "fail"

	RetryGap = 10 * time.Millisecond
)
View Source
const (
	RatePeriod = 100 * time.Millisecond
)

Variables

View Source
var (
	SendMsgTimeout = errors.New("failed to send message")
	GetMsgTimeout  = errors.New("failed to get message")
)
View Source
var (
	ErrInvalidArgs = errors.New("failed to exec task, due to invalid args")
)
View Source
var (
	ErrResultNotFound = errors.New("failed to find result")
)

Functions

func GetRealValue

func GetRealValue(val *reflect.Value) interface{}

Types

type AsyncTask

type AsyncTask interface {
	// ParseKwargs - define a method to parse kwargs
	ParseKwargs(map[string]interface{}) error

	// RunTask - define a method for execution
	RunTask() (interface{}, error)
}

type BrokerMessage

type BrokerMessage struct {
	ID    string `json:"id"`
	Value string `json:"value"`
}

func (*BrokerMessage) Decode

func (bm *BrokerMessage) Decode() (*TaskMessage, error)

Decode return taskMessage

type ResultMessage

type ResultMessage struct {
	ID        string      `json:"id"`
	Status    string      `json:"status"`
	Traceback string      `json:"traceback"`
	Result    interface{} `json:"result"`
}

type TaskBackend

type TaskBackend interface {
	GetResult(taskId string) (*ResultMessage, error)
	SetResult(taskID string, result *ResultMessage) error
}

func NewMapBackend

func NewMapBackend() TaskBackend

type TaskBroker

type TaskBroker interface {
	SendMessage(*BrokerMessage) error
	GetMessage() (*BrokerMessage, error)
	io.Closer
}

func NewChannelBroker

func NewChannelBroker(cache int) TaskBroker

type TaskMessage

type TaskMessage struct {
	ID      string                 `json:"id"`
	Name    string                 `json:"task"`
	Args    []interface{}          `json:"args"`
	Kwargs  map[string]interface{} `json:"kwargs"`
	Retries int                    `json:"retries"`
	Expires *time.Time             `json:"expires"`
}

func (*TaskMessage) Encode

func (tm *TaskMessage) Encode() (string, error)

Encode returns base64 json encoded string

type TaskProducer

type TaskProducer interface {
	AddTask(name string, args ...interface{}) (*TaskResult, error)
	AddTaskWithKey(name string, args map[string]interface{}) (*TaskResult, error)
}

func NewTaskProducer

func NewTaskProducer(broker TaskBroker, backend TaskBackend) TaskProducer

type TaskResult

type TaskResult struct {
	ID string
	// contains filtered or unexported fields
}

func (*TaskResult) AsyncGet

func (tr *TaskResult) AsyncGet() (*ResultMessage, error)

AsyncGet result

func (*TaskResult) Get

func (tr *TaskResult) Get(timeout time.Duration) (*ResultMessage, error)

Get result synchronize

type TaskWorker

type TaskWorker interface {
	StartWorker(ctx context.Context)
	StopWorker()
	Register(name string, task interface{})
}

func NewTaskWorker

func NewTaskWorker(broker TaskBroker, backend TaskBackend) TaskWorker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL