pipe

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New[T any](pub goflux.Publisher[T], opts ...Option[T]) goflux.Handler[T]

New returns a goflux.Handler that forwards every accepted message to pub. The handler:

  1. Runs the middleware chain (if any)
  2. Applies the filter (if set) — false means skip (return nil)
  3. Forwards msg.Header into the publish context via goflux.WithHeader
  4. Publishes msg.Payload to msg.Subject

Publish errors are returned to the caller (transport decides retry/nak). The dead-letter observer is called on publish error for logging/alerting.

Example
package main

import (
	"context"
	"fmt"

	"github.com/foomo/goflux"
	"github.com/foomo/goflux/pipe"
	"github.com/foomo/goflux/transport/channel"
	"github.com/foomo/gofuncy"
)

type Event struct {
	ID   string `json:"id"`
	Name string `json:"name"`
}

func main() {
	srcBus := channel.NewBus[Event]()
	srcPub := channel.NewPublisher(srcBus)

	srcSub, err := channel.NewSubscriber(srcBus, 1)
	if err != nil {
		panic(err)
	}

	dstBus := channel.NewBus[Event]()
	dstPub := channel.NewPublisher(dstBus)

	dstSub, err := channel.NewSubscriber(dstBus, 1)
	if err != nil {
		panic(err)
	}

	ctx, cancel := context.WithCancel(context.Background())

	gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
		ready()

		return dstSub.Subscribe(ctx, "events", func(_ context.Context, msg goflux.Message[Event]) error {
			fmt.Println(msg.Subject, msg.Payload)
			cancel()

			return nil
		})
	}, gofuncy.WithName("dst-subscriber"))

	gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
		ready()

		return srcSub.Subscribe(ctx, "events", pipe.New[Event](dstPub))
	}, gofuncy.WithName("pipe"))

	if err := srcPub.Publish(ctx, "events", Event{ID: "1", Name: "hello"}); err != nil {
		panic(err)
	}

	<-ctx.Done()
}
Output:
events {1 hello}

func NewFlatMap

func NewFlatMap[T, U any](pub goflux.Publisher[U], fn FlatMapFunc[T, U], opts ...MapOption[T, U]) goflux.Handler[T]

NewFlatMap returns a goflux.Handler that expands each message from T into zero or more U values, publishing each one individually.

If the FlatMapFunc fails, the error is returned immediately. If a publish fails mid-batch, items already published are NOT rolled back — downstream consumers must be idempotent or deduplicate.

Example
package main

import (
	"context"
	"fmt"

	"github.com/foomo/goflux"
	"github.com/foomo/goflux/pipe"
	"github.com/foomo/goflux/transport/channel"
	"github.com/foomo/gofuncy"
)

type LineItem struct {
	OrderID string `json:"order_id"`
	Item    string `json:"item"`
}

type Order struct {
	ID    string   `json:"id"`
	Items []string `json:"items"`
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	srcBus := channel.NewBus[Order]()
	srcPub := channel.NewPublisher(srcBus)

	srcSub, err := channel.NewSubscriber(srcBus, 1)
	if err != nil {
		panic(err)
	}

	dstBus := channel.NewBus[LineItem]()
	dstPub := channel.NewPublisher(dstBus)

	dstSub, err := channel.NewSubscriber(dstBus, 1)
	if err != nil {
		panic(err)
	}

	flatMapFn := func(_ context.Context, msg goflux.Message[Order]) ([]LineItem, error) {
		items := make([]LineItem, len(msg.Payload.Items))
		for i, item := range msg.Payload.Items {
			items[i] = LineItem{OrderID: msg.Payload.ID, Item: item}
		}

		return items, nil
	}

	var count int

	gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
		ready()

		return dstSub.Subscribe(ctx, "orders", func(_ context.Context, msg goflux.Message[LineItem]) error {
			fmt.Println(msg.Payload.OrderID, msg.Payload.Item)

			count++
			if count == 2 {
				cancel()
			}

			return nil
		})
	}, gofuncy.WithName("dst-subscriber"))

	gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
		ready()

		return srcSub.Subscribe(ctx, "orders", pipe.NewFlatMap[Order, LineItem](dstPub, flatMapFn))
	}, gofuncy.WithName("pipe-flatmap"))

	if err := srcPub.Publish(ctx, "orders", Order{ID: "o1", Items: []string{"widget", "gadget"}}); err != nil {
		panic(err)
	}

	<-ctx.Done()
}
Output:
o1 widget
o1 gadget

func NewMap

func NewMap[T, U any](pub goflux.Publisher[U], mapFn MapFunc[T, U], opts ...MapOption[T, U]) goflux.Handler[T]

NewMap returns a goflux.Handler that transforms each message from T to U before publishing. The filter runs on the original Message[T] before the map.

Map errors and publish errors are returned to the caller. The dead-letter observer is called on either failure.

Example
package main

import (
	"context"
	"fmt"

	"github.com/foomo/goflux"
	"github.com/foomo/goflux/pipe"
	"github.com/foomo/goflux/transport/channel"
	"github.com/foomo/gofuncy"
)

type Event struct {
	ID   string `json:"id"`
	Name string `json:"name"`
}

type Summary struct {
	Label string `json:"label"`
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())

	srcBus := channel.NewBus[Event]()
	srcPub := channel.NewPublisher(srcBus)

	srcSub, err := channel.NewSubscriber(srcBus, 1)
	if err != nil {
		panic(err)
	}

	dstBus := channel.NewBus[Summary]()
	dstPub := channel.NewPublisher(dstBus)

	dstSub, err := channel.NewSubscriber(dstBus, 1)
	if err != nil {
		panic(err)
	}

	mapFn := func(_ context.Context, msg goflux.Message[Event]) (Summary, error) {
		return Summary{Label: msg.Payload.Name}, nil
	}

	gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
		ready()

		return dstSub.Subscribe(ctx, "events", func(_ context.Context, msg goflux.Message[Summary]) error {
			fmt.Println(msg.Subject, msg.Payload)
			cancel()

			return nil
		})
	}, gofuncy.WithName("dst-subscriber"))

	gofuncy.StartWithReady(ctx, func(ctx context.Context, ready gofuncy.ReadyFunc) error {
		ready()

		return srcSub.Subscribe(ctx, "events", pipe.NewMap[Event, Summary](dstPub, mapFn))
	}, gofuncy.WithName("pipe-map"))

	if err := srcPub.Publish(ctx, "events", Event{ID: "1", Name: "hello"}); err != nil {
		panic(err)
	}

	<-ctx.Done()
}
Output:
events {hello}

Types

type DeadLetterFunc

type DeadLetterFunc[T any] func(ctx context.Context, msg goflux.Message[T], err error)

DeadLetterFunc is an observer called when a map/flatmap/publish operation fails. It receives the original message and the error for logging/alerting. It does NOT swallow the error — the error is still returned to the transport.

type Filter

type Filter[T any] func(ctx context.Context, msg goflux.Message[T]) bool

Filter decides whether a message should be forwarded. Returning false skips the message (returns nil to transport = ack).

type FlatMapFunc

type FlatMapFunc[T, U any] func(ctx context.Context, msg goflux.Message[T]) ([]U, error)

FlatMapFunc expands a Message[T] into zero or more U values. A non-nil error is returned to the transport and routes to the dead-letter observer.

type MapFunc

type MapFunc[T, U any] func(ctx context.Context, msg goflux.Message[T]) (U, error)

MapFunc transforms a Message[T] payload into a U value. A non-nil error is returned to the transport and routes to the dead-letter observer.

type MapOption

type MapOption[T, U any] func(*mapConfig[T, U])

MapOption configures a NewMap or NewFlatMap pipe.

func WithMapDeadLetter

func WithMapDeadLetter[T, U any](fn DeadLetterFunc[T]) MapOption[T, U]

WithMapDeadLetter sets an observer called when map/flatmap or publish fails.

func WithMapFilter

func WithMapFilter[T, U any](f Filter[T]) MapOption[T, U]

WithMapFilter sets a filter that runs before map/flatmap.

func WithMapMiddleware

func WithMapMiddleware[T, U any](mw ...goflux.Middleware[T]) MapOption[T, U]

WithMapMiddleware registers middleware for a map/flatmap pipe.

type Option

type Option[T any] func(*config[T])

Option configures a New pipe.

func WithDeadLetter

func WithDeadLetter[T any](fn DeadLetterFunc[T]) Option[T]

WithDeadLetter sets an observer called when publish fails. The observer receives the original message and the error.

func WithFilter

func WithFilter[T any](f Filter[T]) Option[T]

WithFilter sets a filter that runs before publish. Messages for which the filter returns false are skipped (handler returns nil).

func WithMiddleware

func WithMiddleware[T any](mw ...goflux.Middleware[T]) Option[T]

WithMiddleware registers middleware that wraps the pipe's internal handler. Middleware runs before filter/map/publish — it sees the original message and can enrich the context that flows into subsequent stages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL