work

package module
v0.0.0-...-0a9c861 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 20, 2020 License: BSD-3-Clause Imports: 28 Imported by: 0

README

go-work

go-work is a framework that takes many of the http framework abstractions (requests, status, middleware, etc) and applies them to generic work loads.

Why go-work?

It seems like there's always work to do in non-trivial systems. Some times that work can easily be wrapped into an gRPC or HTTP service, but there are many problems that just don't fix that pattern:

  • deferred background Jobs from an gRPC/HTTP request
  • adding/consuming Jobs from a queue (Redis, Pulsar, S3, etc)
  • adding/consuming Jobs from a database table (just another type of queue)
  • scheduling Jobs in the future

Whenever we tackle these work problems, we need to solve the same issues for every execution of a Job:

  • logging
  • metrics
  • tracing
  • concurrency

If we do have concurrent work, then we also need to manage things like:

  • on-ramping the load
  • max concurrency
  • cancelling Jobs

Using go-work

Jobs

Jobs define the work to be done in a common interface. Think of them like an http.Request with an attached context (Ctx). Jobs have Args, Context, and optional timeouts. A Job.Ctx has key/values which can be used to pass data down the chain of adapters (aka middleware). A Job.Ctx.Status is set by a Handler to represent the state of the Job's execution: Success, Error, NoResponse, etc) Jobs are passed to Handlers by a Worker for each request.

type Job struct {
    // Queue the job should be placed into
    Queue string

    // ctx related to the execution of a job - Perform(job) gets a new ctx everytime
    Ctx *Context

    // Args that will be passed to the Handler as the 2nd parameter when run
    Args Args

    // Handler that will be run by the worker
    Handler string

    // Timeout for every execution of job
    Timeout time.Duration
}

Handlers

Handlers define the func to be executed for a Job by a Worker. Handlers also represent an interface than can be chained together to create middleware (aka Adapters)

type Handler func(j *Job) error

Example that "handles" a publishing Job request. You can imagine how easy it will be to define a new handler that handles publishing the next message from a Redis source. Notice the handler sets the Job's status before returning.

func DefaultPublishNextMessageCRDB(j *work.Job) error {
	box := j.Args["box"].(outbox.EventOutbox)
	publisher := j.Args["publisher"].(pulsar.Producer)
	connFactory := j.Args["outboxConnectionFactory"].(OutboxConnectionFactory)

	l, ok := j.Ctx.Get(work.AggregateLogger)
	if !ok {
		err := fmt.Errorf("PublishNextMessage: no logger")
		logrus.Error(err)
		j.Ctx.SetStatus(work.StatusInternalError)
		return err
	}
	log := l.(*logrus.Logger)
	logger := log.WithFields(logrus.Fields{})

	db, err := connFactory(logger)
	if err != nil {
		j.Ctx.SetStatus(work.StatusInternalError)
		logger.Error(err)
		return err
	}
	logger.Infof("PublishNextMessage: checking new messages...")
	toPub, err := box.NextMessage(db, logger)
	if err != nil {
		logger.Infof("PublishNextMessage: no message: %s", err.Error())
		j.Ctx.SetStatus(work.StatusNoResponse)
		return nil
	}

	event := createEvent(toPub)
	if err := pubsub.PublishEvent(context.Background(), publisher, event); err != nil {
		j.Ctx.SetStatus(work.StatusInternalError)
		err := fmt.Errorf("PublishNextMessage: pubsub.PublishEvent error == %s", err.Error())
		logger.Error(err)
		return err
	}
	logger.Infof("PublishNextMessage: published to topic %s", publisher.Topic())

	if err := box.MarkAsPublished(db, toPub, logger, outbox.WithPublishedToTopics(publisher.Topic())); err != nil {
		j.Ctx.SetStatus(work.StatusInternalError)
		err := fmt.Errorf("PublishNextMessage: box.MarkAsPublished error == %s", err.Error())
		logger.Error(err)
		return err
	}
	j.Ctx.SetStatus(work.StatusSuccess)
	return nil
}

Concurrent Job

Concurrent jobs define a job to be performed by workers concurrently.

type ConcurrentJob struct {

    // PerformWith if the job will be performed with PerformWithEveryWithSync as a reoccuring job
    // or just once as PerformWithWithSync
    PerformWith PerformWith

    // PerformEvery defines the duration between executions of PerformWithEveryWithSync jobs
    PerformEvery time.Duration

    // MaxWorkers for the concurrent Job
    MaxWorkers int64

    // Job to run concurrently
    Job Job
    // contains filtered or unexported fields
}
ConcurrentJob API
func (j *ConcurrentJob) Start() error
func (j *ConcurrentJob) Stop()
func (j *ConcurrentJob) Register(name string, h Handler) error
func (*ConcurrentJob) RunningWorkers
func NewConcurrentJob(
    job Job,
    workerFactory WorkerFactory,
    performWith PerformWith,
    performEvery time.Duration,
    maxWorker int64,
    startInterval time.Duration,
    logger Logger,
) (ConcurrentJob, error)

Be sure to call ConcurrentJob.Stop() or your program will leak resources.

defer w.Stop()
WorkerFactory API
type WorkerFactory func(context.Context, Logger) Worker
func NewCommonWorkerFactory(ctx context.Context, l Logger) Worker

Job.Ctx Status

Handlers set the status for every Job request (execution) using Job.Ctx.SetStatus() This allows middleware to take action based on the Job's status for things like metrics and logging.

const (
    // StatusUnknown was a job with an unknown status
    StatusUnknown Status = -1

    // StatusSucces was a successful job
    StatusSuccess = 200

    // StatusBadRequest was a job with a bad request
    StatusBadRequest = 400

    // StatusForbidden was a forbidden job
    StatusForbidden = 403

    // StatusUnauthorized was an unauthorized job
    StatusUnauthorized = 401

    // StatusTimeout was a job that timed out
    StatusTimeout = 408

    // StatusNoResponse was a job that intentionally created no response (basically the conditions were met for a noop by the Job)
    StatusNoResponse = 444

    // StatusInternalError was a job with an internal error
    StatusInternalError = 500

    // StatusUnavailable was a job that was unavailable
    StatusUnavailable = 503
)

Worker

A Worker implements an interface that defines how a Job will be executed.

  • now (sync and async)
  • at a time in the future (only async)
  • after waiting a specific time (only async)
  • every occurence of a specified time interval (sync and async)

Be sure to call Worker.Stop() or your program could leak resources.

defer w.Stop()

Official implementations:

  • CommonWorker. CommonWorkers is backed by the standard lib and goroutines.
type Worker interface {
	// Start the worker
	Start(context.Context) error
	// Stop the worker
	Stop() error
	// PerformEvery a job every interval (loop)
	// if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time
	// the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval
	PerformEvery(*Job, time.Duration, ...Option) error
	// Perform a job as soon as possibly,  If WithSync(true) Option then it's a blocking call, the default is false (so async)
	Perform(*Job, ...Option) error
	// PerformAt performs a job at a particular time and always async
	PerformAt(*Job, time.Time) error
	// PerformIn performs a job after waiting for a specified amount of time and always async
	PerformIn(*Job, time.Duration) error
	// PeformReceive peforms a job for every value received from channel
	PerformReceive(*Job, interface{}, ...Option) error
	// Register a Handler
	Register(string, Handler) error
	// GetContext returns the worker context
	GetContext() context.Context
	// SetContext sets the worker context
	SetContext(context.Context)
}

Adapter (Middleware)

Adapter defines a common func interface so middleware can be chained together for a Job. Currently, there is adapter middleware for things like: metrics, tracing, logging, and healthchecks. More adapters will be added as well.

type Adapter func(Handler) Handler

Example opentracing Adapater

// WithOpenTracing is an adpater middleware that adds opentracing
func WithOpenTracing(operationPrefix []byte) Adapter {
	if operationPrefix == nil {
		operationPrefix = []byte("api-request-")
	}
	return func(h Handler) Handler {
		return Handler(func(job *Job) error {
			// all before request is handled
			var span opentracing.Span
			if cspan := job.Ctx.Value("tracing-context"); cspan != nil {
				span = StartSpanWithParent(cspan.(opentracing.Span).Context(), string(operationPrefix)+job.Handler)
			} else {
				span = StartSpan(string(operationPrefix) + job.Handler)
			}
			defer span.Finish()                  // after all the other defers are completed.. finish the span
			job.Ctx.Set("tracing-context", span) // add the span to the context so it can be used for the duration of the request.
			defer span.SetTag("status-code", job.Ctx.Status())
			err := h(job)
			return err
		})
	}

Acknowledgements

I need to acknowledge that many of the ideas implemented in this library are not my own. I've been inspired and shamelessly borrowed from the following individuals/projects:


Complete API reference: docs.md

Documentation

Index

Constants

View Source
const (

	// RequestSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the request approximate size in bytes
	// WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt
	RequestSize = "keyRequestSize"

	// ResponseSize is the index used by Job.Ctx.Set(string, interface{}) or Job.Ctx.Get(string) to communicate the response approximate size in bytes
	// WithPrometheus optionally will report this info when RequestSize is found in the Job.Ctxt
	ResponseSize = "keyResponseSize"
)
View Source
const (
	// StatusUnknown was a job with an unknown status
	StatusUnknown Status = -1

	// StatusSuccess was a successful job
	StatusSuccess = 200

	// StatusBadRequest was a job with a bad request
	StatusBadRequest = 400

	// StatusForbidden was a forbidden job
	StatusForbidden = 403

	// StatusUnauthorized was an unauthorized job
	StatusUnauthorized = 401

	// StatusTimeout was a job that timed out
	StatusTimeout = 408

	// StatusNoResponse was a job that intentionally created no response (basically the conditions were met for a noop by the Job)
	StatusNoResponse = 444

	// StatusInternalError was a job with an internal error
	StatusInternalError = 500

	// StatusUnavailable was a job that was unavailable
	StatusUnavailable = 503
)
View Source
const AggregateLogger = "aggregateLogger"

AggregateLogger defines the const string for getting the logger from a Job context

View Source
const DefaultHealthTickerDuration = 1 * time.Minute

DefaultHealthTickerDuration is the time duration between the recalculation of the status returned by HealthCheck.GetStatus()

Variables

View Source
var ContextTraceIDField string

ContextTraceIDField - used to find the trace id in the context - optional

Functions

func InitTracing

func InitTracing(serviceName string, tracingAgentHostPort string, opt ...Option) (
	tracer opentracing.Tracer,
	reporter jaeger.Reporter,
	closer io.Closer,
	err error)

InitTracing will init opentracing with options WithSampleProbability defaults: constant sampling

func RunE

func RunE(fn func() error, opt ...Option) (err error)

Run the function safely knowing that if it panics, the panic will be caught and returned as an error. it produces a stack trace and if WithJob(job), the job's status is set to StatusInternalError.

func StartSpan

func StartSpan(operationName string) opentracing.Span

StartSpan will start a new span with no parent span.

func StartSpanWithParent

func StartSpanWithParent(parent opentracing.SpanContext, operationName string) opentracing.Span

StartSpanWithParent will start a new span with a parent span. example:

span:= StartSpanWithParent(c.Get("tracing-context"),

func WrapChannel

func WrapChannel(chanToWrap interface{}) (<-chan interface{}, error)

WrapChanel takes a concrete receiving chan in as an interface{}, and wraps it with an interface{} chan so you can treat all receiving channels the same way

Types

type Adapter

type Adapter func(Handler) Handler

Adapter defines the adaptor middleware type

func WithAggregateLogger

func WithAggregateLogger(
	useBanner bool,
	timeFormat string,
	utc bool,
	logrusFieldNameForTraceID string,
	contextTraceIDField []byte,
	opt ...Option) Adapter

WithAggregateLogger is a middleware adapter for aggregated logging (see go-gin-logrus)

func WithHealthCheck

func WithHealthCheck(health *HealthCheck, opt ...Option) Adapter

WithHealthCheck is an adpater middleware for healthcheck. it also adds a health http GET endpoint. It supports the Option. WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) that allows you to override the default of: 1.0 (100%) errors in the last 5 min.

func WithOpenTracing

func WithOpenTracing(operationPrefix []byte) Adapter

WithOpenTracing is an adpater middleware that adds opentracing

func WithPrometheus

func WithPrometheus(p *Prometheus) Adapter

Instrument is a gin middleware that can be used to generate metrics for a single handler

type Args

type Args map[string]interface{}

Args is how parameters are passed to jobs

type CommonWorker

type CommonWorker struct {

	// Logger for the worker
	Logger Logger
	// contains filtered or unexported fields
}

CommonWorker defines the typical common worker

func NewCommonWorker

func NewCommonWorker(l Logger) *CommonWorker

NewCommonWorker creates a new CommonWorker

func NewCommonWorkerWithContext

func NewCommonWorkerWithContext(ctx context.Context, l Logger) *CommonWorker

NewCommonWorkerWithContext creates a new CommonWorker

func (*CommonWorker) GetContext

func (w *CommonWorker) GetContext() context.Context

GetContext from the worker

func (*CommonWorker) Perform

func (w *CommonWorker) Perform(job *Job, opt ...Option) error

Perform executes the job. If WithSync(true) Option then it's a blocking call, the default is false (so async)

func (*CommonWorker) PerformAt

func (w *CommonWorker) PerformAt(job *Job, t time.Time) error

PerformAt performs a job at a particular time using a goroutine.

func (*CommonWorker) PerformEvery

func (w *CommonWorker) PerformEvery(job *Job, interval time.Duration, opt ...Option) error

PerformEvery executes the job on the interval. if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time. the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval.

func (*CommonWorker) PerformIn

func (w *CommonWorker) PerformIn(job *Job, d time.Duration) error

PerformIn performs a job after the "in" time has expired.

func (*CommonWorker) PerformReceive

func (w *CommonWorker) PerformReceive(job *Job, readChan interface{}, opt ...Option) error

PerformReceive will loop on receiving data from the readChan (passed as a simple interface{}). This uses the Job ctx for timeouts and cancellation (just like the rest of the framework)

func (*CommonWorker) Register

func (w *CommonWorker) Register(name string, h Handler) error

Register Handler with the worker

func (*CommonWorker) SetContext

func (w *CommonWorker) SetContext(ctx context.Context)

SetContext for the worker

func (*CommonWorker) Start

func (w *CommonWorker) Start(ctx context.Context) error

Start the worker

func (*CommonWorker) Stop

func (w *CommonWorker) Stop() error

Stop the worker and you must call Stop() to clean up the CommonWorker internal Ctx (or it will leak memory)

type ConcurrentJob

type ConcurrentJob struct {

	// PerformWith if the job will be performed with PerformWithEveryWithSync as a reoccuring job
	// or just once as PerformWithWithSync
	PerformWith PerformWith

	// PerformEvery defines the duration between executions of PerformWithEveryWithSync jobs
	PerformEvery time.Duration

	// MaxWorkers for the concurrent Job
	MaxWorkers int64

	// Job to run concurrently
	Job Job

	PerformReceiveChan interface{}
	// contains filtered or unexported fields
}

ConcurrentJob represents a job to be run concurrently

func NewConcurrentJob

func NewConcurrentJob(
	job Job,
	workerFactory WorkerFactory,
	performWith PerformWith,
	performEvery time.Duration,
	maxWorker int64,
	startInterval time.Duration,
	logger Logger,
	opt ...Option,
) (ConcurrentJob, error)

NewConcurrentJob makes a new job

func (*ConcurrentJob) Register

func (j *ConcurrentJob) Register(name string, h Handler) error

Register a handler for the Job's workers

func (*ConcurrentJob) RunningWorkers

func (j *ConcurrentJob) RunningWorkers() int64

func (*ConcurrentJob) Start

func (j *ConcurrentJob) Start() error

Start all the work

func (*ConcurrentJob) Stop

func (j *ConcurrentJob) Stop()

Stop all the work and you must call Stop() to clean up the ConcurrentJob Ctx (or it will leak memory)

type Context

type Context struct {
	// context.Context allows it to be a compatible context
	context.Context
	// contains filtered or unexported fields
}

Context for the Job and is reset for every execution

func NewContext

func NewContext(ctx context.Context) Context

NewContext factory

func (*Context) Get

func (c *Context) Get(k string) (interface{}, bool)

Get a key/value

func (*Context) Set

func (c *Context) Set(k string, v interface{})

Set a key/value

func (*Context) SetStatus

func (c *Context) SetStatus(s Status)

SetStatus for the context

func (*Context) Status

func (c *Context) Status() Status

Status retrieves the context's status

type Handler

type Handler func(j *Job) error

Handler is executed a a Work for a given Job. It also defines the interface for handlers that can be used by middleware adapters

func Adapt

func Adapt(h Handler, adapters ...Adapter) Handler

Adapt a handler with provided middlware adapters

type HealthCheck

type HealthCheck struct {

	// HealthPath is the GET url path for the endpoint
	HealthPath string

	// Engine is the gin.Engine that should serve the endpoint
	Engine *gin.Engine

	// Handler is the gin Hanlder to use for the endpoint
	Handler gin.HandlerFunc
	// contains filtered or unexported fields
}

HealthCheck provides a healthcheck endpoint for the work

func NewHealthCheck

func NewHealthCheck(opt ...Option) *HealthCheck

NewHealthCheck creates a new HealthCheck with the options provided. Options: WithEngine(*gin.Engine), WithHealthPath(string), WithHealthHander(gin.HandlerFunc), WithMetricTicker(time.Ticker)

func (*HealthCheck) Close

func (h *HealthCheck) Close()

Close cleans up the all the HealthCheck resources

func (*HealthCheck) DefaultHealthHandler

func (h *HealthCheck) DefaultHealthHandler() gin.HandlerFunc

func (*HealthCheck) GetStatus

func (h *HealthCheck) GetStatus() int

GetStatus returns the current health status

func (*HealthCheck) SetStatus

func (h *HealthCheck) SetStatus(s int)

SetStatus sets the current health status

func (*HealthCheck) WithEngine

func (h *HealthCheck) WithEngine(e *gin.Engine)

WithEngine lets you set the *gin.Engine if it's created after you've created the *HealthCheck

type Job

type Job struct {
	// Queue the job should be placed into
	Queue string

	// ctx related to the execution of a job - Perform(job) gets a new ctx everytime
	Ctx *Context

	// Args that will be passed to the Handler as the 2nd parameter when run
	Args Args

	// Handler that will be run by the worker
	Handler string

	// Timeout for every execution of job
	Timeout time.Duration
}

Job to be processed by a Worker

func (*Job) Copy

func (j *Job) Copy() Job

Copy provides a deep copy of the Job

type Logger

type Logger interface {
	Debugf(string, ...interface{})
	Infof(string, ...interface{})
	Errorf(string, ...interface{})
	Debug(...interface{})
	Info(...interface{})
	Error(...interface{})
}

Logger is used by worker to write logs

type Metric

type Metric interface {
	Add(n float64)
	String() string
	Value() float64
}

Metric is a single meter (a counter for now, but in the future: gauge or histogram, optionally - with history)

func NewMetricCounter

func NewMetricCounter(frames ...string) Metric

NewCounter returns a counter metric that increments the value with each incoming number.

func NewMetricStatusGauge

func NewMetricStatusGauge(min int, max int, frames ...string) Metric

NewMetricStatusGauge is a factory for statusGauge Metrics

type Option

type Option func(Options)

Option - how Options are passed as arguments

func WithBanner

func WithBanner(useBanner bool) Option

WithBanner specifies the table name to use for an outbox

func WithChannel

func WithChannel(ch interface{}) Option

WithChannel optional channel parameter

func WithEngine

func WithEngine(e *gin.Engine) Option

WithEngine is an option allowing to set the gin engine when intializing with New. Example : r := gin.Default() p := work.NewPrometheus(WithEngine(r))

func WithErrorPercentage

func WithErrorPercentage(percentageOfErrors float64, lastNumOfMinutes int) Option

WithErrorPercentage allows you to override the default of 1.0 (100%) with the % you want for error rate.

func WithErrorPercentageByRequestCount

func WithErrorPercentageByRequestCount(percentageOfErrors float64, minNumOfRequests, lastNumOfRequests int) Option

func WithHealthHandler

func WithHealthHandler(h gin.HandlerFunc) Option

WithHealthHandler override the default health endpoint handler

func WithHealthPath

func WithHealthPath(path string) Option

WithHealthPath override the default path for the health endpoint

func WithHealthTicker

func WithHealthTicker(ticker *time.Ticker) Option

func WithIgnore

func WithIgnore(handlers ...string) Option

WithIgnore is used to disable instrumentation on some routes

func WithJob

func WithJob(j *Job) Option

WithJob optional Job parameter

func WithLogLevel

func WithLogLevel(level logrus.Level) Option

WithLogLevel will set the logrus log level for the job handler

func WithMetricsPath

func WithMetricsPath(path string) Option

WithMetricsPath is an option allowing to set the metrics path when intializing with New. Example : work.New(work.WithMetricsPath("/mymetrics"))

func WithNamespace

func WithNamespace(ns string) Option

WithNamespace is an option allowing to set the namespace when intitializing with New. Example : work.New(work.WithNamespace("my_namespace"))

func WithReducedLoggingFunc

func WithReducedLoggingFunc(a ReducedLoggingFunc) Option

WithReducedLoggingFunc specifies the function used to set custom logic around when to print logs

func WithSampleProbability

func WithSampleProbability(sampleProbability float64) Option

WithSampleProbability - optional sample probability

func WithSilentNoResponse

func WithSilentNoResponse(silent bool) Option

WithSilentNoResponse specifies that StatusNoResponse requests should be silent (no logging)

func WithSilentSuccess

func WithSilentSuccess(silent bool) Option

WithSilentSuccess specifies that StatusSuccess requests should be silent (no logging)

func WithSubSystem

func WithSubSystem(sub string) Option

WithSubsystem is an option allowing to set the subsystem when intitializing with New. Example : work.New(work.WithSubsystem("my_system"))

func WithSync

func WithSync(sync bool) Option

WithSync optional synchronous execution

type Options

type Options map[string]interface{}

Options = how options are represented

func GetOpts

func GetOpts(opt ...Option) Options

GetOpts - iterate the inbound Options and return a struct

func (*Options) Get

func (o *Options) Get(name string) (interface{}, bool)

Get a specific option by name

type PerformWith

type PerformWith int
const (
	PerformWithUnknown PerformWith = iota
	PerformWithSync
	PerformWithAsync
	PerformEveryWithSync
	PerformEveryWithAsync
	PerformReceiveWithSync
	PerformReceiveWithAsync
)

func (PerformWith) String

func (p PerformWith) String() string

type Prometheus

type Prometheus struct {
	MetricsPath string
	Namespace   string
	Subsystem   string
	Ignored     isPresentMap
	Engine      *gin.Engine
	// contains filtered or unexported fields
}

Prometheus contains the metrics gathered by the instance and its path

func NewPrometheus

func NewPrometheus(opt ...Option) *Prometheus

New will initialize a new Prometheus instance with the given options. If no options are passed, sane defaults are used. If a router is passed using the Engine() option, this instance will automatically bind to it.

func (*Prometheus) WithEngine

func (p *Prometheus) WithEngine(e *gin.Engine)

WithEngine is a method that should be used if the engine is set after middleware initialization

type ReducedLoggingFunc

type ReducedLoggingFunc func(workStatus Status, logBufferLength int) bool

ReducedLoggingFunc defines a function type used for custom logic on when to print logs

var DefaultReducedLoggingFunc ReducedLoggingFunc = func(s Status, l int) bool { return false }

type Session

type Session struct {
	// contains filtered or unexported fields
}

Session that is used to pass session info to a Job this is a good spot to put things like *redis.Pool or *sqlx.DB for outbox connection pools

func NewSession

func NewSession() Session

NewSession factory

func (*Session) Get

func (c *Session) Get(k string) (interface{}, bool)

Get a key/value

func (*Session) Set

func (c *Session) Set(k string, v interface{})

Set a key/value

type Status

type Status int

Status for a job's execution (Perform)

type Worker

type Worker interface {
	// Start the worker
	Start(context.Context) error
	// Stop the worker
	Stop() error
	// PerformEvery a job every interval (loop)
	// if WithSync(true) Option, then the operation blocks until it's done which means only one instance can be executed at a time
	// the default is WithSync(false), so there's not blocking and you could get multiple instances running at a time if the latency is longer than the interval
	PerformEvery(*Job, time.Duration, ...Option) error
	// Perform a job as soon as possibly,  If WithSync(true) Option then it's a blocking call, the default is false (so async)
	Perform(*Job, ...Option) error
	// PerformAt performs a job at a particular time and always async
	PerformAt(*Job, time.Time) error
	// PerformIn performs a job after waiting for a specified amount of time and always async
	PerformIn(*Job, time.Duration) error
	// PeformReceive peforms a job for every value received from channel
	PerformReceive(*Job, interface{}, ...Option) error
	// Register a Handler
	Register(string, Handler) error
	// GetContext returns the worker context
	GetContext() context.Context
	// SetContext sets the worker context
	SetContext(context.Context)
}

func NewCommonWorkerFactory

func NewCommonWorkerFactory(ctx context.Context, l Logger) Worker

NewCommonWorkerFactory is a simple adapter that allows the factory to conform to the WorkerFactory type

type WorkerFactory

type WorkerFactory func(context.Context, Logger) Worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL