Documentation ¶
Index ¶
Constants ¶
View Source
const ( TaskSuccess = "success" TaskFail = "fail" RetryGap = 10 * time.Millisecond )
View Source
const (
RatePeriod = 100 * time.Millisecond
)
Variables ¶
View Source
var ( SendMsgTimeout = errors.New("failed to send message") GetMsgTimeout = errors.New("failed to get message") )
View Source
var (
ErrInvalidArgs = errors.New("failed to exec task, due to invalid args")
)
View Source
var (
ErrResultNotFound = errors.New("failed to find result")
)
Functions ¶
func GetRealValue ¶
Types ¶
type BrokerMessage ¶
func (*BrokerMessage) Decode ¶
func (bm *BrokerMessage) Decode() (*TaskMessage, error)
Decode return taskMessage
type ResultMessage ¶
type TaskBackend ¶
type TaskBackend interface { GetResult(taskId string) (*ResultMessage, error) SetResult(taskID string, result *ResultMessage) error }
func NewMapBackend ¶
func NewMapBackend() TaskBackend
type TaskBroker ¶
type TaskBroker interface { SendMessage(*BrokerMessage) error GetMessage() (*BrokerMessage, error) io.Closer }
func NewChannelBroker ¶
func NewChannelBroker(cache int) TaskBroker
type TaskMessage ¶
type TaskMessage struct { ID string `json:"id"` Name string `json:"task"` Args []interface{} `json:"args"` Kwargs map[string]interface{} `json:"kwargs"` Retries int `json:"retries"` Expires *time.Time `json:"expires"` }
func (*TaskMessage) Encode ¶
func (tm *TaskMessage) Encode() (string, error)
Encode returns base64 json encoded string
type TaskProducer ¶
type TaskProducer interface { AddTask(name string, args ...interface{}) (*TaskResult, error) AddTaskWithKey(name string, args map[string]interface{}) (*TaskResult, error) }
func NewTaskProducer ¶
func NewTaskProducer(broker TaskBroker, backend TaskBackend) TaskProducer
type TaskResult ¶
type TaskResult struct { ID string // contains filtered or unexported fields }
func (*TaskResult) AsyncGet ¶
func (tr *TaskResult) AsyncGet() (*ResultMessage, error)
AsyncGet result
func (*TaskResult) Get ¶
func (tr *TaskResult) Get(timeout time.Duration) (*ResultMessage, error)
Get result synchronize
type TaskWorker ¶
type TaskWorker interface { StartWorker(ctx context.Context) StopWorker() Register(name string, task interface{}) }
func NewTaskWorker ¶
func NewTaskWorker(broker TaskBroker, backend TaskBackend) TaskWorker
Click to show internal directories.
Click to hide internal directories.