Documentation
¶
Index ¶
- func New[T any](pub goflux.Publisher[T], opts ...Option[T]) goflux.Handler[T]
- func NewFlatMap[T, U any](pub goflux.Publisher[U], fn FlatMapFunc[T, U], opts ...MapOption[T, U]) goflux.Handler[T]
- func NewMap[T, U any](pub goflux.Publisher[U], mapFn MapFunc[T, U], opts ...MapOption[T, U]) goflux.Handler[T]
- type DeadLetterFunc
- type Filter
- type FlatMapFunc
- type MapFunc
- type MapOption
- type Option
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func New ¶
New returns a goflux.Handler that forwards every accepted message to pub. The handler:
- Runs the middleware chain (if any)
- Applies the filter (if set) — false means skip (return nil)
- Forwards msg.Header into the publish context via goflux.WithHeader
- Publishes msg.Payload to msg.Subject
Publish errors are returned to the caller (transport decides retry/nak). The dead-letter observer is called on publish error for logging/alerting.
Example ¶
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/pipe"
"github.com/foomo/goflux/transport/channel"
"github.com/foomo/gofuncy"
)
type Event struct {
ID string `json:"id"`
Name string `json:"name"`
}
func main() {
srcBus := channel.NewBus[Event]()
srcPub := channel.NewPublisher(srcBus)
srcSub, err := channel.NewSubscriber(srcBus, 1)
if err != nil {
panic(err)
}
dstBus := channel.NewBus[Event]()
dstPub := channel.NewPublisher(dstBus)
dstSub, err := channel.NewSubscriber(dstBus, 1)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return dstSub.Subscribe(ctx, "events", func(_ context.Context, msg goflux.Message[Event]) error {
fmt.Println(msg.Subject, msg.Payload)
cancel()
return nil
})
}, gofuncy.WithName("dst-subscriber"))
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return srcSub.Subscribe(ctx, "events", pipe.New[Event](dstPub))
}, gofuncy.WithName("pipe"))
if err := srcPub.Publish(ctx, "events", Event{ID: "1", Name: "hello"}); err != nil {
panic(err)
}
<-ctx.Done()
}
Output: events {1 hello}
func NewFlatMap ¶
func NewFlatMap[T, U any](pub goflux.Publisher[U], fn FlatMapFunc[T, U], opts ...MapOption[T, U]) goflux.Handler[T]
NewFlatMap returns a goflux.Handler that expands each message from T into zero or more U values, publishing each one individually.
If the FlatMapFunc fails, the error is returned immediately. If a publish fails mid-batch, items already published are NOT rolled back — downstream consumers must be idempotent or deduplicate.
Example ¶
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/pipe"
"github.com/foomo/goflux/transport/channel"
"github.com/foomo/gofuncy"
)
type LineItem struct {
OrderID string `json:"order_id"`
Item string `json:"item"`
}
type Order struct {
ID string `json:"id"`
Items []string `json:"items"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
srcBus := channel.NewBus[Order]()
srcPub := channel.NewPublisher(srcBus)
srcSub, err := channel.NewSubscriber(srcBus, 1)
if err != nil {
panic(err)
}
dstBus := channel.NewBus[LineItem]()
dstPub := channel.NewPublisher(dstBus)
dstSub, err := channel.NewSubscriber(dstBus, 1)
if err != nil {
panic(err)
}
flatMapFn := func(_ context.Context, msg goflux.Message[Order]) ([]LineItem, error) {
items := make([]LineItem, len(msg.Payload.Items))
for i, item := range msg.Payload.Items {
items[i] = LineItem{OrderID: msg.Payload.ID, Item: item}
}
return items, nil
}
var count int
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return dstSub.Subscribe(ctx, "orders", func(_ context.Context, msg goflux.Message[LineItem]) error {
fmt.Println(msg.Payload.OrderID, msg.Payload.Item)
count++
if count == 2 {
cancel()
}
return nil
})
}, gofuncy.WithName("dst-subscriber"))
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return srcSub.Subscribe(ctx, "orders", pipe.NewFlatMap[Order, LineItem](dstPub, flatMapFn))
}, gofuncy.WithName("pipe-flatmap"))
if err := srcPub.Publish(ctx, "orders", Order{ID: "o1", Items: []string{"widget", "gadget"}}); err != nil {
panic(err)
}
<-ctx.Done()
}
Output: o1 widget o1 gadget
func NewMap ¶
func NewMap[T, U any](pub goflux.Publisher[U], mapFn MapFunc[T, U], opts ...MapOption[T, U]) goflux.Handler[T]
NewMap returns a goflux.Handler that transforms each message from T to U before publishing. The filter runs on the original Message[T] before the map.
Map errors and publish errors are returned to the caller. The dead-letter observer is called on either failure.
Example ¶
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/pipe"
"github.com/foomo/goflux/transport/channel"
"github.com/foomo/gofuncy"
)
type Event struct {
ID string `json:"id"`
Name string `json:"name"`
}
type Summary struct {
Label string `json:"label"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
srcBus := channel.NewBus[Event]()
srcPub := channel.NewPublisher(srcBus)
srcSub, err := channel.NewSubscriber(srcBus, 1)
if err != nil {
panic(err)
}
dstBus := channel.NewBus[Summary]()
dstPub := channel.NewPublisher(dstBus)
dstSub, err := channel.NewSubscriber(dstBus, 1)
if err != nil {
panic(err)
}
mapFn := func(_ context.Context, msg goflux.Message[Event]) (Summary, error) {
return Summary{Label: msg.Payload.Name}, nil
}
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return dstSub.Subscribe(ctx, "events", func(_ context.Context, msg goflux.Message[Summary]) error {
fmt.Println(msg.Subject, msg.Payload)
cancel()
return nil
})
}, gofuncy.WithName("dst-subscriber"))
gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
ready()
return srcSub.Subscribe(ctx, "events", pipe.NewMap[Event, Summary](dstPub, mapFn))
}, gofuncy.WithName("pipe-map"))
if err := srcPub.Publish(ctx, "events", Event{ID: "1", Name: "hello"}); err != nil {
panic(err)
}
<-ctx.Done()
}
Output: events {hello}
Types ¶
type DeadLetterFunc ¶
DeadLetterFunc is an observer called when a map/flatmap/publish operation fails. It receives the original message and the error for logging/alerting. It does NOT swallow the error — the error is still returned to the transport.
type Filter ¶
Filter decides whether a message should be forwarded. Returning false skips the message (returns nil to transport = ack).
type FlatMapFunc ¶
FlatMapFunc expands a Message[T] into zero or more U values. A non-nil error is returned to the transport and routes to the dead-letter observer.
type MapFunc ¶
MapFunc transforms a Message[T] payload into a U value. A non-nil error is returned to the transport and routes to the dead-letter observer.
type MapOption ¶
type MapOption[T, U any] func(*mapConfig[T, U])
MapOption configures a NewMap or NewFlatMap pipe.
func WithMapDeadLetter ¶
func WithMapDeadLetter[T, U any](fn DeadLetterFunc[T]) MapOption[T, U]
WithMapDeadLetter sets an observer called when map/flatmap or publish fails.
func WithMapFilter ¶
WithMapFilter sets a filter that runs before map/flatmap.
func WithMapMiddleware ¶
func WithMapMiddleware[T, U any](mw ...goflux.Middleware[T]) MapOption[T, U]
WithMapMiddleware registers middleware for a map/flatmap pipe.
type Option ¶
type Option[T any] func(*config[T])
Option configures a New pipe.
func WithDeadLetter ¶
func WithDeadLetter[T any](fn DeadLetterFunc[T]) Option[T]
WithDeadLetter sets an observer called when publish fails. The observer receives the original message and the error.
func WithFilter ¶
WithFilter sets a filter that runs before publish. Messages for which the filter returns false are skipped (handler returns nil).
func WithMiddleware ¶
func WithMiddleware[T any](mw ...goflux.Middleware[T]) Option[T]
WithMiddleware registers middleware that wraps the pipe's internal handler. Middleware runs before filter/map/publish — it sees the original message and can enrich the context that flows into subsequent stages.