Versions in this module Expand all Collapse all v0 v0.0.2 Jun 15, 2026 Changes in this version + const ACCOUNT_AUTHENTICATION_EXPIRED_ERR + const AUTHENTICATION_EXPIRED_ERR + const AUTHENTICATION_REVOKED_ERR + const AUTHORIZATION_ERR + const AllKeys + const AsyncSubscription + const CLOSED + const CONNECTED + const CONNECTING + const ChanSubscription + const DEFAULT_ENCODER + const DISCONNECTED + const DRAINING_PUBS + const DRAINING_SUBS + const DefaultDrainTimeout + const DefaultFlusherTimeout + const DefaultMaxChanLen + const DefaultMaxPingOut + const DefaultMaxReconnect + const DefaultPingInterval + const DefaultPort + const DefaultPubRetryAttempts + const DefaultPubRetryWait + const DefaultReconnectBufSize + const DefaultReconnectJitter + const DefaultReconnectJitterTLS + const DefaultReconnectWait + const DefaultSubPendingBytesLimit + const DefaultSubPendingMsgsLimit + const DefaultTimeout + const DefaultURL + const DefaultWriteBufSize + const ExpectedLastMsgIdHdr + const ExpectedLastSeqHdr + const ExpectedLastSubjSeqHdr + const ExpectedStreamHdr + const GOB_ENCODER + const INFO_ARG + const InboxPrefix + const JSLastSequence + const JSON_ENCODER + const JSSequence + const JSStream + const JSSubject + const JSTimeStamp + const KeyValueMaxHistory + const LangString + const MAX_ACCOUNT_CONNECTIONS_ERR + const MAX_CONNECTIONS_ERR + const MAX_CONTROL_LINE_SIZE + const MAX_SUBSCRIPTIONS_ERR + const MINUS_ERR_ARG + const MSG_ARG + const MSG_END + const MSG_PAYLOAD + const MsgIdHdr + const MsgRollup + const MsgRollupAll + const MsgRollupSubject + const MsgSize + const MsgTTLHdr + const NilSubscription + const OP_H + const OP_I + const OP_IN + const OP_INF + const OP_INFO + const OP_INFO_SPC + const OP_M + const OP_MINUS + const OP_MINUS_E + const OP_MINUS_ER + const OP_MINUS_ERR + const OP_MINUS_ERR_SPC + const OP_MS + const OP_MSG + const OP_MSG_SPC + const OP_P + const OP_PI + const OP_PIN + const OP_PING + const OP_PLUS + const OP_PLUS_O + const OP_PLUS_OK + const OP_PO + const OP_PON + const OP_PONG + const OP_START + const PERMISSIONS_ERR + const PullSubscription + const RECONNECTING + const RequestChanLen + const STALE_CONNECTION + const SubscriptionActive + const SubscriptionClosed + const SubscriptionDraining + const SubscriptionSlowConsumer + const SyncSubscription + const Version + var DefaultOptions = GetDefaultOptions() + var ErrAccountAuthExpired = errors.New("nats: account authentication expired") + var ErrAuthExpired = errors.New("nats: authentication expired") + var ErrAuthRevoked = errors.New("nats: authentication revoked") + var ErrAuthorization = errors.New("nats: authorization violation") + var ErrBadBucket = errors.New("nats: bucket not valid key-value store") + var ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + var ErrBadObjectMeta = errors.New("nats: object-store meta information invalid") + var ErrBadQueueName = errors.New("nats: invalid queue name") + var ErrBadSubject = errors.New("nats: invalid subject") + var ErrBadSubscription = errors.New("nats: invalid subscription") + var ErrBadTimeout = errors.New("nats: timeout invalid") + var ErrBucketMalformed = errors.New("nats: bucket malformed") + var ErrBucketNotFound = errors.New("nats: bucket not found") + var ErrBucketRequired = errors.New("nats: bucket required") + var ErrCantGetBucket = errors.New("nats: invalid Get, object is a link to a bucket") + var ErrChanArg = errors.New("nats: argument needs to be a channel type") + var ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set") + var ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") + var ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") + var ErrConnectionClosed = errors.New("nats: connection closed") + var ErrConnectionDraining = errors.New("nats: connection draining") + var ErrConnectionNotTLS = errors.New("nats: connection is not tls") + var ErrConnectionReconnecting = errors.New("nats: connection reconnecting") + var ErrDigestMismatch = errors.New("nats: received a corrupt object, digests do not match") + var ErrDisconnected = errors.New("nats: server is disconnected") + var ErrDrainTimeout = errors.New("nats: draining connection timed out") + var ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") + var ErrHistoryToLarge = errors.New("nats: history limited to a max of 64") + var ErrInvalidArg = errors.New("nats: invalid argument") + var ErrInvalidBucketName = errors.New("nats: invalid bucket name") + var ErrInvalidConnection = errors.New("nats: invalid connection") + var ErrInvalidContext = errors.New("nats: invalid context") + var ErrInvalidDigestFormat = errors.New("nats: object digest hash has invalid format") + var ErrInvalidKey = errors.New("nats: invalid key") + var ErrInvalidMsg = errors.New("nats: invalid message or message nil") + var ErrInvalidStoreName = errors.New("nats: invalid object-store name") + var ErrJsonParse = errors.New("nats: connect message, json parse error") + var ErrKeyDeleted = errors.New("nats: key was deleted") + var ErrKeyNotFound = errors.New("nats: key not found") + var ErrKeyValueConfigRequired = errors.New("nats: config required") + var ErrKeyWatcherTimeout = errors.New("nats: key watcher timed out waiting for initial keys") + var ErrLinkNotAllowed = errors.New("nats: link cannot be set when putting the object in bucket") + var ErrMaxAccountConnectionsExceeded = errors.New("nats: maximum account active connections exceeded") + var ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") + var ErrMaxMessages = errors.New("nats: maximum messages delivered") + var ErrMaxPayload = errors.New("nats: maximum payload exceeded") + var ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") + var ErrMixingWebsocketSchemes = errors.New("nats: mixing of websocket and non websocket URLs is not allowed") + var ErrMsgNoReply = errors.New("nats: message does not have a reply") + var ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") + var ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") + var ErrNameRequired = errors.New("nats: name is required") + var ErrNeeds262 = errors.New("nats: object-store requires at least server version 2.6.2") + var ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") + var ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") + var ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") + var ErrNoDeadlineContext = errors.New("nats: context requires a deadline") + var ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") + var ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") + var ErrNoKeysFound = errors.New("nats: no keys found") + var ErrNoLinkToDeleted = errors.New("nats: not allowed to link to a deleted object") + var ErrNoLinkToLink = errors.New("nats: not allowed to link to another link") + var ErrNoObjectsFound = errors.New("nats: no objects found") + var ErrNoResponders = errors.New("nats: no responders available for request") + var ErrNoServers = errors.New("nats: no servers available for connection") + var ErrNoUserCB = errors.New("nats: user callback not defined") + var ErrObjectAlreadyExists = errors.New("nats: an object already exists with that name") + var ErrObjectConfigRequired = errors.New("nats: object-store config required") + var ErrObjectNotFound = errors.New("nats: object not found") + var ErrObjectRequired = errors.New("nats: object required") + var ErrPermissionViolation = errors.New("nats: permissions violation") + var ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") + var ErrSecureConnRequired = errors.New("nats: secure connection required") + var ErrSecureConnWanted = errors.New("nats: secure connection not available") + var ErrServerNotInPool = errors.New("nats: selected server is not in the pool") + var ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") + var ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) + var ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") + var ErrTLS = errors.New("nats: tls error") + var ErrTimeout = errors.New("nats: timeout") + var ErrTokenAlreadySet = errors.New("nats: token and token handler both set") + var ErrTypeSubscription = errors.New("nats: invalid subscription type") + var ErrUpdateMetaDeleted = errors.New("nats: cannot update meta for a deleted object") + var ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") + var ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass") + var ErrWebSocketHeadersAlreadySet = errors.New("nats: websocket connection headers already set") + func DecodeObjectDigest(data string) ([]byte, error) + func GetObjectDigestValue(data hash.Hash) string + func NewInbox() string + func RegisterEncoder(encType string, enc Encoder) + type APIError struct + Code int + Description string + ErrorCode ErrorCode + func (e *APIError) APIError() *APIError + func (e *APIError) Error() string + func (e *APIError) Is(err error) bool + type APIStats struct + Errors uint64 + Inflight uint64 + Level int + Total uint64 + type AccountInfo struct + API APIStats + Domain string + Tiers map[string]Tier + type AccountLimits struct + MaxAckPending int + MaxBytesRequired bool + MaxConsumers int + MaxMemory int64 + MaxStore int64 + MaxStreams int + MemoryMaxStreamBytes int64 + StoreMaxStreamBytes int64 + type AckOpt interface + type AckPolicy int + const AckAllPolicy + const AckExplicitPolicy + const AckFlowControlPolicy + const AckNonePolicy + func (p *AckPolicy) UnmarshalJSON(data []byte) error + func (p AckPolicy) MarshalJSON() ([]byte, error) + func (p AckPolicy) String() string + type AckWait time.Duration + type AuthTokenHandler func() string + type ClientTrace struct + RequestSent func(subj string, payload []byte) + ResponseReceived func(subj string, payload []byte, hdr Header) + type ClusterInfo struct + Leader string + LeaderSince *time.Time + Name string + RaftGroup string + Replicas []*PeerInfo + SystemAcc bool + TrafficAcc string + type Conn struct + Opts Options + func Connect(url string, options ...Option) (*Conn, error) + func (nc *Conn) AuthRequired() bool + func (nc *Conn) Barrier(f func()) error + func (nc *Conn) Buffered() (int, error) + func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) + func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) + func (nc *Conn) Close() + func (nc *Conn) ClosedHandler() ConnHandler + func (nc *Conn) ConnectedAddr() string + func (nc *Conn) ConnectedClusterName() string + func (nc *Conn) ConnectedServerId() string + func (nc *Conn) ConnectedServerJetStream() (bool, int) + func (nc *Conn) ConnectedServerName() string + func (nc *Conn) ConnectedServerVersion() string + func (nc *Conn) ConnectedUrl() string + func (nc *Conn) ConnectedUrlRedacted() string + func (nc *Conn) DisconnectErrHandler() ConnErrHandler + func (nc *Conn) DiscoveredServers() []string + func (nc *Conn) DiscoveredServersHandler() ConnHandler + func (nc *Conn) Drain() error + func (nc *Conn) ErrorHandler() ErrHandler + func (nc *Conn) Flush() error + func (nc *Conn) FlushTimeout(timeout time.Duration) (err error) + func (nc *Conn) FlushWithContext(ctx context.Context) error + func (nc *Conn) ForceReconnect() error + func (nc *Conn) GetClientID() (uint64, error) + func (nc *Conn) GetClientIP() (net.IP, error) + func (nc *Conn) HeadersSupported() bool + func (nc *Conn) IsClosed() bool + func (nc *Conn) IsConnected() bool + func (nc *Conn) IsDraining() bool + func (nc *Conn) IsReconnecting() bool + func (nc *Conn) IsSystemAccount() bool + func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) + func (nc *Conn) LastError() error + func (nc *Conn) LocalAddr() string + func (nc *Conn) MaxPayload() int64 + func (nc *Conn) NewInbox() string + func (nc *Conn) NewRespInbox() string + func (nc *Conn) NumSubscriptions() int + func (nc *Conn) Publish(subj string, data []byte) error + func (nc *Conn) PublishMsg(m *Msg) error + func (nc *Conn) PublishRequest(subj, reply string, data []byte) error + func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) + func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) + func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) + func (nc *Conn) RTT() (time.Duration, error) + func (nc *Conn) ReconnectHandler() ConnHandler + func (nc *Conn) RemoveStatusListener(ch chan (Status)) + func (nc *Conn) Request(subj string, data []byte, timeout time.Duration) (*Msg, error) + func (nc *Conn) RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) + func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) + func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) + func (nc *Conn) ServerPool() []Server + func (nc *Conn) Servers() []string + func (nc *Conn) SetClosedHandler(cb ConnHandler) + func (nc *Conn) SetDisconnectErrHandler(dcb ConnErrHandler) + func (nc *Conn) SetDisconnectHandler(dcb ConnHandler) + func (nc *Conn) SetDiscoveredServersHandler(dscb ConnHandler) + func (nc *Conn) SetErrorHandler(cb ErrHandler) + func (nc *Conn) SetReconnectHandler(rcb ConnHandler) + func (nc *Conn) SetServerPool(servers []string) error + func (nc *Conn) Stats() Statistics + func (nc *Conn) Status() Status + func (nc *Conn) StatusChanged(statuses ...Status) chan Status + func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) + func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) + func (nc *Conn) TLSConnectionState() (tls.ConnectionState, error) + func (nc *Conn) TLSRequired() bool + type ConnErrHandler func(*Conn, error) + type ConnHandler func(*Conn) + type ConsumerConfig struct + AckPolicy AckPolicy + AckWait time.Duration + BackOff []time.Duration + DeliverGroup string + DeliverPolicy DeliverPolicy + DeliverSubject string + Description string + Durable string + FilterSubject string + FilterSubjects []string + FlowControl bool + HeadersOnly bool + Heartbeat time.Duration + InactiveThreshold time.Duration + MaxAckPending int + MaxDeliver int + MaxRequestBatch int + MaxRequestExpires time.Duration + MaxRequestMaxBytes int + MaxWaiting int + MemoryStorage bool + Metadata map[string]string + Name string + OptStartSeq uint64 + OptStartTime *time.Time + RateLimit uint64 + ReplayPolicy ReplayPolicy + Replicas int + SampleFrequency string + type ConsumerInfo struct + AckFloor SequenceInfo + Cluster *ClusterInfo + Config ConsumerConfig + Created time.Time + Delivered SequenceInfo + Name string + NumAckPending int + NumPending uint64 + NumRedelivered int + NumWaiting int + PushBound bool + Stream string + type ContextOpt struct + func Context(ctx context.Context) ContextOpt + type CustomDialer interface + Dial func(network, address string) (net.Conn, error) + type DeleteMarkersOlderThan time.Duration + type DeleteOpt interface + func LastRevision(revision uint64) DeleteOpt + type DeliverPolicy int + const DeliverAllPolicy + const DeliverByStartSequencePolicy + const DeliverByStartTimePolicy + const DeliverLastPerSubjectPolicy + const DeliverLastPolicy + const DeliverNewPolicy + func (p *DeliverPolicy) UnmarshalJSON(data []byte) error + func (p DeliverPolicy) MarshalJSON() ([]byte, error) + type DiscardPolicy int + const DiscardNew + const DiscardOld + func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error + func (dp DiscardPolicy) MarshalJSON() ([]byte, error) + func (dp DiscardPolicy) String() string + type EncodedConn struct + Conn *Conn + Enc Encoder + func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) + func (c *EncodedConn) BindRecvChan(subject string, channel any) (*Subscription, error) + func (c *EncodedConn) BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error) + func (c *EncodedConn) BindSendChan(subject string, channel any) error + func (c *EncodedConn) Close() + func (c *EncodedConn) Drain() error + func (c *EncodedConn) Flush() error + func (c *EncodedConn) FlushTimeout(timeout time.Duration) (err error) + func (c *EncodedConn) LastError() error + func (c *EncodedConn) Publish(subject string, v any) error + func (c *EncodedConn) PublishRequest(subject, reply string, v any) error + func (c *EncodedConn) QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) + func (c *EncodedConn) Request(subject string, v any, vPtr any, timeout time.Duration) error + func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error + func (c *EncodedConn) Subscribe(subject string, cb Handler) (*Subscription, error) + type Encoder interface + Decode func(subject string, data []byte, vPtr any) error + Encode func(subject string, v any) ([]byte, error) + func EncoderForType(encType string) Encoder + type ErrConsumerSequenceMismatch struct + ConsumerSequence uint64 + LastConsumerSequence uint64 + StreamResumeSequence uint64 + func (ecs *ErrConsumerSequenceMismatch) Error() string + type ErrHandler func(*Conn, *Subscription, error) + type ErrorCode uint16 + const JSErrCodeBadRequest + const JSErrCodeConsumerAlreadyExists + const JSErrCodeConsumerEmptyFilter + const JSErrCodeConsumerNameExists + const JSErrCodeConsumerNotFound + const JSErrCodeDuplicateFilterSubjects + const JSErrCodeInsufficientResourcesErr + const JSErrCodeJetStreamNotAvailable + const JSErrCodeJetStreamNotEnabled + const JSErrCodeJetStreamNotEnabledForAccount + const JSErrCodeMessageNotFound + const JSErrCodeOverlappingFilterSubjects + const JSErrCodeStreamNameInUse + const JSErrCodeStreamNotFound + const JSErrCodeStreamWrongLastSequence + const JSStreamInvalidConfig + type ExternalStream struct + APIPrefix string + DeliverPrefix string + type GetObjectInfoOpt interface + func GetObjectInfoShowDeleted() GetObjectInfoOpt + type GetObjectOpt interface + func GetObjectShowDeleted() GetObjectOpt + type Handler any + type Header map[string][]string + func DecodeHeadersMsg(data []byte) (Header, error) + func (h Header) Add(key, value string) + func (h Header) Del(key string) + func (h Header) Get(key string) string + func (h Header) Set(key, value string) + func (h Header) Values(key string) []string + type InProcessConnProvider interface + InProcessConn func() (net.Conn, error) + type JSOpt interface + func APIPrefix(pre string) JSOpt + func DirectGet() JSOpt + func DirectGetNext(subject string) JSOpt + func Domain(domain string) JSOpt + func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt + func PublishAsyncMaxPending(max int) JSOpt + func PublishAsyncTimeout(dur time.Duration) JSOpt + func StreamListFilter(subject string) JSOpt + func UseLegacyDurableConsumers() JSOpt + type JetStream interface + ChanQueueSubscribe func(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) + ChanSubscribe func(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) + CleanupPublisher func() + Publish func(subj string, data []byte, opts ...PubOpt) (*PubAck, error) + PublishAsync func(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) + PublishAsyncComplete func() <-chan struct{} + PublishAsyncPending func() int + PublishMsg func(m *Msg, opts ...PubOpt) (*PubAck, error) + PublishMsgAsync func(m *Msg, opts ...PubOpt) (PubAckFuture, error) + PullSubscribe func(subj, durable string, opts ...SubOpt) (*Subscription, error) + QueueSubscribe func(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) + QueueSubscribeSync func(subj, queue string, opts ...SubOpt) (*Subscription, error) + Subscribe func(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) + SubscribeSync func(subj string, opts ...SubOpt) (*Subscription, error) + type JetStreamContext interface + type JetStreamError interface + APIError func() *APIError + var ErrAsyncPublishTimeout JetStreamError = &jsError{ ... } + var ErrBadRequest JetStreamError = &jsError{ ... } + var ErrCantAckIfConsumerAckNone JetStreamError = &jsError{ ... } + var ErrConsumerConfigRequired JetStreamError = &jsError{ ... } + var ErrConsumerCreationResponseEmpty JetStreamError = &jsError{ ... } + var ErrConsumerDeleted JetStreamError = &jsError{ ... } + var ErrConsumerInfoOnOrderedReset JetStreamError = &jsError{ ... } + var ErrConsumerLeadershipChanged JetStreamError = &jsError{ ... } + var ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{ ... } + var ErrConsumerNameAlreadyInUse JetStreamError = &jsError{ ... } + var ErrConsumerNameRequired JetStreamError = &jsError{ ... } + var ErrConsumerNotActive JetStreamError = &jsError{ ... } + var ErrConsumerNotFound JetStreamError = &jsError{ ... } + var ErrContextAndTimeout JetStreamError = &jsError{ ... } + var ErrDuplicateFilterSubjects JetStreamError = &jsError{ ... } + var ErrEmptyFilter JetStreamError = &jsError{ ... } + var ErrFetchDisconnected = &jsError{ ... } + var ErrInvalidConsumerName JetStreamError = &jsError{ ... } + var ErrInvalidDurableName = errors.New("nats: invalid durable name") + var ErrInvalidFilterSubject JetStreamError = &jsError{ ... } + var ErrInvalidJSAck JetStreamError = &jsError{ ... } + var ErrInvalidStreamName JetStreamError = &jsError{ ... } + var ErrJetStreamNotEnabled JetStreamError = &jsError{ ... } + var ErrJetStreamNotEnabledForAccount JetStreamError = &jsError{ ... } + var ErrJetStreamPublisherClosed JetStreamError = &jsError{ ... } + var ErrKeyExists JetStreamError = &jsError{ ... } + var ErrMsgAlreadyAckd JetStreamError = &jsError{ ... } + var ErrMsgNotFound JetStreamError = &jsError{ ... } + var ErrNoHeartbeat JetStreamError = &jsError{ ... } + var ErrNoMatchingStream JetStreamError = &jsError{ ... } + var ErrNoStreamResponse JetStreamError = &jsError{ ... } + var ErrNotJSMessage JetStreamError = &jsError{ ... } + var ErrOverlappingFilterSubjects JetStreamError = &jsError{ ... } + var ErrPullSubscribeRequired JetStreamError = &jsError{ ... } + var ErrPullSubscribeToPushConsumer JetStreamError = &jsError{ ... } + var ErrStreamConfigRequired JetStreamError = &jsError{ ... } + var ErrStreamNameAlreadyInUse JetStreamError = &jsError{ ... } + var ErrStreamNameRequired JetStreamError = &jsError{ ... } + var ErrStreamNotFound JetStreamError = &jsError{ ... } + var ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{ ... } + var ErrStreamSourceNotSupported JetStreamError = &jsError{ ... } + var ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{ ... } + var ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{ ... } + var ErrSubjectMismatch JetStreamError = &jsError{ ... } + var ErrSubscriptionClosed JetStreamError = &jsError{ ... } + var ErrTooManyStalledMsgs JetStreamError = &jsError{ ... } + type JetStreamManager interface + AccountInfo func(opts ...JSOpt) (*AccountInfo, error) + AddConsumer func(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) + AddStream func(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) + ConsumerInfo func(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) + ConsumerNames func(stream string, opts ...JSOpt) <-chan string + Consumers func(stream string, opts ...JSOpt) <-chan *ConsumerInfo + ConsumersInfo func(stream string, opts ...JSOpt) <-chan *ConsumerInfo + DeleteConsumer func(stream, consumer string, opts ...JSOpt) error + DeleteMsg func(name string, seq uint64, opts ...JSOpt) error + DeleteStream func(name string, opts ...JSOpt) error + GetLastMsg func(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) + GetMsg func(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) + PurgeStream func(name string, opts ...JSOpt) error + SecureDeleteMsg func(name string, seq uint64, opts ...JSOpt) error + StreamInfo func(stream string, opts ...JSOpt) (*StreamInfo, error) + StreamNameBySubject func(string, ...JSOpt) (string, error) + StreamNames func(opts ...JSOpt) <-chan string + Streams func(opts ...JSOpt) <-chan *StreamInfo + StreamsInfo func(opts ...JSOpt) <-chan *StreamInfo + UpdateConsumer func(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) + UpdateStream func(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) + type KeyLister interface + Error func() <-chan error + Keys func() <-chan string + Stop func() error + type KeyValue interface + Bucket func() string + Create func(key string, value []byte) (revision uint64, err error) + Delete func(key string, opts ...DeleteOpt) error + Get func(key string) (entry KeyValueEntry, err error) + GetRevision func(key string, revision uint64) (entry KeyValueEntry, err error) + History func(key string, opts ...WatchOpt) ([]KeyValueEntry, error) + Keys func(opts ...WatchOpt) ([]string, error) + ListKeys func(opts ...WatchOpt) (KeyLister, error) + Purge func(key string, opts ...DeleteOpt) error + PurgeDeletes func(opts ...PurgeOpt) error + Put func(key string, value []byte) (revision uint64, err error) + PutString func(key string, value string) (revision uint64, err error) + Status func() (KeyValueStatus, error) + Update func(key string, value []byte, last uint64) (revision uint64, err error) + Watch func(keys string, opts ...WatchOpt) (KeyWatcher, error) + WatchAll func(opts ...WatchOpt) (KeyWatcher, error) + WatchFiltered func(keys []string, opts ...WatchOpt) (KeyWatcher, error) + type KeyValueBucketStatus struct + func (s *KeyValueBucketStatus) BackingStore() string + func (s *KeyValueBucketStatus) Bucket() string + func (s *KeyValueBucketStatus) Bytes() uint64 + func (s *KeyValueBucketStatus) Config() KeyValueConfig + func (s *KeyValueBucketStatus) History() int64 + func (s *KeyValueBucketStatus) IsCompressed() bool + func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo + func (s *KeyValueBucketStatus) TTL() time.Duration + func (s *KeyValueBucketStatus) Values() uint64 + type KeyValueConfig struct + Bucket string + Compression bool + Description string + History uint8 + MaxBytes int64 + MaxValueSize int32 + Mirror *StreamSource + Placement *Placement + RePublish *RePublish + Replicas int + Sources []*StreamSource + Storage StorageType + TTL time.Duration + type KeyValueEntry interface + Bucket func() string + Created func() time.Time + Delta func() uint64 + Key func() string + Operation func() KeyValueOp + Revision func() uint64 + Value func() []byte + type KeyValueManager interface + CreateKeyValue func(cfg *KeyValueConfig) (KeyValue, error) + DeleteKeyValue func(bucket string) error + KeyValue func(bucket string) (KeyValue, error) + KeyValueStoreNames func() <-chan string + KeyValueStores func() <-chan KeyValueStatus + type KeyValueOp uint8 + const KeyValueDelete + const KeyValuePurge + const KeyValuePut + func (op KeyValueOp) String() string + type KeyValueStatus interface + BackingStore func() string + Bucket func() string + Bytes func() uint64 + Config func() KeyValueConfig + History func() int64 + IsCompressed func() bool + TTL func() time.Duration + Values func() uint64 + type KeyWatcher interface + Context func() context.Context + Error func() <-chan error + Stop func() error + Updates func() <-chan KeyValueEntry + type ListObjectsOpt interface + func ListObjectsShowDeleted() ListObjectsOpt + type MaxWait time.Duration + type MessageBatch interface + Done func() <-chan struct{} + Error func() error + Messages func() <-chan *Msg + type Msg struct + Data []byte + Header Header + Reply string + Sub *Subscription + Subject string + func NewMsg(subject string) *Msg + func (m *Msg) Ack(opts ...AckOpt) error + func (m *Msg) AckSync(opts ...AckOpt) error + func (m *Msg) Equal(msg *Msg) bool + func (m *Msg) InProgress(opts ...AckOpt) error + func (m *Msg) Metadata() (*MsgMetadata, error) + func (m *Msg) Nak(opts ...AckOpt) error + func (m *Msg) NakWithDelay(delay time.Duration, opts ...AckOpt) error + func (m *Msg) Respond(data []byte) error + func (m *Msg) RespondMsg(msg *Msg) error + func (m *Msg) Size() int + func (m *Msg) Term(opts ...AckOpt) error + type MsgErrHandler func(JetStream, *Msg, error) + type MsgHandler func(msg *Msg) + type MsgMetadata struct + Consumer string + Domain string + NumDelivered uint64 + NumPending uint64 + Sequence SequencePair + Stream string + Timestamp time.Time + type ObjectBucketStatus struct + func (s *ObjectBucketStatus) BackingStore() string + func (s *ObjectBucketStatus) Bucket() string + func (s *ObjectBucketStatus) Description() string + func (s *ObjectBucketStatus) IsCompressed() bool + func (s *ObjectBucketStatus) Metadata() map[string]string + func (s *ObjectBucketStatus) Replicas() int + func (s *ObjectBucketStatus) Sealed() bool + func (s *ObjectBucketStatus) Size() uint64 + func (s *ObjectBucketStatus) Storage() StorageType + func (s *ObjectBucketStatus) StreamInfo() *StreamInfo + func (s *ObjectBucketStatus) TTL() time.Duration + type ObjectInfo struct + Bucket string + Chunks uint32 + Deleted bool + Digest string + ModTime time.Time + NUID string + Size uint64 + type ObjectLink struct + Bucket string + Name string + type ObjectMeta struct + Description string + Headers Header + Metadata map[string]string + Name string + Opts *ObjectMetaOptions + type ObjectMetaOptions struct + ChunkSize uint32 + Link *ObjectLink + type ObjectOpt interface + type ObjectResult interface + Error func() error + Info func() (*ObjectInfo, error) + type ObjectStore interface + AddBucketLink func(name string, bucket ObjectStore) (*ObjectInfo, error) + AddLink func(name string, obj *ObjectInfo) (*ObjectInfo, error) + Delete func(name string) error + Get func(name string, opts ...GetObjectOpt) (ObjectResult, error) + GetBytes func(name string, opts ...GetObjectOpt) ([]byte, error) + GetFile func(name, file string, opts ...GetObjectOpt) error + GetInfo func(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) + GetString func(name string, opts ...GetObjectOpt) (string, error) + List func(opts ...ListObjectsOpt) ([]*ObjectInfo, error) + Put func(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) + PutBytes func(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) + PutFile func(file string, opts ...ObjectOpt) (*ObjectInfo, error) + PutString func(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) + Seal func() error + Status func() (ObjectStoreStatus, error) + UpdateMeta func(name string, meta *ObjectMeta) error + Watch func(opts ...WatchOpt) (ObjectWatcher, error) + type ObjectStoreConfig struct + Bucket string + Compression bool + Description string + MaxBytes int64 + Metadata map[string]string + Placement *Placement + Replicas int + Storage StorageType + TTL time.Duration + type ObjectStoreManager interface + CreateObjectStore func(cfg *ObjectStoreConfig) (ObjectStore, error) + DeleteObjectStore func(bucket string) error + ObjectStore func(bucket string) (ObjectStore, error) + ObjectStoreNames func(opts ...ObjectOpt) <-chan string + ObjectStores func(opts ...ObjectOpt) <-chan ObjectStoreStatus + type ObjectStoreStatus interface + BackingStore func() string + Bucket func() string + Description func() string + IsCompressed func() bool + Metadata func() map[string]string + Replicas func() int + Sealed func() bool + Size func() uint64 + Storage func() StorageType + TTL func() time.Duration + type ObjectWatcher interface + Stop func() error + Updates func() <-chan *ObjectInfo + type Option func(*Options) error + func ClientCert(certFile, keyFile string) Option + func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option + func ClosedHandler(cb ConnHandler) Option + func Compression(enabled bool) Option + func ConnectHandler(cb ConnHandler) Option + func CustomInboxPrefix(p string) Option + func CustomReconnectDelay(cb ReconnectDelayHandler) Option + func Dialer(dialer *net.Dialer) Option + func DisconnectErrHandler(cb ConnErrHandler) Option + func DisconnectHandler(cb ConnHandler) Option + func DiscoveredServersHandler(cb ConnHandler) Option + func DontRandomize() Option + func DrainTimeout(t time.Duration) Option + func ErrorHandler(cb ErrHandler) Option + func FlusherTimeout(t time.Duration) Option + func IgnoreAuthErrorAbort() Option + func IgnoreDiscoveredServers() Option + func InProcessServer(server InProcessConnProvider) Option + func LameDuckModeHandler(cb ConnHandler) Option + func MaxPingsOutstanding(max int) Option + func MaxReconnects(max int) Option + func Name(name string) Option + func Nkey(pubKey string, sigCB SignatureHandler) Option + func NkeyOptionFromSeed(seedFile string) (Option, error) + func NoCallbacksAfterClientClose() Option + func NoEcho() Option + func NoReconnect() Option + func PermissionErrOnSubscribe(enabled bool) Option + func PingInterval(t time.Duration) Option + func ProxyPath(path string) Option + func ReconnectBufSize(size int) Option + func ReconnectErrHandler(cb ConnErrHandler) Option + func ReconnectHandler(cb ConnHandler) Option + func ReconnectJitter(jitter, jitterForTLS time.Duration) Option + func ReconnectOnFlusherError() Option + func ReconnectToServer(cb ReconnectToServerHandler) Option + func ReconnectWait(t time.Duration) Option + func RetryOnFailedConnect(retry bool) Option + func RootCAs(file ...string) Option + func Secure(tls ...*tls.Config) Option + func SetCustomDialer(dialer CustomDialer) Option + func SkipHostLookup() Option + func SkipSubjectValidation() Option + func SyncQueueLen(max int) Option + func TLSHandshakeFirst() Option + func Timeout(t time.Duration) Option + func Token(token string) Option + func TokenHandler(cb AuthTokenHandler) Option + func UseOldRequestStyle() Option + func UserCredentialBytes(userOrChainedFileBytes []byte, seedFiles ...[]byte) Option + func UserCredentials(userOrChainedFile string, seedFiles ...string) Option + func UserInfo(user, password string) Option + func UserInfoHandler(cb UserInfoCB) Option + func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option + func UserJWTAndSeed(jwt string, seed string) Option + func WebSocketConnectionHeaders(headers http.Header) Option + func WebSocketConnectionHeadersHandler(cb WebSocketHeadersHandler) Option + func WriteBufferSize(size int) Option + type Options struct + AllowReconnect bool + AsyncErrorCB ErrHandler + ClosedCB ConnHandler + Compression bool + ConnectedCB ConnHandler + CustomDialer CustomDialer + CustomReconnectDelayCB ReconnectDelayHandler + Dialer *net.Dialer + DisconnectedCB ConnHandler + DisconnectedErrCB ConnErrHandler + DiscoveredServersCB ConnHandler + DrainTimeout time.Duration + FlusherTimeout time.Duration + IgnoreAuthErrorAbort bool + IgnoreDiscoveredServers bool + InProcessServer InProcessConnProvider + InboxPrefix string + LameDuckModeHandler ConnHandler + MaxPingsOut int + MaxReconnect int + Name string + Nkey string + NoCallbacksAfterClientClose bool + NoEcho bool + NoRandomize bool + Password string + Pedantic bool + PermissionErrOnSubscribe bool + PingInterval time.Duration + ProxyPath string + ReconnectBufSize int + ReconnectErrCB ConnErrHandler + ReconnectJitter time.Duration + ReconnectJitterTLS time.Duration + ReconnectOnFlusherError bool + ReconnectToServerCB ReconnectToServerHandler + ReconnectWait time.Duration + ReconnectedCB ConnHandler + RetryOnFailedConnect bool + RootCAsCB RootCAsHandler + Secure bool + Servers []string + SignatureCB SignatureHandler + SkipHostLookup bool + SkipSubjectValidation bool + SubChanLen int + TLSCertCB TLSCertHandler + TLSConfig *tls.Config + TLSHandshakeFirst bool + Timeout time.Duration + Token string + TokenHandler AuthTokenHandler + Url string + UseOldRequestStyle bool + User string + UserInfo UserInfoCB + UserJWT UserJWTHandler + Verbose bool + WebSocketConnectionHeaders http.Header + WebSocketConnectionHeadersHandler WebSocketHeadersHandler + WriteBufferSize int + func GetDefaultOptions() Options + func (o Options) Connect() (*Conn, error) + type PeerInfo struct + Active time.Duration + Current bool + Lag uint64 + Name string + Offline bool + type Placement struct + Cluster string + Tags []string + type PubAck struct + Domain string + Duplicate bool + Sequence uint64 + Stream string + type PubAckFuture interface + Err func() <-chan error + Msg func() *Msg + Ok func() <-chan *PubAck + type PubOpt interface + func ExpectLastMsgId(id string) PubOpt + func ExpectLastSequence(seq uint64) PubOpt + func ExpectLastSequencePerSubject(seq uint64) PubOpt + func ExpectStream(stream string) PubOpt + func MsgId(id string) PubOpt + func MsgTTL(dur time.Duration) PubOpt + func RetryAttempts(num int) PubOpt + func RetryWait(dur time.Duration) PubOpt + func StallWait(ttl time.Duration) PubOpt + type PullHeartbeat time.Duration + type PullMaxBytes int + type PullOpt interface + type PurgeOpt interface + type RawStreamMsg struct + Data []byte + Header Header + Sequence uint64 + Subject string + Time time.Time + type RePublish struct + Destination string + HeadersOnly bool + Source string + type ReconnectDelayHandler func(attempts int) time.Duration + type ReconnectToServerHandler func([]Server, ServerInfo) (*Server, time.Duration) + type ReplayPolicy int + const ReplayInstantPolicy + const ReplayOriginalPolicy + func (p *ReplayPolicy) UnmarshalJSON(data []byte) error + func (p ReplayPolicy) MarshalJSON() ([]byte, error) + type RetentionPolicy int + const InterestPolicy + const LimitsPolicy + const WorkQueuePolicy + func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error + func (rp RetentionPolicy) MarshalJSON() ([]byte, error) + func (rp RetentionPolicy) String() string + type RootCAsHandler func() (*x509.CertPool, error) + type SequenceInfo struct + Consumer uint64 + Last *time.Time + Stream uint64 + type SequencePair struct + Consumer uint64 + Stream uint64 + type Server struct + Reconnects int + URL *url.URL + type ServerInfo struct + AuthRequired bool + CID uint64 + ClientIP string + Cluster string + ConnectURLs []string + Headers bool + Host string + ID string + IsSystemAccount bool + JSApiLevel int + JetStream bool + LameDuckMode bool + MaxPayload int64 + Name string + Nonce string + Port int + Proto int + TLSAvailable bool + TLSRequired bool + Version string + type SignatureHandler func([]byte) ([]byte, error) + type Statistics struct + InBytes uint64 + InMsgs uint64 + OutBytes uint64 + OutMsgs uint64 + Reconnects uint64 + type Status int + func (s Status) String() string + type StorageType int + const FileStorage + const MemoryStorage + func (st *StorageType) UnmarshalJSON(data []byte) error + func (st StorageType) MarshalJSON() ([]byte, error) + func (st StorageType) String() string + type StoreCompression uint8 + const NoCompression + const S2Compression + func (alg *StoreCompression) UnmarshalJSON(b []byte) error + func (alg StoreCompression) MarshalJSON() ([]byte, error) + func (alg StoreCompression) String() string + type StreamAlternate struct + Cluster string + Domain string + Name string + type StreamConfig struct + AllowDirect bool + AllowMsgTTL bool + AllowRollup bool + Compression StoreCompression + ConsumerLimits StreamConsumerLimits + DenyDelete bool + DenyPurge bool + Description string + Discard DiscardPolicy + DiscardNewPerSubject bool + Duplicates time.Duration + FirstSeq uint64 + MaxAge time.Duration + MaxBytes int64 + MaxConsumers int + MaxMsgSize int32 + MaxMsgs int64 + MaxMsgsPerSubject int64 + Metadata map[string]string + Mirror *StreamSource + MirrorDirect bool + Name string + NoAck bool + Placement *Placement + RePublish *RePublish + Replicas int + Retention RetentionPolicy + Sealed bool + Sources []*StreamSource + Storage StorageType + SubjectDeleteMarkerTTL time.Duration + SubjectTransform *SubjectTransformConfig + Subjects []string + Template string + type StreamConsumerLimits struct + InactiveThreshold time.Duration + MaxAckPending int + type StreamInfo struct + Alternates []*StreamAlternate + Cluster *ClusterInfo + Config StreamConfig + Created time.Time + Mirror *StreamSourceInfo + Sources []*StreamSourceInfo + State StreamState + type StreamInfoRequest struct + DeletedDetails bool + SubjectsFilter string + type StreamPurgeRequest struct + Keep uint64 + Sequence uint64 + Subject string + type StreamSource struct + Domain string + External *ExternalStream + FilterSubject string + Name string + OptStartSeq uint64 + OptStartTime *time.Time + SubjectTransforms []SubjectTransformConfig + type StreamSourceInfo struct + Active time.Duration + Error *APIError + External *ExternalStream + FilterSubject string + Lag uint64 + Name string + SubjectTransforms []SubjectTransformConfig + type StreamState struct + Bytes uint64 + Consumers int + Deleted []uint64 + FirstSeq uint64 + FirstTime time.Time + LastSeq uint64 + LastTime time.Time + Msgs uint64 + NumDeleted int + NumSubjects uint64 + Subjects map[string]uint64 + type SubOpt interface + func AckAll() SubOpt + func AckExplicit() SubOpt + func AckNone() SubOpt + func BackOff(backOff []time.Duration) SubOpt + func Bind(stream, consumer string) SubOpt + func BindStream(stream string) SubOpt + func ConsumerFilterSubjects(subjects ...string) SubOpt + func ConsumerMemoryStorage() SubOpt + func ConsumerName(name string) SubOpt + func ConsumerReplicas(replicas int) SubOpt + func DeliverAll() SubOpt + func DeliverLast() SubOpt + func DeliverLastPerSubject() SubOpt + func DeliverNew() SubOpt + func DeliverSubject(subject string) SubOpt + func Description(description string) SubOpt + func Durable(consumer string) SubOpt + func EnableFlowControl() SubOpt + func HeadersOnly() SubOpt + func IdleHeartbeat(duration time.Duration) SubOpt + func InactiveThreshold(threshold time.Duration) SubOpt + func ManualAck() SubOpt + func MaxAckPending(n int) SubOpt + func MaxDeliver(n int) SubOpt + func MaxRequestBatch(max int) SubOpt + func MaxRequestExpires(max time.Duration) SubOpt + func MaxRequestMaxBytes(bytes int) SubOpt + func OrderedConsumer() SubOpt + func PullMaxWaiting(n int) SubOpt + func RateLimit(n uint64) SubOpt + func ReplayInstant() SubOpt + func ReplayOriginal() SubOpt + func SkipConsumerLookup() SubOpt + func StartSequence(seq uint64) SubOpt + func StartTime(startTime time.Time) SubOpt + type SubStatus int + func (s SubStatus) String() string + type SubjectTransformConfig struct + Destination string + Source string + type Subscription struct + Queue string + Subject string + func (s *Subscription) AutoUnsubscribe(max int) error + func (s *Subscription) ClearMaxPending() error + func (s *Subscription) Delivered() (int64, error) + func (s *Subscription) Drain() error + func (s *Subscription) Dropped() (int, error) + func (s *Subscription) IsDraining() bool + func (s *Subscription) IsValid() bool + func (s *Subscription) MaxPending() (int, int, error) + func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) + func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) + func (s *Subscription) Pending() (int, int, error) + func (s *Subscription) PendingLimits() (int, int, error) + func (s *Subscription) QueuedMsgs() (int, error) + func (s *Subscription) SetClosedHandler(handler func(subject string)) + func (s *Subscription) SetPendingLimits(msgLimit, bytesLimit int) error + func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus + func (s *Subscription) Type() SubscriptionType + func (s *Subscription) Unsubscribe() error + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) + func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) + func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error) + func (sub *Subscription) InitialConsumerPending() (uint64, error) + func (sub *Subscription) Msgs() iter.Seq2[*Msg, error] + func (sub *Subscription) MsgsTimeout(timeout time.Duration) iter.Seq2[*Msg, error] + type SubscriptionType int + type TLSCertHandler func() (tls.Certificate, error) + type Tier struct + Consumers int + Limits AccountLimits + Memory uint64 + ReservedMemory uint64 + ReservedStore uint64 + Store uint64 + Streams int + type UserInfoCB func() (string, string) + type UserJWTHandler func() (string, error) + type WatchOpt interface + func IgnoreDeletes() WatchOpt + func IncludeHistory() WatchOpt + func MetaOnly() WatchOpt + func UpdatesOnly() WatchOpt + type WebSocketHeadersHandler func() (http.Header, error)