Documentation
¶
Overview ¶
blocks/flow/block.go
Package flow provides a pipeline-style request processing block for go-code-blocks. It composes independent steps — validation, enrichment, CEL decisions, transformation and response — into a single server.Handler that is plugged directly into any server block (HTTP, Lambda, TCP).
A Flow replaces the imperative code inside a route handler with a declarative sequence of named, testable steps:
f := flow.New("order-flow",
flow.NewStep("validate-customer",
flow.Validate(validator, "is-pj", func(req *server.Request, _ *flow.State) map[string]any {
return map[string]any{"customer_type": req.Header("X-Customer-Type")}
})),
flow.NewStep("load-customer",
flow.Enrich(func(ctx context.Context, req *server.Request, _ *flow.State) (any, error) {
return customersDB.GetItem(ctx, req.PathParam("id"), nil)
})),
flow.NewStep("check-limit",
flow.Decide(validator, func(req *server.Request, s *flow.State) map[string]any {
var c Customer
s.Bind("load-customer", &c)
return map[string]any{"amount": c.CreditLimit}
})),
flow.NewStep("respond",
flow.Respond(func(ctx context.Context, req *server.Request, s *flow.State) (*server.Response, error) {
var c Customer
s.Bind("load-customer", &c)
return server.JSON(200, map[string]any{
"customer": c,
"approved": s.Passed("check-limit", "high-value"),
}), nil
})),
)
app.MustRegister(f)
router.POST("/orders/:id", f.Handler())
blocks/flow/state.go
blocks/flow/steps.go
Index ¶
- type Block
- type State
- func (s *State) Abort(resp *server.Response)
- func (s *State) Bind(key string, dest any) error
- func (s *State) Decision(key string) *decision.Result
- func (s *State) Failed(decisionKey, ruleName string) bool
- func (s *State) Get(key string) any
- func (s *State) Has(key string) bool
- func (s *State) Keys() []string
- func (s *State) Passed(decisionKey, ruleName string) bool
- func (s *State) Respond(resp *server.Response)
- func (s *State) Set(key string, value any)
- type Step
- type StepFn
- func AbortIf(condition func(state *State) bool, respFn func(state *State) *server.Response) StepFn
- func Decide(d *decision.Block, ...) StepFn
- func DecideFrom(d *decision.Block, inputFn func(req *server.Request, state *State) any) StepFn
- func Enrich(fn func(ctx context.Context, req *server.Request, state *State) (any, error)) StepFn
- func Respond(...) StepFn
- func Transform(fn func(ctx context.Context, req *server.Request, state *State) error) StepFn
- func Validate(d *decision.Block, ruleName string, ...) StepFn
- func ValidateFrom(d *decision.Block, ruleName string, ...) StepFn
- type ValidateOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Block ¶
type Block struct {
// contains filtered or unexported fields
}
Block is a flow pipeline block. It implements core.Block and produces a server.Handler via Handler().
func (*Block) Handler ¶
Handler returns a server.Handler that executes the flow for every request. The returned handler is safe to register on multiple routes.
router.POST("/orders/:id", orderFlow.Handler())
router.PUT("/orders/:id", orderFlow.Handler())
type State ¶
type State struct {
// contains filtered or unexported fields
}
State is the shared execution context threaded through every step of a Flow.
It accumulates enrichment data, CEL decision results and the final response, allowing each step to build on the work of previous steps without coupling them directly to each other.
State is safe for concurrent reads from the same goroutine during sequential step execution. Do not share a State across goroutines.
func (*State) Abort ¶
Abort short-circuits the flow immediately. The provided response is returned to the caller; no further steps are executed. Calling Abort from within a step function stops the pipeline cleanly — it is not an error.
// Inside a Validate step: state.Abort(server.Error(http.StatusForbidden, "access denied"))
func (*State) Bind ¶
Bind unmarshals the value stored under key into dest via JSON roundtrip. This allows retrieving typed structs from enrichment results without a manual type assertion.
var user User
if err := state.Bind("load-user", &user); err != nil { ... }
func (*State) Decision ¶
Decision returns the full *decision.Result stored under key, or nil. Key matches the name passed to flow.Step when using a Decide step.
func (*State) Passed ¶
Passed reports whether the named rule passed in the decision stored under decisionKey. Returns false when the decision or rule is not found.
// In a Transform or Respond step:
if state.Passed("route-decision", "is-pj") { ... }
type Step ¶
type Step struct {
// contains filtered or unexported fields
}
Step associates a name with a StepFn. The name is used for logging, error messages, and as the automatic storage key for Enrich and Decide steps.
func DecideFromStep ¶
func DecideFromStep(name string, d *decision.Block, inputFn func(req *server.Request, state *State) any) Step
DecideFromStep creates a Step that evaluates rules from a struct.
func DecideStep ¶
func DecideStep(name string, d *decision.Block, inputFn func(req *server.Request, state *State) map[string]any) Step
DecideStep creates a Step that evaluates all CEL rules and stores the result. This is the preferred form — it captures the step name automatically.
flow.DecideStep("route-decision", router, func(req, state) map[string]any {
return map[string]any{"customer_type": req.Header("X-Type")}
})
func EnrichStep ¶
func EnrichStep(name string, fn func(ctx context.Context, req *server.Request, state *State) (any, error)) Step
EnrichStep creates a Step that fetches data and stores it under name. This is the preferred form — it captures the step name automatically.
flow.EnrichStep("load-user", usersDB, func(ctx, req, state) (any, error) {
return usersDB.GetItem(ctx, req.PathParam("id"), nil)
})
func NewStep ¶
NewStep creates a named Step. It is the primary way to build generic steps inline within flow.New. For typed steps prefer EnrichStep and DecideStep, which capture the name automatically as the state storage key.
flow.New("checkout-flow",
flow.NewStep("validate", flow.Validate(...)),
flow.EnrichStep("load-user", fn),
flow.DecideStep("check-limit", rules, fn),
flow.NewStep("respond", flow.Respond(...)),
)
type StepFn ¶
StepFn is the function signature for every flow step.
It receives the incoming request and the shared State accumulated so far. A step can:
- Read enriched data: state.Get("step-name") / state.Bind("step-name", &dest)
- Store enriched data: state.Set("key", value)
- Read CEL results: state.Decision("step-name") / state.Passed("step", "rule")
- Abort the flow: state.Abort(server.Error(422, "invalid"))
- Set final response: state.Respond(server.JSON(200, result))
- Return a fatal error: return err → caller receives HTTP 500
func AbortIf ¶
func AbortIf( condition func(state *State) bool, respFn func(state *State) *server.Response, ) StepFn
AbortIf conditionally aborts the flow based on the current state. If condition returns true, the response from respFn is set and the flow stops. No-op when condition returns false.
// Abort if the decision said "not approved"
flow.Step("gate-approval",
flow.AbortIf(
func(s *flow.State) bool { return s.Failed("check-limit", "approved") },
func(s *flow.State) *server.Response {
return server.Error(http.StatusForbidden, "credit limit not approved")
},
))
func Decide ¶
func Decide( d *decision.Block, inputFn func(req *server.Request, state *State) map[string]any, ) StepFn
Decide runs all CEL rules in the decision block and stores the *decision.Result in state under the step name. Unlike Validate, a failing rule does NOT abort the flow — it is recorded for later steps to inspect.
flow.Step("route-decision",
flow.Decide(router, func(req *server.Request, s *flow.State) map[string]any {
var customer Customer
s.Bind("load-customer", &customer)
return map[string]any{"customer_type": customer.Type, "amount": customer.Total}
}))
In a later step:
if state.Passed("route-decision", "is-pj") { ... }
if state.Decision("route-decision").Any() { ... }
func DecideFrom ¶
DecideFrom runs all CEL rules against a struct with `decision:` tags. The result is stored under the step name.
func Enrich ¶
Enrich calls an external source (database, REST API, cache, etc.) and stores the result in state under the step name. Subsequent steps retrieve it via state.Get("step-name") or state.Bind("step-name", &dest).
If fn returns an error, the flow aborts with HTTP 500. If fn returns nil as the value, nothing is stored (step is a no-op).
flow.Step("load-user",
flow.Enrich(func(ctx context.Context, req *server.Request, _ *flow.State) (any, error) {
return usersDB.GetItem(ctx, req.PathParam("id"), nil)
}))
The result is stored under "load-user" and retrieved downstream:
var u User
state.Bind("load-user", &u)
func Respond ¶
func Respond(fn func(ctx context.Context, req *server.Request, state *State) (*server.Response, error)) StepFn
Respond builds and sets the final HTTP response for the flow. After a Respond step, subsequent steps still run but their ability to change the response depends on them calling state.Respond again. Use state.Abort if you want to stop all further processing.
flow.Step("send-response",
flow.Respond(func(ctx context.Context, req *server.Request, s *flow.State) (*server.Response, error) {
var payload map[string]any
s.Bind("payload", &payload)
return server.JSON(http.StatusOK, payload), nil
}))
func Transform ¶
Transform applies a pure data transformation to state without making any external calls. Use it to map, rename, filter or compute values from previously enriched data before building the response.
flow.Step("build-payload",
flow.Transform(func(ctx context.Context, req *server.Request, s *flow.State) error {
var user User
s.Bind("load-user", &user)
var order Order
s.Bind("load-order", &order)
s.Set("payload", map[string]any{
"user_name": user.Name,
"order_total": order.Total,
"order_status": order.Status,
})
return nil
}))
func Validate ¶
func Validate( d *decision.Block, ruleName string, inputFn func(req *server.Request, state *State) map[string]any, opts ...ValidateOption, ) StepFn
Validate runs a single CEL rule from the decision block. If the rule fails, the flow is aborted with HTTP 422 (or the status set via WithFailStatus). If it passes, the flow continues normally.
inputFn builds the map of CEL variables from the current request and state.
flow.Step("check-type",
flow.Validate(validator, "is-pj",
func(req *server.Request, _ *flow.State) map[string]any {
return map[string]any{"customer_type": req.Header("X-Customer-Type")}
},
flow.WithFailStatus(http.StatusForbidden),
))
func ValidateFrom ¶
func ValidateFrom( d *decision.Block, ruleName string, inputFn func(req *server.Request, state *State) any, opts ...ValidateOption, ) StepFn
ValidateFrom runs a single CEL rule against a struct with `decision:` tags. The struct is built from the current request and state via inputFn.
flow.Step("check-customer",
flow.ValidateFrom(validator, "is-pj",
func(req *server.Request, s *flow.State) any {
var c Customer
s.Bind("load-customer", &c)
return c
}))
type ValidateOption ¶
type ValidateOption func(*validateConfig)
ValidateOption configures the behaviour of a Validate step.
func WithFailMessage ¶
func WithFailMessage(fn func(ruleName string) string) ValidateOption
WithFailMessage sets a custom message formatter for validation failures. The rule name that failed is passed as the argument.
flow.WithFailMessage(func(rule string) string {
return fmt.Sprintf("business rule %q not satisfied", rule)
})
func WithFailStatus ¶
func WithFailStatus(code int) ValidateOption
WithFailStatus overrides the HTTP status code returned when validation fails. Defaults to 422 Unprocessable Entity.