pbjs

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 28, 2025 License: MIT Imports: 15 Imported by: 0

README

pbjs

build codecov Go Report Card

pbjs provides an opinionated wrapper for publishing/consuming Protobuf messages with NATS JetStream. It was created with the aim of bootstrapping stream interactions for a number of projects in a more lightweight manner than Watermill. That said, for production use I would recommend watermill-nats with a protobuf marshaler.

Getting Started

go get github.com/stevecallear/pbjs@latest
// jetstream client, stream and consumer have been created prior
jsc, err := js.Consumer(ctx, "mystream", "myconsumer")
if err != nil {
    log.Fatal(err)
}

con, err := pbjs.NewConsumer(jsc, pbjs.NewHandler(func(ctx context.Context, m *testpb.OrderDispatchedEvent) error {
    log.Println(m.GetValue())
    return nil
}))
if err != nil {
    log.Fatal(err)
}
defer con.Close()

pub := pbjs.NewPublisher(js)
err = pub.Publish(ctx, &testpb.OrderDispatchedEvent{Id: uuid.NewString()})
if err != nil {
    log.Fatal(err)
}

Subject Convention

Publisher accepts a SubjectConventionFunc that allows the subject to be defined based on the publisher context and message. By default this simply returns the message type, but it can be customised for more complex situations:

pub := pbjs.NewPublisher(js, pbjs.WithSubjectConvention(func(ctx context.Context, m proto.Message) (string, error) {
    // convention logic
}))
Message Annotations

It is possible to specify a subject as a message annotation to avoid complex convention logic. The subject can optionally include templated elements referring to the proto fields.

message OrderDispatchedEvent {
    option (pbjs.message).subject = "ORDER.{id}.dispatched";
    string id = 1;
}
pub := pbjs.NewPublisher(js, 
    pbjs.WithSubjectConvention(pbjs.AnnotationSubjectConvention),
    pbjs.WithMiddleware(ValidationMiddleware()), // if message fields are used for the subject convention, they should be validated prior    
)

pub.Publish(ctx, &testpb.OrderDispatchedEvent{Id: "abc123"})
// subject: ORDER.abc123.dispatched

The implementation currently uses ProtoReflect to extract the subject template and apply the field values. This can result in runtime errors due to configuration so unit testing is recommended to ensure correct outputs.

Consumers

Consumers accept a Handler implementation. HandlerFunc provides a convenience function to simplify creation. To avoid casts, NewHandler accepts a typed handler function.

Error Handling

Message acknowledgement can be controlled based on the handler error. By default nil errors result in Ack, errors wrapped using NewPersistentError result in Term and all other errors result in Nak.

h := pbjs.HandlerFunc(func(ctx context.Context, m *testpb.MessageB) error {
    state, err := store.Get(ctx, m.GetId())
    if err != nil {
        return err // transient error, re-deliver
    }

    seq := pbjs.GetContextMetadata(ctx).Sequence.Stream
    if state.Version > seq {
        return pbjs.NewPersistentError(errors.New("stale message"))
    }
    
    // logic

    return nil
})

The default behaviour can be customised using the WithErrorHandler option on consumer creation.

Message resolution and unmarshaling both result in persistent errors. Both errors occur outside of the handler chain and are therefore not addressable by middleware. Such errors are logged to slog.Default() by default with logger customisation achievable using WithLogger on consumer creation.

Middleware

Both Publisher and Consumer accept middleware on creation. A simple logging middleware is provided:

pub := pbjs.NewPublisher(js, pbjs.WithPublisherMiddleware(pbjs.LoggingMiddleware(logger)))
Validation

Validation middleware can be used to prevent the publish of invalid messages or to allow consumers to immediately reject invalid messages. A middleware function to achieve this with protovalidate would look like the following.

func ValidationMiddleware() MiddlewareFunc {
	return func(h Handler) Handler {
		return HandlerFunc(func(ctx context.Context, m proto.Message) error {
			if err := protovalidate.Validate(m); err != nil {
				return pbjs.NewError(err, pbjs.ErrTypePersistent) // return persistent to avoid re-delivery in consumers
			}
			return h.Handle(ctx, m)
		})
	}
}

Header Propagation

NATS headers and message metadata are stored in the publish/consume contexts. This allows header propagation when publishing from a consumer handler. By default HdrCorrelationID is not overriden in this scenario. All Nats-Expect-* headers are removed, however, to avoid unexpected behaviour.

Documentation

Index

Examples

Constants

View Source
const (
	HdrNatsMsgID     = nats.MsgIdHdr
	HdrNatsTimestamp = "Nats-Time-Stamp"
	HdrMsgType       = "Msg-Type"
	HdrContentType   = "Content-Type"
	HdrCorrelationID = "Correlation-Id"
	HdrSubject       = "Subject"
)

Variables

This section is empty.

Functions

func AnnotationSubjectConvention added in v0.2.0

func AnnotationSubjectConvention(ctx context.Context, m proto.Message) (string, error)

AnnotationSubjectConvention returns a subject for the message based on proto annotations. If the required annotation is not present on the message then an error will be returned.

func ExecuteSubjectTemplate added in v0.2.0

func ExecuteSubjectTemplate(template string, m proto.Message) (string, error)

ExecuteSubjectTemplate replaces field identifiers with their message values

func GetContextHeader

func GetContextHeader(ctx context.Context) nats.Header

GetContextHeader returns the nats.Header from the supplied context. If the context does not have a header set, an empty one is returned.

func GetContextMetadata

func GetContextMetadata(ctx context.Context) *jetstream.MsgMetadata

GetContextMetadata returns the jetstream.MsgMetadata from the supplied context. If the context does not have metadata set, an empty one is returned.

func IsPersistentError added in v0.2.0

func IsPersistentError(err error) bool

IsPersistentError returns true if the supplied error is persistent

func MessageType

func MessageType(m proto.Message) string

MessageType returns the type for the supplied message

func NewPersistentError added in v0.2.0

func NewPersistentError(err error) error

NewPersistentError wraps the supplied error to indicate that it is persistent

func SetContextHeader

func SetContextHeader(ctx context.Context, h nats.Header) context.Context

SetContextHeader returns a context with the nats.Header set

func SetContextMetadata

func SetContextMetadata(ctx context.Context, md *jetstream.MsgMetadata) context.Context

SetContextMetadata returns a context with the jetstream.MsgMetadata set

func WithConsumerMiddleware

func WithConsumerMiddleware(m ...MiddlewareFunc) func(*ConsumerOptions)

WithConsumerMiddleware specifies the consumer middleware

func WithErrorHandler

func WithErrorHandler(fn ErrorFunc) func(*ConsumerOptions)

WithErrorHandler specifies the error handler func

func WithLogger

func WithLogger(l *slog.Logger) func(*ConsumerOptions)

WithLogger specifies the consumer logger

func WithPublisherMiddleware

func WithPublisherMiddleware(m ...MiddlewareFunc) func(o *PublisherOptions)

WithPublisherMiddleware specifies the publisher middleware

func WithSubjectConvention

func WithSubjectConvention(fn SubjectConventionFunc) func(o *PublisherOptions)

WithSubjectConvention specifies the publisher subject convention

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a JetStream consumer

func NewConsumer

func NewConsumer(jc jetstream.Consumer, h Handler, optFns ...func(*ConsumerOptions)) (*Consumer, error)

NewConsumer returns a new consumer

Example
js, ok := getJS()
if !ok {
	fmt.Println("value") // skip the example if jetstream is not available
	return
}

ctx := context.Background()

_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
	Name:     "example_stream",
	Subjects: []string{"testpb.>"},
})
if err != nil {
	log.Fatal(err)
}

jsc, err := js.CreateOrUpdateConsumer(ctx, "example_stream", jetstream.ConsumerConfig{
	Name:    "example_consumer",
	Durable: "example_consumer",
})
if err != nil {
	log.Fatal(err)
}

var value string
ch := make(chan struct{})

con, err := pbjs.NewConsumer(jsc,
	pbjs.NewHandler(func(ctx context.Context, in *testpb.OrderDispatchedEvent) error {
		defer close(ch)
		value = in.GetId()
		return nil
	}),
)
if err != nil {
	log.Fatal(err)
}
defer con.Close()

pub := pbjs.NewPublisher(js)
err = pub.Publish(ctx, &testpb.OrderDispatchedEvent{Id: "abc123"})
if err != nil {
	log.Fatal(err)
}

select {
case <-ch:
case <-time.After(time.Second):
	log.Fatal("timeout waiting for channel")
}

fmt.Println(value)
Output:
abc123

func (*Consumer) Close

func (c *Consumer) Close() error

Close drains the underlying jetstream.ConsumeContext

type ConsumerOptions

type ConsumerOptions struct {
	Middleware []MiddlewareFunc
	Logger     *slog.Logger
	Timeout    time.Duration
	ErrorFn    ErrorFunc
}

ConsumerOptions represents consumer options

type ErrorFunc

type ErrorFunc func(ctx context.Context, m jetstream.Msg, err error)

ErrorFunc represents a consumer error handler. By default all persistent errors result in Term, while all others result in Nak.

type Handler

type Handler interface {
	Handle(ctx context.Context, m proto.Message) error
}

Handler represents an untyped message handler

func ApplyMiddleware

func ApplyMiddleware(h Handler, m ...MiddlewareFunc) Handler

ApplyMiddleware returns a handler with the supplied middleware applied

func NewHandler

func NewHandler[T proto.Message](fn func(ctx context.Context, m T) error) Handler

NewHandler returns a new typed message handler for the specified func

type HandlerFunc

type HandlerFunc func(ctx context.Context, m proto.Message) error

HandlerFunc represents an untyped message handler func

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, m proto.Message) error

Handle handles the message

type MiddlewareFunc

type MiddlewareFunc func(h Handler) Handler

MiddlewareFunc represents a publisher/consumer middleware func

func LoggingMiddleware

func LoggingMiddleware(l *slog.Logger) MiddlewareFunc

LoggingMiddleware returns a slog.Logger middleware funct

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher represents a JetStream publisher

func NewPublisher

func NewPublisher(js jetstream.JetStream, optFns ...func(*PublisherOptions)) *Publisher

NewPublisher returns a new publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, m proto.Message) error

Publish publishes the message

type PublisherOptions

type PublisherOptions struct {
	Middleware        []MiddlewareFunc
	SubjectConvention SubjectConventionFunc
}

PublisherOptions represents publisher options

type SubjectConventionFunc

type SubjectConventionFunc func(ctx context.Context, m proto.Message) (string, error)

SubjectConventionFunc represents a subjection convention func. By default publishers use the value of MessageType as the subject.

Directories

Path Synopsis
internal
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL