tract

package module
v0.0.0-...-3bbdb69 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2020 License: MIT Imports: 4 Imported by: 0

README

Tract

Tract is a highly concurrent, scalable design pattern. It has no need for you to keep track of and maintain wait groups nor channels, so you can just focus on the business logic.

Tracts automatically initialize and close all their resources when initialized and closed as a whole: no need keep track of what order to initialize and close each resource yourself.

Tracts encourage the proper use of and closure of shared and non-shared resources via Workers and Worker Factories.

Tract support automatic metric gathering, giving you the full picture of any bottlenecks in the program. Knowing exactly where in your program latency is being incurred facilitates quick performance debugging.

Tract Types

There are different types of tracts:

Worker Tract

A worker tract has a worker factory that creates N workers when initialized. When this tract receives a request on its input, one of the workers will pull that request, process it, then pass it along to the worker tract's output.

Serial Tract

A serial tract has multiple tracts that are linked together serially when initialized. When this tract receives a request on its input, it is pulled from the first tract and passed to each tract in the group sequentially until it reaches the last tract's output where it is available from the serial tract output.

Paralell Tract

A paralell tract has multiple independent tracts. When this tract receives a request on its input, it is pulled by one of the inner tracts, processed by that tract, and passed along to the paralell tract's output.

Fan Out Tract

A fanout tract has multiple independent tracts. When this tract receives a request on its input, it is multiplied and passed to every inner tract. Each of these requests processes through its tract, and passed along to the fanout tract's output.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrFanOutAsHead = errors.New("fan out tract detected with no set input")

ErrFanOutAsHead is en error returned when a fanout group doesn't have its input set. Aka htere should be another Tract feeding into it.

View Source
var ErrNoGroupMember = errors.New("group tract detected with no inner tracts")

ErrNoGroupMember is an error returned when a group tract doesn't have enough members.

Functions

func CleanupRequest

func CleanupRequest(r Request, success bool)

CleanupRequest manually calls all the cleanup functions attached to the request. This does not remove the cleanups.

func GetRequestStartTime

func GetRequestStartTime(r Request) time.Time

GetRequestStartTime get the time the request was generated. If there is no start time, the zero value of time.Time is returned.

Types

type DefaultMetricsThrottler

type DefaultMetricsThrottler struct {
	// contains filtered or unexported fields
}

DefaultMetricsThrottler is a provided implementation of ShouldHandle() that can be composed into any struct trying to implement a MetricsHandler.

func NewDefaultMetricsThrottler

func NewDefaultMetricsThrottler(frequency time.Duration) DefaultMetricsThrottler

NewDefaultMetricsThrottler makes a DefaultMetricsThrottle ready to use

func (DefaultMetricsThrottler) ShouldHandle

func (d DefaultMetricsThrottler) ShouldHandle() bool

ShouldHandle determines when we should handle metrics based off a frequency

Frequency |            ShouldHandle() logic
--------------------------------------------
        0 |                   Always handle
      < 0 |                    Never handle
      > 0 | Handle once per frequency cycle

type FinalOutput

type FinalOutput struct{}

FinalOutput is the last output for requests. Requests that are outputted here have reached the end of their life. It is the default output of a Tract.

func (FinalOutput) Close

func (c FinalOutput) Close()

Close is a noop.

func (FinalOutput) Put

func (c FinalOutput) Put(r Request)

Put sinks the request (noop).

type Input

type Input interface {
	// Get gets the next request. The bool return value is true if a request was gotten.
	// It's false when there is no requests and never will be any more.
	Get() (Request, bool)
}

Input specifies a way for a Tract to get requests.

type InputChannel

type InputChannel <-chan Request

InputChannel is a channel of requests.

func (InputChannel) Get

func (c InputChannel) Get() (Request, bool)

Get gets the next request from the channel.

type InputGenerator

type InputGenerator struct{}

InputGenerator generates request objects. It is the default input of a Tract.

func (InputGenerator) Get

func (c InputGenerator) Get() (Request, bool)

Get generates the next request. The current time is stored in the request at this generation time. It can be retrieved by using GetRequestStartTime().

type Metric

type Metric struct {
	Key   MetricsKey
	Value time.Duration
}

Metric is a tuple of a metric time latency and a key specifying what the metric is measuring.

type MetricsHandler

type MetricsHandler interface {
	// HandleMetrics is the method where all metrics are passed to for handling. It should compare the
	// Key of each metric against the list of metric key constants to determine what each metric means.
	HandleMetrics(...Metric)
	// ShouldHandle should return true if the MetricsHandler is ready for more metrics, otherwise false.
	// This is used by the tract to determine if metrics should even be generated. This function should
	// return as fast as possible as it will be called for every request within every tract this
	// MetricsHandler is used.
	ShouldHandle() bool
}

MetricsHandler handles metrics that a tract produces.

Example

ExampleMetricsHandler shows an example of using a metrics handler in a Tract. This provides a way for the user to gather metrics around each worker tract.

package main

import (
	"fmt"
	"time"

	"git.dev.kochava.com/ccurrin/tract"
)

var _ tract.MetricsHandler = &exampleMetricsHandler{}

func NewExampleMetricsHandler(name string, frequency time.Duration) tract.MetricsHandler {
	return &exampleMetricsHandler{
		sourceTractName:         name,
		DefaultMetricsThrottler: tract.NewDefaultMetricsThrottler(frequency),
	}
}

type exampleMetricsHandler struct {
	sourceTractName string
	tract.DefaultMetricsThrottler
}

func (h *exampleMetricsHandler) HandleMetrics(metrics ...tract.Metric) {
	// Handle the metrics
	// Send them to influx or something.
	for _, metric := range metrics {
		// check the metrics key to determine what this metric is for.
		// You don't have to handle every key type.
		var metricsKey string
		switch metric.Key {
		// How long did we spend waiting for the next request?
		case tract.MetricsKeyIn:
			metricsKey = "in"
		// How long did we spend working on the request?
		case tract.MetricsKeyDuring:
			metricsKey = "during"
		// How long did we spend waiting to output the request?
		case tract.MetricsKeyOut:
			metricsKey = "out"
		// The request has reached the end of the line. How long since the request was created?
		case tract.MetricsKeyTract:
			metricsKey = "tract"
		// Either an invalid metrics key, one we don't know about, or one we don't care about.
		default:
			metricsKey = "unknown"
		}
		fmt.Printf("%s :: %s :: %v\n", h.sourceTractName, metricsKey, metric.Value)
	}
}

// ExampleMetricsHandler shows an example of using a metrics handler in a Tract.
// This provides a way for the user to gather metrics around each worker tract.
func main() {
	// Each worker tract using a metrics handler should use their own metrics handler
	// if using the tract.DefaultMetricsThrottler, so they each throttle separately.
	// In this case, our handler is given context of what Tract it's handling in its
	// contructor, so separate handlers for each Tract is needed for that as well.
	myTract := tract.NewSerialGroupTract("my tract",
		// ...
		tract.NewWorkerTract("square root", 4,
			tract.NewFactoryFromWorker(SquareRootWorker{}),
			tract.WithMetricsHandler(NewExampleMetricsHandler("squareroot", 10*time.Second)),
		),
		tract.NewWorkerTract("some other worker", 2,
			tract.NewFactoryFromWorker(SquareRootWorker{}),
			tract.WithMetricsHandler(NewExampleMetricsHandler("somethingelse", 10*time.Second)),
		),
		// ...
	)

	err := myTract.Init()
	if err != nil {
		// Handle error
		return
	}

	myTract.Start()()
}

type MetricsInput

type MetricsInput struct {
	Input
	// contains filtered or unexported fields
}

MetricsInput is a wrapper around an Input that will automatically generate input latency metrics

func (MetricsInput) Get

func (i MetricsInput) Get() (Request, bool)

Get gets from the inner input while gathering metrics.

type MetricsKey

type MetricsKey int

MetricsKey is an enum key that specifies a kind of metric

const (
	// MetricsKeyIn specifiies metric for the amount of time a tract spent waiting for the next request from its input.
	MetricsKeyIn MetricsKey = iota + 1
	// MetricsKeyDuring specifiies metric for the amount of time a tract spent waiting for its worker to process a request.
	MetricsKeyDuring
	// MetricsKeyOut specifiies metric for the amount of time a tract spent waiting to output a request to its output.
	MetricsKeyOut
	// MetricsKeyTract specifiies metric for the amount of time from when a request was generated,
	// until it hit the end of the tract (was outputted from a tract that had no user specified output).
	MetricsKeyTract
)

type MetricsOutput

type MetricsOutput struct {
	Output
	// contains filtered or unexported fields
}

MetricsOutput is a wrapper around an Output that will automatically generate output latency metrics

func (MetricsOutput) Put

func (o MetricsOutput) Put(r Request)

Put outputs to the inner output while gathering metrics.

type MetricsWorker

type MetricsWorker struct {
	Worker
	// contains filtered or unexported fields
}

MetricsWorker is a wrapper around a Worker that will automatically generate during latency metrics.

func (MetricsWorker) Work

func (w MetricsWorker) Work(r Request) (Request, bool)

Work works using the inner Worker while gathering metrics.

type Output

type Output interface {
	// Put outputs the the request.
	// Should never be called once Close has been called.
	Put(Request)
	// Close closes the output. No more requests should be outputted.
	// Put should not be called once Close has been called.
	// If there is something on the other side of this output receiving
	// requests, it should be notified that there are no more requests.
	Close()
}

Output specifies a way for a Tract pass requests along.

type OutputChannel

type OutputChannel chan<- Request

OutputChannel is a channel of requests.

func (OutputChannel) Close

func (c OutputChannel) Close()

Close closes the channel.

func (OutputChannel) Put

func (c OutputChannel) Put(r Request)

Put puts the request onto the channel.

type Request

type Request context.Context

Request is the object that is passed along the tract. It keeps track of state by storing data via context values

func AddRequestCleanup

func AddRequestCleanup(r Request, f func(Request, bool)) Request

AddRequestCleanup add a function to the request that will be run when the request dies. This happens either when it reaches the end of a pool with no user set output, or a worker specified that the request should no longer continue.

func RemoveAllRequestCleanups

func RemoveAllRequestCleanups(r Request) Request

RemoveAllRequestCleanups removes all of the cleanups attached to the request. This does not run the cleanups.

type Tract

type Tract interface {
	// Name of the Tract: used for logging and instrementation.
	Name() string
	// Init initializes the Tract. Must be called before calling Start().
	// Once Start has been called, Init should not be called.
	Init() error
	// Start starts the Tract. Returns a callback that waits for the Tract to finish processing.
	// Callback must be called to close resources and close output.
	Start() func()
	// SetInput sets the input of the tract.
	// Users should generally use group Tracts instead of using SetInput directly.
	// Tracts used as sub-tracts in a tract group will have thier inputs set by the group's Init()
	// in which case the groups SetInput should be used instead.
	SetInput(Input)
	// SetOutput sets the output of the tract.
	// Users should generally use group Tracts instead of using SetOutput directly.
	// Tracts used as sub-tracts in a tract group will have thier outputs set by the group's Init()
	// in which case the groups SetOutput should be used instead.
	SetOutput(Output)
}

Tract is a highly concurrent, scalable design pattern. Tracts receive and pass Requests from/to other Tracts. Tracts can be combined to form a single group Tract. Each sub-Tract in a group has a job it does with the base sub-Tract being a Worker Tract. A Worker Tract performs operations on a request before passing it along the overarching group Tract. Other than Worker Tracts, all other Tracts manage other Tracts, manage the flow of Requests, or are advanced user implemented Tracts (user will generally just implement workers).

A Tract lifecycle is as follows:

  1. myTract is constructed by one of the Tract contructors in this package.
  2. myTract is initialized by calling myTract.Init(). * if Init() returns an error, it is not safe to proceed.
  3. myTract is started by calling myTract.Start().
  4. myTract is closed by calling the callback returned from Start().
  5. myTract can be used again by looping back to step 2 (by default). * Init() -> Start()() -> Init() ...

A tract will close when its input specifies there are no more requests to process:

  1. The base case first Tract is a Worker Tract. It's Worker can be viewed as the Request generator. When that Worker returns a "should not send" from Work(), there are no more Request, and the Tract will shutdown.
  2. The Tract's input has been manually set by the user. The user contols Tract shutdown using that input.

Usage:

myTract := tract.NewXYZTract(...)
err := myTract.Init()
if err != nil {
    // Handle error
    return
}
waitForTract := myTract.Start()
waitForTract()

// Let's start again!
err = myTract.Init()
...
Example (SerialGroupTract)

Perpare a few numbers to be square rooted and do it using a tract!

package main

import (
	"context"
	"fmt"
	"math"
	"sort"
	"sync"

	"git.dev.kochava.com/ccurrin/tract"
)

// SquareRootWorkerArg is the key on the tract Request where our square root worker will look for its argument for its work
type SquareRootWorkerArg struct{}

// SquareRootWorkerResult is the key on the tract Request where our square root worker will store the result of its work
type SquareRootWorkerResult struct{}

// These are compiler checks to make sure our implementations satisfy the tract Workers interface
var (
	_ tract.Worker = SquareRootWorker{}
	_ tract.Worker = &SliceArgReaderWorker{}
	_ tract.Worker = &SliceResultsWriterWorker{}
)

// SquareRootWorker is a middle stage worker in a tract.
// Middle stage workers typically will get arguments from the request using:
//
//	`request.Value(<package-level-struct>).(<expected-type>)`
//
// then perfrom an operation, and store the results back to the same request using:
//
//	`context.WithValue(request, <package-level-struct>, <result>)`
//
// SquareRootWorker performs `math.Sqrt` on its argument.
// SquareRootWorker gets its argument from `SquareRootWorkerArg{}` and stores the result to `SquareRootWorkerResult{}`
type SquareRootWorker struct{}

func (w SquareRootWorker) Work(r tract.Request) (tract.Request, bool) {
	arg, ok := r.Value(SquareRootWorkerArg{}).(float64)
	if !ok {
		return context.WithValue(r, SquareRootWorkerResult{}, math.NaN()), true
	}
	result := math.Sqrt(arg)
	return context.WithValue(r, SquareRootWorkerResult{}, result), true
}

func (w SquareRootWorker) Close() {}

// SliceArgReaderWorker is first stage of the tract.
// First stage workers typically will generate or retrive data from some sort of source,
// such as a queue, file, or user provided data, and store that data on the request using:
//
//	`context.WithValue(request, <package-level-struct>, <data>)`
//
// from here, later stages of the tract can use this data. The first stage is also responsible
// for comencing shutdown of the entire tract. When its call to `Work()` returns a false bool,
// it will signal a shutdown of the entire tract. Thus head workers should return false when
// they are done pulling in data from the source.
// SliceArgReaderWorker uses its user populated `arguments` field as a queue. Each time `Work`
// is called it pops an item off of `arguments` and stores it to `SquareRootWorkerArg{}` on the
// tract Request. When there are no more items, false is returned.
// Since SliceArgReaderWorker is being used as its own factory, calls to `Work()` must be thread
// safe, thus a mutex is being used.
type SliceArgReaderWorker struct {
	arguments []float64
	mutex     sync.Mutex
}

func (w *SliceArgReaderWorker) Work(r tract.Request) (tract.Request, bool) {
	w.mutex.Lock()
	defer w.mutex.Unlock()
	if len(w.arguments) == 0 {
		return r, false
	}
	var arg float64
	arg, w.arguments = w.arguments[0], w.arguments[1:]
	return context.WithValue(r, SquareRootWorkerArg{}, arg), true
}

func (w *SliceArgReaderWorker) Close() {}

// SliceResultsWriterWorker is the last stage of the tract.
// Last stage workers typically perform the final operations on a Request. Once a request has passed
// though the final worker, the Request will go thorugh cleanup, send its tract latency metric, then
// it is gone. During this stage all data on the request can be retrieved the same way as the middle
// stages using:
//
//	`request.Value(<package-level-struct>).(<expected-type>)`
//
// SliceResultsWriterWorker gets the data from `SquareRootWorkerResult{}` and pushes it onto a list of
// results.
// Since SliceResultsWriterWorker is being used as its own factory, calls to `Work()` must be thread
// safe, thus a mutex is being used.
type SliceResultsWriterWorker struct {
	results []float64
	mutex   sync.Mutex
}

func (w *SliceResultsWriterWorker) Work(r tract.Request) (tract.Request, bool) {
	result, ok := r.Value(SquareRootWorkerResult{}).(float64)
	if !ok {
		return r, false
	}
	w.mutex.Lock()
	w.results = append(w.results, result)
	w.mutex.Unlock()
	return r, true
}

func (w *SliceResultsWriterWorker) Close() {}

// Perpare a few numbers to be square rooted and do it using a tract!
func main() {
	var resultsWorker SliceResultsWriterWorker
	wholeTract := tract.NewSerialGroupTract("my tract",
		tract.NewWorkerTract("argment reader", 1,
			tract.NewFactoryFromWorker(&SliceArgReaderWorker{
				arguments: []float64{0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100},
			}),
		),
		tract.NewWorkerTract("square root", 4, tract.NewFactoryFromWorker(SquareRootWorker{})),
		tract.NewWorkerTract("result reader", 1, tract.NewFactoryFromWorker(&resultsWorker)),
	)

	err := wholeTract.Init()
	if err != nil {
		//  Handle error
	}

	wait := wholeTract.Start()
	wait()

	sort.Sort(sort.Float64Slice(resultsWorker.results))
	for _, result := range resultsWorker.results {
		fmt.Println(result)
	}

}
Output:

0
1
2
3
4
5
6
7
8
9
10

func NewFanOutGroupTract

func NewFanOutGroupTract(name string, tract Tract, tracts ...Tract) Tract

NewFanOutGroupTract makes a new tract that consists muliple other tracts. Each request this tract receives is routed to all of its inner tracts. All requests proccessed by the inner tracts are routed to the same output. This Tract should not be the first tract in a group as it has no machanism of closing on it's own. Aka it's input must be set to something.

   ------------------
   | / ( Tract0 ) \ |
-> | - ( Tract1 ) - | ->
   | \ ( Tract2 ) / |
   |     ...        |
   ------------------

func NewParalellGroupTract

func NewParalellGroupTract(name string, tract Tract, tracts ...Tract) Tract

NewParalellGroupTract makes a new tract that consists of muliple other tracts. Each request this tract receives is routed to 1 of its inner tracts. All requests proccessed by the inner tracts are routed to the same output.

   ------------------
   | / ( Tract0 ) \ |
-> | - ( Tract1 ) - | ->
   | \ ( Tract2 ) / |
   |     ...        |
   ------------------

func NewSerialGroupTract

func NewSerialGroupTract(name string, tract Tract, tracts ...Tract) Tract

NewSerialGroupTract makes a new tract that consists muliple other tracts. This accomplishes the same thing as chaining other tracts together manually, but has the benefit of being able to treat that chain of tracts as a single tract.

   ----------------------------------------------
-> | ( Tract0 ) -> ( Tract1 ) -> ( Tract2 ) ... | ->
   ----------------------------------------------

func NewWorkerTract

func NewWorkerTract(name string, size int, workerFactory WorkerFactory, options ...WorkerTractOption) Tract

NewWorkerTract makes a new tract that will spin up @size number of workers generated from @workerFactory that get from the input and put to the output of the tract.

type Worker

type Worker interface {
	// Work takes a request, performs an operation, and returns that request and a success flag.
	// If the returned bool is false, that specifies that the returned request should be discarded.
	// The expected pattern is to retrieve any needed arguments from the request using request.Value(...)
	// then apply the results of the work to the same request using context.WithValue(request, ...).
	// When designing workers keep the keys for the request values you will be using in mind.
	Work(Request) (Request, bool)
	// Close closes worker resources
	Close()
}

Worker is an object that performs work potentially using it own resources and/or factory resources.

type WorkerFactory

type WorkerFactory interface {
	// MakeWorker makes a worker expected to run in a tract.
	// This Worker contructor will be called once per worker needed for a Worker Tract.
	// Any resources that a single worker will need (and not share with other Workers) should be
	// instanciated here, and closed by the Worker's Close() method. Any resources the Workers will
	// share should be instantiated in this WorkerFactory's Contructor and closed by its Close()
	// method, or should be instaniated and closed in a higher scope.
	MakeWorker() (Worker, error)
	// Close closes factory resources
	Close()
}

WorkerFactory makes potentially many Worker objects that may use resources managed by the factory.

Example
package main

import (
	"context"
	"database/sql"
	"fmt"

	"git.dev.kochava.com/ccurrin/tract"
)

var (
	_ tract.WorkerFactory = databaseWorkerFactory{}
	_ tract.Worker        = &databaseWorker{}
)

type DatabaseResultsKey struct{}

func NewDatabaseWorkerFactory(
	driverName1, dataSourceName1, query1 string, resultCount1 int,
	driverName2, dataSourceName2, query2 string, resultCount2 int,
) (tract.WorkerFactory, error) {
	db, err := sql.Open(driverName1, dataSourceName1)
	if err != nil {
		return nil, err
	}
	return databaseWorkerFactory{
		db:                   db,
		query1:               query1,
		query2:               query2,
		resultCount1:         resultCount1,
		resultCount2:         resultCount2,
		workerDriverName:     driverName2,
		workerDataSourceName: dataSourceName2,
	}, nil
}

type databaseWorkerFactory struct {
	db                                     *sql.DB
	query1, query2                         string
	resultCount1, resultCount2             int
	workerDriverName, workerDataSourceName string
}

func (f databaseWorkerFactory) MakeWorker() (tract.Worker, error) {
	db, err := sql.Open(f.workerDriverName, f.workerDataSourceName)
	if err != nil {
		return nil, err
	}
	results := make([]interface{}, f.resultCount1)
	resultsPtrs := make([]interface{}, len(results))
	for i := range results {
		resultsPtrs[i] = &results[i]
	}
	return &databaseWorker{
		db:           f.db,
		query1:       f.query1,
		localDB:      db,
		query2:       f.query2,
		resultCount2: f.resultCount2,
		results:      results,
		resultsPtrs:  resultsPtrs,
	}, nil
}

func (f databaseWorkerFactory) Close() {
	f.db.Close()
}

type databaseWorker struct {
	// resources from factory
	db           *sql.DB
	query1       string
	query2       string
	resultCount2 int
	// local resources
	localDB              *sql.DB
	results, resultsPtrs []interface{}
}

func (w *databaseWorker) Work(r tract.Request) (tract.Request, bool) {
	err := w.db.QueryRow(w.query1).Scan(w.resultsPtrs...)
	if err != nil {
		// Handle error
		return r, false
	}

	results := make([]interface{}, w.resultCount2)
	resultsPtrs := make([]interface{}, len(results))
	for i := range results {
		resultsPtrs[i] = &results[i]
	}

	err = w.localDB.QueryRow(w.query2, w.results...).Scan(resultsPtrs...)
	if err != nil {
		// Handle error
		return r, false
	}
	return context.WithValue(r, DatabaseResultsKey{}, results), true
}

func (w *databaseWorker) Close() {
	w.localDB.Close()
}

func main() {
	dbWorkerFactory, err := NewDatabaseWorkerFactory(
		"mysql", "mydatabase.internal", "SELECT value1, value2 FROM myTable1 LIMIT 1;", 2,
		"mysql", "mydatabase.internal", "SELECT value1, value2, value3 FROM myTable2 WHERE value1 = ? AND value2 = ? LIMIT 1;", 3,
	)
	if err != nil {
		// Handle error
		return
	}
	defer dbWorkerFactory.Close()

	dbWorker, err := dbWorkerFactory.MakeWorker()
	if err != nil {
		// Handle error
		return
	}
	defer dbWorker.Close()

	resultRequest, ok := dbWorker.Work(context.Background())
	if !ok {
		// Handle problem
		return
	}

	results, ok := resultRequest.Value(DatabaseResultsKey{}).([]interface{})
	if !ok {
		// Handle problem
		return
	}
	for _, result := range results {
		fmt.Println(result)
	}
}
Example (TractWorkerFactoryAsync)
squareRootTract := tract.NewWorkerTract("square root", 4, tract.NewFactoryFromWorker(SquareRootWorker{}))

factory := tract.NewTractWorkerFactory(squareRootTract)

worker, err := factory.MakeWorker()
if err != nil {
	//  Handle error
	return
}

args := []float64{0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100}
results := make([]float64, len(args))

wg := sync.WaitGroup{}
for i := range args {
	wg.Add(1)
	go func(j int) {
		defer wg.Done()
		resultReq, success := worker.Work(context.WithValue(context.Background(), SquareRootWorkerArg{}, args[j]))
		if !success {
			fmt.Println("not successful")
		}
		results[j], _ = resultReq.Value(SquareRootWorkerResult{}).(float64)
	}(i)
}
wg.Wait()
for _, result := range results {
	fmt.Println(result)
}

worker.Close()
factory.Close()
Output:

0
1
2
3
4
5
6
7
8
9
10
Example (TractWorkerFactorySync)
squareRootTract := tract.NewWorkerTract("square root", 4, tract.NewFactoryFromWorker(SquareRootWorker{}))

factory := tract.NewTractWorkerFactory(squareRootTract)

worker, err := factory.MakeWorker()
if err != nil {
	//  Handle error
	return
}

args := []float64{0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100}

for _, arg := range args {
	resultReq, success := worker.Work(context.WithValue(context.Background(), SquareRootWorkerArg{}, arg))
	if !success {
		fmt.Println("not successful")
	}
	result, _ := resultReq.Value(SquareRootWorkerResult{}).(float64)
	fmt.Println(result)
}

worker.Close()
factory.Close()
Output:

0
1
2
3
4
5
6
7
8
9
10

func NewFactoryFromWorker

func NewFactoryFromWorker(worker Worker) WorkerFactory

NewFactoryFromWorker makes a WorkerFactory from a provided Worker. Whenever the WorkerFactory makes a worker, it just returns same worker it started with. This is useful for the common case of making a tract that uses workers who's Work() function is already thred safe. without having to make a specific factory object. The worker's call to close is defered until the factory is closed.

func NewTractWorkerFactory

func NewTractWorkerFactory(tract Tract) WorkerFactory

NewTractWorkerFactory turns a Tract into a WorkerFactory. When it makes it's first worker, it initializes and starts the tract. Its workers work consist of passing requests into the tract, waiting for the request to reach the end of the tract, and returning the resulting request. Any cleanups put on the request in the tract, versus cleanups put on the request called before it reaches this worker are kept entirely separated. This tract's cleanups occur at the end of just itself; cleanups put on the request before hand will occur when they normally would have.

type WorkerTractOption

type WorkerTractOption func(*workerTract)

WorkerTractOption is a function option applyable to worker tracts.

func WithFactoryClosure

func WithFactoryClosure(shouldClose bool) WorkerTractOption

WithFactoryClosure creates a WorkerTractOption that will specify if the Tract should close its WorkerFactory when the tract is finished running when its Start closure is called. By default factories are not closed when a Tract is closed. This allows a Tract to be restarted, but forces the user to close their own factories. If specified that the factory should close, then the tract cannot safely be restarted, but the user won't have to manually close their factory.

func WithMetricsHandler

func WithMetricsHandler(mh MetricsHandler) WorkerTractOption

WithMetricsHandler creates a WorkerTractOption that will set the tract's metrics handler to the provided one. By default no metrics handler is used, and thus no metrics are gathered.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL