command

package module
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2023 License: MIT Imports: 6 Imported by: 0

README

Go Command Bus

A command bus to demand all the things.

Maintainability Test Coverage GoDoc

Installation

go get github.com/io-da/command

Overview

  1. Commands
  2. Handlers
  3. Error Handlers
  4. Middlewares
  5. The Bus
    1. Handling Commands
    2. Tweaking Performance
    3. Shutting Down
    4. Available Errors
    5. Scheduled Commands
  6. Benchmarks
  7. Examples

Introduction

This library is intended for anyone looking to trigger application commands in a decoupled architecture.
The Bus provides the option to use workers (goroutines) to attempt handling the commands in non-blocking manner.
Clean and simple codebase.

Getting Started

Commands

Commands are any type that implements the Command interface. Ideally they should contain immutable data.
It is also possible to provide a closure to the bus.

type Identifier int64

type Command interface {
    Identifier() Identifier
}
Handlers

Handlers are any type that implements the Handler interface. Handlers must be instantiated and provided to the Bus on initialization.
The Bus is initialized using the function bus.Initialize. The Bus will then use the Identifier of the handlers to know which Command to process.

type Handler interface {
    Handle(cmd Command) (any, error)
    Handles() Identifier
}
Error Handlers

Error handlers are any type that implements the ErrorHandler interface. Error handlers are optional (but advised) and provided to the Bus using the bus.SetErrorHandlers function.

type ErrorHandler interface {
    Handle(cmd Command, err error)
}

Any time an error occurs within the bus, it will be passed on to the error handlers. This strategy can be used for decoupled error handling.

Middlewares

Middlewares are any type that implements the InwardMiddleware or the OutwardMiddleware interface.
Middlewares are optional and provided to the Bus using the bus.SetInwardMiddlewares and bus.SetOutwardMiddlewares functions.
An InwardMiddleware handles every command before it is provided to its respective handler.
An OutwardMiddleware handles every command that was successfully processed by its respective handler. These middlewares are also provided with the data or error returned by the command handler. Allowing potential data/error handling, such as transformations.

The order in which the middlewares are provided to the Bus is always respected. Additionally, if a middleware returns an error, it interrupts the flow and the command is no longer passed along to the next step.

type InwardMiddleware interface {
    HandleInward(cmd Command) error
}

type OutwardMiddleware interface {
    HandleOutward(cmd Command, data any, err error) error
}
The Bus

Bus is the struct that will be used to trigger all the application's commands.
The Bus should be instantiated and initialized on application startup. The initialization is separated from the instantiation for dependency injection purposes.
The application should instantiate the Bus once and then use it's reference to trigger all the commands.
There can only be one Handler per Command.

Handling Commands

The Bus provides multiple ways to handle commands.

Synchronous. The bus processes the command immediately and returns the result from the handler.

data, err := bus.Handle(&FooBar{})

Asynchronous. The bus processes the command using workers. It is no-blocking.
It is possible however to Await for the command to finish being processed.

as, _ := bus.HandleAsync(&FooBar{})
// do something
data, err := as.Await()

Closures. The bus also accepts a closure to be provided.
It will be handled in an asynchronous manner using workers. These also support Await.

as, _ := bus.HandleClosure(func() (data any, err error) {
  return "foo bar", nil
})
// do something
data, err := as.Await()

Schedule. The bus will use a schedule processor to handle the provided command according to a *Schedule struct.
More information about *Schedule can be found here.

uuid, err := bus.Schedule(&FooBar{}, schedule.At(time.Now())))
// if the scheduled command needs to be removed during runtime.
bus.RemoveScheduled(uuid)
Tweaking Performance

The number of workers for async commands can be adjusted.

bus.SetWorkerPoolSize(10)

If used, this function must be called before the Bus is initialized. And it specifies the number of goroutines used to handle async commands.
In some scenarios increasing the value can drastically improve performance.
It defaults to the value returned by runtime.GOMAXPROCS(0).

The buffer size of the async commands queue can also be adjusted.
Depending on the use case, this value may greatly impact performance.

bus.SetQueueBuffer(100)

If used, this function must be called before the Bus is initialized.
It defaults to 100.

Shutting Down

The Bus also provides a shutdown function that attempts to gracefully stop the command bus and all its routines.

bus.Shutdown()

This function will block until the bus is fully stopped.

Available Errors

Below is a list of errors that can occur when calling bus.Initialize, bus.Handle, bus.HandleAsync and bus.Schedule.

command.InvalidCommandError
command.BusNotInitializedError
command.BusIsShuttingDownError
command.OneHandlerPerCommandError
command.HandlerNotFoundError
Scheduled Commands

Since 1.2, the bus also has built in support for github.com/io-da/schedule.
Using bus.Schedule, one may schedule a command to be processed at certain times or even following a cron like pattern.

Benchmarks

All the benchmarks are performed with command handlers calculating the fibonacci of 1000.
CPU: 11th Gen Intel(R) Core(TM) i9-11950H @ 2.60GHz

Benchmark Type Time
Sync Commands 15828 ns/op
Async Commands 2808 ns/op

Examples

An optional constants list of Command identifiers (idiomatic enum) for consistency

const (
   Unidentified Identifier = iota
   FooCommand
   BarCommand
)
Example Commands

A simple struct command.


type fooCommand struct {
    bar string
}
func (*fooCommand) Identifier() Identifier {
    return FooCommand
}

A string command.

type barCommand string
func (barCommand) Identifier() Identifier {
    return BarCommand
}
Example Handlers

A couple of empty respective handlers.


type fooHandler struct{}

func (hdl *fooHandler) Handles() Identifier {
    return FooCommand
}

func (hdl *fooHandler) Handle(cmd Command) (data any, err error) {
    // handle FooCommand
    return
}

type barHandler struct{}

func (hdl *barHandler) Handles() Identifier {
    return BarCommand
}

func (hdl *barHandler) Handle(cmd Command) (data any, err error) {
    // handle BarCommand
    return
}
Putting it together

Initialization and usage of the exemplified commands and handlers

import (
    "github.com/io-da/command"
)

func main() {
    // instantiate the bus (returns *command.Bus)
    bus := command.NewBus()
    
    // initialize the bus with all the application's command handlers
    bus.Initialize(
        &fooHandler{},
        &barHandler{},
    )
    
    // trigger commands!
    // sync
    bus.Handle(&fooCommand{})
    // async
    bus.HandleAsync(barCommand{})
    // async await
    res, _ := bus.HandleAsync(&fooCommand{}) 
    // do something
    res.Await()
    // scheduled to run every day
    sch := schedule.As(schedule.Cron().EveryDay())
    bus.Schedule(&fooCommand{}, sch)
}

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Documentation

Index

Constants

View Source
const (
	// InvalidCommandError will be returned when attempting to handle an invalid command.
	InvalidCommandError = BusError("command: invalid command")
	// BusNotInitializedError will be returned when attempting to handle a command before the bus is initialized.
	BusNotInitializedError = BusError("command: the bus is not initialized")
	// BusIsShuttingDownError will be returned when attempting to handle a command while the bus is shutting down.
	BusIsShuttingDownError = BusError("command: the bus is shutting down")
	// OneHandlerPerCommandError will be returned when attempting to initialize the bus with more than one handler listening to the same command.
	OneHandlerPerCommandError = BusError("command: there can only be one handler per command")
	// HandlerNotFoundError will be returned when no handler is found to the provided command.
	HandlerNotFoundError = BusError("command: no handler found for the command provided")
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Async added in v1.3.0

type Async struct {
	// contains filtered or unexported fields
}

Async is the struct returned from async commands.

func (*Async) Await added in v1.3.0

func (res *Async) Await() (any, error)

Await for the command to be processed.

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the only struct exported and required for the command bus usage. The Bus should be instantiated using the NewBus function.

func NewBus

func NewBus() *Bus

NewBus instantiates the Bus struct. The Initialization of the Bus is performed separately (Initialize function) for dependency injection purposes.

func (*Bus) Handle

func (bus *Bus) Handle(cmd Command) (any, error)

Handle processes the command synchronously through their respective handler.

func (*Bus) HandleAsync

func (bus *Bus) HandleAsync(cmd Command) (*Async, error)

HandleAsync processes the command asynchronously using workers through their respective handler. It also returns an *Async struct which allows clients to optionally ```Await``` for the command to be processed.

func (*Bus) HandleClosure added in v1.4.0

func (bus *Bus) HandleClosure(cmd ClosureCommand) *Async

HandleClosure processes a closure command asynchronously using workers. It also returns an *Async struct which allows clients to optionally ```Await``` for the command to be processed.

func (*Bus) Initialize

func (bus *Bus) Initialize(hdls ...Handler) error

Initialize the command bus by providing the list of handlers. There can only be one handler per command.

func (*Bus) RemoveScheduled added in v1.2.0

func (bus *Bus) RemoveScheduled(keys ...uuid.UUID)

RemoveScheduled removes previously scheduled commands.

func (*Bus) Schedule added in v1.2.0

func (bus *Bus) Schedule(cmd Command, sch *schedule.Schedule) (*uuid.UUID, error)

Schedule allows commands to be scheduled to be executed asynchronously. Check https://github.com/io-da/schedule for ```*Schedule``` usage.

func (*Bus) SetErrorHandlers added in v1.3.0

func (bus *Bus) SetErrorHandlers(hdls ...ErrorHandler)

SetErrorHandlers may optionally be used to provide a list of error handlers. They will receive any error thrown during the command process. Error handlers may only be provided *before* the bus is initialized.

func (*Bus) SetInwardMiddlewares added in v1.3.0

func (bus *Bus) SetInwardMiddlewares(mdls ...InwardMiddleware)

SetInwardMiddlewares may optionally be used to provide a list of inward middlewares. They will receive and process every command that is about to be handled. *The order the middlewares are provided is always respected*. Middlewares may only be provided *before* the bus is initialized.

func (*Bus) SetOutwardMiddlewares added in v1.3.0

func (bus *Bus) SetOutwardMiddlewares(mdls ...OutwardMiddleware)

SetOutwardMiddlewares may optionally be used to provide a list of outward middlewares. They will receive and process every command that was handled. *The order the middlewares are provided is always respected*. Middlewares may only be provided *before* the bus is initialized.

func (*Bus) SetQueueBuffer added in v1.3.0

func (bus *Bus) SetQueueBuffer(queueBuffer int)

SetQueueBuffer may optionally be used to tweak the buffer size of the async commands queue. This value may have high impact on performance depending on the use case. It can only be adjusted *before* the bus is initialized. It defaults to 100.

func (*Bus) SetWorkerPoolSize added in v1.3.0

func (bus *Bus) SetWorkerPoolSize(workerPoolSize int)

SetWorkerPoolSize may optionally be used to tweak the worker pool size for async commands. It can only be adjusted *before* the bus is initialized. It defaults to the value returned by runtime.GOMAXPROCS(0).

func (*Bus) Shutdown

func (bus *Bus) Shutdown()

Shutdown the command bus gracefully. *Async commands accessed while shutting down will be disregarded*.

type BusError added in v1.3.0

type BusError string

BusError is used to create errors originating from the command bus

func (BusError) Error added in v1.3.0

func (e BusError) Error() string

Error returns the string message of the error.

type ClosureCommand added in v1.4.0

type ClosureCommand func() (data any, err error)

ClosureCommand is the type used by the bus to handle closures

type Command

type Command interface {
	Identifier() Identifier
}

Command is the interface that must be implemented by any type to be considered a command.

type ErrorHandler

type ErrorHandler interface {
	Handle(cmd Command, err error)
}

ErrorHandler must be implemented for a type to qualify as an error handler.

type Handler

type Handler interface {
	Handle(cmd Command) (any, error)
	Handles() Identifier
}

Handler must be implemented for a type to qualify as a command handler.

type Identifier added in v1.3.0

type Identifier int64

Identifier is used to create a consistent identity solution for commands

type InwardMiddleware added in v1.3.0

type InwardMiddleware interface {
	HandleInward(cmd Command) error
}

InwardMiddleware must be implemented for a type to qualify as an inward command middleware. An inward middleware process a command before being provided to the respective command handler.

type OutwardMiddleware added in v1.3.0

type OutwardMiddleware interface {
	HandleOutward(cmd Command, data any, err error) (any, error)
}

OutwardMiddleware must be implemented for a type to qualify as an outward command middleware. An outward middleware process the command after being provided to the respective command handler.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL