pbjs

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: MIT Imports: 12 Imported by: 0

README

pbjs

build codecov Go Report Card

pbjs provides an opinionated wrapper for publishing/consuming Protobuf messages with NATS JetStream. It was created with the aim of bootstrapping stream interactions for a number of projects in a more lightweight manner than Watermill. That said, for production use I would recommend watermill-nats with a protobuf marshaler.

Getting Started

go get github.com/stevecallear/pbjs@latest
// jetstream client, stream and consumer have been created prior
jsc, err := js.Consumer(ctx, "mystream", "myconsumer")
if err != nil {
    log.Fatal(err)
}

con, err := pbjs.NewConsumer(jsc, pbjs.NewHandler(func(ctx context.Context, m *testpb.MessageA) error {
    log.Println(m.GetValue())
    return nil
}))
if err != nil {
    log.Fatal(err)
}
defer con.Close()

pub := pbjs.NewPublisher(js)
err = pub.Publish(ctx, &testpb.MessageA{Value: "hello"})
if err != nil {
    log.Fatal(err)
}

Handlers

Consumers accept a Handler implementation. HandlerFunc provides a convenience function to simplify handler creation. To avoid casts, NewHandler accepts a typed handler function and returns a TypeHandler.

Middleware

Both Publisher and Consumer accept middleware on creation. A simple logging middleware is provided:

pub := pbjs.NewPublisher(js, pbjs.WithPublisherMiddleware(pbjs.LoggingMiddleware(logger)))

Subject Convention

Publisher accepts a SubjectConventionFunc that allows the subject to be defined based on the publisher context and message. By default this simply returns the message type, but it can be customised for more complex situations:

pub := pbjs.NewPublisher(js, pbjs.WithSubjectConvention(func(ctx context.Context, m proto.Message) (string, error) {
    // convention logic
}))

A future goal of the module is to provide a protoc-gen implementation to allow subject conventions to be defined as part of the .proto definition.

Handler Errors

Message acknowledgement is based on the handler result. Handlers can use NewError to distinguish between transient and persistent errors:

h := pbjs.HandlerFunc(func(ctx context.Context, m *testpb.MessageB) error {
    state, err := store.Get(ctx, m.GetId())
    if err != nil {
        return pbjs.NewError(err, pbjs.ErrorTypeTransient)
    }

    seq := pbjs.GetContextMetadata(ctx).Sequence.Stream
    if state.Version > seq {
        return pbjs.NewError(errors.New("stale message", pbjs.ErrorTypePersistent))
    }
    
    // logic

    return nil
})

By default transient errors result in message Nak while persistent errors result in message Term. This behaviour can be customised using the WithErrorHandler option on consumer creation.

Header Propagation

NATS headers and message metadata are stored in the publish/consume contexts. This allows header propagation when publishing from a consumer handler. By default HdrCorrelationID is not overriden in this scenario. All Nats-Expect-* headers are removed, however, to avoid unexpected behaviour.

Documentation

Index

Examples

Constants

View Source
const (
	HdrNatsMsgID     = nats.MsgIdHdr
	HdrNatsTimestamp = "Nats-Time-Stamp"
	HdrMsgType       = "Msg-Type"
	HdrContentType   = "Content-Type"
	HdrCorrelationID = "Correlation-Id"
	HdrSubject       = "Subject"
)

Variables

This section is empty.

Functions

func GetContextHeader

func GetContextHeader(ctx context.Context) nats.Header

GetContextHeader returns the nats.Header from the supplied context. If the context does not have a header set, an empty one is returned.

func GetContextMetadata

func GetContextMetadata(ctx context.Context) *jetstream.MsgMetadata

GetContextMetadata returns the jetstream.MsgMetadata from the supplied context. If the context does not have metadata set, an empty one is returned.

func MessageType

func MessageType(m proto.Message) string

MessageType returns the type for the supplied message

func NewError

func NewError(err error, typ ErrorType) error

NewError wraps the supplied error with the specified types

func SetContextHeader

func SetContextHeader(ctx context.Context, h nats.Header) context.Context

SetContextHeader returns a context with the nats.Header set

func SetContextMetadata

func SetContextMetadata(ctx context.Context, md *jetstream.MsgMetadata) context.Context

SetContextMetadata returns a context with the jetstream.MsgMetadata set

func WithConsumerMiddleware

func WithConsumerMiddleware(m ...MiddlewareFunc) func(*ConsumerOptions)

WithConsumerMiddleware specifies the consumer middleware

func WithErrorHandler

func WithErrorHandler(fn ErrorFunc) func(*ConsumerOptions)

WithErrorHandler specifies the error handler func

func WithLogger

func WithLogger(l *slog.Logger) func(*ConsumerOptions)

WithLogger specifies the consumer logger

func WithPublisherMiddleware

func WithPublisherMiddleware(m ...MiddlewareFunc) func(o *PublisherOptions)

WithPublisherMiddleware specifies the publisher middleware

func WithSubjectConvention

func WithSubjectConvention(fn SubjectConventionFunc) func(o *PublisherOptions)

WithSubjectConvention specifies the publisher subject convention

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a JetStream consumer

func NewConsumer

func NewConsumer(jc jetstream.Consumer, h Handler, optFns ...func(*ConsumerOptions)) (*Consumer, error)

NewConsumer returns a new consumer

Example
//hello
js, ok := getJS()
if !ok {
	fmt.Println("value") // skip the example if jetstream is not available
	return
}

ctx := context.Background()

_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
	Name:     "example_stream",
	Subjects: []string{"testpb.>"},
})
if err != nil {
	log.Fatal(err)
}

jsc, err := js.CreateOrUpdateConsumer(ctx, "example_stream", jetstream.ConsumerConfig{
	Name:    "example_consumer",
	Durable: "example_consumer",
})
if err != nil {
	log.Fatal(err)
}

var value string
ch := make(chan struct{})

con, err := pbjs.NewConsumer(jsc,
	pbjs.NewHandler(func(ctx context.Context, in *testpb.MessageA) error {
		defer close(ch)
		value = in.GetValue()
		return nil
	}),
)
if err != nil {
	log.Fatal(err)
}
defer con.Close()

pub := pbjs.NewPublisher(js)
err = pub.Publish(ctx, &testpb.MessageA{Value: "value"})
if err != nil {
	log.Fatal(err)
}

select {
case <-ch:
case <-time.After(time.Second):
	log.Fatal("timeout waiting for channel")
}

fmt.Println(value)
Output:
value

func (*Consumer) Close

func (c *Consumer) Close() error

Close drains the underlying jetstream.ConsumeContext

type ConsumerOptions

type ConsumerOptions struct {
	Middleware []MiddlewareFunc
	Logger     *slog.Logger
	Timeout    time.Duration
	ErrorFn    ErrorFunc
}

ConsumerOptions represents consumer options

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error represents a handler error

func (*Error) Error

func (e *Error) Error() string

Error returns the inner error message

func (*Error) Unwrap

func (e *Error) Unwrap() error

Unwrap returns the inner error

type ErrorFunc

type ErrorFunc func(ctx context.Context, m jetstream.Msg, err error)

ErrorFunc represents a consumer error handler. By default all persistent errors result in Term, while all others result in Nak.

type ErrorType

type ErrorType uint8

ErrorType indicates an error type

const (
	// ErrorTypeNone indicates that there is no error.
	// It is returned only when a nil error is supplied to [ErrorTypeOf].
	ErrorTypeNone ErrorType = iota

	// ErrorTypeUnknown indicates an unknown error.
	// It is returned only when the error supplied to [ErrorTypeOf] does not have a type.
	ErrorTypeUnknown

	// ErrorTypeTransient indicates that the handler error was transient.
	// By default transient errors result in Nak.
	ErrorTypeTransient

	// ErrorTypePersistent indicates that the handler error was persistent.
	// By default persistent errors result in Term.
	ErrorTypePersistent
)

func ErrorTypeOf

func ErrorTypeOf(err error) ErrorType

ErrorTypeOf returns the ErrorType of the supplied error

type Handler

type Handler interface {
	Handle(ctx context.Context, m proto.Message) error
}

Handler represents an untyped message handler

func ApplyMiddleware

func ApplyMiddleware(h Handler, m ...MiddlewareFunc) Handler

ApplyMiddleware returns a handler with the supplied middleware applied

type HandlerFunc

type HandlerFunc func(ctx context.Context, m proto.Message) error

HandlerFunc represents an untyped message handler func

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, m proto.Message) error

Handle handles the message

type MiddlewareFunc

type MiddlewareFunc func(h Handler) Handler

MiddlewareFunc represents a publisher/consumer middleware func

func LoggingMiddleware

func LoggingMiddleware(l *slog.Logger) MiddlewareFunc

LoggingMiddleware returns a slog.Logger middleware funct

type MsgPtr

type MsgPtr[T any] interface {
	*T
	proto.Message
}

MsgPtr represents a pointer to a proto.Message implementation

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher represents a JetStream publisher

func NewPublisher

func NewPublisher(js jetstream.JetStream, optFns ...func(*PublisherOptions)) *Publisher

NewPublisher returns a new publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, m proto.Message) error

Publish publishes the message

type PublisherOptions

type PublisherOptions struct {
	Middleware        []MiddlewareFunc
	SubjectConvention SubjectConventionFunc
}

PublisherOptions represents publisher options

type SubjectConventionFunc

type SubjectConventionFunc func(ctx context.Context, m proto.Message) (string, error)

SubjectConventionFunc represents a subjection convention func. By default publishers use the value of MessageType as the subject.

type TypeHandler

type TypeHandler interface {
	Handler
	Type() string
}

TypeHandler represents a typed message handler

func NewHandler

func NewHandler[T any, PT MsgPtr[T]](fn func(ctx context.Context, m PT) error) TypeHandler

NewHandler returns a new typed message handler for the specified func

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL