Documentation
¶
Overview ¶
Package concurrency contains code that helps build multi-threaded applications
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Dispatcher ¶
type Dispatcher interface { Start() Stop() Submit(Task) error SubmitWork(fn func() error) error ProcessedJobs() uint64 }
Dispatcher defines an interface. There is only one implementation of the dispatcher available, but by using an interface it forces the user to institate the dispatcher by using the NewDispatcher function
The dispatcher can process work that is submitted either as a struct that implements the Task interface or as an anonymous function. If the work to be processed requires state to be maintained then creating a struct that implements the interface is the best approach. If the work is stateless then passing an anonymous function is more suitable.
func NewDispatcher ¶
func NewDispatcher(id string, workers, queueSize int) Dispatcher
NewDispatcher create a new Dispatcher. The ID is used to identify the dispatcher in log messages. workers is the number of go routines that this dispatcher will create to process work. queueSize is the size of the channel used to store tasks.
Example ¶
Create a new dispatcher with the name "foo". After creating a dispatcher, it must be started. You can submit jobs via any of the Submit methods. Once your done you should call Stop. This will block and wait for all the worker go routines to complete.
package main import ( "github.com/puppetlabs/go-libs/pkg/concurrency" ) func main() { dispatcher := concurrency.NewDispatcher("foo", 10, 5) dispatcher.Start() dispatcher.SubmitWork(func() error { // do some work return nil }) dispatcher.Stop() // blocks and waits for all workers to complete }
Example (Second) ¶
Create a new dispatcher with the name "foo". Create an instance of MockWork and submit that to the dispatcher.
package main import ( "encoding/json" "time" "github.com/puppetlabs/go-libs/pkg/concurrency" ) // MockWork implements the Task interface, and can be submitted to the dispatcher to be processed by // worker threads. type MockWork struct { id string sleepTime time.Duration } // Execute performs the actual work func (w MockWork) Execute() error { mockMessage := MockMessage{Name: w.id} _, err := json.Marshal(mockMessage) time.Sleep(w.sleepTime) return err } // MockMessage ... type MockMessage struct { Name string } func main() { dispatcher := concurrency.NewDispatcher("foo", 10, 5) dispatcher.Start() w1 := MockWork{id: "bob", sleepTime: 5 * time.Second} dispatcher.Submit(w1) dispatcher.Stop() // blocks and waits for all workers to complete }