Documentation
¶
Index ¶
- Variables
- func CleanupRequest(r Request, success bool)
- func GetRequestStartTime(r Request) time.Time
- type DefaultMetricsThrottler
- type FinalOutput
- type Input
- type InputChannel
- type InputGenerator
- type Metric
- type MetricsHandler
- type MetricsInput
- type MetricsKey
- type MetricsOutput
- type MetricsWorker
- type Output
- type OutputChannel
- type Request
- type Tract
- func NewFanOutGroupTract(name string, tract Tract, tracts ...Tract) Tract
- func NewParalellGroupTract(name string, tract Tract, tracts ...Tract) Tract
- func NewSerialGroupTract(name string, tract Tract, tracts ...Tract) Tract
- func NewWorkerTract(name string, size int, workerFactory WorkerFactory, ...) Tract
- type Worker
- type WorkerFactory
- type WorkerTractOption
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrFanOutAsHead = errors.New("fan out tract detected with no set input")
ErrFanOutAsHead is en error returned when a fanout group doesn't have its input set. Aka htere should be another Tract feeding into it.
var ErrNoGroupMember = errors.New("group tract detected with no inner tracts")
ErrNoGroupMember is an error returned when a group tract doesn't have enough members.
Functions ¶
func CleanupRequest ¶
CleanupRequest manually calls all the cleanup functions attached to the request. This does not remove the cleanups.
func GetRequestStartTime ¶
GetRequestStartTime get the time the request was generated. If there is no start time, the zero value of time.Time is returned.
Types ¶
type DefaultMetricsThrottler ¶
type DefaultMetricsThrottler struct {
// contains filtered or unexported fields
}
DefaultMetricsThrottler is a provided implementation of ShouldHandle() that can be composed into any struct trying to implement a MetricsHandler.
func NewDefaultMetricsThrottler ¶
func NewDefaultMetricsThrottler(frequency time.Duration) DefaultMetricsThrottler
NewDefaultMetricsThrottler makes a DefaultMetricsThrottle ready to use
func (DefaultMetricsThrottler) ShouldHandle ¶
func (d DefaultMetricsThrottler) ShouldHandle() bool
ShouldHandle determines when we should handle metrics based off a frequency
Frequency | ShouldHandle() logic
--------------------------------------------
0 | Always handle
< 0 | Never handle
> 0 | Handle once per frequency cycle
type FinalOutput ¶
type FinalOutput struct{}
FinalOutput is the last output for requests. Requests that are outputted here have reached the end of their life. It is the default output of a Tract.
type Input ¶
type Input interface {
// Get gets the next request. The bool return value is true if a request was gotten.
// It's false when there is no requests and never will be any more.
Get() (Request, bool)
}
Input specifies a way for a Tract to get requests.
type InputChannel ¶
type InputChannel <-chan Request
InputChannel is a channel of requests.
func (InputChannel) Get ¶
func (c InputChannel) Get() (Request, bool)
Get gets the next request from the channel.
type InputGenerator ¶
type InputGenerator struct{}
InputGenerator generates request objects. It is the default input of a Tract.
func (InputGenerator) Get ¶
func (c InputGenerator) Get() (Request, bool)
Get generates the next request. The current time is stored in the request at this generation time. It can be retrieved by using GetRequestStartTime().
type Metric ¶
type Metric struct {
Key MetricsKey
Value time.Duration
}
Metric is a tuple of a metric time latency and a key specifying what the metric is measuring.
type MetricsHandler ¶
type MetricsHandler interface {
// HandleMetrics is the method where all metrics are passed to for handling. It should compare the
// Key of each metric against the list of metric key constants to determine what each metric means.
HandleMetrics(...Metric)
// ShouldHandle should return true if the MetricsHandler is ready for more metrics, otherwise false.
// This is used by the tract to determine if metrics should even be generated. This function should
// return as fast as possible as it will be called for every request within every tract this
// MetricsHandler is used.
ShouldHandle() bool
}
MetricsHandler handles metrics that a tract produces.
Example ¶
ExampleMetricsHandler shows an example of using a metrics handler in a Tract. This provides a way for the user to gather metrics around each worker tract.
package main
import (
"fmt"
"time"
"git.dev.kochava.com/ccurrin/tract"
)
var _ tract.MetricsHandler = &exampleMetricsHandler{}
func NewExampleMetricsHandler(name string, frequency time.Duration) tract.MetricsHandler {
return &exampleMetricsHandler{
sourceTractName: name,
DefaultMetricsThrottler: tract.NewDefaultMetricsThrottler(frequency),
}
}
type exampleMetricsHandler struct {
sourceTractName string
tract.DefaultMetricsThrottler
}
func (h *exampleMetricsHandler) HandleMetrics(metrics ...tract.Metric) {
// Handle the metrics
// Send them to influx or something.
for _, metric := range metrics {
// check the metrics key to determine what this metric is for.
// You don't have to handle every key type.
var metricsKey string
switch metric.Key {
// How long did we spend waiting for the next request?
case tract.MetricsKeyIn:
metricsKey = "in"
// How long did we spend working on the request?
case tract.MetricsKeyDuring:
metricsKey = "during"
// How long did we spend waiting to output the request?
case tract.MetricsKeyOut:
metricsKey = "out"
// The request has reached the end of the line. How long since the request was created?
case tract.MetricsKeyTract:
metricsKey = "tract"
// Either an invalid metrics key, one we don't know about, or one we don't care about.
default:
metricsKey = "unknown"
}
fmt.Printf("%s :: %s :: %v\n", h.sourceTractName, metricsKey, metric.Value)
}
}
// ExampleMetricsHandler shows an example of using a metrics handler in a Tract.
// This provides a way for the user to gather metrics around each worker tract.
func main() {
// Each worker tract using a metrics handler should use their own metrics handler
// if using the tract.DefaultMetricsThrottler, so they each throttle separately.
// In this case, our handler is given context of what Tract it's handling in its
// contructor, so separate handlers for each Tract is needed for that as well.
myTract := tract.NewSerialGroupTract("my tract",
// ...
tract.NewWorkerTract("square root", 4,
tract.NewFactoryFromWorker(SquareRootWorker{}),
tract.WithMetricsHandler(NewExampleMetricsHandler("squareroot", 10*time.Second)),
),
tract.NewWorkerTract("some other worker", 2,
tract.NewFactoryFromWorker(SquareRootWorker{}),
tract.WithMetricsHandler(NewExampleMetricsHandler("somethingelse", 10*time.Second)),
),
// ...
)
err := myTract.Init()
if err != nil {
// Handle error
return
}
myTract.Start()()
}
type MetricsInput ¶
type MetricsInput struct {
Input
// contains filtered or unexported fields
}
MetricsInput is a wrapper around an Input that will automatically generate input latency metrics
func (MetricsInput) Get ¶
func (i MetricsInput) Get() (Request, bool)
Get gets from the inner input while gathering metrics.
type MetricsKey ¶
type MetricsKey int
MetricsKey is an enum key that specifies a kind of metric
const ( // MetricsKeyIn specifiies metric for the amount of time a tract spent waiting for the next request from its input. MetricsKeyIn MetricsKey = iota + 1 // MetricsKeyDuring specifiies metric for the amount of time a tract spent waiting for its worker to process a request. MetricsKeyDuring // MetricsKeyOut specifiies metric for the amount of time a tract spent waiting to output a request to its output. MetricsKeyOut // MetricsKeyTract specifiies metric for the amount of time from when a request was generated, // until it hit the end of the tract (was outputted from a tract that had no user specified output). MetricsKeyTract )
type MetricsOutput ¶
type MetricsOutput struct {
Output
// contains filtered or unexported fields
}
MetricsOutput is a wrapper around an Output that will automatically generate output latency metrics
func (MetricsOutput) Put ¶
func (o MetricsOutput) Put(r Request)
Put outputs to the inner output while gathering metrics.
type MetricsWorker ¶
type MetricsWorker struct {
Worker
// contains filtered or unexported fields
}
MetricsWorker is a wrapper around a Worker that will automatically generate during latency metrics.
type Output ¶
type Output interface {
// Put outputs the the request.
// Should never be called once Close has been called.
Put(Request)
// Close closes the output. No more requests should be outputted.
// Put should not be called once Close has been called.
// If there is something on the other side of this output receiving
// requests, it should be notified that there are no more requests.
Close()
}
Output specifies a way for a Tract pass requests along.
type OutputChannel ¶
type OutputChannel chan<- Request
OutputChannel is a channel of requests.
func (OutputChannel) Put ¶
func (c OutputChannel) Put(r Request)
Put puts the request onto the channel.
type Request ¶
Request is the object that is passed along the tract. It keeps track of state by storing data via context values
func AddRequestCleanup ¶
AddRequestCleanup add a function to the request that will be run when the request dies. This happens either when it reaches the end of a pool with no user set output, or a worker specified that the request should no longer continue.
func RemoveAllRequestCleanups ¶
RemoveAllRequestCleanups removes all of the cleanups attached to the request. This does not run the cleanups.
type Tract ¶
type Tract interface {
// Name of the Tract: used for logging and instrementation.
Name() string
// Init initializes the Tract. Must be called before calling Start().
// Once Start has been called, Init should not be called.
Init() error
// Start starts the Tract. Returns a callback that waits for the Tract to finish processing.
// Callback must be called to close resources and close output.
Start() func()
// SetInput sets the input of the tract.
// Users should generally use group Tracts instead of using SetInput directly.
// Tracts used as sub-tracts in a tract group will have thier inputs set by the group's Init()
// in which case the groups SetInput should be used instead.
SetInput(Input)
// SetOutput sets the output of the tract.
// Users should generally use group Tracts instead of using SetOutput directly.
// Tracts used as sub-tracts in a tract group will have thier outputs set by the group's Init()
// in which case the groups SetOutput should be used instead.
SetOutput(Output)
}
Tract is a highly concurrent, scalable design pattern. Tracts receive and pass Requests from/to other Tracts. Tracts can be combined to form a single group Tract. Each sub-Tract in a group has a job it does with the base sub-Tract being a Worker Tract. A Worker Tract performs operations on a request before passing it along the overarching group Tract. Other than Worker Tracts, all other Tracts manage other Tracts, manage the flow of Requests, or are advanced user implemented Tracts (user will generally just implement workers).
A Tract lifecycle is as follows:
- myTract is constructed by one of the Tract contructors in this package.
- myTract is initialized by calling myTract.Init(). * if Init() returns an error, it is not safe to proceed.
- myTract is started by calling myTract.Start().
- myTract is closed by calling the callback returned from Start().
- myTract can be used again by looping back to step 2 (by default). * Init() -> Start()() -> Init() ...
A tract will close when its input specifies there are no more requests to process:
- The base case first Tract is a Worker Tract. It's Worker can be viewed as the Request generator. When that Worker returns a "should not send" from Work(), there are no more Request, and the Tract will shutdown.
- The Tract's input has been manually set by the user. The user contols Tract shutdown using that input.
Usage:
myTract := tract.NewXYZTract(...)
err := myTract.Init()
if err != nil {
// Handle error
return
}
waitForTract := myTract.Start()
waitForTract()
// Let's start again!
err = myTract.Init()
...
Example (SerialGroupTract) ¶
Perpare a few numbers to be square rooted and do it using a tract!
package main
import (
"context"
"fmt"
"math"
"sort"
"sync"
"git.dev.kochava.com/ccurrin/tract"
)
// SquareRootWorkerArg is the key on the tract Request where our square root worker will look for its argument for its work
type SquareRootWorkerArg struct{}
// SquareRootWorkerResult is the key on the tract Request where our square root worker will store the result of its work
type SquareRootWorkerResult struct{}
// These are compiler checks to make sure our implementations satisfy the tract Workers interface
var (
_ tract.Worker = SquareRootWorker{}
_ tract.Worker = &SliceArgReaderWorker{}
_ tract.Worker = &SliceResultsWriterWorker{}
)
// SquareRootWorker is a middle stage worker in a tract.
// Middle stage workers typically will get arguments from the request using:
//
// `request.Value(<package-level-struct>).(<expected-type>)`
//
// then perfrom an operation, and store the results back to the same request using:
//
// `context.WithValue(request, <package-level-struct>, <result>)`
//
// SquareRootWorker performs `math.Sqrt` on its argument.
// SquareRootWorker gets its argument from `SquareRootWorkerArg{}` and stores the result to `SquareRootWorkerResult{}`
type SquareRootWorker struct{}
func (w SquareRootWorker) Work(r tract.Request) (tract.Request, bool) {
arg, ok := r.Value(SquareRootWorkerArg{}).(float64)
if !ok {
return context.WithValue(r, SquareRootWorkerResult{}, math.NaN()), true
}
result := math.Sqrt(arg)
return context.WithValue(r, SquareRootWorkerResult{}, result), true
}
func (w SquareRootWorker) Close() {}
// SliceArgReaderWorker is first stage of the tract.
// First stage workers typically will generate or retrive data from some sort of source,
// such as a queue, file, or user provided data, and store that data on the request using:
//
// `context.WithValue(request, <package-level-struct>, <data>)`
//
// from here, later stages of the tract can use this data. The first stage is also responsible
// for comencing shutdown of the entire tract. When its call to `Work()` returns a false bool,
// it will signal a shutdown of the entire tract. Thus head workers should return false when
// they are done pulling in data from the source.
// SliceArgReaderWorker uses its user populated `arguments` field as a queue. Each time `Work`
// is called it pops an item off of `arguments` and stores it to `SquareRootWorkerArg{}` on the
// tract Request. When there are no more items, false is returned.
// Since SliceArgReaderWorker is being used as its own factory, calls to `Work()` must be thread
// safe, thus a mutex is being used.
type SliceArgReaderWorker struct {
arguments []float64
mutex sync.Mutex
}
func (w *SliceArgReaderWorker) Work(r tract.Request) (tract.Request, bool) {
w.mutex.Lock()
defer w.mutex.Unlock()
if len(w.arguments) == 0 {
return r, false
}
var arg float64
arg, w.arguments = w.arguments[0], w.arguments[1:]
return context.WithValue(r, SquareRootWorkerArg{}, arg), true
}
func (w *SliceArgReaderWorker) Close() {}
// SliceResultsWriterWorker is the last stage of the tract.
// Last stage workers typically perform the final operations on a Request. Once a request has passed
// though the final worker, the Request will go thorugh cleanup, send its tract latency metric, then
// it is gone. During this stage all data on the request can be retrieved the same way as the middle
// stages using:
//
// `request.Value(<package-level-struct>).(<expected-type>)`
//
// SliceResultsWriterWorker gets the data from `SquareRootWorkerResult{}` and pushes it onto a list of
// results.
// Since SliceResultsWriterWorker is being used as its own factory, calls to `Work()` must be thread
// safe, thus a mutex is being used.
type SliceResultsWriterWorker struct {
results []float64
mutex sync.Mutex
}
func (w *SliceResultsWriterWorker) Work(r tract.Request) (tract.Request, bool) {
result, ok := r.Value(SquareRootWorkerResult{}).(float64)
if !ok {
return r, false
}
w.mutex.Lock()
w.results = append(w.results, result)
w.mutex.Unlock()
return r, true
}
func (w *SliceResultsWriterWorker) Close() {}
// Perpare a few numbers to be square rooted and do it using a tract!
func main() {
var resultsWorker SliceResultsWriterWorker
wholeTract := tract.NewSerialGroupTract("my tract",
tract.NewWorkerTract("argment reader", 1,
tract.NewFactoryFromWorker(&SliceArgReaderWorker{
arguments: []float64{0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100},
}),
),
tract.NewWorkerTract("square root", 4, tract.NewFactoryFromWorker(SquareRootWorker{})),
tract.NewWorkerTract("result reader", 1, tract.NewFactoryFromWorker(&resultsWorker)),
)
err := wholeTract.Init()
if err != nil {
// Handle error
}
wait := wholeTract.Start()
wait()
sort.Sort(sort.Float64Slice(resultsWorker.results))
for _, result := range resultsWorker.results {
fmt.Println(result)
}
}
Output: 0 1 2 3 4 5 6 7 8 9 10
func NewFanOutGroupTract ¶
NewFanOutGroupTract makes a new tract that consists muliple other tracts. Each request this tract receives is routed to all of its inner tracts. All requests proccessed by the inner tracts are routed to the same output. This Tract should not be the first tract in a group as it has no machanism of closing on it's own. Aka it's input must be set to something.
------------------ | / ( Tract0 ) \ | -> | - ( Tract1 ) - | -> | \ ( Tract2 ) / | | ... | ------------------
func NewParalellGroupTract ¶
NewParalellGroupTract makes a new tract that consists of muliple other tracts. Each request this tract receives is routed to 1 of its inner tracts. All requests proccessed by the inner tracts are routed to the same output.
------------------ | / ( Tract0 ) \ | -> | - ( Tract1 ) - | -> | \ ( Tract2 ) / | | ... | ------------------
func NewSerialGroupTract ¶
NewSerialGroupTract makes a new tract that consists muliple other tracts. This accomplishes the same thing as chaining other tracts together manually, but has the benefit of being able to treat that chain of tracts as a single tract.
---------------------------------------------- -> | ( Tract0 ) -> ( Tract1 ) -> ( Tract2 ) ... | -> ----------------------------------------------
func NewWorkerTract ¶
func NewWorkerTract(name string, size int, workerFactory WorkerFactory, options ...WorkerTractOption) Tract
NewWorkerTract makes a new tract that will spin up @size number of workers generated from @workerFactory that get from the input and put to the output of the tract.
type Worker ¶
type Worker interface {
// Work takes a request, performs an operation, and returns that request and a success flag.
// If the returned bool is false, that specifies that the returned request should be discarded.
// The expected pattern is to retrieve any needed arguments from the request using request.Value(...)
// then apply the results of the work to the same request using context.WithValue(request, ...).
// When designing workers keep the keys for the request values you will be using in mind.
Work(Request) (Request, bool)
// Close closes worker resources
Close()
}
Worker is an object that performs work potentially using it own resources and/or factory resources.
type WorkerFactory ¶
type WorkerFactory interface {
// MakeWorker makes a worker expected to run in a tract.
// This Worker contructor will be called once per worker needed for a Worker Tract.
// Any resources that a single worker will need (and not share with other Workers) should be
// instanciated here, and closed by the Worker's Close() method. Any resources the Workers will
// share should be instantiated in this WorkerFactory's Contructor and closed by its Close()
// method, or should be instaniated and closed in a higher scope.
MakeWorker() (Worker, error)
// Close closes factory resources
Close()
}
WorkerFactory makes potentially many Worker objects that may use resources managed by the factory.
Example ¶
package main
import (
"context"
"database/sql"
"fmt"
"git.dev.kochava.com/ccurrin/tract"
)
var (
_ tract.WorkerFactory = databaseWorkerFactory{}
_ tract.Worker = &databaseWorker{}
)
type DatabaseResultsKey struct{}
func NewDatabaseWorkerFactory(
driverName1, dataSourceName1, query1 string, resultCount1 int,
driverName2, dataSourceName2, query2 string, resultCount2 int,
) (tract.WorkerFactory, error) {
db, err := sql.Open(driverName1, dataSourceName1)
if err != nil {
return nil, err
}
return databaseWorkerFactory{
db: db,
query1: query1,
query2: query2,
resultCount1: resultCount1,
resultCount2: resultCount2,
workerDriverName: driverName2,
workerDataSourceName: dataSourceName2,
}, nil
}
type databaseWorkerFactory struct {
db *sql.DB
query1, query2 string
resultCount1, resultCount2 int
workerDriverName, workerDataSourceName string
}
func (f databaseWorkerFactory) MakeWorker() (tract.Worker, error) {
db, err := sql.Open(f.workerDriverName, f.workerDataSourceName)
if err != nil {
return nil, err
}
results := make([]interface{}, f.resultCount1)
resultsPtrs := make([]interface{}, len(results))
for i := range results {
resultsPtrs[i] = &results[i]
}
return &databaseWorker{
db: f.db,
query1: f.query1,
localDB: db,
query2: f.query2,
resultCount2: f.resultCount2,
results: results,
resultsPtrs: resultsPtrs,
}, nil
}
func (f databaseWorkerFactory) Close() {
f.db.Close()
}
type databaseWorker struct {
// resources from factory
db *sql.DB
query1 string
query2 string
resultCount2 int
// local resources
localDB *sql.DB
results, resultsPtrs []interface{}
}
func (w *databaseWorker) Work(r tract.Request) (tract.Request, bool) {
err := w.db.QueryRow(w.query1).Scan(w.resultsPtrs...)
if err != nil {
// Handle error
return r, false
}
results := make([]interface{}, w.resultCount2)
resultsPtrs := make([]interface{}, len(results))
for i := range results {
resultsPtrs[i] = &results[i]
}
err = w.localDB.QueryRow(w.query2, w.results...).Scan(resultsPtrs...)
if err != nil {
// Handle error
return r, false
}
return context.WithValue(r, DatabaseResultsKey{}, results), true
}
func (w *databaseWorker) Close() {
w.localDB.Close()
}
func main() {
dbWorkerFactory, err := NewDatabaseWorkerFactory(
"mysql", "mydatabase.internal", "SELECT value1, value2 FROM myTable1 LIMIT 1;", 2,
"mysql", "mydatabase.internal", "SELECT value1, value2, value3 FROM myTable2 WHERE value1 = ? AND value2 = ? LIMIT 1;", 3,
)
if err != nil {
// Handle error
return
}
defer dbWorkerFactory.Close()
dbWorker, err := dbWorkerFactory.MakeWorker()
if err != nil {
// Handle error
return
}
defer dbWorker.Close()
resultRequest, ok := dbWorker.Work(context.Background())
if !ok {
// Handle problem
return
}
results, ok := resultRequest.Value(DatabaseResultsKey{}).([]interface{})
if !ok {
// Handle problem
return
}
for _, result := range results {
fmt.Println(result)
}
}
Example (TractWorkerFactoryAsync) ¶
squareRootTract := tract.NewWorkerTract("square root", 4, tract.NewFactoryFromWorker(SquareRootWorker{}))
factory := tract.NewTractWorkerFactory(squareRootTract)
worker, err := factory.MakeWorker()
if err != nil {
// Handle error
return
}
args := []float64{0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100}
results := make([]float64, len(args))
wg := sync.WaitGroup{}
for i := range args {
wg.Add(1)
go func(j int) {
defer wg.Done()
resultReq, success := worker.Work(context.WithValue(context.Background(), SquareRootWorkerArg{}, args[j]))
if !success {
fmt.Println("not successful")
}
results[j], _ = resultReq.Value(SquareRootWorkerResult{}).(float64)
}(i)
}
wg.Wait()
for _, result := range results {
fmt.Println(result)
}
worker.Close()
factory.Close()
Output: 0 1 2 3 4 5 6 7 8 9 10
Example (TractWorkerFactorySync) ¶
squareRootTract := tract.NewWorkerTract("square root", 4, tract.NewFactoryFromWorker(SquareRootWorker{}))
factory := tract.NewTractWorkerFactory(squareRootTract)
worker, err := factory.MakeWorker()
if err != nil {
// Handle error
return
}
args := []float64{0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100}
for _, arg := range args {
resultReq, success := worker.Work(context.WithValue(context.Background(), SquareRootWorkerArg{}, arg))
if !success {
fmt.Println("not successful")
}
result, _ := resultReq.Value(SquareRootWorkerResult{}).(float64)
fmt.Println(result)
}
worker.Close()
factory.Close()
Output: 0 1 2 3 4 5 6 7 8 9 10
func NewFactoryFromWorker ¶
func NewFactoryFromWorker(worker Worker) WorkerFactory
NewFactoryFromWorker makes a WorkerFactory from a provided Worker. Whenever the WorkerFactory makes a worker, it just returns same worker it started with. This is useful for the common case of making a tract that uses workers who's Work() function is already thred safe. without having to make a specific factory object. The worker's call to close is defered until the factory is closed.
func NewTractWorkerFactory ¶
func NewTractWorkerFactory(tract Tract) WorkerFactory
NewTractWorkerFactory turns a Tract into a WorkerFactory. When it makes it's first worker, it initializes and starts the tract. Its workers work consist of passing requests into the tract, waiting for the request to reach the end of the tract, and returning the resulting request. Any cleanups put on the request in the tract, versus cleanups put on the request called before it reaches this worker are kept entirely separated. This tract's cleanups occur at the end of just itself; cleanups put on the request before hand will occur when they normally would have.
type WorkerTractOption ¶
type WorkerTractOption func(*workerTract)
WorkerTractOption is a function option applyable to worker tracts.
func WithFactoryClosure ¶
func WithFactoryClosure(shouldClose bool) WorkerTractOption
WithFactoryClosure creates a WorkerTractOption that will specify if the Tract should close its WorkerFactory when the tract is finished running when its Start closure is called. By default factories are not closed when a Tract is closed. This allows a Tract to be restarted, but forces the user to close their own factories. If specified that the factory should close, then the tract cannot safely be restarted, but the user won't have to manually close their factory.
func WithMetricsHandler ¶
func WithMetricsHandler(mh MetricsHandler) WorkerTractOption
WithMetricsHandler creates a WorkerTractOption that will set the tract's metrics handler to the provided one. By default no metrics handler is used, and thus no metrics are gathered.



