concurrency

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2021 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package concurrency contains code that helps build multi-threaded applications

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher interface {
	Start()
	Stop()
	Submit(Task) error
	SubmitWork(fn func() error) error
	ProcessedJobs() uint64
}

Dispatcher defines an interface. There is only one implementation of the dispatcher available, but by using an interface it forces the user to institate the dispatcher by using the NewDispatcher function

The dispatcher can process work that is submitted either as a struct that implements the Task interface or as an anonymous function. If the work to be processed requires state to be maintained then creating a struct that implements the interface is the best approach. If the work is stateless then passing an anonymous function is more suitable.

func NewDispatcher

func NewDispatcher(id string, workers, queueSize int) Dispatcher

NewDispatcher create a new Dispatcher. The ID is used to identify the dispatcher in log messages. workers is the number of go routines that this dispatcher will create to process work. queueSize is the size of the channel used to store tasks.

Example

Create a new dispatcher with the name "foo". After creating a dispatcher, it must be started. You can submit jobs via any of the Submit methods. Once your done you should call Stop. This will block and wait for all the worker go routines to complete.

package main

import (
	"github.com/puppetlabs/go-libs/pkg/concurrency"
)

func main() {
	dispatcher := concurrency.NewDispatcher("foo", 10, 5)
	dispatcher.Start()

	dispatcher.SubmitWork(func() error {
		// do some work
		return nil
	})

	dispatcher.Stop() // blocks and waits for all workers to complete
}
Example (Second)

Create a new dispatcher with the name "foo". Create an instance of MockWork and submit that to the dispatcher.

package main

import (
	"encoding/json"
	"time"

	"github.com/puppetlabs/go-libs/pkg/concurrency"
)

// MockWork implements the Task interface, and can be submitted to the dispatcher to be processed by
// worker threads.
type MockWork struct {
	id        string
	sleepTime time.Duration
}

// Execute performs the actual work
func (w MockWork) Execute() error {
	mockMessage := MockMessage{Name: w.id}
	_, err := json.Marshal(mockMessage)
	time.Sleep(w.sleepTime)
	return err
}

// MockMessage ...
type MockMessage struct {
	Name string
}

func main() {
	dispatcher := concurrency.NewDispatcher("foo", 10, 5)
	dispatcher.Start()

	w1 := MockWork{id: "bob", sleepTime: 5 * time.Second}
	dispatcher.Submit(w1)

	dispatcher.Stop() // blocks and waits for all workers to complete
}

type Task

type Task interface {
	Execute() error
}

Task is the interface a struct must implement, so that it can be submitted to the dispatcher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL