taskmanager

package module
v0.0.0-...-de5ebdd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2021 License: BSD-3-Clause Imports: 7 Imported by: 9

README

taskmanager Go Go Reference Coverage Status

Async task manager. Tasks can easily be customized and executed asynchronously on the next available worker.

The manager keeps workers ready to multiplex tasks. The maximum no. of workers can be configured.

This package was mainly created to abstract all async functionality from the app. It provides a consistent context interface to manage routine lifecycle from a single place.

Install

taskmanager works like a regular Go module:

> go get github.com/plexsysio/taskmanager

Usage

import "github.com/plexsysio/taskmanager"

type exampleTask struct {}

func (e *exampleTask) Name() string {
   return "exampleTask"
}

func (e *exampleTask) Execute(ctx context.Context) error {
   for {
      select {
      case <-ctx.Done():
         // taskmanager stopped
         return nil
      default:
        // Do work. For long running tasks use ctx or move to next iteration
      }
   }
}

func main() {

   tm := taskmanager.New(1, 100, time.Second*15)
   t := &exampleTask{}

   sched, err := tm.Go(t)
   if err != nil {
      fmt.Println(err)
      return
   }

   // Task scheduled
   <-sched
   

Closures can also be scheduled

   fSched, err := tm.GoFunc(func(ctx context.Context) error {
      for {
         select {
         case <-ctx.Done():
            //taskmanager stopped
            return nil
         default:
            // Do work
         }
      }
   })
   if err != nil {
      fmt.Println(err)
      return
   }

   // Stop will wait for all routines to stop. Context can be passed here to
   // ensure timeout in Stop
   ctx, _ := context.WithTimeout(time.Second)
   err = tm.Stop(ctx)
   if err != nil {
      fmt.Printf("failed stopping %s\n", err.Error())
   }
}

Documentation

Overview

Package taskmanager implements an async task manager. Structured tasks can be executed asynchronously on pre-provisioned workers. Alternatively functions could also be enqueued. Taskmanager can be stopped and all the workers/functions will receive a `Done` signal on the context passed. This needs to be handled in the task logic in order to clean up properly.

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyExists = errors.New("task with same name already exists")

Functions

This section is empty.

Types

type Logger

type Logger interface {
	Infof(format string, args ...interface{})
	Info(args ...interface{})
	Errorf(format string, args ...interface{})
	Error(args ...interface{})
}

type RestartableTask

type RestartableTask interface {
	Task

	Restart(context.Context, error) bool
}

RestartableTask interface can be implemented to restart tasks on failures. The error passed to Restart is the error with which the task failed previously. Appropriate handling can be done based on that. If restart returns true, the task is enqueued with the manager again.

type Task

type Task interface {
	Execute(context.Context) error
	Name() string
}

Task defines the interface to be implemented by users to enqueue tasks to the TaskManager

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager implements the public API for taskmanager

func New

func New(minCount, maxCount int, timeout time.Duration, log Logger) *TaskManager

New creates a new taskmanager instance. minCount determines the minimum no of workers and maxCount determines the maximum worker count. timeout is used to determine when workers will be timed out on being idle

func (*TaskManager) Go

func (m *TaskManager) Go(newTask Task) (<-chan struct{}, error)

Go enqueues the task to taskmanager. The function returns an error if we try to enqueue a task with the same name. The channel returned is closed when the task is actually assigned a worker.

func (*TaskManager) GoFunc

func (m *TaskManager) GoFunc(key string, closure func(ctx context.Context) error) (<-chan struct{}, error)

GoFunc is used to enqueue a closure to the taskmanager. key should be unique for each closure. If another closure with the same key is enqueued, we get an error.

func (*TaskManager) Status

func (m *TaskManager) Status() (res map[int32]WorkerInfo)

Status is used to obtain the status of workers in taskmanager

func (*TaskManager) Stop

func (m *TaskManager) Stop(ctx context.Context) error

Stop is used to stop all running routines

func (*TaskManager) TaskStatus

func (m *TaskManager) TaskStatus() (res map[string]TaskStatus)

TaskStatus is used to obtain status of tasks enqueued

type TaskStatus

type TaskStatus struct {
	Name        string
	Worker      int32
	Status      WorkerStatus
	Description string
	Progress    float64
	Restarts    int
	// contains filtered or unexported fields
}

TaskStatus is the status saved for the task. This status can be accessed using the TaskStatus() function

type TaskWithProgress

type TaskWithProgress interface {
	Task

	Progress() (float64, error)
	Description() string
}

TaskWithProgress defines additional functions to be implemented in order to query progress using the built-in taskmanager status

type WorkerInfo

type WorkerInfo struct {
	TaskName string
	Status   WorkerStatus
}

WorkerInfo shows information on the worker

type WorkerStatus

type WorkerStatus string

WorkerStatus is helper type for restricting the string values of worker status to known constants

const (
	NotAssigned WorkerStatus = "not assigned"
	Waiting     WorkerStatus = "waiting"
	Running     WorkerStatus = "running"
	Restarted   WorkerStatus = "restarted"
)

func (WorkerStatus) String

func (w WorkerStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL