xstream

package
v0.0.0-...-a136232 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const CTXKEY_CFG ctxkey = "ackstream.xstream.configs"
View Source
const CTXKEY_CONN ctxkey = "ackstream.xstream.connection"

Variables

View Source
var (
	CONSUMER_POLICY_ALL = 0
	CONSUMER_POLICY_NEW = 1
)
View Source
var ErrCfgNotFound = errors.New("no configs was set")
View Source
var ErrConnNotFound = errors.New("no connection was initialized")
View Source
var ErrMsgInvalidEvent = errors.New("could not construct event from message")
View Source
var ErrSubNoQueue = errors.New("could not subscribe a topic without queue name")

Functions

func CfgWithContext

func CfgWithContext(ctx context.Context, cfg *Configs) context.Context

func ConnFromContext

func ConnFromContext(ctx context.Context) (*nats.Conn, error)

func ConnWithContext

func ConnWithContext(ctx context.Context, conn *nats.Conn) context.Context

func NewConnection

func NewConnection(ctx context.Context) (*nats.Conn, error)

func NewEvent

func NewEvent(msg *nats.Msg) (*entities.Event, error)

func NewJetStream

func NewJetStream(ctx context.Context) (nats.JetStreamContext, error)

func NewMsg

func NewMsg(cfg *Configs, event *entities.Event) (*nats.Msg, error)

func NewSubject

func NewSubject(cfg *Configs, sample *entities.Event) string

func UseSub

func UseSub(ctx context.Context, fn SubscribeFn) nats.MsgHandler

Types

type Configs

type Configs struct {
	Debug  bool
	Uri    string `json:"uri" mapstructure:"ACKSTREAM_XSTREAM_URI"`
	Region string `json:"region" mapstructure:"ACKSTREAM_XSTREAM_REGION"`
	Name   string `json:"name" mapstructure:"ACKSTREAM_XSTREAM_NAME"`
	Topic  string `json:"topic" mapstructure:"ACKSTREAM_XSTREAM_TOPIC"`

	MaxMsgs        int64 `json:"max_msg" mapstructure:"ACKSTREAM_XSTREAM_MAX_MSGS"`
	MaxBytes       int64 `json:"max_bytes" mapstructure:"ACKSTREAM_XSTREAM_MAX_BYTES"`
	MaxAge         int32 `json:"max_age" mapstructure:"ACKSTREAM_XSTREAM_MAX_AGE"`
	ConsumerPolicy int   `json:"consumer_policy" mapstructure:"ACKSTREAM_XSTREAM_CONSUMER_POLICY"`
}

func CfgFromContext

func CfgFromContext(ctx context.Context) (*Configs, error)

type Pub

type Pub func(event *entities.Event) (*string, error)

func NewPub

func NewPub(ctx context.Context) (Pub, error)

type Sub

type Sub func(sample *entities.Event, queue string, fn SubscribeFn) error

func NewSub

func NewSub(ctx context.Context) (Sub, error)

type SubscribeFn

type SubscribeFn func(event *entities.Event) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL