Versions in this module Expand all Collapse all v1 v1.7.8 Nov 10, 2022 Changes in this version + var ErrNoTransportConnection = errors.New("no transport layer established, aborting") + var ErrNoTransportDefined = errors.New("no transport layer defined, create object with client instance") + type AckAllQueueMessagesRequest struct + Channel string + ClientID string + RequestID string + WaitTimeSeconds int32 + func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace + func (req *AckAllQueueMessagesRequest) Complete(opts *Options) *AckAllQueueMessagesRequest + func (req *AckAllQueueMessagesRequest) Send(ctx context.Context) (*AckAllQueueMessagesResponse, error) + func (req *AckAllQueueMessagesRequest) SetChannel(channel string) *AckAllQueueMessagesRequest + func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest + func (req *AckAllQueueMessagesRequest) SetId(id string) *AckAllQueueMessagesRequest + func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest + func (req *AckAllQueueMessagesRequest) Validate() error + type AckAllQueueMessagesResponse struct + AffectedMessages uint64 + Error string + IsError bool + RequestID string + type Client struct + ServerInfo *ServerInfo + func NewClient(ctx context.Context, op ...Option) (*Client, error) + func (c *Client) AQM() *AckAllQueueMessagesRequest + func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error) + func (c *Client) C() *Command + func (c *Client) Close() error + func (c *Client) E() *Event + func (c *Client) ES() *EventStore + func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest + func (c *Client) NewCommand() *Command + func (c *Client) NewEvent() *Event + func (c *Client) NewEventStore() *EventStore + func (c *Client) NewQuery() *Query + func (c *Client) NewQueueMessage() *QueueMessage + func (c *Client) NewQueueMessages() *QueueMessages + func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest + func (c *Client) NewResponse() *Response + func (c *Client) NewStreamQueueMessage() *StreamQueueMessage + func (c *Client) Ping(ctx context.Context) (*ServerInfo, error) + func (c *Client) Q() *Query + func (c *Client) QM() *QueueMessage + func (c *Client) QMB() *QueueMessages + func (c *Client) QueuesClient() *QueuesClient + func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error) + func (c *Client) R() *Response + func (c *Client) RQM() *ReceiveQueueMessagesRequest + func (c *Client) ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error) + func (c *Client) SQM() *StreamQueueMessage + func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error) + func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error) + func (c *Client) SetCommand(cmd *Command) *Command + func (c *Client) SetEvent(e *Event) *Event + func (c *Client) SetEventStore(es *EventStore) *EventStore + func (c *Client) SetQuery(query *Query) *Query + func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage + func (c *Client) SetResponse(response *Response) *Response + func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error) + func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, ...) + func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error) + func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error) + func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error) + func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, ...) (<-chan *EventStoreReceive, error) + func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error) + func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error) + func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error) + func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error) + type Command struct + Body []byte + Channel string + ClientId string + Id string + Metadata string + Tags map[string]string + Timeout time.Duration + func NewCommand() *Command + func (c *Command) AddTag(key, value string) *Command + func (c *Command) AddTrace(name string) *Trace + func (c *Command) Send(ctx context.Context) (*CommandResponse, error) + func (c *Command) SetBody(body []byte) *Command + func (c *Command) SetChannel(channel string) *Command + func (c *Command) SetClientId(clientId string) *Command + func (c *Command) SetId(id string) *Command + func (c *Command) SetMetadata(metadata string) *Command + func (c *Command) SetTags(tags map[string]string) *Command + func (c *Command) SetTimeout(timeout time.Duration) *Command + type CommandReceive struct + Body []byte + Channel string + ClientId string + Id string + Metadata string + ResponseTo string + Tags map[string]string + type CommandResponse struct + CommandId string + Error string + Executed bool + ExecutedAt time.Time + ResponseClientId string + Tags map[string]string + type CommandsClient struct + func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error) + func (c *CommandsClient) Close() error + func (c *CommandsClient) Response(ctx context.Context, response *Response) error + func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error) + func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, ...) error + type CommandsSubscription struct + Channel string + ClientId string + Group string + func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription + func (cs *CommandsSubscription) Validate() error + type Event struct + Body []byte + Channel string + ClientId string + Id string + Metadata string + Tags map[string]string + func NewEvent() *Event + func (e *Event) AddTag(key, value string) *Event + func (e *Event) Send(ctx context.Context) error + func (e *Event) SetBody(body []byte) *Event + func (e *Event) SetChannel(channel string) *Event + func (e *Event) SetClientId(clientId string) *Event + func (e *Event) SetId(id string) *Event + func (e *Event) SetMetadata(metadata string) *Event + func (e *Event) SetTags(tags map[string]string) *Event + type EventStore struct + Body []byte + Channel string + ClientId string + Id string + Metadata string + Tags map[string]string + func NewEventStore() *EventStore + func (es *EventStore) AddTag(key, value string) *EventStore + func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error) + func (es *EventStore) SetBody(body []byte) *EventStore + func (es *EventStore) SetChannel(channel string) *EventStore + func (es *EventStore) SetClientId(clientId string) *EventStore + func (es *EventStore) SetId(id string) *EventStore + func (es *EventStore) SetMetadata(metadata string) *EventStore + func (es *EventStore) SetTags(tags map[string]string) *EventStore + type EventStoreReceive struct + Body []byte + Channel string + ClientId string + Id string + Metadata string + Sequence uint64 + Tags map[string]string + Timestamp time.Time + type EventStoreResult struct + Err error + Id string + Sent bool + type EventsClient struct + func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error) + func (e *EventsClient) Close() error + func (e *EventsClient) Send(ctx context.Context, message *Event) error + func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error) + func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, ...) error + type EventsErrorsHandler func(error) + type EventsMessageHandler func(*Event) + type EventsStoreClient struct + func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error) + func (es *EventsStoreClient) Close() error + func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error) + func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error) + func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, ...) error + type EventsStoreSubscription struct + Channel string + ClientId string + Group string + SubscriptionType SubscriptionOption + func (es *EventsStoreSubscription) Complete(opts *Options) *EventsStoreSubscription + func (es *EventsStoreSubscription) Validate() error + type EventsSubscription struct + Channel string + ClientId string + Group string + func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription + func (es *EventsSubscription) Validate() error + type Option interface + func WithAddress(host string, port int) Option + func WithAuthToken(token string) Option + func WithAutoReconnect(value bool) Option + func WithCertificate(certData, serverOverrideDomain string) Option + func WithCheckConnection(value bool) Option + func WithClientId(id string) Option + func WithCredentials(certFile, serverOverrideDomain string) Option + func WithDefaultCacheTTL(ttl time.Duration) Option + func WithDefaultChannel(channel string) Option + func WithMaxReconnects(value int) Option + func WithReceiveBufferSize(size int) Option + func WithReconnectInterval(duration time.Duration) Option + func WithTransportType(transportType TransportType) Option + func WithUri(uri string) Option + type Options struct + func GetDefaultOptions() *Options + func (o *Options) Validate() error + type QueriesClient struct + func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error) + func (q *QueriesClient) Close() error + func (q *QueriesClient) Response(ctx context.Context, response *Response) error + func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error) + func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, ...) error + type QueriesSubscription struct + Channel string + ClientId string + Group string + func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription + func (qs *QueriesSubscription) Validate() error + type Query struct + Body []byte + CacheKey string + CacheTTL time.Duration + Channel string + ClientId string + Id string + Metadata string + Tags map[string]string + Timeout time.Duration + func NewQuery() *Query + func (q *Query) AddTag(key, value string) *Query + func (q *Query) AddTrace(name string) *Trace + func (q *Query) Send(ctx context.Context) (*QueryResponse, error) + func (q *Query) SetBody(body []byte) *Query + func (q *Query) SetCacheKey(cacheKey string) *Query + func (q *Query) SetCacheTTL(ttl time.Duration) *Query + func (q *Query) SetChannel(channel string) *Query + func (q *Query) SetClientId(clientId string) *Query + func (q *Query) SetId(id string) *Query + func (q *Query) SetMetadata(metadata string) *Query + func (q *Query) SetTags(tags map[string]string) *Query + func (q *Query) SetTimeout(timeout time.Duration) *Query + type QueryReceive struct + Body []byte + Channel string + ClientId string + Id string + Metadata string + ResponseTo string + Tags map[string]string + type QueryResponse struct + Body []byte + CacheHit bool + Error string + Executed bool + ExecutedAt time.Time + Metadata string + QueryId string + ResponseClientId string + Tags map[string]string + type QueueInfo struct + Bytes int64 + Delivered int64 + FirstSequence int64 + LastSequence int64 + Messages int64 + Name string + Sent int64 + Subscribers int + Waiting int64 + type QueueMessage struct + func NewQueueMessage() *QueueMessage + func (qm *QueueMessage) Ack() error + func (qm *QueueMessage) AddTag(key, value string) *QueueMessage + func (qm *QueueMessage) AddTrace(name string) *Trace + func (qm *QueueMessage) ExtendVisibility(value int32) error + func (qm *QueueMessage) Reject() error + func (qm *QueueMessage) Resend(channel string) error + func (qm *QueueMessage) Send(ctx context.Context) (*SendQueueMessageResult, error) + func (qm *QueueMessage) SetBody(body []byte) *QueueMessage + func (qm *QueueMessage) SetChannel(channel string) *QueueMessage + func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage + func (qm *QueueMessage) SetId(id string) *QueueMessage + func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage + func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage + func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage + func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage + func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage + func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage + type QueueMessageAttributes struct + DelayedTo int64 + ExpirationAt int64 + MD5OfBody string + ReRouted bool + ReRoutedFromQueue string + ReceiveCount int32 + Sequence uint64 + Timestamp int64 + type QueueMessagePolicy struct + DelaySeconds int32 + ExpirationSeconds int32 + MaxReceiveCount int32 + MaxReceiveQueue string + type QueueMessages struct + Messages []*QueueMessage + func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages + func (qma *QueueMessages) Send(ctx context.Context) ([]*SendQueueMessageResult, error) + type QueueTransactionMessageRequest struct + Channel string + ClientID string + RequestID string + VisibilitySeconds int32 + WaitTimeSeconds int32 + func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) Complete(opts *Options) *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) SetChannel(channel string) *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) SetClientId(clientId string) *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) SetId(id string) *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest + func (req *QueueTransactionMessageRequest) Validate() error + type QueueTransactionMessageResponse struct + Message *QueueMessage + func (qt *QueueTransactionMessageResponse) Ack() error + func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error + func (qt *QueueTransactionMessageResponse) Reject() error + func (qt *QueueTransactionMessageResponse) Resend(channel string) error + func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error + type QueuesClient struct + func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error) + func (q *QueuesClient) AckAll(ctx context.Context, request *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error) + func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error) + func (q *QueuesClient) Close() error + func (q *QueuesClient) Peek(ctx context.Context, request *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error) + func (q *QueuesClient) Pull(ctx context.Context, request *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error) + func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error) + func (q *QueuesClient) Send(ctx context.Context, message *QueueMessage) (*SendQueueMessageResult, error) + func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, ...) (chan struct{}, error) + func (q *QueuesClient) Transaction(ctx context.Context, request *QueueTransactionMessageRequest) (*QueueTransactionMessageResponse, error) + func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, ...) (chan struct{}, error) + type QueuesInfo struct + Delivered int64 + Queues []*QueueInfo + Sent int64 + TotalQueues int + Waiting int64 + type ReceiveQueueMessagesRequest struct + Channel string + ClientID string + IsPeak bool + MaxNumberOfMessages int32 + RequestID string + WaitTimeSeconds int32 + func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace + func (req *ReceiveQueueMessagesRequest) Complete(opts *Options) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) Send(ctx context.Context) (*ReceiveQueueMessagesResponse, error) + func (req *ReceiveQueueMessagesRequest) SetChannel(channel string) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) SetClientId(clientId string) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) SetId(id string) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) SetIsPeak(value bool) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest + func (req *ReceiveQueueMessagesRequest) Validate() error + type ReceiveQueueMessagesResponse struct + Error string + IsError bool + IsPeak bool + Messages []*QueueMessage + MessagesExpired int32 + MessagesReceived int32 + RequestID string + type Response struct + Body []byte + ClientId string + Err error + ExecutedAt time.Time + Metadata string + RequestId string + ResponseTo string + Tags map[string]string + func NewResponse() *Response + func (r *Response) AddTrace(name string) *Trace + func (r *Response) Send(ctx context.Context) error + func (r *Response) SetBody(body []byte) *Response + func (r *Response) SetClientId(clientId string) *Response + func (r *Response) SetError(err error) *Response + func (r *Response) SetExecutedAt(executedAt time.Time) *Response + func (r *Response) SetMetadata(metadata string) *Response + func (r *Response) SetRequestId(id string) *Response + func (r *Response) SetResponseTo(channel string) *Response + func (r *Response) SetTags(tags map[string]string) *Response + type SendQueueMessageResult struct + DelayedTo int64 + Error string + ExpirationAt int64 + IsError bool + MessageID string + SentAt int64 + type ServerInfo struct + Host string + ServerStartTime int64 + ServerUpTimeSeconds int64 + Version string + type StreamQueueMessage struct + Channel string + ClientID string + RequestID string + func (req *StreamQueueMessage) AddTrace(name string) *Trace + func (req *StreamQueueMessage) Close() + func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error) + func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error + func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage + func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage + func (req *StreamQueueMessage) SetId(id string) *StreamQueueMessage + type SubscriptionOption interface + func StartFromFirstEvent() SubscriptionOption + func StartFromLastEvent() SubscriptionOption + func StartFromNewEvents() SubscriptionOption + func StartFromSequence(sequence int) SubscriptionOption + func StartFromTime(since time.Time) SubscriptionOption + func StartFromTimeDelta(delta time.Duration) SubscriptionOption + type Trace struct + Name string + func CreateTrace(name string) *Trace + func (t *Trace) AddAnnotation(timestamp time.Time, message string) *Trace + func (t *Trace) AddBoolAttribute(key string, value bool) *Trace + func (t *Trace) AddInt64Attribute(key string, value int64) *Trace + func (t *Trace) AddStringAttribute(key string, value string) *Trace + type Transport interface + AckAllQueueMessages func(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error) + Close func() error + GetGRPCRawClient func() (pb.KubemqClient, error) + Ping func(ctx context.Context) (*ServerInfo, error) + QueuesInfo func(ctx context.Context, filter string) (*QueuesInfo, error) + ReceiveQueueMessages func(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error) + SendCommand func(ctx context.Context, command *Command) (*CommandResponse, error) + SendEvent func(ctx context.Context, event *Event) error + SendEventStore func(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error) + SendQuery func(ctx context.Context, query *Query) (*QueryResponse, error) + SendQueueMessage func(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error) + SendQueueMessages func(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error) + SendResponse func(ctx context.Context, response *Response) error + StreamEvents func(ctx context.Context, eventsCh chan *Event, errCh chan error) + StreamEventsStore func(ctx context.Context, eventsCh chan *EventStore, ...) + StreamQueueMessage func(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, ...) + SubscribeToCommands func(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error) + SubscribeToEvents func(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error) + SubscribeToEventsStore func(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error) + SubscribeToQueries func(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error) + type TransportType int + const TransportTypeGRPC + const TransportTypeRest