Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FromStream ¶
FromStream consumes a goflow.Stream of messages and publishes each one via the provided goflux.Publisher. The original message nats is used for publishing.
Example ¶
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dstBus := channel.NewBus[Event]()
dstPub := channel.NewPublisher(dstBus)
dstSub, _ := channel.NewSubscriber(dstBus, 1)
dstCh := goflux.ToChan[Event](ctx, dstSub, "events", 4)
time.Sleep(50 * time.Millisecond)
msgs := []goflux.Message[Event]{
goflux.NewMessage("events", Event{ID: "1", Name: "alpha"}),
goflux.NewMessage("events", Event{ID: "2", Name: "bravo"}),
}
stream := goflow.Of(ctx, msgs...)
_ = bridge.FromStream(stream, dstPub)
fmt.Println((<-dstCh).Payload.Name)
fmt.Println((<-dstCh).Payload.Name)
Output: alpha bravo
func ToStream ¶
func ToStream[T any](ctx context.Context, sub goflux.Subscriber[T], subject string, bufSize int) goflow.Stream[goflux.Message[T]]
ToStream bridges a goflux.Subscriber into a goflow.Stream. It launches Subscribe in a goroutine via goflux.ToChan and wraps the resulting channel as a Stream.
Example ¶
package main
import (
"context"
"fmt"
"time"
"github.com/foomo/goflux/bridge"
"github.com/foomo/goflux/transport/channel"
)
type Event struct {
ID string `json:"id"`
Name string `json:"name"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bus := channel.NewBus[Event]()
pub := channel.NewPublisher(bus)
sub, _ := channel.NewSubscriber(bus, 1)
stream := bridge.ToStream[Event](ctx, sub, "events", 4)
go func() {
time.Sleep(50 * time.Millisecond)
_ = pub.Publish(ctx, "events", Event{ID: "1", Name: "alpha"})
_ = pub.Publish(ctx, "events", Event{ID: "2", Name: "bravo"})
}()
ch := stream.Chan()
fmt.Println((<-ch).Payload.Name)
fmt.Println((<-ch).Payload.Name)
}
Output: alpha bravo
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.