Documentation
¶
Overview ¶
Package nats_jetstream implements the CloudEvent transport implementation using NATS JetStream.
Index ¶
- Variables
- func NatsOptions(opts ...nats.Option) []nats.Option
- func WriteMsg(ctx context.Context, m binding.Message, writer io.ReaderFrom, ...) error
- type Consumer
- type ConsumerOption
- type Message
- type Protocol
- type ProtocolOption
- type QueueSubscriber
- type Receiver
- type RegularSubscriber
- type Sender
- type SenderOption
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")
Functions ¶
func NatsOptions ¶
func NatsOptions(opts ...nats.Option) []nats.Option
NatsOptions is a helper function to group a variadic nats.ProtocolOption into []nats.Option that can be used by either Sender, Consumer or Protocol
Types ¶
type Consumer ¶
type Consumer struct { Receiver Conn *nats.Conn Jsm nats.JetStreamContext Subject string Subscriber Subscriber SubOpt []nats.SubOpt // contains filtered or unexported fields }
func NewConsumer ¶
func NewConsumer(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error)
func NewConsumerFromConn ¶
func NewConsumerFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error)
type ConsumerOption ¶
func WithQueueSubscriber ¶
func WithQueueSubscriber(queue string) ConsumerOption
WithQueueSubscriber configures the Consumer to join a queue group when subscribing
type Message ¶
type Message struct { Msg *nats.Msg // contains filtered or unexported fields }
Message implements binding.Message by wrapping an *nats.Msg. This message *can* be read several times safely
func NewMessage ¶
func NewMessage(msg *nats.Msg) *Message
NewMessage wraps an *nats.Msg in a binding.Message. The returned message *can* be read several times safely
func (*Message) Finish ¶
Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (*Message) ReadBinary ¶
ReadBinary transfers a binary-mode event to an BinaryWriter.
func (*Message) ReadEncoding ¶
ReadEncoding return the type of the message Encoding.
func (*Message) ReadStructured ¶
ReadStructured transfers a structured-mode event to a StructuredWriter.
type Protocol ¶
type Protocol struct { Conn *nats.Conn Consumer *Consumer Sender *Sender // contains filtered or unexported fields }
Protocol is a reference implementation for using the CloudEvents binding integration. Protocol acts as both a NATS client and a NATS handler.
func NewProtocol ¶
func NewProtocol(url, stream, sendSubject, receiveSubject string, natsOpts []nats.Option, jsOps []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error)
NewProtocol creates a new NATS protocol.
func NewProtocolFromConn ¶
func NewProtocolFromConn(conn *nats.Conn, stream, sendSubject, receiveSubject string, jsOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ProtocolOption) (*Protocol, error)
type ProtocolOption ¶
ProtocolOption is the function signature required to be considered an nats.ProtocolOption.
type QueueSubscriber ¶
type QueueSubscriber struct {
Queue string
}
QueueSubscriber creates queue subscriptions
func (*QueueSubscriber) Subscribe ¶
func (s *QueueSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
func NewReceiver ¶
func NewReceiver() *Receiver
NewReceiver creates a new protocol.Receiver responsible for receiving messages.
func (*Receiver) MsgHandler ¶
func (r *Receiver) MsgHandler(msg *nats.Msg)
MsgHandler implements nats.MsgHandler and publishes messages onto our internal incoming channel to be delivered via r.Receive(ctx)
type RegularSubscriber ¶
type RegularSubscriber struct { }
RegularSubscriber creates regular subscriptions
func (*RegularSubscriber) Subscribe ¶
func (s *RegularSubscriber) Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Subscribe implements Subscriber.Subscribe
type Sender ¶
type Sender struct { Jsm nats.JetStreamContext Conn *nats.Conn Subject string Stream string // contains filtered or unexported fields }
func NewSender ¶
func NewSender(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error)
NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection
func NewSenderFromConn ¶
func NewSenderFromConn(conn *nats.Conn, stream, subject string, jsmOpts []nats.JSOpt, opts ...SenderOption) (*Sender, error)
NewSenderFromConn creates a new protocol.Sender which leaves responsibility for opening and closing the STAN connection to the caller
type SenderOption ¶
type Subscriber ¶
type Subscriber interface {
Subscribe(jsm nats.JetStreamContext, subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
}
The Subscriber interface allows us to configure how the subscription is created