parallelism

package
v1.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2023 License: Apache-2.0 Imports: 4 Imported by: 1

README

Dispatcher

The dispatcher is a worker pattern that can be used when you want to run n things in parallel, with a blocking submit. Calls to Submit() block if all workers are busy, this is intentional and helps to avoid imbalanced load. For example imagine a scenario where you have two dispatchers (read, and write). One has n workers reading from an API (read) which then submits work the the other dispatcher (write) which has n workers writing to a database. If the read is much faster, you could have a scenario where read runs unbounded and reads far too much data into memory, resulting in a crash. In this scenario, since Submit() blocks, the read dispatcher will block until the write dispatcher has available workers, which prevents the faster read side from running unbounded.

Usage

Implement the WorkHandler and Job interfaces on your own structs that have any data you need, and pass your handler struct when creating a new dispatcher via NewDispatcher()

Example Dispatcher usage

The below example will generate 15 jobs and submit them to the dispatcher with a parallelism of 5. You'll see 5 jobs running at once, then the next 5 when those are done, etc. The sleep is there to demonstrate that the jobs don't run until there are free workers to run them. The log "queued work" is there to demonstrate that the call to submit does indeed block as we expect. You can copy and paste this into https://go.dev/play/ to experiment.

package main

import (
	"fmt"
	"time"

	"github.com/brianvoe/gofakeit/v6"
	"github.com/catalystsquad/app-utils-go/logging"
	"github.com/catalystsquad/app-utils-go/parallelism"
)

// MyHandler imlements the WorkHandler interface. It gets the job data, prints the phrase, and then sleeps.
type MyHandler struct{}

// The Implementation
func (h MyHandler) HandleJob(job parallelism.Job) {
	// assert type to my job data struct type
	data := job.GetData().(MyJobData)
	// do work
	fmt.Println(data.Phrase)
	// sleep to show parallel blocking work
	time.Sleep(2 * time.Second)
}

// MyJob implements the Job interface and has a custom struct for the data my job needs to run, in this case `MyJobData` which has a single string field
type MyJob struct {
	JobData MyJobData
}

// The implementation returns the job data
func (j MyJob) GetData() interface{} {
	return j.JobData
}

// MyJobData is a custom struct for job data
type MyJobData struct {
	Phrase string
}

func main() {
	// generate jobs
	phrases := []string{}
	for i := 0; i < 15; i++ {
		phrases = append(phrases, gofakeit.HackerPhrase())
	}
	// instantiate my handler
	handler := MyHandler{}
	// instantiate and start the dispatcher
	inParallel := 5
	dispatcher := parallelism.NewDispatcher(inParallel, handler).Start()
	// queue work
	for _, phrase := range phrases {
		dispatcher.Submit(MyJob{JobData: MyJobData{Phrase: phrase}})
		logging.Log.Info("queued work")
	}
	// wait for work to complete
	dispatcher.Wait()
	logging.Log.Info("work complete")
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ExampleRunInParallel

func ExampleRunInParallel()

func RunInParallel

func RunInParallel(num, parallelism int, theFunc interface{})

RunInParallel runs a given function in parallel, and blocking until all executions are complete. num is the number of times to execute the function. parallelism is how many concurrent executions you want. this function divides num / parallelism and gives the remainder to the first worker.

Types

type Dispatcher added in v1.0.4

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher is the link between the client and the workers

func NewDispatcher added in v1.0.3

func NewDispatcher(parallelism int, workHandler WorkHandler) *Dispatcher

NewDispatcher returns a new Dispatcher. Its main job is to receive a job and share it on the WorkPool WorkPool is the link between the Dispatcher and all the workers as the WorkPool of the Dispatcher is common JobPool for all the workers

func (*Dispatcher) Start added in v1.0.4

func (d *Dispatcher) Start() *Dispatcher

Start creates pool of workers, and starts each worker

func (*Dispatcher) Submit added in v1.0.4

func (d *Dispatcher) Submit(job Job)

Submit is how a job is submitted to the Dispatcher, jobs will be handled by a worker

func (*Dispatcher) Wait added in v1.0.4

func (d *Dispatcher) Wait()

Wait will wait until all work is completed. This is accomplished by sharing the Dispatcher's waitgroup with workers.

type Job added in v1.0.3

type Job interface {
	GetData() interface{}
}

Job is the user facing interface that describes the work to be done

type WorkHandler added in v1.0.3

type WorkHandler interface {
	HandleJob(job Job)
}

WorkHandler is the user facing interface that does the work

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL