queues_stream

package
v0.0.0-...-3837e7c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AckAllRequest

type AckAllRequest struct {
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func NewAckAllRequest

func NewAckAllRequest() *AckAllRequest

func (*AckAllRequest) SetChannel

func (req *AckAllRequest) SetChannel(channel string) *AckAllRequest

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllRequest) SetClientId

func (req *AckAllRequest) SetClientId(clientId string) *AckAllRequest

func (*AckAllRequest) SetWaitTimeSeconds

func (req *AckAllRequest) SetWaitTimeSeconds(wait int) *AckAllRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

type AckAllResponse

type AckAllResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type GrpcClient

type GrpcClient struct {
	pb.KubemqClient
	// contains filtered or unexported fields
}

func NewGrpcClient

func NewGrpcClient(ctx context.Context, op ...Option) (*GrpcClient, error)

func (*GrpcClient) Close

func (g *GrpcClient) Close() error

func (*GrpcClient) GlobalClientId

func (g *GrpcClient) GlobalClientId() string

func (*GrpcClient) Ping

func (g *GrpcClient) Ping(ctx context.Context) (*pb.PingResult, error)

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on grpcClient create

func WithClientId

func WithClientId(id string) Option

WithClientId - set grpcClient id to be used in all functions call with this grpcClient - mandatory

func WithConnectionNotificationFunc

func WithConnectionNotificationFunc(fn func(msg string)) Option

WithConnectionNotificationFunc - set on connection activity messages

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for grpcClient. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate

func (o *Options) Validate() error

type PollRequest

type PollRequest struct {
	Channel     string `json:"Channel"`
	MaxItems    int    `json:"max_items"`
	WaitTimeout int    `json:"wait_timeout"`
	AutoAck     bool   `json:"auto_ack"`
	OnErrorFunc func(err error)
	OnComplete  func()
}

PollRequest - Request parameters for Poll function

func NewPollRequest

func NewPollRequest() *PollRequest

func (*PollRequest) SetAutoAck

func (p *PollRequest) SetAutoAck(autoAck bool) *PollRequest

func (*PollRequest) SetChannel

func (p *PollRequest) SetChannel(channel string) *PollRequest

func (*PollRequest) SetMaxItems

func (p *PollRequest) SetMaxItems(maxItems int) *PollRequest

func (*PollRequest) SetOnComplete

func (p *PollRequest) SetOnComplete(onComplete func()) *PollRequest

func (*PollRequest) SetOnErrorFunc

func (p *PollRequest) SetOnErrorFunc(onErrorFunc func(err error)) *PollRequest

func (*PollRequest) SetWaitTimeout

func (p *PollRequest) SetWaitTimeout(waitTimeout int) *PollRequest

type PollResponse

type PollResponse struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (PollResponse) AckAll

func (r PollResponse) AckAll() error

func (PollResponse) AckOffsets

func (r PollResponse) AckOffsets(offsets ...int64) error

func (PollResponse) Close

func (r PollResponse) Close() error

func (*PollResponse) HasMessages

func (p *PollResponse) HasMessages() bool

func (PollResponse) NAckAll

func (r PollResponse) NAckAll() error

func (PollResponse) NAckOffsets

func (r PollResponse) NAckOffsets(offsets ...int64) error

func (PollResponse) ReQueueAll

func (r PollResponse) ReQueueAll(channel string) error

func (PollResponse) ReQueueOffsets

func (r PollResponse) ReQueueOffsets(channel string, offsets ...int64) error

type QueueInfo

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack

func (qm *QueueMessage) Ack() error

func (QueueMessage) AckAll

func (r QueueMessage) AckAll() error

func (QueueMessage) AckOffsets

func (r QueueMessage) AckOffsets(offsets ...int64) error

func (*QueueMessage) AddTag

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (QueueMessage) Close

func (r QueueMessage) Close() error

func (*QueueMessage) NAck

func (qm *QueueMessage) NAck() error

func (QueueMessage) NAckAll

func (r QueueMessage) NAckAll() error

func (QueueMessage) NAckOffsets

func (r QueueMessage) NAckOffsets(offsets ...int64) error

func (*QueueMessage) ReQueue

func (qm *QueueMessage) ReQueue(channel string) error

func (QueueMessage) ReQueueAll

func (r QueueMessage) ReQueueAll(channel string) error

func (QueueMessage) ReQueueOffsets

func (r QueueMessage) ReQueueOffsets(channel string, offsets ...int64) error

func (*QueueMessage) SetBody

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message Channel - mandatory if default Channel was not set

func (*QueueMessage) SetClientId

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default grpcClient was not set

func (*QueueMessage) SetId

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueuesInfo

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type QueuesStreamClient

type QueuesStreamClient struct {
	// contains filtered or unexported fields
}

func NewQueuesStreamClient

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesStreamClient, error)

func (*QueuesStreamClient) AckAll

func (q *QueuesStreamClient) AckAll(ctx context.Context, request *AckAllRequest) (*AckAllResponse, error)

func (*QueuesStreamClient) Close

func (q *QueuesStreamClient) Close() error

func (*QueuesStreamClient) Poll

func (q *QueuesStreamClient) Poll(ctx context.Context, request *PollRequest) (*PollResponse, error)

func (*QueuesStreamClient) QueuesInfo

func (q *QueuesStreamClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

func (*QueuesStreamClient) Send

func (q *QueuesStreamClient) Send(ctx context.Context, messages ...*QueueMessage) (*SendResult, error)

type SendResult

type SendResult struct {
	Results []*pb.SendQueueMessageResult
}

type SubscribeRequest

type SubscribeRequest struct {
	Channels    []string
	MaxItems    int  `json:"max_items"`
	WaitTimeout int  `json:"wait_timeout"`
	AutoAck     bool `json:"auto_ack"`
}

func NewSubscribeRequest

func NewSubscribeRequest() *SubscribeRequest

func (*SubscribeRequest) SetAutoAck

func (s *SubscribeRequest) SetAutoAck(autoAck bool) *SubscribeRequest

func (*SubscribeRequest) SetChannels

func (s *SubscribeRequest) SetChannels(channels ...string) *SubscribeRequest

func (*SubscribeRequest) SetMaxItems

func (s *SubscribeRequest) SetMaxItems(maxItems int) *SubscribeRequest

func (*SubscribeRequest) SetWaitTimeout

func (s *SubscribeRequest) SetWaitTimeout(waitTimeout int) *SubscribeRequest

type SubscribeResponse

type SubscribeResponse struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func NewSubscribeResponse

func NewSubscribeResponse() *SubscribeResponse

func (SubscribeResponse) AckAll

func (r SubscribeResponse) AckAll() error

func (SubscribeResponse) AckOffsets

func (r SubscribeResponse) AckOffsets(offsets ...int64) error

func (SubscribeResponse) Close

func (r SubscribeResponse) Close() error

func (SubscribeResponse) NAckAll

func (r SubscribeResponse) NAckAll() error

func (SubscribeResponse) NAckOffsets

func (r SubscribeResponse) NAckOffsets(offsets ...int64) error

func (SubscribeResponse) ReQueueAll

func (r SubscribeResponse) ReQueueAll(channel string) error

func (SubscribeResponse) ReQueueOffsets

func (r SubscribeResponse) ReQueueOffsets(channel string, offsets ...int64) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL