bridge

package module
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 3 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func FromStream

func FromStream[T any](stream goflow.Stream[goflux.Message[T]], pub goflux.Publisher[T]) error

FromStream consumes a goflow.Stream of messages and publishes each one via the provided goflux.Publisher. The original message nats is used for publishing.

Example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dstBus := channel.NewBus[Event]()
dstPub := channel.NewPublisher(dstBus)
dstSub, _ := channel.NewSubscriber(dstBus, 1)

dstCh := goflux.ToChan[Event](ctx, dstSub, "events", 4)

time.Sleep(50 * time.Millisecond)

msgs := []goflux.Message[Event]{
	goflux.NewMessage("events", Event{ID: "1", Name: "alpha"}),
	goflux.NewMessage("events", Event{ID: "2", Name: "bravo"}),
}
stream := goflow.Of(ctx, msgs...)

_ = bridge.FromStream(stream, dstPub)

fmt.Println((<-dstCh).Payload.Name)
fmt.Println((<-dstCh).Payload.Name)
Output:
alpha
bravo

func ToStream

func ToStream[T any](ctx context.Context, sub goflux.Subscriber[T], subject string, bufSize int) goflow.Stream[goflux.Message[T]]

ToStream bridges a goflux.Subscriber into a goflow.Stream. It launches Subscribe in a goroutine via goflux.ToChan and wraps the resulting channel as a Stream.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/foomo/goflux/bridge"
	"github.com/foomo/goflux/transport/channel"
)

type Event struct {
	ID   string `json:"id"`
	Name string `json:"name"`
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	bus := channel.NewBus[Event]()
	pub := channel.NewPublisher(bus)

	sub, _ := channel.NewSubscriber(bus, 1)

	stream := bridge.ToStream[Event](ctx, sub, "events", 4)

	go func() {
		time.Sleep(50 * time.Millisecond)

		_ = pub.Publish(ctx, "events", Event{ID: "1", Name: "alpha"})
		_ = pub.Publish(ctx, "events", Event{ID: "2", Name: "bravo"})
	}()

	ch := stream.Chan()

	fmt.Println((<-ch).Payload.Name)
	fmt.Println((<-ch).Payload.Name)
}
Output:
alpha
bravo

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL