streams

package
v0.2.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2025 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FETCH_NO_WAIT   = 1000000
	MAX_ACK_PENDING = -1 //unlimited
	MAX_DELIVERY    = -1 // unlimited
	MAX_BYTES       = -1 // unlimited

	PublishedByHeader = "publishedBy"
)
View Source
const (
	CONN_POOL_SIZE = 100
)

Variables

This section is empty.

Functions

func ConsumeAll added in v0.2.28

func ConsumeAll(nc *nats.Conn, streamName string, subject string, handler HandlerFunc) error

func ConsumeAllMessagesSync added in v0.2.23

func ConsumeAllMessagesSync(nc *nats.Conn, streamName string, subject string, handler HandlerFunc) error

func ConsumeMessages added in v0.2.12

func ConsumeMessages(nc *nats.Conn, streamName string, subject string, durable string, handler HandlerFunc) (jetstream.ConsumeContext, error)

func ConsumeMessagesNoAck added in v0.2.24

func ConsumeMessagesNoAck(nc *nats.Conn, streamName string, subject string, durable string, handler HandlerFuncNoAck) (jetstream.ConsumeContext, error)

func GetHeader added in v0.2.12

func GetHeader(headers map[string][]string, name string) string

func NKeySignatureHandler added in v0.2.12

func NKeySignatureHandler(seed string, b []byte) ([]byte, error)

Types

type HandlerFunc added in v0.2.22

type HandlerFunc func(context.Context, string, map[string][]string, []byte, uint64) error

type HandlerFuncNoAck added in v0.2.24

type HandlerFuncNoAck func(context.Context, *jetstream.Msg) error

type KeyHistory

type KeyHistory struct {
	Created  time.Time `json:"created"`
	Revision uint64    `json:"revision"`
	Value    string    `json:"value"`
}

type NatsConnPool added in v0.2.12

type NatsConnPool struct {
	// contains filtered or unexported fields
}

func NewNatsConnPool added in v0.2.12

func NewNatsConnPool(url string, options ...nats.Option) *NatsConnPool

func (*NatsConnPool) Close added in v0.2.12

func (this *NatsConnPool) Close() error

func (*NatsConnPool) GetConnection added in v0.2.12

func (this *NatsConnPool) GetConnection() (*nats.Conn, error)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream(url string, streamName string) (*Stream, error)

func NewStreamWithConn added in v0.2.12

func NewStreamWithConn(nc *nats.Conn, streamName string) *Stream

func NewStreamWithConnPool added in v0.2.12

func NewStreamWithConnPool(url string, streamName string, options ...nats.Option) *Stream

func (*Stream) AddKeyValue

func (this *Stream) AddKeyValue(bucket string, key string, value []byte) error

func (*Stream) Close added in v0.2.2

func (s *Stream) Close() error

func (*Stream) CreateBucket

func (this *Stream) CreateBucket(bucket string, history int, ttl time.Duration) (jetstream.KeyValue, error)

func (*Stream) CreateConsumer

func (s *Stream) CreateConsumer(stream string, durable string) (jetstream.Consumer, error)

func (*Stream) CreateStream

func (s *Stream) CreateStream(subjects []string) (jetstream.Stream, error)

func (*Stream) CreateStreamWithConfig added in v0.2.9

func (s *Stream) CreateStreamWithConfig(subjects []string, config jetstream.StreamConfig) (jetstream.Stream, error)

func (*Stream) CreateStreamWithDomain added in v0.2.17

func (s *Stream) CreateStreamWithDomain(domain string, subjects []string) (jetstream.Stream, error)

func (*Stream) CreateStreamWithDomainConfig added in v0.2.17

func (s *Stream) CreateStreamWithDomainConfig(domain string, subjects []string, config jetstream.StreamConfig) (jetstream.Stream, error)

func (*Stream) DeleteKeyValue

func (this *Stream) DeleteKeyValue(bucket string, key string) error

func (*Stream) FetchAll added in v0.2.12

func (s *Stream) FetchAll(filters []string, startTime *time.Time) ([][]byte, error)

func (*Stream) GetKVs

func (this *Stream) GetKVs(bucket string) (map[string][]byte, error)

func (*Stream) GetKeyHistory

func (this *Stream) GetKeyHistory(bucket string, key string) error

func (*Stream) GetKeys

func (this *Stream) GetKeys(bucket string) ([]string, error)

func (*Stream) GetMessageByID

func (s *Stream) GetMessageByID(j jetstream.Stream, sequence uint64) ([]byte, error)

func (*Stream) GetMessageBySequence

func (s *Stream) GetMessageBySequence(sequence uint64) ([]byte, error)

func (*Stream) GetName added in v0.2.29

func (s *Stream) GetName() string

func (*Stream) GetStream

func (s *Stream) GetStream(name string) (jetstream.Stream, error)

func (*Stream) GetValueByKey

func (this *Stream) GetValueByKey(bucket string, key string) ([]byte, error)

func (*Stream) KVWatcher added in v0.2.24

func (this *Stream) KVWatcher(bucket string, cfunc func(string, string, []byte)) error

func (*Stream) LastBySubject added in v0.2.12

func (s *Stream) LastBySubject(filters []string) ([]byte, error)

func (*Stream) LastPerSubject added in v0.2.12

func (s *Stream) LastPerSubject(filters []string) (map[string][][]byte, error)

func (*Stream) Publish

func (s *Stream) Publish(subject string, payload []byte) (*jetstream.PubAck, error)

func (*Stream) PublishMsg

func (s *Stream) PublishMsg(subject string, payload []byte, publisher string) (*jetstream.PubAck, error)

func (*Stream) PublishMsgWithHeader added in v0.2.22

func (s *Stream) PublishMsgWithHeader(subject string, payload []byte, header map[string][]string) (*jetstream.PubAck, error)

func (*Stream) PurgeStream added in v0.2.28

func (s *Stream) PurgeStream(name string) error

func (*Stream) PurgeSubject added in v0.2.22

func (s *Stream) PurgeSubject(subject string) error

func (*Stream) PutKeyValue

func (this *Stream) PutKeyValue(bucket string, key string, value []byte) error

func (*Stream) UpdateKeyValue

func (this *Stream) UpdateKeyValue(bucket string, key string, value []byte, revision uint64) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL