Documentation
¶
Index ¶
- Constants
- func AnnotationSubjectConvention(ctx context.Context, m proto.Message) (string, error)
- func ExecuteSubjectTemplate(template string, m proto.Message) (string, error)
- func GetContextHeader(ctx context.Context) nats.Header
- func GetContextMetadata(ctx context.Context) *jetstream.MsgMetadata
- func IsPersistentError(err error) bool
- func MessageType(m proto.Message) string
- func NewPersistentError(err error) error
- func SetContextHeader(ctx context.Context, h nats.Header) context.Context
- func SetContextMetadata(ctx context.Context, md *jetstream.MsgMetadata) context.Context
- func WithConsumerMiddleware(m ...MiddlewareFunc) func(*ConsumerOptions)
- func WithErrorHandler(fn ErrorFunc) func(*ConsumerOptions)
- func WithLogger(l *slog.Logger) func(*ConsumerOptions)
- func WithPublisherMiddleware(m ...MiddlewareFunc) func(o *PublisherOptions)
- func WithSubjectConvention(fn SubjectConventionFunc) func(o *PublisherOptions)
- type Consumer
- type ConsumerOptions
- type ErrorFunc
- type Handler
- type HandlerFunc
- type MiddlewareFunc
- type Publisher
- type PublisherOptions
- type SubjectConventionFunc
Examples ¶
Constants ¶
const ( HdrNatsMsgID = nats.MsgIdHdr HdrNatsTimestamp = "Nats-Time-Stamp" HdrMsgType = "Msg-Type" HdrContentType = "Content-Type" HdrCorrelationID = "Correlation-Id" HdrSubject = "Subject" )
Variables ¶
This section is empty.
Functions ¶
func AnnotationSubjectConvention ¶ added in v0.2.0
AnnotationSubjectConvention returns a subject for the message based on proto annotations. If the required annotation is not present on the message then an error will be returned.
func ExecuteSubjectTemplate ¶ added in v0.2.0
ExecuteSubjectTemplate replaces field identifiers with their message values
func GetContextHeader ¶
GetContextHeader returns the nats.Header from the supplied context. If the context does not have a header set, an empty one is returned.
func GetContextMetadata ¶
func GetContextMetadata(ctx context.Context) *jetstream.MsgMetadata
GetContextMetadata returns the jetstream.MsgMetadata from the supplied context. If the context does not have metadata set, an empty one is returned.
func IsPersistentError ¶ added in v0.2.0
IsPersistentError returns true if the supplied error is persistent
func MessageType ¶
MessageType returns the type for the supplied message
func NewPersistentError ¶ added in v0.2.0
NewPersistentError wraps the supplied error to indicate that it is persistent
func SetContextHeader ¶
SetContextHeader returns a context with the nats.Header set
func SetContextMetadata ¶
SetContextMetadata returns a context with the jetstream.MsgMetadata set
func WithConsumerMiddleware ¶
func WithConsumerMiddleware(m ...MiddlewareFunc) func(*ConsumerOptions)
WithConsumerMiddleware specifies the consumer middleware
func WithErrorHandler ¶
func WithErrorHandler(fn ErrorFunc) func(*ConsumerOptions)
WithErrorHandler specifies the error handler func
func WithLogger ¶
func WithLogger(l *slog.Logger) func(*ConsumerOptions)
WithLogger specifies the consumer logger
func WithPublisherMiddleware ¶
func WithPublisherMiddleware(m ...MiddlewareFunc) func(o *PublisherOptions)
WithPublisherMiddleware specifies the publisher middleware
func WithSubjectConvention ¶
func WithSubjectConvention(fn SubjectConventionFunc) func(o *PublisherOptions)
WithSubjectConvention specifies the publisher subject convention
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a JetStream consumer
func NewConsumer ¶
func NewConsumer(jc jetstream.Consumer, h Handler, optFns ...func(*ConsumerOptions)) (*Consumer, error)
NewConsumer returns a new consumer
Example ¶
js, ok := getJS()
if !ok {
fmt.Println("value") // skip the example if jetstream is not available
return
}
ctx := context.Background()
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "example_stream",
Subjects: []string{"testpb.>"},
})
if err != nil {
log.Fatal(err)
}
jsc, err := js.CreateOrUpdateConsumer(ctx, "example_stream", jetstream.ConsumerConfig{
Name: "example_consumer",
Durable: "example_consumer",
})
if err != nil {
log.Fatal(err)
}
var value string
ch := make(chan struct{})
con, err := pbjs.NewConsumer(jsc,
pbjs.NewHandler(func(ctx context.Context, in *testpb.OrderDispatchedEvent) error {
defer close(ch)
value = in.GetId()
return nil
}),
)
if err != nil {
log.Fatal(err)
}
defer con.Close()
pub := pbjs.NewPublisher(js)
err = pub.Publish(ctx, &testpb.OrderDispatchedEvent{Id: "abc123"})
if err != nil {
log.Fatal(err)
}
select {
case <-ch:
case <-time.After(time.Second):
log.Fatal("timeout waiting for channel")
}
fmt.Println(value)
Output: abc123
func (*Consumer) Close ¶
Close drains the underlying jetstream.ConsumeContext
type ConsumerOptions ¶
type ConsumerOptions struct {
Middleware []MiddlewareFunc
Logger *slog.Logger
Timeout time.Duration
ErrorFn ErrorFunc
}
ConsumerOptions represents consumer options
type ErrorFunc ¶
ErrorFunc represents a consumer error handler. By default all persistent errors result in Term, while all others result in Nak.
type Handler ¶
Handler represents an untyped message handler
func ApplyMiddleware ¶
func ApplyMiddleware(h Handler, m ...MiddlewareFunc) Handler
ApplyMiddleware returns a handler with the supplied middleware applied
type HandlerFunc ¶
HandlerFunc represents an untyped message handler func
type MiddlewareFunc ¶
MiddlewareFunc represents a publisher/consumer middleware func
func LoggingMiddleware ¶
func LoggingMiddleware(l *slog.Logger) MiddlewareFunc
LoggingMiddleware returns a slog.Logger middleware funct
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher represents a JetStream publisher
func NewPublisher ¶
func NewPublisher(js jetstream.JetStream, optFns ...func(*PublisherOptions)) *Publisher
NewPublisher returns a new publisher
type PublisherOptions ¶
type PublisherOptions struct {
Middleware []MiddlewareFunc
SubjectConvention SubjectConventionFunc
}
PublisherOptions represents publisher options
type SubjectConventionFunc ¶
SubjectConventionFunc represents a subjection convention func. By default publishers use the value of MessageType as the subject.