middlewares

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2021 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var Logger = taskset.Middleware{
	Run: run,
}

Logger is a basic logging middleware. It will log using log.Println when each task is started and finished.

Functions

func NewConcurrencyLimiter

func NewConcurrencyLimiter(lock sync.Locker) taskset.Middleware

NewConcurrencyLimiter creates a middleware used to limit a task set's concurrency. Concurrency limiter will lock and unlock the provided lock before and after each task is run, releasing the lock while the task is in the process of depending on another task.

If lock is nil, concurrency limiter will instead use a lock provided for each task separately using WithLock. If no lock was provided for a task, nothing is locked for that task.

If you want to run all tasks sequentially, use &sync.Mutex{}. If you want to limit the number of parallel tasks, use NewSemaphore. If you want a subset of tasks to be mutually exclusive, use WithLock.

Example (MutualExclusion)
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/bennydictor/taskset"
	"github.com/bennydictor/taskset/middlewares"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet(
		middlewares.NewConcurrencyLimiter(nil),
	)

	// A and B can't run concurrently
	var abMu sync.Mutex

	// C and D can't run concurrently
	var cdMu sync.Mutex

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 1, nil
	},
		middlewares.WithLock(&abMu),
	)

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 2, nil
	},
		middlewares.WithLock(&abMu),
	)

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 3, nil
	},
		middlewares.WithLock(&cdMu),
	)

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 4, nil
	},
		middlewares.WithLock(&cdMu),
	)

	start := time.Now()
	taskSet.Start(ctx)
	taskSet.Wait(ctx)
	totalTime := time.Since(start)

	fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
}
Output:

total time: 4s
Example (Semaphore)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bennydictor/taskset"
	"github.com/bennydictor/taskset/middlewares"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet(
		middlewares.NewConcurrencyLimiter(middlewares.NewSemaphore(2)),
	)

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 1, nil
	})

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 2, nil
	})

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 3, nil
	})

	start := time.Now()
	taskSet.Start(ctx)
	taskSet.Wait(ctx)
	totalTime := time.Since(start)

	fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
}
Output:

total time: 4s
Example (Sequential)
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/bennydictor/taskset"
	"github.com/bennydictor/taskset/middlewares"
)

func main() {
	ctx := context.Background()

	taskSet := taskset.NewTaskSet(
		middlewares.NewConcurrencyLimiter(&sync.Mutex{}),
	)

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 1, nil
	})

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 2, nil
	})

	taskSet.New(func(ctx context.Context, depend taskset.Depend) (interface{}, error) {
		time.Sleep(2 * time.Second)
		return 3, nil
	})

	start := time.Now()
	taskSet.Start(ctx)
	taskSet.Wait(ctx)
	totalTime := time.Since(start)

	fmt.Printf("total time: %.0fs\n", totalTime.Seconds())
}
Output:

total time: 6s

func NewRecover

func NewRecover(handle func(interface{}) taskset.Result) taskset.Middleware

NewRecover creates a middleware for handling panics. If a task panics, it will be recovered, and its value be given to handle(). handle() returns a result that becomes the result of the task.

func NewSemaphore

func NewSemaphore(n int64) sync.Locker

NewSemaphore returns a sync.Locker that can be locked up to n times concurrently.

func WithLock

func WithLock(lock sync.Locker) taskset.Property

WithLock adds a lock to a task to be used with concurrency limiter.

Types

type DependGraphviz

type DependGraphviz struct {
	sync.Mutex
	// contains filtered or unexported fields
}

DependGraphviz provides a middleware that records all dependency declarations by all tasks, and makes this information available as a graphviz source file.

func NewDependGraphviz

func NewDependGraphviz() *DependGraphviz

NewDependGraphviz creates a new DependGraphviz.

func (*DependGraphviz) Middleware

func (d *DependGraphviz) Middleware() taskset.Middleware

Middleware provides the taskset.Middleware.

func (*DependGraphviz) String

func (d *DependGraphviz) String() string

String returns the generated graphviz source file as a string.

func (*DependGraphviz) Write

func (d *DependGraphviz) Write(w io.Writer) error

Write writes the generated graphviz source file to an io.Writer.

Directories

Path Synopsis
opentracing module
prometheus module
zap module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL