Documentation
¶
Index ¶
- Constants
- func GetContextHeader(ctx context.Context) nats.Header
- func GetContextMetadata(ctx context.Context) *jetstream.MsgMetadata
- func MessageType(m proto.Message) string
- func NewError(err error, typ ErrorType) error
- func SetContextHeader(ctx context.Context, h nats.Header) context.Context
- func SetContextMetadata(ctx context.Context, md *jetstream.MsgMetadata) context.Context
- func WithConsumerMiddleware(m ...MiddlewareFunc) func(*ConsumerOptions)
- func WithErrorHandler(fn ErrorFunc) func(*ConsumerOptions)
- func WithLogger(l *slog.Logger) func(*ConsumerOptions)
- func WithPublisherMiddleware(m ...MiddlewareFunc) func(o *PublisherOptions)
- func WithSubjectConvention(fn SubjectConventionFunc) func(o *PublisherOptions)
- type Consumer
- type ConsumerOptions
- type Error
- type ErrorFunc
- type ErrorType
- type Handler
- type HandlerFunc
- type MiddlewareFunc
- type MsgPtr
- type Publisher
- type PublisherOptions
- type SubjectConventionFunc
- type TypeHandler
Examples ¶
Constants ¶
const ( HdrNatsMsgID = nats.MsgIdHdr HdrNatsTimestamp = "Nats-Time-Stamp" HdrMsgType = "Msg-Type" HdrContentType = "Content-Type" HdrCorrelationID = "Correlation-Id" HdrSubject = "Subject" )
Variables ¶
This section is empty.
Functions ¶
func GetContextHeader ¶
GetContextHeader returns the nats.Header from the supplied context. If the context does not have a header set, an empty one is returned.
func GetContextMetadata ¶
func GetContextMetadata(ctx context.Context) *jetstream.MsgMetadata
GetContextMetadata returns the jetstream.MsgMetadata from the supplied context. If the context does not have metadata set, an empty one is returned.
func MessageType ¶
MessageType returns the type for the supplied message
func SetContextHeader ¶
SetContextHeader returns a context with the nats.Header set
func SetContextMetadata ¶
SetContextMetadata returns a context with the jetstream.MsgMetadata set
func WithConsumerMiddleware ¶
func WithConsumerMiddleware(m ...MiddlewareFunc) func(*ConsumerOptions)
WithConsumerMiddleware specifies the consumer middleware
func WithErrorHandler ¶
func WithErrorHandler(fn ErrorFunc) func(*ConsumerOptions)
WithErrorHandler specifies the error handler func
func WithLogger ¶
func WithLogger(l *slog.Logger) func(*ConsumerOptions)
WithLogger specifies the consumer logger
func WithPublisherMiddleware ¶
func WithPublisherMiddleware(m ...MiddlewareFunc) func(o *PublisherOptions)
WithPublisherMiddleware specifies the publisher middleware
func WithSubjectConvention ¶
func WithSubjectConvention(fn SubjectConventionFunc) func(o *PublisherOptions)
WithSubjectConvention specifies the publisher subject convention
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a JetStream consumer
func NewConsumer ¶
func NewConsumer(jc jetstream.Consumer, h Handler, optFns ...func(*ConsumerOptions)) (*Consumer, error)
NewConsumer returns a new consumer
Example ¶
//hello
js, ok := getJS()
if !ok {
fmt.Println("value") // skip the example if jetstream is not available
return
}
ctx := context.Background()
_, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "example_stream",
Subjects: []string{"testpb.>"},
})
if err != nil {
log.Fatal(err)
}
jsc, err := js.CreateOrUpdateConsumer(ctx, "example_stream", jetstream.ConsumerConfig{
Name: "example_consumer",
Durable: "example_consumer",
})
if err != nil {
log.Fatal(err)
}
var value string
ch := make(chan struct{})
con, err := pbjs.NewConsumer(jsc,
pbjs.NewHandler(func(ctx context.Context, in *testpb.MessageA) error {
defer close(ch)
value = in.GetValue()
return nil
}),
)
if err != nil {
log.Fatal(err)
}
defer con.Close()
pub := pbjs.NewPublisher(js)
err = pub.Publish(ctx, &testpb.MessageA{Value: "value"})
if err != nil {
log.Fatal(err)
}
select {
case <-ch:
case <-time.After(time.Second):
log.Fatal("timeout waiting for channel")
}
fmt.Println(value)
Output: value
func (*Consumer) Close ¶
Close drains the underlying jetstream.ConsumeContext
type ConsumerOptions ¶
type ConsumerOptions struct {
Middleware []MiddlewareFunc
Logger *slog.Logger
Timeout time.Duration
ErrorFn ErrorFunc
}
ConsumerOptions represents consumer options
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error represents a handler error
type ErrorFunc ¶
ErrorFunc represents a consumer error handler. By default all persistent errors result in Term, while all others result in Nak.
type ErrorType ¶
type ErrorType uint8
ErrorType indicates an error type
const ( // ErrorTypeNone indicates that there is no error. // It is returned only when a nil error is supplied to [ErrorTypeOf]. ErrorTypeNone ErrorType = iota // ErrorTypeUnknown indicates an unknown error. // It is returned only when the error supplied to [ErrorTypeOf] does not have a type. ErrorTypeUnknown // ErrorTypeTransient indicates that the handler error was transient. // By default transient errors result in Nak. ErrorTypeTransient // ErrorTypePersistent indicates that the handler error was persistent. // By default persistent errors result in Term. ErrorTypePersistent )
func ErrorTypeOf ¶
ErrorTypeOf returns the ErrorType of the supplied error
type Handler ¶
Handler represents an untyped message handler
func ApplyMiddleware ¶
func ApplyMiddleware(h Handler, m ...MiddlewareFunc) Handler
ApplyMiddleware returns a handler with the supplied middleware applied
type HandlerFunc ¶
HandlerFunc represents an untyped message handler func
type MiddlewareFunc ¶
type MiddlewareFunc func(h Handler) Handler
MiddlewareFunc represents a publisher/consumer middleware func
func LoggingMiddleware ¶
func LoggingMiddleware(l *slog.Logger) MiddlewareFunc
LoggingMiddleware returns a slog.Logger middleware funct
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher represents a JetStream publisher
func NewPublisher ¶
func NewPublisher(js jetstream.JetStream, optFns ...func(*PublisherOptions)) *Publisher
NewPublisher returns a new publisher
type PublisherOptions ¶
type PublisherOptions struct {
Middleware []MiddlewareFunc
SubjectConvention SubjectConventionFunc
}
PublisherOptions represents publisher options
type SubjectConventionFunc ¶
SubjectConventionFunc represents a subjection convention func. By default publishers use the value of MessageType as the subject.