kubemq

package module
v1.7.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: Apache-2.0 Imports: 19 Imported by: 197

README

KubeMQ Go SDK

KubeMQ is an enterprise-grade message queue and broker for containers, designed for any workload and architecture running in Kubernetes. This library is Go implementation of KubeMQ client connection.

Install KubeMQ Community Edition

Please visit KubeMQ Community for intallation steps.

Install KubeMQ Go SDK

go get github.com/kubemq-io/kubemq-go

Learn KubeMQ

Visit our Extensive KubeMQ Documentation.

Examples - Cookbook Recipes

Please visit our cookbook repository

Support

if you encounter any issues, please open an issue here, In addition, you can reach us for support by:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTransportDefined    = errors.New("no transport layer defined, create object with client instance")
	ErrNoTransportConnection = errors.New("no transport layer established, aborting")
)

Functions

This section is empty.

Types

type AckAllQueueMessagesRequest added in v1.2.0

type AckAllQueueMessagesRequest struct {
	RequestID       string
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func (*AckAllQueueMessagesRequest) AddTrace added in v1.2.0

func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to ack all receive queue message request

func (*AckAllQueueMessagesRequest) Complete added in v1.5.0

func (*AckAllQueueMessagesRequest) Send added in v1.2.0

Send - sending receive queue messages request , waiting for response or timeout

func (*AckAllQueueMessagesRequest) SetChannel added in v1.2.0

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllQueueMessagesRequest) SetClientId added in v1.2.0

func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest

SetClientId - set ack all queue message request ClientId - mandatory if default client was not set

func (*AckAllQueueMessagesRequest) SetId added in v1.2.0

SetId - set ack all queue message request id, otherwise new random uuid will be set

func (*AckAllQueueMessagesRequest) SetWaitTimeSeconds added in v1.2.0

func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

func (*AckAllQueueMessagesRequest) Validate added in v1.5.0

func (req *AckAllQueueMessagesRequest) Validate() error

type AckAllQueueMessagesResponse added in v1.2.0

type AckAllQueueMessagesResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type Client

type Client struct {
	ServerInfo *ServerInfo
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, op ...Option) (*Client, error)

NewClient - create client instance to be use to communicate with KubeMQ server

func (*Client) AQM added in v1.2.0

AQM - create an empty ack all receive queue messages request object

func (*Client) AckAllQueueMessages added in v1.2.0

func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)

AckAllQueueMessages - send ack all messages in queue

func (*Client) C

func (c *Client) C() *Command

C - create an empty command object

func (*Client) Close

func (c *Client) Close() error

Close - closing client connection. any on going transactions will be aborted

func (*Client) E

func (c *Client) E() *Event

E - create an empty event object

func (*Client) ES

func (c *Client) ES() *EventStore

ES - create an empty event store object

func (*Client) NewAckAllQueueMessagesRequest added in v1.2.0

func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest

NewAckAllQueueMessagesRequest - create an empty ack all receive queue messages request object

func (*Client) NewCommand added in v1.2.0

func (c *Client) NewCommand() *Command

NewCommand - create an empty command

func (*Client) NewEvent added in v1.2.0

func (c *Client) NewEvent() *Event

NewEvent - create an empty event

func (*Client) NewEventStore added in v1.2.0

func (c *Client) NewEventStore() *EventStore

NewEventStore- create an empty event store

func (*Client) NewQuery added in v1.2.0

func (c *Client) NewQuery() *Query

NewQuery - create an empty query

func (*Client) NewQueueMessage added in v1.2.0

func (c *Client) NewQueueMessage() *QueueMessage

NewQueueMessage - create an empty queue messages

func (*Client) NewQueueMessages added in v1.2.0

func (c *Client) NewQueueMessages() *QueueMessages

NewQueueMessages - create an empty queue messages array

func (*Client) NewReceiveQueueMessagesRequest added in v1.2.0

func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

NewReceiveQueueMessagesRequest - create an empty receive queue message request object

func (*Client) NewResponse added in v1.2.0

func (c *Client) NewResponse() *Response

NewResponse - create an empty response

func (*Client) NewStreamQueueMessage added in v1.2.0

func (c *Client) NewStreamQueueMessage() *StreamQueueMessage

NewStreamQueueMessage - create an empty stream receive queue message object

func (*Client) Ping added in v1.3.5

func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)

Ping - get status of current connection

func (*Client) Q

func (c *Client) Q() *Query

Q - create an empty query object

func (*Client) QM added in v1.2.0

func (c *Client) QM() *QueueMessage

QM - create an empty queue message object

func (*Client) QMB added in v1.2.0

func (c *Client) QMB() *QueueMessages

QMB - create an empty queue message array object

func (*Client) QueuesInfo added in v1.7.0

func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

QueuesInfo - get queues detailed information

func (*Client) R

func (c *Client) R() *Response

R - create an empty response object for command or query responses

func (*Client) RQM added in v1.2.0

RQM - create an empty receive queue message request object

func (*Client) ReceiveQueueMessages added in v1.2.0

ReceiveQueueMessages - call to receive messages from a queue

func (*Client) SQM added in v1.2.0

func (c *Client) SQM() *StreamQueueMessage

SQM - create an empty stream receive queue message object

func (*Client) SendQueueMessage added in v1.2.0

func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)

SendQueueMessage - send single queue message

func (*Client) SendQueueMessages added in v1.2.0

func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)

SendQueueMessages - send multiple queue messages

func (*Client) SetCommand added in v1.4.0

func (c *Client) SetCommand(cmd *Command) *Command

func (*Client) SetEvent added in v1.4.0

func (c *Client) SetEvent(e *Event) *Event

func (*Client) SetEventStore added in v1.4.0

func (c *Client) SetEventStore(es *EventStore) *EventStore

func (*Client) SetQuery added in v1.4.0

func (c *Client) SetQuery(query *Query) *Query

func (*Client) SetQueueMessage added in v1.4.0

func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage

func (*Client) SetResponse added in v1.4.0

func (c *Client) SetResponse(response *Response) *Response

func (*Client) StreamEvents

func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)

StreamEvents - send stream of events in a single call

func (*Client) StreamEventsStore

func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)

StreamEventsStore - send stream of events store in a single call

func (*Client) SubscribeToCommands

func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToCommandsWithRequest added in v1.5.0

func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToEvents

func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToEventsStore

func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)

SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsStoreWithRequest added in v1.5.0

func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)

SubscribeToEventsStoreWithRequest - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsWithRequest added in v1.5.0

func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToQueries

func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

func (*Client) SubscribeToQueriesWithRequest added in v1.5.0

func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

type Command

type Command struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewCommand added in v1.4.0

func NewCommand() *Command

func (*Command) AddTag added in v1.2.0

func (c *Command) AddTag(key, value string) *Command

AddTag - add key value tags to command message

func (*Command) AddTrace added in v1.2.0

func (c *Command) AddTrace(name string) *Trace

AddTrace - add tracing support to command

func (*Command) Send

func (c *Command) Send(ctx context.Context) (*CommandResponse, error)

Send - sending command , waiting for response or timeout

func (*Command) SetBody

func (c *Command) SetBody(body []byte) *Command

SetBody - set command body - mandatory if metadata field is empty

func (*Command) SetChannel

func (c *Command) SetChannel(channel string) *Command

SetChannel - set command channel - mandatory if default channel was not set

func (*Command) SetClientId

func (c *Command) SetClientId(clientId string) *Command

SetClientId - set command ClientId - mandatory if default client was not set

func (*Command) SetId

func (c *Command) SetId(id string) *Command

SetId - set command requestId, otherwise new random uuid will be set

func (*Command) SetMetadata

func (c *Command) SetMetadata(metadata string) *Command

SetMetadata - set command metadata - mandatory if body field is empty

func (*Command) SetTags added in v1.4.1

func (c *Command) SetTags(tags map[string]string) *Command

SetTags - set key value tags to command message

func (*Command) SetTimeout

func (c *Command) SetTimeout(timeout time.Duration) *Command

SetTimeout - set timeout for command to be returned. if timeout expired , send command will result with an error

type CommandReceive

type CommandReceive struct {
	Id         string
	ClientId   string
	Channel    string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type CommandResponse

type CommandResponse struct {
	CommandId        string
	ResponseClientId string
	Executed         bool
	ExecutedAt       time.Time
	Error            string
	Tags             map[string]string
}

type CommandsClient added in v1.5.0

type CommandsClient struct {
	// contains filtered or unexported fields
}

func NewCommandsClient added in v1.5.0

func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error)

func (*CommandsClient) Close added in v1.5.0

func (c *CommandsClient) Close() error

func (*CommandsClient) Response added in v1.5.0

func (c *CommandsClient) Response(ctx context.Context, response *Response) error

func (*CommandsClient) Send added in v1.5.0

func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)

func (*CommandsClient) Subscribe added in v1.5.0

func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error

type CommandsSubscription added in v1.5.0

type CommandsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*CommandsSubscription) Complete added in v1.5.0

func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription

func (*CommandsSubscription) Validate added in v1.5.0

func (cs *CommandsSubscription) Validate() error

type Event

type Event struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEvent added in v1.4.0

func NewEvent() *Event

func (*Event) AddTag added in v1.2.0

func (e *Event) AddTag(key, value string) *Event

AddTag - add key value tags to event message

func (*Event) Send

func (e *Event) Send(ctx context.Context) error

func (*Event) SetBody

func (e *Event) SetBody(body []byte) *Event

SetBody - set event body - mandatory if metadata field was not set

func (*Event) SetChannel

func (e *Event) SetChannel(channel string) *Event

SetChannel - set event channel - mandatory if default channel was not set

func (*Event) SetClientId

func (e *Event) SetClientId(clientId string) *Event

SetClientId - set event ClientId - mandatory if default client was not set

func (*Event) SetId

func (e *Event) SetId(id string) *Event

SetId - set event id otherwise new random uuid will be set

func (*Event) SetMetadata

func (e *Event) SetMetadata(metadata string) *Event

SetMetadata - set event metadata - mandatory if body field was not set

func (*Event) SetTags added in v1.4.1

func (e *Event) SetTags(tags map[string]string) *Event

SetTags - set key value tags to event message

type EventStore

type EventStore struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEventStore added in v1.4.0

func NewEventStore() *EventStore

func (*EventStore) AddTag added in v1.2.0

func (es *EventStore) AddTag(key, value string) *EventStore

AddTag - add key value tags to event store message

func (*EventStore) Send

func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)

Send - sending events store message

func (*EventStore) SetBody

func (es *EventStore) SetBody(body []byte) *EventStore

SetBody - set event store body - mandatory if metadata field was not set

func (*EventStore) SetChannel

func (es *EventStore) SetChannel(channel string) *EventStore

SetChannel - set event store channel - mandatory if default channel was not set

func (*EventStore) SetClientId

func (es *EventStore) SetClientId(clientId string) *EventStore

SetClientId - set event store ClientId - mandatory if default client was not set

func (*EventStore) SetId

func (es *EventStore) SetId(id string) *EventStore

SetId - set event store id otherwise new random uuid will be set

func (*EventStore) SetMetadata

func (es *EventStore) SetMetadata(metadata string) *EventStore

SetMetadata - set event store metadata - mandatory if body field was not set

func (*EventStore) SetTags added in v1.4.1

func (es *EventStore) SetTags(tags map[string]string) *EventStore

SetTags - set key value tags to event store message

type EventStoreReceive

type EventStoreReceive struct {
	Id        string
	Sequence  uint64
	Timestamp time.Time
	Channel   string
	Metadata  string
	Body      []byte
	ClientId  string
	Tags      map[string]string
}

type EventStoreResult

type EventStoreResult struct {
	Id   string
	Sent bool
	Err  error
}

type EventsClient added in v1.5.0

type EventsClient struct {
	// contains filtered or unexported fields
}

func NewEventsClient added in v1.5.0

func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error)

func (*EventsClient) Close added in v1.5.0

func (e *EventsClient) Close() error

func (*EventsClient) Send added in v1.5.0

func (e *EventsClient) Send(ctx context.Context, message *Event) error

func (*EventsClient) Stream added in v1.5.0

func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)

func (*EventsClient) Subscribe added in v1.5.0

func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error

type EventsErrorsHandler added in v1.5.0

type EventsErrorsHandler func(error)

type EventsMessageHandler added in v1.5.0

type EventsMessageHandler func(*Event)

type EventsStoreClient added in v1.5.0

type EventsStoreClient struct {
	// contains filtered or unexported fields
}

func NewEventsStoreClient added in v1.5.0

func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error)

func (*EventsStoreClient) Close added in v1.5.0

func (es *EventsStoreClient) Close() error

func (*EventsStoreClient) Send added in v1.5.0

func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)

func (*EventsStoreClient) Stream added in v1.5.0

func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)

func (*EventsStoreClient) Subscribe added in v1.5.0

func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error

type EventsStoreSubscription added in v1.5.0

type EventsStoreSubscription struct {
	Channel          string
	Group            string
	ClientId         string
	SubscriptionType SubscriptionOption
}

func (*EventsStoreSubscription) Complete added in v1.5.0

func (*EventsStoreSubscription) Validate added in v1.5.0

func (es *EventsStoreSubscription) Validate() error

type EventsSubscription added in v1.5.0

type EventsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*EventsSubscription) Complete added in v1.5.0

func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription

func (*EventsSubscription) Validate added in v1.5.0

func (es *EventsSubscription) Validate() error

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken added in v1.3.2

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect added in v1.4.0

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate added in v1.3.1

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection added in v1.4.0

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on client create

func WithClientId

func WithClientId(id string) Option

WithClientId - set client id to be used in all functions call with this client - mandatory

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel added in v1.3.1

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects added in v1.4.0

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval added in v1.4.0

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

func WithTransportType

func WithTransportType(transportType TransportType) Option

WithTransportType - set client transport type, currently GRPC or Rest

func WithUri added in v1.2.0

func WithUri(uri string) Option

WithUriAddress - set uri address of KubeMQ server

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate added in v1.2.0

func (o *Options) Validate() error

type QueriesClient added in v1.5.0

type QueriesClient struct {
	// contains filtered or unexported fields
}

func NewQueriesClient added in v1.5.0

func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error)

func (*QueriesClient) Close added in v1.5.0

func (q *QueriesClient) Close() error

func (*QueriesClient) Response added in v1.5.0

func (q *QueriesClient) Response(ctx context.Context, response *Response) error

func (*QueriesClient) Send added in v1.5.0

func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)

func (*QueriesClient) Subscribe added in v1.5.0

func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, onQueryReceive func(query *QueryReceive, err error)) error

type QueriesSubscription added in v1.5.0

type QueriesSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*QueriesSubscription) Complete added in v1.5.0

func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription

func (*QueriesSubscription) Validate added in v1.5.0

func (qs *QueriesSubscription) Validate() error

type Query

type Query struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	CacheKey string
	CacheTTL time.Duration
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewQuery added in v1.4.0

func NewQuery() *Query

func (*Query) AddTag added in v1.2.0

func (q *Query) AddTag(key, value string) *Query

AddTag - add key value tags to query message

func (*Query) AddTrace added in v1.2.0

func (q *Query) AddTrace(name string) *Trace

AddTrace - add tracing support to query

func (*Query) Send

func (q *Query) Send(ctx context.Context) (*QueryResponse, error)

Send - sending query request , waiting for response or timeout

func (*Query) SetBody

func (q *Query) SetBody(body []byte) *Query

SetBody - set query body - mandatory if metadata field is empty

func (*Query) SetCacheKey

func (q *Query) SetCacheKey(cacheKey string) *Query

SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests

func (*Query) SetCacheTTL

func (q *Query) SetCacheTTL(ttl time.Duration) *Query

SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set

func (*Query) SetChannel

func (q *Query) SetChannel(channel string) *Query

SetChannel - set query channel - mandatory if default channel was not set

func (*Query) SetClientId

func (q *Query) SetClientId(clientId string) *Query

SetClientId - set query ClientId - mandatory if default client was not set

func (*Query) SetId

func (q *Query) SetId(id string) *Query

SetId - set query requestId, otherwise new random uuid will be set

func (*Query) SetMetadata

func (q *Query) SetMetadata(metadata string) *Query

SetMetadata - set query metadata - mandatory if body field is empty

func (*Query) SetTags added in v1.4.1

func (q *Query) SetTags(tags map[string]string) *Query

SetTags - set key value tags to query message

func (*Query) SetTimeout

func (q *Query) SetTimeout(timeout time.Duration) *Query

SetTimeout - set timeout for query to be returned. if timeout expired , send query will result with an error

type QueryReceive

type QueryReceive struct {
	Id         string
	Channel    string
	ClientId   string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type QueryResponse

type QueryResponse struct {
	QueryId          string
	Executed         bool
	ExecutedAt       time.Time
	Metadata         string
	ResponseClientId string
	Body             []byte
	CacheHit         bool
	Error            string
	Tags             map[string]string
}

type QueueInfo added in v1.7.0

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage added in v1.2.0

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage added in v1.4.0

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack added in v1.2.0

func (qm *QueueMessage) Ack() error

ack - sending ack queue message in stream queue message mode

func (*QueueMessage) AddTag added in v1.2.0

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (*QueueMessage) AddTrace added in v1.2.0

func (qm *QueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to queue message

func (*QueueMessage) ExtendVisibility added in v1.2.0

func (qm *QueueMessage) ExtendVisibility(value int32) error

ExtendVisibility - extend the visibility time for the current receive message

func (*QueueMessage) Reject added in v1.2.0

func (qm *QueueMessage) Reject() error

reject - sending reject queue message in stream queue message mode

func (*QueueMessage) Resend added in v1.2.0

func (qm *QueueMessage) Resend(channel string) error

Resend - sending resend

func (*QueueMessage) Send added in v1.2.0

Send - sending queue message request , waiting for response or timeout

func (*QueueMessage) SetBody added in v1.2.0

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel added in v1.2.0

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message channel - mandatory if default channel was not set

func (*QueueMessage) SetClientId added in v1.2.0

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default client was not set

func (*QueueMessage) SetId added in v1.2.0

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata added in v1.2.0

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds added in v1.2.0

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds added in v1.2.0

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount added in v1.2.0

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue added in v1.2.0

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags added in v1.4.1

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueueMessageAttributes added in v1.2.0

type QueueMessageAttributes struct {
	Timestamp         int64
	Sequence          uint64
	MD5OfBody         string
	ReceiveCount      int32
	ReRouted          bool
	ReRoutedFromQueue string
	ExpirationAt      int64
	DelayedTo         int64
}

type QueueMessagePolicy added in v1.2.0

type QueueMessagePolicy struct {
	ExpirationSeconds int32
	DelaySeconds      int32
	MaxReceiveCount   int32
	MaxReceiveQueue   string
}

type QueueMessages added in v1.2.0

type QueueMessages struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (*QueueMessages) Add added in v1.2.0

func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages

Add - adding new queue message to array of messages

func (*QueueMessages) Send added in v1.2.0

Send - sending queue messages array request , waiting for response or timeout

type QueueTransactionMessageRequest added in v1.5.0

type QueueTransactionMessageRequest struct {
	RequestID         string
	ClientID          string
	Channel           string
	VisibilitySeconds int32
	WaitTimeSeconds   int32
}

func NewQueueTransactionMessageRequest added in v1.5.0

func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest

func (*QueueTransactionMessageRequest) Complete added in v1.5.0

func (*QueueTransactionMessageRequest) SetChannel added in v1.5.0

SetChannel - set receive queue transaction message request channel - mandatory if default channel was not set

func (*QueueTransactionMessageRequest) SetClientId added in v1.5.0

SetClientId - set receive queue transaction message request ClientId - mandatory if default client was not set

func (*QueueTransactionMessageRequest) SetId added in v1.5.0

SetId - set receive queue transaction message request id, otherwise new random uuid will be set

func (*QueueTransactionMessageRequest) SetVisibilitySeconds added in v1.5.0

func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest

SetVisibilitySeconds - set receive queue transaction message visibility seconds for hiding message from other clients during processing

func (*QueueTransactionMessageRequest) SetWaitTimeSeconds added in v1.5.0

func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest

SetWaitTimeSeconds - set receive queue transaction message request wait timout for receiving queue message

func (*QueueTransactionMessageRequest) Validate added in v1.5.0

func (req *QueueTransactionMessageRequest) Validate() error

type QueueTransactionMessageResponse added in v1.5.0

type QueueTransactionMessageResponse struct {
	Message *QueueMessage
	// contains filtered or unexported fields
}

func (*QueueTransactionMessageResponse) Ack added in v1.5.0

func (*QueueTransactionMessageResponse) ExtendVisibilitySeconds added in v1.5.0

func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error

func (*QueueTransactionMessageResponse) Reject added in v1.5.0

func (*QueueTransactionMessageResponse) Resend added in v1.5.0

func (qt *QueueTransactionMessageResponse) Resend(channel string) error

func (*QueueTransactionMessageResponse) ResendNewMessage added in v1.5.0

func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error

type QueuesClient added in v1.5.0

type QueuesClient struct {
	// contains filtered or unexported fields
}

func NewQueuesStreamClient added in v1.7.1

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error)

func (*QueuesClient) AckAll added in v1.5.0

func (*QueuesClient) Batch added in v1.5.0

func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)

func (*QueuesClient) Close added in v1.5.0

func (q *QueuesClient) Close() error

func (*QueuesClient) Peek added in v1.5.0

func (*QueuesClient) Pull added in v1.5.0

func (*QueuesClient) QueuesInfo added in v1.7.0

func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

func (*QueuesClient) Send added in v1.5.0

func (*QueuesClient) Subscribe added in v1.5.0

func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, onReceive func(response *ReceiveQueueMessagesResponse, err error)) (chan struct{}, error)

func (*QueuesClient) Transaction added in v1.5.0

func (*QueuesClient) TransactionStream added in v1.5.0

func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, onReceive func(response *QueueTransactionMessageResponse, err error)) (chan struct{}, error)

type QueuesInfo added in v1.7.0

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type ReceiveQueueMessagesRequest added in v1.2.0

type ReceiveQueueMessagesRequest struct {
	RequestID           string
	ClientID            string
	Channel             string
	MaxNumberOfMessages int32
	WaitTimeSeconds     int32
	IsPeak              bool
	// contains filtered or unexported fields
}

func NewReceiveQueueMessagesRequest added in v1.5.0

func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

func (*ReceiveQueueMessagesRequest) AddTrace added in v1.2.0

func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to receive queue message request

func (*ReceiveQueueMessagesRequest) Complete added in v1.5.0

func (*ReceiveQueueMessagesRequest) Send added in v1.2.0

Send - sending receive queue messages request , waiting for response or timeout

func (*ReceiveQueueMessagesRequest) SetChannel added in v1.2.0

SetChannel - set receive queue message request channel - mandatory if default channel was not set

func (*ReceiveQueueMessagesRequest) SetClientId added in v1.2.0

SetClientId - set receive queue message request ClientId - mandatory if default client was not set

func (*ReceiveQueueMessagesRequest) SetId added in v1.2.0

SetId - set receive queue message request id, otherwise new random uuid will be set

func (*ReceiveQueueMessagesRequest) SetIsPeak added in v1.2.0

SetIsPeak - set receive queue message request type, true - peaking at the queue and not actual dequeue , false - dequeue the queue

func (*ReceiveQueueMessagesRequest) SetMaxNumberOfMessages added in v1.2.0

func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest

SetMaxNumberOfMessages - set receive queue message request max number of messages to receive in single call

func (*ReceiveQueueMessagesRequest) SetWaitTimeSeconds added in v1.2.0

func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest

SetWaitTimeSeconds - set receive queue message request wait timout for receiving all requested messages

func (*ReceiveQueueMessagesRequest) Validate added in v1.5.0

func (req *ReceiveQueueMessagesRequest) Validate() error

type ReceiveQueueMessagesResponse added in v1.2.0

type ReceiveQueueMessagesResponse struct {
	RequestID        string
	Messages         []*QueueMessage
	MessagesReceived int32
	MessagesExpired  int32
	IsPeak           bool
	IsError          bool
	Error            string
}

type Response

type Response struct {
	RequestId  string
	ResponseTo string
	Metadata   string
	Body       []byte
	ClientId   string
	ExecutedAt time.Time
	Err        error
	Tags       map[string]string
	// contains filtered or unexported fields
}

func NewResponse added in v1.4.0

func NewResponse() *Response

func (*Response) AddTrace added in v1.2.0

func (r *Response) AddTrace(name string) *Trace

AddTrace - add tracing support to response

func (*Response) Send

func (r *Response) Send(ctx context.Context) error

Send - sending response to command or query request

func (*Response) SetBody

func (r *Response) SetBody(body []byte) *Response

SetMetadata - set body response, for query only

func (*Response) SetClientId

func (r *Response) SetClientId(clientId string) *Response

SetClientID - set clientId response, if not set default clientId will be used

func (*Response) SetError

func (r *Response) SetError(err error) *Response

SetError - set query or command execution error

func (*Response) SetExecutedAt

func (r *Response) SetExecutedAt(executedAt time.Time) *Response

SetExecutedAt - set query or command execution time

func (*Response) SetMetadata

func (r *Response) SetMetadata(metadata string) *Response

SetMetadata - set metadata response, for query only

func (*Response) SetRequestId

func (r *Response) SetRequestId(id string) *Response

SetId - set response corresponded requestId - mandatory

func (*Response) SetResponseTo

func (r *Response) SetResponseTo(channel string) *Response

SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory

func (*Response) SetTags added in v1.2.0

func (r *Response) SetTags(tags map[string]string) *Response

SetTags - set response tags

type SendQueueMessageResult added in v1.2.0

type SendQueueMessageResult struct {
	MessageID    string
	SentAt       int64
	ExpirationAt int64
	DelayedTo    int64
	IsError      bool
	Error        string
}

type ServerInfo added in v1.2.0

type ServerInfo struct {
	Host                string
	Version             string
	ServerStartTime     int64
	ServerUpTimeSeconds int64
}

type StreamQueueMessage added in v1.2.0

type StreamQueueMessage struct {
	RequestID string
	ClientID  string
	Channel   string
	// contains filtered or unexported fields
}

func (*StreamQueueMessage) AddTrace added in v1.2.0

func (req *StreamQueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to stream receive queue message request

func (*StreamQueueMessage) Close added in v1.2.0

func (req *StreamQueueMessage) Close()

Close - end stream of queue messages and cancel all pending operations

func (*StreamQueueMessage) Next added in v1.2.0

func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)

Next - receive queue messages request , waiting for response or timeout

func (*StreamQueueMessage) ResendWithNewMessage added in v1.2.0

func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error

ResendWithNewMessage - resend the current received message to a new channel

func (*StreamQueueMessage) SetChannel added in v1.2.0

func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage

SetChannel - set stream queue message request channel - mandatory if default channel was not set

func (*StreamQueueMessage) SetClientId added in v1.2.0

func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage

SetClientId - set stream queue message request ClientId - mandatory if default client was not set

func (*StreamQueueMessage) SetId added in v1.2.0

SetId - set stream queue message request id, otherwise new random uuid will be set

type SubscriptionOption

type SubscriptionOption interface {
	// contains filtered or unexported methods
}

func StartFromFirstEvent

func StartFromFirstEvent() SubscriptionOption

StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point

func StartFromLastEvent

func StartFromLastEvent() SubscriptionOption

StartFromLastEvent - replay last event and continue stream new events from this point

func StartFromNewEvents

func StartFromNewEvents() SubscriptionOption

StartFromNewEvents - start event store subscription with only new events

func StartFromSequence

func StartFromSequence(sequence int) SubscriptionOption

StartFromSequence - replay events from specific event sequence number and continue stream new events from this point

func StartFromTime

func StartFromTime(since time.Time) SubscriptionOption

StartFromTime - replay events from specific time continue stream new events from this point

func StartFromTimeDelta

func StartFromTimeDelta(delta time.Duration) SubscriptionOption

StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point

type Trace added in v1.2.0

type Trace struct {
	Name string
	// contains filtered or unexported fields
}

func CreateTrace added in v1.2.0

func CreateTrace(name string) *Trace

func (*Trace) AddAnnotation added in v1.2.0

func (t *Trace) AddAnnotation(timestamp time.Time, message string) *Trace

func (*Trace) AddBoolAttribute added in v1.2.0

func (t *Trace) AddBoolAttribute(key string, value bool) *Trace

func (*Trace) AddInt64Attribute added in v1.2.0

func (t *Trace) AddInt64Attribute(key string, value int64) *Trace

func (*Trace) AddStringAttribute added in v1.2.0

func (t *Trace) AddStringAttribute(key string, value string) *Trace

type Transport

type Transport interface {
	Ping(ctx context.Context) (*ServerInfo, error)
	SendEvent(ctx context.Context, event *Event) error
	StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
	SubscribeToEvents(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
	SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error)
	StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
	SubscribeToEventsStore(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
	SendCommand(ctx context.Context, command *Command) (*CommandResponse, error)
	SubscribeToCommands(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
	SendQuery(ctx context.Context, query *Query) (*QueryResponse, error)
	SubscribeToQueries(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
	SendResponse(ctx context.Context, response *Response) error
	SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
	SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
	ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
	AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
	StreamQueueMessage(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, resCh chan *pb.StreamQueueMessagesResponse, errCh chan error, doneCh chan bool)
	QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
	GetGRPCRawClient() (pb.KubemqClient, error)
	Close() error
}

type TransportType

type TransportType int
const (
	TransportTypeGRPC TransportType = iota
	TransportTypeRest
)

Directories

Path Synopsis
examples
pkg
v2

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL