ssm

package module
v0.0.0-...-34f24ca Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2024 License: MIT Imports: 8 Imported by: 3

README

Simple State Machine

Build Status

This is a very basic API for creating State Machines.

A state is represented by any function which accepts a context as a parameter and returns another state function.

The states are run iteratively until the End state is reached, when the state machine stops.

Error handling is provided through two Error states:

  • The simple Error state which stops the execution
  • The RestartError state which restarts the execution from the first received state.

type Fn func(ctx context.Context) Fn

To start the state machine you can pass a state to one of the Run or RunParallel functions. The later runs the received states in parallel and the former does so sequentially.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var InvalidRateLimitFn = errors.New("invalid rate limit method")

Functions

func Cancel

Cancel retrieves the cancel cause function from the context passed to Run or RunParallel and returns it.

func FixedWindow

func FixedWindow(count int, d time.Duration) func() (bool, time.Duration)

func IsEnd

func IsEnd(f Fn) bool

func IsError

func IsError(f Fn) bool

IsError ver grubby API to check if a state Fn is an error state

func Run

func Run(ctx context.Context, states ...Fn) error

Run executes the received states machine in a loop in sequential fashion until it's reduced to a single End, or ErrorEnd state, when it stops and returns the corresponding error.

func RunParallel

func RunParallel(ctx context.Context, states ...Fn) error

RunParallel executes the received states machine in a loop in parallel fashion until it's reduced to a single End, or ErrorEnd state, when it stops and returns the corresponding error.

Types

type ErrorFn

type ErrorFn Fn

func (ErrorFn) Error

func (f ErrorFn) Error() string

type Fn

type Fn func(context.Context) Fn
var End Fn = nil

func After

func After(d time.Duration, state Fn) Fn

After runs the received state after d time.Duration has elapsed. This function blocks until the timer elapses, when it returns the next resolved state.

func At

func At(t time.Time, state Fn) Fn

At runs the received state at t time.Time. This function blocks until the time is reached, when it returns the next resolved state.

func BackOff

func BackOff(dur StrategyFn, fn Fn) Fn

BackOff returns an aggregator function which can be used to execute the received state with increasing delays. The function for determining the delay is passed in the StrategyFn "dur" parameter.

There is no end condition, so take care to limit the execution through some external method, usually through the Retry mechanism.

Please note that the first run is executed without any delay.

func Batch

func Batch(states ...Fn) Fn

Batch executes the received states sequentially, and accumulates the next states. The resulting next state is returned as a sequential batch of all the non End states resolved.

func Breaker

func Breaker(trip TripStrategyFn, fn Fn) Fn

Breaker is a state machine that can be used for disabling execution of incoming "fn" state if its returning state is an error state and the conditions of the "trip" TripStrategyFn are fulfilled.

Currently, there is no method for closing the Breaker once opened.

func ErrorEnd

func ErrorEnd(err error) Fn

ErrorEnd represents an error state which returns an End state.

func ErrorRestart

func ErrorRestart(err error) Fn

ErrorRestart represents an error state which returns the first iteration passed. This iteration is loaded from the context, and is saved there by the Run and RunParallel functions.

func NonBlocking

func NonBlocking(states ...Fn) Fn

NonBlocking executes states in a goroutine and until it resolves it returns a wait state

func OpenBreaker

func OpenBreaker() Fn

OpenBreaker is an error state that is used to trip open the Breaker.

func Parallel

func Parallel(states ...Fn) Fn

Parallel executes the received states in parallel goroutines, and accumulates the next states. The resulting next state is returned as a parallel batch of all the non End states resolved.

func RateLimit

func RateLimit(limitFn LimitStrategyFn, states ...Fn) Fn

RateLimit is a state machine that executes the "state" Fn under the constraints of the "limitFn" LimitStrategyFn.

The strategy function returns if the current execution needs to be stalled in order to fulfill the rate limit logic it corresponds to, together with what the corresponding delay should be, if it does.

func Restart

func Restart(ctx context.Context) Fn

Restart retrieves the initial state passed to Run or RunParallel from the context and returns it. If nothing is found it returns the End state.

func Retry

func Retry(count int, fn Fn) Fn

Retry is a way to construct a state machine out of repeating the execution of received state "fn", when it returns an IsError state until the number of "retries" has been reached.

The "fn" parameter can be one of the functions accepting a StrategyFn parameters, which wrap the original state Fn, and which provide a way to delay the execution between retries.

The Retry state machine is reentrant, therefore can be used from multiple goroutines.

If zero is passed as the retry count, the state machine still gets executed at least once.

Example
package main

import (
	"context"
	"fmt"
	"time"

	sm "git.sr.ht/~mariusor/ssm"
)

func main() {
	st := time.Now()
	cnt := 0

	fmt.Printf("Retries\n")
	retry := sm.Retry(10, func(_ context.Context) sm.Fn {
		cnt++
		run := time.Now()
		fmt.Printf("%d:%s\n", cnt, run.Sub(st).Truncate(time.Millisecond))
		st = run
		return sm.ErrorEnd(fmt.Errorf("retrying"))
	})

	sm.Run(context.Background(), retry)

}
Output:

Retries
1:0s
2:0s
3:0s
4:0s
5:0s
6:0s
7:0s
8:0s
9:0s
10:0s

func Timeout

func Timeout(max time.Duration, state Fn) Fn

Timeout is a state machine that calls the "state" Fn limiting its execution time to a maximum of "max" time.Duration.

If the timeout is reached, the execution is canceled and an ErrorEnd state wrapping the context.DeadlineExceeded error is returned.

Example
package main

import (
	"context"
	"fmt"
	"time"

	sm "git.sr.ht/~mariusor/ssm"
)

func main() {
	fmt.Printf("Timeout\n")
	st := time.Now()
	err := sm.Run(context.Background(), sm.Timeout(400*time.Millisecond, func(ctx context.Context) sm.Fn {
		fmt.Println("Sleeping for 200ms")
		time.Sleep(200 * time.Millisecond)
		return sm.ErrorEnd(fmt.Errorf("we reached the end"))
	}))
	fmt.Printf("Error: %s after %s", err, time.Now().Sub(st).Truncate(100*time.Millisecond))

}
Output:

Timeout
Sleeping for 200ms
Error: we reached the end after 200ms

func TimeoutExceeded

func TimeoutExceeded() Fn

TimeoutExceeded is an error state that is used when a Timeout is reached.

Example
package main

import (
	"context"
	"fmt"
	"time"

	sm "git.sr.ht/~mariusor/ssm"
)

func main() {
	fmt.Printf("Timeout\n")
	st := time.Now()
	err := sm.Run(context.Background(), sm.Timeout(400*time.Millisecond, func(ctx context.Context) sm.Fn {
		fmt.Println("Sleeping for 1s")
		time.Sleep(time.Second)
		return sm.End
	}))
	fmt.Printf("Error: %s after %s", err, time.Now().Sub(st).Truncate(100*time.Millisecond))

}
Output:

Timeout
Sleeping for 1s
Error: context deadline exceeded after 400ms

func Wrap

func Wrap(fn func(ctx context.Context) error) Fn

func WrapRepeat

func WrapRepeat(fn func(ctx context.Context) error) Fn

type LimitStrategyFn

type LimitStrategyFn func() (bool, time.Duration)

LimitStrategyFn are a type of functions that determine if successive calls to a RateLimit'ed state require stalling and how much stalling is required to fulfill the desired rate limit.

type StrategyFn

type StrategyFn func() time.Duration

StrategyFn is the type that returns the desired time.Duration for the BackOff function.

func Constant

func Constant(d time.Duration) StrategyFn

Constant returns a constant time.Duration for every call.

Example
package main

import (
	"context"
	"fmt"
	"time"

	sm "git.sr.ht/~mariusor/ssm"
)

const delay = 10 * time.Millisecond

func main() {
	st := time.Now()
	cnt := 0

	fmt.Printf("8 retries with constant backoff of 10ms\n")
	con := sm.Retry(8, sm.BackOff(sm.Constant(delay), func(_ context.Context) sm.Fn {
		run := time.Now()
		cnt++
		fmt.Printf("%d:%s\n", cnt, run.Sub(st).Truncate(delay))
		st = run
		return sm.ErrorEnd(fmt.Errorf("keep going"))
	}))

	sm.Run(context.Background(), con)

}
Output:

8 retries with constant backoff of 10ms
1:0s
2:10ms
3:10ms
4:10ms
5:10ms
6:10ms
7:10ms
8:10ms

func Jitter

func Jitter(min time.Duration, fn StrategyFn) StrategyFn

Jitter adds "min" minimum time.Duration random jitter The maximum is the duration returned by StrategyFn

Example
package main

import (
	"context"
	"fmt"
	"time"

	sm "git.sr.ht/~mariusor/ssm"
)

func main() {
	st := time.Now()
	cnt := 0

	fmt.Printf("2 retries with 200µs jitter over 1ms delay\n")
	jitter := sm.Retry(2, sm.BackOff(sm.Jitter(200*time.Microsecond, sm.Constant(time.Millisecond)), func(_ context.Context) sm.Fn {
		run := time.Now()
		cnt++
		// NOTE(marius): Because our truncation precision (1ms) exceeds the possible jitter-ed duration (500µs to 1ms)
		// this is a mostly useless test.
		fmt.Printf("%d:%s\n", cnt, run.Sub(st).Truncate(time.Millisecond))
		st = run
		return sm.ErrorEnd(fmt.Errorf("never right"))
	}))

	sm.Run(context.Background(), jitter)

}
Output:

2 retries with 200µs jitter over 1ms delay
1:0s
2:2ms

func Linear

func Linear(d time.Duration, m float64) StrategyFn

Linear returns the linear function of the time.Duration multiplied by mul for every call.

Example
package main

import (
	"context"
	"fmt"
	"time"

	sm "git.sr.ht/~mariusor/ssm"
)

const delay = 10 * time.Millisecond

func main() {
	st := time.Now()
	cnt := 0

	fmt.Printf("5 retries with 1.4x linear backoff of 10ms delay\n")
	linear := sm.Retry(5, sm.BackOff(sm.Linear(delay, 1.4), func(_ context.Context) sm.Fn {
		run := time.Now()
		cnt++
		fmt.Printf("%d:%s\n", cnt, run.Sub(st).Truncate(time.Millisecond))
		st = run
		return sm.ErrorEnd(fmt.Errorf("don't stop"))
	}))

	sm.Run(context.Background(), linear)

}
Output:

5 retries with 1.4x linear backoff of 10ms delay
1:0s
2:10ms
3:14ms
4:20ms
5:28ms

type TripStrategyFn

type TripStrategyFn func() bool

TripStrategyFn is used by the Circuit Breaker state machine to determine if current run requires the breaker to trip open.

Our API doesn't allow these functions to do the error check on the returned state of the breaker state. It is assumed that the Breaker itself calls the TripStrategyFn function only for error states.

func MaxTriesTrip

func MaxTriesTrip(max int) TripStrategyFn

MaxTriesTrip returns false for "max" invocations, then returns true. It is the simplest form of count based circuit breaking.

The check function itself can be run safely in parallel, so if multiple checks are needed, MaxTriesTrip must be invoked multiple times.

func TimedTrip

func TimedTrip(d time.Duration, fn TripStrategyFn) TripStrategyFn

TimedTrip uses "fn" TripStrategyFn for returning the status of the Breaker, but it resets it every "d" time.Duration.

Directories

Path Synopsis
cmd module
diag Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL