fxworker

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: MIT Imports: 12 Imported by: 0

README

Fx Worker Module

ci go report codecov Deps PkgGoDev

Fx module for worker.

Installation

go get github.com/ankorstore/yokai/fxworker

Features

This module provides a workers pool to your Fx application with:

  • automatic panic recovery
  • automatic logging
  • automatic metrics
  • possibility to defer workers
  • possibility to limit workers max execution attempts

Documentation

Dependencies

This module is intended to be used alongside:

Loading

To load the module in your Fx application:

package main

import (
  "context"

  "github.com/ankorstore/yokai/fxconfig"
  "github.com/ankorstore/yokai/fxgenerate"
  "github.com/ankorstore/yokai/fxlog"
  "github.com/ankorstore/yokai/fxmetrics"
  "github.com/ankorstore/yokai/fxtrace"
  "github.com/ankorstore/yokai/fxworker"
  "github.com/ankorstore/yokai/worker"
  "go.uber.org/fx"
)

func main() {
  fx.New(
    fxconfig.FxConfigModule,                    // load the module dependencies
    fxlog.FxLogModule,
    fxtrace.FxTraceModule,
    fxtrace.FxTraceModule,
    fxmetrics.FxMetricsModule,
    fxgenerate.FxGenerateModule,
    fxworker.FxWorkerModule,                    // load the module
    fx.Invoke(func(pool *worker.WorkerPool) {
      pool.Start(context.Background())          // start the workers pool
    }),
  ).Run()
}
Configuration

Configuration reference:

# ./configs/config.yaml
app:
  name: app
  env: dev
  version: 0.1.0
  debug: true
modules:
  worker:
    defer: 0.1             # threshold in seconds to wait before starting all workers, immediate start by default
    attempts: 3            # max execution attempts in case of failures for all workers, no restart by default
    metrics:
      collect:
        enabled: true      # to collect metrics about workers executions
        namespace: foo     # workers metrics namespace (empty by default)
        subsystem: bar     # workers metrics subsystem (empty by default)

Notes:

  • the workers logging will be based on the fxlog module configuration
  • the workers tracing will be based on the fxtrace module configuration
Registration

This module provides the possibility to register several Worker implementations, with optional WorkerExecutionOption.

They will be then collected and given by Fx to the WorkerPool, made available in the Fx container.

This is done via the AsWorker() function:

package main

import (
  "context"

  "github.com/ankorstore/yokai/fxconfig"
  "github.com/ankorstore/yokai/fxgenerate"
  "github.com/ankorstore/yokai/fxlog"
  "github.com/ankorstore/yokai/fxmetrics"
  "github.com/ankorstore/yokai/fxtrace"
  "github.com/ankorstore/yokai/fxworker"
  "github.com/ankorstore/yokai/worker"
  "go.uber.org/fx"
)

type ExampleWorker struct{}

func NewExampleWorker() *ExampleWorker {
  return &ExampleWorker{}
}

func (w *ExampleWorker) Name() string {
  return "example-worker"
}

func (w *ExampleWorker) Run(ctx context.Context) error {
  worker.CtxLogger(ctx).Info().Msg("run")

  return nil
}

func main() {
  fx.New(
    fxconfig.FxConfigModule,                      // load the module dependencies
    fxlog.FxLogModule,
    fxtrace.FxTraceModule,
    fxtrace.FxTraceModule,
    fxmetrics.FxMetricsModule,
    fxgenerate.FxGenerateModule,
    fxworker.FxWorkerModule,                      // load the module
    fx.Provide(
      fxworker.AsWorker(
        NewExampleWorker,                         // register the ExampleWorker
        worker.WithDeferredStartThreshold(1),     // with a deferred start threshold of 1 second
        worker.WithMaxExecutionsAttempts(2),      // and 2 max execution attempts
      ),
    ),
    fx.Invoke(func(pool *worker.WorkerPool) {
      pool.Start(context.Background())            // start the workers pool
    }),
  ).Run()
}

To get more details about the features made available for your workers (contextual logging, tracing, etc.), check the worker module documentation.

Override

By default, the worker.WorkerPool is created by the DefaultWorkerPoolFactory.

If needed, you can provide your own factory and override the module:

package main

import (
  "context"

  "github.com/ankorstore/yokai/fxconfig"
  "github.com/ankorstore/yokai/fxgenerate"
  "github.com/ankorstore/yokai/fxhealthcheck"
  "github.com/ankorstore/yokai/fxlog"
  "github.com/ankorstore/yokai/fxmetrics"
  "github.com/ankorstore/yokai/fxtrace"
  "github.com/ankorstore/yokai/fxworker"
  "github.com/ankorstore/yokai/healthcheck"
  "github.com/ankorstore/yokai/worker"
  "go.uber.org/fx"
)

type CustomWorkerPoolFactory struct{}

func NewCustomWorkerPoolFactory() worker.WorkerPoolFactory {
  return &CustomWorkerPoolFactory{}
}

func (f *CustomWorkerPoolFactory) Create(options ...worker.WorkerPoolOption) (*worker.WorkerPool, error) {
  return &worker.WorkerPool{...}, nil
}

func main() {
  fx.New(
    fxconfig.FxConfigModule,                     // load the module dependencies
    fxlog.FxLogModule,
    fxtrace.FxTraceModule,
    fxtrace.FxTraceModule,
    fxmetrics.FxMetricsModule,
    fxgenerate.FxGenerateModule,
    fxworker.FxWorkerModule,                     // load the module
    fx.Decorate(NewCustomWorkerPoolFactory),     // override the module with a custom factory
    fx.Invoke(func(pool *worker.WorkerPool) {
      pool.Start(context.Background())           // start the custom worker pool
    }),
  ).Run()
}

Documentation

Index

Constants

View Source
const ModuleName = "worker"

ModuleName is the module name.

Variables

FxWorkerModule is the Fx worker module.

Functions

func AsWorker

func AsWorker(w any, options ...worker.WorkerExecutionOption) fx.Option

AsWorker registers a worker.Worker into Fx, with an optional list of worker.WorkerExecutionOption.

func GetReturnType

func GetReturnType(target any) string

GetReturnType returns the return type of a target.

func GetType

func GetType(target any) string

GetType returns the type of a target.

func NewFxWorkerPool

func NewFxWorkerPool(p FxWorkerPoolParam) (*worker.WorkerPool, error)

NewFxWorkerPool returns a new worker.WorkerPool.

Types

type FxWorkerModuleInfo

type FxWorkerModuleInfo struct {
	// contains filtered or unexported fields
}

FxWorkerModuleInfo is a module info collector for fxcore.

func NewFxWorkerModuleInfo

func NewFxWorkerModuleInfo(pool *worker.WorkerPool) *FxWorkerModuleInfo

NewFxWorkerModuleInfo returns a new FxWorkerModuleInfo.

func (*FxWorkerModuleInfo) Data

func (i *FxWorkerModuleInfo) Data() map[string]interface{}

Data return the data of the module info.

func (*FxWorkerModuleInfo) Name

func (i *FxWorkerModuleInfo) Name() string

Name return the name of the module info.

type FxWorkerPoolParam

type FxWorkerPoolParam struct {
	fx.In
	LifeCycle       fx.Lifecycle
	Generator       uuid.UuidGenerator
	TracerProvider  oteltrace.TracerProvider
	Factory         worker.WorkerPoolFactory
	Config          *config.Config
	Registry        *WorkerRegistry
	Logger          *log.Logger
	MetricsRegistry *prometheus.Registry
}

FxWorkerPoolParam allows injection of the required dependencies in NewFxWorkerPool.

type FxWorkerRegistryParam

type FxWorkerRegistryParam struct {
	fx.In
	Workers     []worker.Worker    `group:"workers"`
	Definitions []WorkerDefinition `group:"workers-definitions"`
}

FxWorkerRegistryParam allows injection of the required dependencies in NewFxWorkerRegistry.

type WorkerDefinition

type WorkerDefinition interface {
	ReturnType() string
	Options() []worker.WorkerExecutionOption
}

WorkerDefinition is the interface for workers definitions.

func NewWorkerDefinition

func NewWorkerDefinition(returnType string, options ...worker.WorkerExecutionOption) WorkerDefinition

NewWorkerDefinition returns a new WorkerDefinition.

type WorkerRegistry

type WorkerRegistry struct {
	// contains filtered or unexported fields
}

WorkerRegistry is the registry collecting workers and their definitions.

func NewFxWorkerRegistry

func NewFxWorkerRegistry(p FxWorkerRegistryParam) *WorkerRegistry

NewFxWorkerRegistry returns as new WorkerRegistry.

func (*WorkerRegistry) ResolveWorkersRegistrations

func (r *WorkerRegistry) ResolveWorkersRegistrations() ([]*worker.WorkerRegistration, error)

ResolveWorkersRegistrations resolves a list of worker.WorkerRegistration from their definitions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL