job

package module
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2021 License: MIT Imports: 5 Imported by: 7

README

Introduction

Go has lots of synchronization mechanisms ranging from language primitives such as mutexes, channels, atomic functions, and to a more complicated components such as content.Context. Unfortunately, using them to solve problems arising in day-to-day programming might be quite challenging, especially for novices.

The goal of the Job design pattern and this implementation is to provide an easy-to-use and solid foundation for solving problems involving concurrent executions and their control. The Job pattern in many cases can be viewed as an alternative to content.Context, though it's not meant to completely replace it.

Documentation

Why?

I know, this is probably the most import question a skeptical programmer might ask. "Why should I learn how to use it, when we already have content.Context, your solution looks like an over-engineered one". Well, because I have a weakness for Gimp, allow me to retort in a sarcastic way to such criticism: Balancer Schema But seriously, simplicity in software engineering is often a reciprocal for productivity. Never underestimate the benefits of going one abstraction level up.

What is a Job?

A job is a set of concurrently running tasks, execution of which depends on each other. If one task fails, the whole job execution fails too.

    myjob := job.NewJob(nil)
    myjob.AddTask(task1)
    myjob.AddTask(task2)
    <-myjob.Run()
    // Let's process the result
What is a Task?

A single task consists of the three routines: an initialization routine, a recurrent routine, and a finalization routine:

func (stream *stream) ReadOnStreamTask(j job.Job) (job.Init, job.Run, job.Finalize) {
    init := func(task job.Task){
        // Do some initialization
    }
    run := func(task job.Task) {
        read(stream, task)
        task.Tick()
    }
    fin := func(task job.Task) {
        readCancel(stream, task)
    }
    return init, run, fin
}

The recurrent routine is running in an indefinite loop. It represents well-known for { select { } } statements in Go. The recurrent routine calls three special methods:

  • .Tick() - to proceed task execution.
  • .Done() - to finish task execution (or .FinishJob() to finish job execution as well).
  • .Idle() - to tell that a task has nothing to do, and as a result it might be finished by reaching the idle timeout.

Tasks can assert some conditions, replacing if err != nil { panic(err) } by a more terse way:

func (m *MyTask) MyTask(j job.JobInterface) (job.Init, job.Run, job.Finalize) {
    run := func(task *job.TaskInfo) {
        _, err := os.Open("badfile")
        task.Assert(err)
    }
}

Every failed assertion will result in the cancellation of job execution, and invocation of the finalization routines of all tasks of the job being cancelled.

There are two types of tasks: an ordinary task (or recurrent), and an oneshot task. The main purpose of a oneshot task is to start off execution of other recurrent tasks waiting for the oneshot to be finished. You probably heard of such an approach in software design as Contract by design. Here we have similar approach: in the example below recurrent tasks make an assumption that they will be running when a network connection is established and the goal of the oneshot task (mngr.ConnectTask) is to fulfill that precondition, providing reference to the connection in Job value.

    mainJob := j.NewJob(nil)
    mainJob.AddOneshotTask(mngr.ConnectTask)
    mainJob.AddTask(netmanager.ReadTask)
    mainJob.AddTask(netmanager.WriteTask)
    mainJob.AddTask(imgResizer.ScanForImagesTask)
    mainJob.AddTask(imgResizer.SaveResizedImageTask)
    <-mainJob.Run()

A job can have only one oneshot task.

For data sharing tasks should employ (although it's not an obligation) a ping/pong synchronization using two channels, where the first one is being used to receive data and the second one - to notify the sender that data processing is completed.

    run := func(task job.Task) {
        select {
        case data := <- p.conn.Upstream().RecvRaw(): // Receive data from upstream server
            p.conn.Downstream().Write() <- data // Write data to downstream server
            p.conn.Downstream().WriteSync() // sync with downstream data receiver
            p.conn.Upstream().RecvRawSync() // sync with upstream data sender
        }
        task.Tick()
    }

To prevent a task from being blocked for good, be sure to close all channels it's using in its finalization routine.

Real life example

Now, when you have a basic understanding, let's put the given pattern to use and take a look at a real life example: the implementation of a reverse proxy working as layer 4 load balancer, a backend server resizing images, and a simple client that would scan a specified directory for images and send them through the proxy server for resizing. The code will speak for itself, so that you will quickly get the idea of how to use the given pattern.

API reference

Public functions
  • NewJob(value interface{}) *job - creates a new job with the given job value.
  • RegisterDefaultLogger(logger Logger) - registers a fallback logger for all jobs.
Job
Task
Footnotes:
  1. Being called by the recurrent routine.

Documentation

Index

Constants

View Source
const (
	New jobState = iota
	WaitingForPrereq
	OneshotRunning
	RecurrentRunning
	Finalizing
	Cancelled
	Done
)
View Source
const (
	PendingTask taskState = iota
	RunningTask
	FailedTask
	FinishedTask
)
View Source
const (
	Oneshot taskType = iota
	Recurrent
)

Variables

View Source
var (
	DefaultLogLevel    int
	NotifySig          = struct{}{}
	ErrTaskIdleTimeout = errors.New("go-work: task idling timed out")
	ErrAssertZeroValue = errors.New("go-work.Assert: zero value")
	ErrJobExecTimeout  = errors.New("go-work: job execution timed out")
)

Functions

func NewJob

func NewJob(value interface{}) *job

func RegisterDefaultLogger

func RegisterDefaultLogger(logger Logger)

Types

type Finalize

type Finalize func(Task)

type Init

type Init func(Task)

Task main routines

type Job

type Job interface {
	AddTask(job JobTask) *task
	GetTaskByIndex(index int) *task
	AddOneshotTask(job JobTask)
	AddTaskWithIdleTimeout(job JobTask, timeout time.Duration) *task
	WithPrerequisites(sigs ...<-chan struct{})
	WithTimeout(duration time.Duration)
	Run() chan struct{}
	RunInBackground() <-chan struct{}
	Cancel(err interface{})
	Finish()
	Log(level int) chan<- interface{}
	RegisterLogger(logger Logger)
	GetValue() interface{}
	SetValue(v interface{})
	GetState() jobState
	GetInterruptedBy() (*task, interface{})
	TaskDoneNotify() <-chan *task
	JobDoneNotify() chan struct{}
}

type JobTask

type JobTask func(j Job) (Init, Run, Finalize)

type LogLevelMap

type LogLevelMap map[int]*LogLevelMapItem

type LogLevelMapItem

type LogLevelMapItem struct {
	// contains filtered or unexported fields
}

func NewLogLevelMapItem

func NewLogLevelMapItem(ch chan interface{}, handler LogRecordHandler) *LogLevelMapItem

type LogRecordHandler

type LogRecordHandler func(entry interface{}, level int)

type Logger

type Logger func() LogLevelMap

type OneshotTask

type OneshotTask JobTask

type Run

type Run func(Task)

type Task

type Task interface {
	GetIndex() int
	GetJob() Job
	GetState() taskState
	GetResult() interface{}
	SetResult(result interface{})
	Tick()
	Done()
	Idle()
	FinishJob()
	Assert(err interface{})
	AssertTrue(cond bool, err string)
	AssertNotNil(value interface{})
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL