Documentation
¶
Index ¶
- func CancelAll(tasks []Task)
- func WaitAll(tasks []Task)
- type Batch
- type PartitionFunc
- type Partitioner
- type State
- type Task
- func Consume(ctx context.Context, concurrency int, tasks chan Task) Task
- func ForkJoin(ctx context.Context, tasks []Task) Task
- func Invoke(ctx context.Context, action Work) Task
- func InvokeAll(ctx context.Context, concurrency int, tasks []Task) Task
- func NewTask(action Work) Task
- func NewTasks(actions ...Work) []Task
- func Repeat(ctx context.Context, interval time.Duration, action Work) Task
- func Spread(ctx context.Context, within time.Duration, tasks []Task) Task
- func Throttle(ctx context.Context, tasks []Task, rateLimit int, every time.Duration) Task
- type Work
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Batch ¶
Batch represents a batch where one can append to the batch and process it as a whole.
Example ¶
var wg sync.WaitGroup
wg.Add(2)
r := NewBatch(context.Background(), func(input []interface{}) []interface{} {
fmt.Println(input)
return input
})
r.Append(1).ContinueWith(context.TODO(), func(result interface{}, err error) (interface{}, error) {
wg.Done()
return nil, nil
})
r.Append(2).ContinueWith(context.TODO(), func(result interface{}, err error) (interface{}, error) {
wg.Done()
return nil, nil
})
r.Reduce()
wg.Wait()
Output: [1 2]
type PartitionFunc ¶
PartitionFunc takes in data and outputs key if ok is false, the data doesn't fall into and partition
type Partitioner ¶
type Partitioner interface {
// Append items to the queue which is pending partition
Append(items interface{}) Task
// Partition items and output the result
Partition() map[string][]interface{}
}
Partitioner partitions events
Example ¶
partitionFunc := func(data interface{}) (string, bool) {
xevent, ok := data.(map[string]string)
if !ok {
return "", false
}
key, ok := xevent["pre"]
return key, ok
}
p := NewPartitioner(context.Background(), partitionFunc)
input := []interface{}{
map[string]string{"pre": "a", "val": "val1"},
map[string]string{"pre": "b", "val": "val2"},
map[string]string{"pre": "a", "val": "val4"},
map[string]string{"pre": "c", "val": "val5"},
}
t := p.Append(input)
_, _ = t.Outcome()
res := p.Partition()
first := res["a"][0].(map[string]string)
fmt.Println(first["pre"])
fmt.Println(first["val"])
Output: a val1
func NewPartitioner ¶
func NewPartitioner(ctx context.Context, partition PartitionFunc) Partitioner
NewPartitioner creates a new partitioner
type State ¶
type State byte
State represents the state enumeration for a task.
const ( IsCreated State = iota // IsCreated represents a newly created task IsRunning // IsRunning represents a task which is currently running IsCompleted // IsCompleted represents a task which was completed successfully or errored out IsCancelled // IsCancelled represents a task which was cancelled or has timed out )
Various task states
type Task ¶
type Task interface {
Run(ctx context.Context) Task
Cancel()
State() State
Outcome() (interface{}, error)
ContinueWith(ctx context.Context, nextAction func(interface{}, error) (interface{}, error)) Task
}
Task represents a unit of work to be done
func ForkJoin ¶
ForkJoin executes input task in parallel and waits for ALL outcomes before returning.
Example ¶
first := NewTask(func(context.Context) (interface{}, error) {
return 1, nil
})
second := NewTask(func(context.Context) (interface{}, error) {
return nil, errors.New("some error")
})
ForkJoin(context.Background(), []Task{first, second})
fmt.Println(first.Outcome())
fmt.Println(second.Outcome())
Output: 1 <nil> <nil> some error
func InvokeAll ¶
InvokeAll runs the tasks with a specific max concurrency
Example ¶
resChan := make(chan int, 6)
works := make([]Work, 6, 6)
for i := range works {
j := i
works[j] = func(context.Context) (interface{}, error) {
fmt.Println(j / 2)
time.Sleep(time.Millisecond * 10)
return nil, nil
}
}
tasks := NewTasks(works...)
InvokeAll(context.Background(), 2, tasks)
WaitAll(tasks)
close(resChan)
res := []int{}
for r := range resChan {
res = append(res, r)
}
Output: 0 0 1 1 2 2
func Repeat ¶
Repeat performs an action asynchronously on a predetermined interval.
Example ¶
out := make(chan bool, 1)
task := Repeat(context.TODO(), time.Nanosecond*10, func(context.Context) (interface{}, error) {
out <- true
return nil, nil
})
<-out
v := <-out
fmt.Println(v)
task.Cancel()
Output: true
func Spread ¶
Spread evenly spreads the work within the specified duration.
Example ¶
tasks := newTasks()
within := 200 * time.Millisecond
// Spread
task := Spread(context.Background(), within, tasks)
_, _ = task.Outcome() // Wait
// Make sure all tasks are done
for _, task := range tasks {
v, _ := task.Outcome()
fmt.Println(v)
}
Output: 1 1 1 1 1
Source Files
¶
Click to show internal directories.
Click to hide internal directories.