kubemq

package module
v0.0.0-...-3837e7c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2022 License: Apache-2.0 Imports: 19 Imported by: 0

README

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoTransportDefined    = errors.New("no transport layer defined, create object with client instance")
	ErrNoTransportConnection = errors.New("no transport layer established, aborting")
)

Functions

This section is empty.

Types

type AckAllQueueMessagesRequest

type AckAllQueueMessagesRequest struct {
	RequestID       string
	ClientID        string
	Channel         string
	WaitTimeSeconds int32
	// contains filtered or unexported fields
}

func (*AckAllQueueMessagesRequest) AddTrace

func (req *AckAllQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to ack all receive queue message request

func (*AckAllQueueMessagesRequest) Complete

func (*AckAllQueueMessagesRequest) Send

Send - sending receive queue messages request , waiting for response or timeout

func (*AckAllQueueMessagesRequest) SetChannel

SetChannel - set ack all queue message request channel - mandatory if default channel was not set

func (*AckAllQueueMessagesRequest) SetClientId

func (req *AckAllQueueMessagesRequest) SetClientId(clientId string) *AckAllQueueMessagesRequest

SetClientId - set ack all queue message request ClientId - mandatory if default client was not set

func (*AckAllQueueMessagesRequest) SetId

SetId - set ack all queue message request id, otherwise new random uuid will be set

func (*AckAllQueueMessagesRequest) SetWaitTimeSeconds

func (req *AckAllQueueMessagesRequest) SetWaitTimeSeconds(wait int) *AckAllQueueMessagesRequest

SetWaitTimeSeconds - set ack all queue message request wait timout

func (*AckAllQueueMessagesRequest) Validate

func (req *AckAllQueueMessagesRequest) Validate() error

type AckAllQueueMessagesResponse

type AckAllQueueMessagesResponse struct {
	RequestID        string
	AffectedMessages uint64
	IsError          bool
	Error            string
}

type Client

type Client struct {
	ServerInfo *ServerInfo
	// contains filtered or unexported fields
}

func NewClient

func NewClient(ctx context.Context, op ...Option) (*Client, error)

NewClient - create client instance to be use to communicate with KubeMQ server

func (*Client) AQM

AQM - create an empty ack all receive queue messages request object

func (*Client) AckAllQueueMessages

func (c *Client) AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)

AckAllQueueMessages - send ack all messages in queue

func (*Client) C

func (c *Client) C() *Command

C - create an empty command object

func (*Client) Close

func (c *Client) Close() error

Close - closing client connection. any on going transactions will be aborted

func (*Client) E

func (c *Client) E() *Event

E - create an empty event object

func (*Client) ES

func (c *Client) ES() *EventStore

ES - create an empty event store object

func (*Client) NewAckAllQueueMessagesRequest

func (c *Client) NewAckAllQueueMessagesRequest() *AckAllQueueMessagesRequest

NewAckAllQueueMessagesRequest - create an empty ack all receive queue messages request object

func (*Client) NewCommand

func (c *Client) NewCommand() *Command

NewCommand - create an empty command

func (*Client) NewEvent

func (c *Client) NewEvent() *Event

NewEvent - create an empty event

func (*Client) NewEventStore

func (c *Client) NewEventStore() *EventStore

NewEventStore- create an empty event store

func (*Client) NewQuery

func (c *Client) NewQuery() *Query

NewQuery - create an empty query

func (*Client) NewQueueMessage

func (c *Client) NewQueueMessage() *QueueMessage

NewQueueMessage - create an empty queue messages

func (*Client) NewQueueMessages

func (c *Client) NewQueueMessages() *QueueMessages

NewQueueMessages - create an empty queue messages array

func (*Client) NewReceiveQueueMessagesRequest

func (c *Client) NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

NewReceiveQueueMessagesRequest - create an empty receive queue message request object

func (*Client) NewResponse

func (c *Client) NewResponse() *Response

NewResponse - create an empty response

func (*Client) NewStreamQueueMessage

func (c *Client) NewStreamQueueMessage() *StreamQueueMessage

NewStreamQueueMessage - create an empty stream receive queue message object

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) (*ServerInfo, error)

Ping - get status of current connection

func (*Client) Q

func (c *Client) Q() *Query

Q - create an empty query object

func (*Client) QM

func (c *Client) QM() *QueueMessage

QM - create an empty queue message object

func (*Client) QMB

func (c *Client) QMB() *QueueMessages

QMB - create an empty queue message array object

func (*Client) QueuesInfo

func (c *Client) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

QueuesInfo - get queues detailed information

func (*Client) R

func (c *Client) R() *Response

R - create an empty response object for command or query responses

func (*Client) RQM

RQM - create an empty receive queue message request object

func (*Client) ReceiveQueueMessages

ReceiveQueueMessages - call to receive messages from a queue

func (*Client) SQM

func (c *Client) SQM() *StreamQueueMessage

SQM - create an empty stream receive queue message object

func (*Client) SendQueueMessage

func (c *Client) SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)

SendQueueMessage - send single queue message

func (*Client) SendQueueMessages

func (c *Client) SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)

SendQueueMessages - send multiple queue messages

func (*Client) SetCommand

func (c *Client) SetCommand(cmd *Command) *Command

func (*Client) SetEvent

func (c *Client) SetEvent(e *Event) *Event

func (*Client) SetEventStore

func (c *Client) SetEventStore(es *EventStore) *EventStore

func (*Client) SetQuery

func (c *Client) SetQuery(query *Query) *Query

func (*Client) SetQueueMessage

func (c *Client) SetQueueMessage(qm *QueueMessage) *QueueMessage

func (*Client) SetResponse

func (c *Client) SetResponse(response *Response) *Response

func (*Client) StreamEvents

func (c *Client) StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)

StreamEvents - send stream of events in a single call

func (*Client) StreamEventsStore

func (c *Client) StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)

StreamEventsStore - send stream of events store in a single call

func (*Client) SubscribeToCommands

func (c *Client) SubscribeToCommands(ctx context.Context, channel, group string, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToCommandsWithRequest

func (c *Client) SubscribeToCommandsWithRequest(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)

SubscribeToCommands - subscribe to commands requests by channel and group. return channel of CommandReceived or en error

func (*Client) SubscribeToEvents

func (c *Client) SubscribeToEvents(ctx context.Context, channel, group string, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToEventsStore

func (c *Client) SubscribeToEventsStore(ctx context.Context, channel, group string, errCh chan error, opt SubscriptionOption) (<-chan *EventStoreReceive, error)

SubscribeToEventsStore - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsStoreWithRequest

func (c *Client) SubscribeToEventsStoreWithRequest(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)

SubscribeToEventsStoreWithRequest - subscribe to events store by channel and group with subscription option. return channel of events or en error

func (*Client) SubscribeToEventsWithRequest

func (c *Client) SubscribeToEventsWithRequest(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)

SubscribeToEvents - subscribe to events by channel and group. return channel of events or en error

func (*Client) SubscribeToQueries

func (c *Client) SubscribeToQueries(ctx context.Context, channel, group string, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

func (*Client) SubscribeToQueriesWithRequest

func (c *Client) SubscribeToQueriesWithRequest(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)

SubscribeToQueries - subscribe to queries requests by channel and group. return channel of QueryReceived or en error

type Command

type Command struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewCommand

func NewCommand() *Command

func (*Command) AddTag

func (c *Command) AddTag(key, value string) *Command

AddTag - add key value tags to command message

func (*Command) AddTrace

func (c *Command) AddTrace(name string) *Trace

AddTrace - add tracing support to command

func (*Command) Send

func (c *Command) Send(ctx context.Context) (*CommandResponse, error)

Send - sending command , waiting for response or timeout

func (*Command) SetBody

func (c *Command) SetBody(body []byte) *Command

SetBody - set command body - mandatory if metadata field is empty

func (*Command) SetChannel

func (c *Command) SetChannel(channel string) *Command

SetChannel - set command channel - mandatory if default channel was not set

func (*Command) SetClientId

func (c *Command) SetClientId(clientId string) *Command

SetClientId - set command ClientId - mandatory if default client was not set

func (*Command) SetId

func (c *Command) SetId(id string) *Command

SetId - set command requestId, otherwise new random uuid will be set

func (*Command) SetMetadata

func (c *Command) SetMetadata(metadata string) *Command

SetMetadata - set command metadata - mandatory if body field is empty

func (*Command) SetTags

func (c *Command) SetTags(tags map[string]string) *Command

SetTags - set key value tags to command message

func (*Command) SetTimeout

func (c *Command) SetTimeout(timeout time.Duration) *Command

SetTimeout - set timeout for command to be returned. if timeout expired , send command will result with an error

type CommandReceive

type CommandReceive struct {
	Id         string
	ClientId   string
	Channel    string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type CommandResponse

type CommandResponse struct {
	CommandId        string
	ResponseClientId string
	Executed         bool
	ExecutedAt       time.Time
	Error            string
	Tags             map[string]string
}

type CommandsClient

type CommandsClient struct {
	// contains filtered or unexported fields
}

func NewCommandsClient

func NewCommandsClient(ctx context.Context, op ...Option) (*CommandsClient, error)

func (*CommandsClient) Close

func (c *CommandsClient) Close() error

func (*CommandsClient) Response

func (c *CommandsClient) Response(ctx context.Context, response *Response) error

func (*CommandsClient) Send

func (c *CommandsClient) Send(ctx context.Context, request *Command) (*CommandResponse, error)

func (*CommandsClient) Subscribe

func (c *CommandsClient) Subscribe(ctx context.Context, request *CommandsSubscription, onCommandReceive func(cmd *CommandReceive, err error)) error

type CommandsSubscription

type CommandsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*CommandsSubscription) Complete

func (cs *CommandsSubscription) Complete(opts *Options) *CommandsSubscription

func (*CommandsSubscription) Validate

func (cs *CommandsSubscription) Validate() error

type Event

type Event struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent() *Event

func (*Event) AddTag

func (e *Event) AddTag(key, value string) *Event

AddTag - add key value tags to event message

func (*Event) Send

func (e *Event) Send(ctx context.Context) error

func (*Event) SetBody

func (e *Event) SetBody(body []byte) *Event

SetBody - set event body - mandatory if metadata field was not set

func (*Event) SetChannel

func (e *Event) SetChannel(channel string) *Event

SetChannel - set event channel - mandatory if default channel was not set

func (*Event) SetClientId

func (e *Event) SetClientId(clientId string) *Event

SetClientId - set event ClientId - mandatory if default client was not set

func (*Event) SetId

func (e *Event) SetId(id string) *Event

SetId - set event id otherwise new random uuid will be set

func (*Event) SetMetadata

func (e *Event) SetMetadata(metadata string) *Event

SetMetadata - set event metadata - mandatory if body field was not set

func (*Event) SetTags

func (e *Event) SetTags(tags map[string]string) *Event

SetTags - set key value tags to event message

type EventStore

type EventStore struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	ClientId string
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewEventStore

func NewEventStore() *EventStore

func (*EventStore) AddTag

func (es *EventStore) AddTag(key, value string) *EventStore

AddTag - add key value tags to event store message

func (*EventStore) Send

func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error)

Send - sending events store message

func (*EventStore) SetBody

func (es *EventStore) SetBody(body []byte) *EventStore

SetBody - set event store body - mandatory if metadata field was not set

func (*EventStore) SetChannel

func (es *EventStore) SetChannel(channel string) *EventStore

SetChannel - set event store channel - mandatory if default channel was not set

func (*EventStore) SetClientId

func (es *EventStore) SetClientId(clientId string) *EventStore

SetClientId - set event store ClientId - mandatory if default client was not set

func (*EventStore) SetId

func (es *EventStore) SetId(id string) *EventStore

SetId - set event store id otherwise new random uuid will be set

func (*EventStore) SetMetadata

func (es *EventStore) SetMetadata(metadata string) *EventStore

SetMetadata - set event store metadata - mandatory if body field was not set

func (*EventStore) SetTags

func (es *EventStore) SetTags(tags map[string]string) *EventStore

SetTags - set key value tags to event store message

type EventStoreReceive

type EventStoreReceive struct {
	Id        string
	Sequence  uint64
	Timestamp time.Time
	Channel   string
	Metadata  string
	Body      []byte
	ClientId  string
	Tags      map[string]string
}

type EventStoreResult

type EventStoreResult struct {
	Id   string
	Sent bool
	Err  error
}

type EventsClient

type EventsClient struct {
	// contains filtered or unexported fields
}

func NewEventsClient

func NewEventsClient(ctx context.Context, op ...Option) (*EventsClient, error)

func (*EventsClient) Close

func (e *EventsClient) Close() error

func (*EventsClient) Send

func (e *EventsClient) Send(ctx context.Context, message *Event) error

func (*EventsClient) Stream

func (e *EventsClient) Stream(ctx context.Context, onError func(err error)) (func(msg *Event) error, error)

func (*EventsClient) Subscribe

func (e *EventsClient) Subscribe(ctx context.Context, request *EventsSubscription, onEvent func(msg *Event, err error)) error

type EventsErrorsHandler

type EventsErrorsHandler func(error)

type EventsMessageHandler

type EventsMessageHandler func(*Event)

type EventsStoreClient

type EventsStoreClient struct {
	// contains filtered or unexported fields
}

func NewEventsStoreClient

func NewEventsStoreClient(ctx context.Context, op ...Option) (*EventsStoreClient, error)

func (*EventsStoreClient) Close

func (es *EventsStoreClient) Close() error

func (*EventsStoreClient) Send

func (es *EventsStoreClient) Send(ctx context.Context, message *EventStore) (*EventStoreResult, error)

func (*EventsStoreClient) Stream

func (es *EventsStoreClient) Stream(ctx context.Context, onResult func(result *EventStoreResult, err error)) (func(msg *EventStore) error, error)

func (*EventsStoreClient) Subscribe

func (es *EventsStoreClient) Subscribe(ctx context.Context, request *EventsStoreSubscription, onEvent func(msg *EventStoreReceive, err error)) error

type EventsStoreSubscription

type EventsStoreSubscription struct {
	Channel          string
	Group            string
	ClientId         string
	SubscriptionType SubscriptionOption
}

func (*EventsStoreSubscription) Complete

func (*EventsStoreSubscription) Validate

func (es *EventsStoreSubscription) Validate() error

type EventsSubscription

type EventsSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*EventsSubscription) Complete

func (es *EventsSubscription) Complete(opts *Options) *EventsSubscription

func (*EventsSubscription) Validate

func (es *EventsSubscription) Validate() error

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithAddress

func WithAddress(host string, port int) Option

WithAddress - set host and port address of KubeMQ server

func WithAuthToken

func WithAuthToken(token string) Option

WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection

func WithAutoReconnect

func WithAutoReconnect(value bool) Option

WithAutoReconnect - set automatic reconnection in case of lost connectivity to server

func WithCertificate

func WithCertificate(certData, serverOverrideDomain string) Option

WithCertificate - set secured TLS credentials from the input certificate data for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithCheckConnection

func WithCheckConnection(value bool) Option

WithCheckConnection - set server connectivity on client create

func WithClientId

func WithClientId(id string) Option

WithClientId - set client id to be used in all functions call with this client - mandatory

func WithCredentials

func WithCredentials(certFile, serverOverrideDomain string) Option

WithCredentials - set secured TLS credentials from the input certificate file for client. serverNameOverride is for testing only. If set to a non empty string, it will override the virtual host name of authority (e.g. :authority header field) in requests.

func WithDefaultCacheTTL

func WithDefaultCacheTTL(ttl time.Duration) Option

WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value

func WithDefaultChannel

func WithDefaultChannel(channel string) Option

WithDefaultChannel - set default channel for any outbound requests

func WithMaxReconnects

func WithMaxReconnects(value int) Option

WithMaxReconnects - set max reconnects before return error, default 0, never.

func WithReceiveBufferSize

func WithReceiveBufferSize(size int) Option

WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions

func WithReconnectInterval

func WithReconnectInterval(duration time.Duration) Option

WithReconnectInterval - set reconnection interval duration, default is 5 seconds

func WithTransportType

func WithTransportType(transportType TransportType) Option

WithTransportType - set client transport type, currently GRPC or Rest

func WithUri

func WithUri(uri string) Option

WithUriAddress - set uri address of KubeMQ server

type Options

type Options struct {
	// contains filtered or unexported fields
}

func GetDefaultOptions

func GetDefaultOptions() *Options

func (*Options) Validate

func (o *Options) Validate() error

type QueriesClient

type QueriesClient struct {
	// contains filtered or unexported fields
}

func NewQueriesClient

func NewQueriesClient(ctx context.Context, op ...Option) (*QueriesClient, error)

func (*QueriesClient) Close

func (q *QueriesClient) Close() error

func (*QueriesClient) Response

func (q *QueriesClient) Response(ctx context.Context, response *Response) error

func (*QueriesClient) Send

func (q *QueriesClient) Send(ctx context.Context, request *Query) (*QueryResponse, error)

func (*QueriesClient) Subscribe

func (q *QueriesClient) Subscribe(ctx context.Context, request *QueriesSubscription, onQueryReceive func(query *QueryReceive, err error)) error

type QueriesSubscription

type QueriesSubscription struct {
	Channel  string
	Group    string
	ClientId string
}

func (*QueriesSubscription) Complete

func (qs *QueriesSubscription) Complete(opts *Options) *QueriesSubscription

func (*QueriesSubscription) Validate

func (qs *QueriesSubscription) Validate() error

type Query

type Query struct {
	Id       string
	Channel  string
	Metadata string
	Body     []byte
	Timeout  time.Duration
	ClientId string
	CacheKey string
	CacheTTL time.Duration
	Tags     map[string]string
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery() *Query

func (*Query) AddTag

func (q *Query) AddTag(key, value string) *Query

AddTag - add key value tags to query message

func (*Query) AddTrace

func (q *Query) AddTrace(name string) *Trace

AddTrace - add tracing support to query

func (*Query) Send

func (q *Query) Send(ctx context.Context) (*QueryResponse, error)

Send - sending query request , waiting for response or timeout

func (*Query) SetBody

func (q *Query) SetBody(body []byte) *Query

SetBody - set query body - mandatory if metadata field is empty

func (*Query) SetCacheKey

func (q *Query) SetCacheKey(cacheKey string) *Query

SetCacheKey - set cache key to retrieve already stored query response, otherwise the response for this query will be stored in cache for future query requests

func (*Query) SetCacheTTL

func (q *Query) SetCacheTTL(ttl time.Duration) *Query

SetCacheTTL - set cache time to live for the this query cache key response to be retrieved already stored query response, if not set default cacheTTL will be set

func (*Query) SetChannel

func (q *Query) SetChannel(channel string) *Query

SetChannel - set query channel - mandatory if default channel was not set

func (*Query) SetClientId

func (q *Query) SetClientId(clientId string) *Query

SetClientId - set query ClientId - mandatory if default client was not set

func (*Query) SetId

func (q *Query) SetId(id string) *Query

SetId - set query requestId, otherwise new random uuid will be set

func (*Query) SetMetadata

func (q *Query) SetMetadata(metadata string) *Query

SetMetadata - set query metadata - mandatory if body field is empty

func (*Query) SetTags

func (q *Query) SetTags(tags map[string]string) *Query

SetTags - set key value tags to query message

func (*Query) SetTimeout

func (q *Query) SetTimeout(timeout time.Duration) *Query

SetTimeout - set timeout for query to be returned. if timeout expired , send query will result with an error

type QueryReceive

type QueryReceive struct {
	Id         string
	Channel    string
	ClientId   string
	Metadata   string
	Body       []byte
	ResponseTo string
	Tags       map[string]string
}

type QueryResponse

type QueryResponse struct {
	QueryId          string
	Executed         bool
	ExecutedAt       time.Time
	Metadata         string
	ResponseClientId string
	Body             []byte
	CacheHit         bool
	Error            string
	Tags             map[string]string
}

type QueueInfo

type QueueInfo struct {
	Name          string `json:"name"`
	Messages      int64  `json:"messages"`
	Bytes         int64  `json:"bytes"`
	FirstSequence int64  `json:"first_sequence"`
	LastSequence  int64  `json:"last_sequence"`
	Sent          int64  `json:"sent"`
	Subscribers   int    `json:"subscribers"`
	Waiting       int64  `json:"waiting"`
	Delivered     int64  `json:"delivered"`
}

type QueueMessage

type QueueMessage struct {
	*pb.QueueMessage
	// contains filtered or unexported fields
}

func NewQueueMessage

func NewQueueMessage() *QueueMessage

func (*QueueMessage) Ack

func (qm *QueueMessage) Ack() error

ack - sending ack queue message in stream queue message mode

func (*QueueMessage) AddTag

func (qm *QueueMessage) AddTag(key, value string) *QueueMessage

AddTag - add key value tags to query message

func (*QueueMessage) AddTrace

func (qm *QueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to queue message

func (*QueueMessage) ExtendVisibility

func (qm *QueueMessage) ExtendVisibility(value int32) error

ExtendVisibility - extend the visibility time for the current receive message

func (*QueueMessage) Reject

func (qm *QueueMessage) Reject() error

reject - sending reject queue message in stream queue message mode

func (*QueueMessage) Resend

func (qm *QueueMessage) Resend(channel string) error

Resend - sending resend

func (*QueueMessage) Send

Send - sending queue message request , waiting for response or timeout

func (*QueueMessage) SetBody

func (qm *QueueMessage) SetBody(body []byte) *QueueMessage

SetBody - set queue message body - mandatory if metadata field is empty

func (*QueueMessage) SetChannel

func (qm *QueueMessage) SetChannel(channel string) *QueueMessage

SetChannel - set queue message channel - mandatory if default channel was not set

func (*QueueMessage) SetClientId

func (qm *QueueMessage) SetClientId(clientId string) *QueueMessage

SetClientId - set queue message ClientId - mandatory if default client was not set

func (*QueueMessage) SetId

func (qm *QueueMessage) SetId(id string) *QueueMessage

SetId - set queue message id, otherwise new random uuid will be set

func (*QueueMessage) SetMetadata

func (qm *QueueMessage) SetMetadata(metadata string) *QueueMessage

SetMetadata - set queue message metadata - mandatory if body field is empty

func (*QueueMessage) SetPolicyDelaySeconds

func (qm *QueueMessage) SetPolicyDelaySeconds(sec int) *QueueMessage

SetPolicyDelaySeconds - set queue message delivery delay in seconds, 0 , immediate delivery

func (*QueueMessage) SetPolicyExpirationSeconds

func (qm *QueueMessage) SetPolicyExpirationSeconds(sec int) *QueueMessage

SetPolicyExpirationSeconds - set queue message expiration seconds, 0 never expires

func (*QueueMessage) SetPolicyMaxReceiveCount

func (qm *QueueMessage) SetPolicyMaxReceiveCount(max int) *QueueMessage

SetPolicyMaxReceiveCount - set max delivery attempts before message will discard or re-route to a new queue

func (*QueueMessage) SetPolicyMaxReceiveQueue

func (qm *QueueMessage) SetPolicyMaxReceiveQueue(channel string) *QueueMessage

SetPolicyMaxReceiveQueue - set queue name to be routed once MaxReceiveCount is triggered, empty will discard the message

func (*QueueMessage) SetTags

func (qm *QueueMessage) SetTags(tags map[string]string) *QueueMessage

SetTags - set key value tags to queue message

type QueueMessageAttributes

type QueueMessageAttributes struct {
	Timestamp         int64
	Sequence          uint64
	MD5OfBody         string
	ReceiveCount      int32
	ReRouted          bool
	ReRoutedFromQueue string
	ExpirationAt      int64
	DelayedTo         int64
}

type QueueMessagePolicy

type QueueMessagePolicy struct {
	ExpirationSeconds int32
	DelaySeconds      int32
	MaxReceiveCount   int32
	MaxReceiveQueue   string
}

type QueueMessages

type QueueMessages struct {
	Messages []*QueueMessage
	// contains filtered or unexported fields
}

func (*QueueMessages) Add

func (qma *QueueMessages) Add(msg *QueueMessage) *QueueMessages

Add - adding new queue message to array of messages

func (*QueueMessages) Send

Send - sending queue messages array request , waiting for response or timeout

type QueueTransactionMessageRequest

type QueueTransactionMessageRequest struct {
	RequestID         string
	ClientID          string
	Channel           string
	VisibilitySeconds int32
	WaitTimeSeconds   int32
}

func NewQueueTransactionMessageRequest

func NewQueueTransactionMessageRequest() *QueueTransactionMessageRequest

func (*QueueTransactionMessageRequest) Complete

func (*QueueTransactionMessageRequest) SetChannel

SetChannel - set receive queue transaction message request channel - mandatory if default channel was not set

func (*QueueTransactionMessageRequest) SetClientId

SetClientId - set receive queue transaction message request ClientId - mandatory if default client was not set

func (*QueueTransactionMessageRequest) SetId

SetId - set receive queue transaction message request id, otherwise new random uuid will be set

func (*QueueTransactionMessageRequest) SetVisibilitySeconds

func (req *QueueTransactionMessageRequest) SetVisibilitySeconds(visibility int) *QueueTransactionMessageRequest

SetVisibilitySeconds - set receive queue transaction message visibility seconds for hiding message from other clients during processing

func (*QueueTransactionMessageRequest) SetWaitTimeSeconds

func (req *QueueTransactionMessageRequest) SetWaitTimeSeconds(wait int) *QueueTransactionMessageRequest

SetWaitTimeSeconds - set receive queue transaction message request wait timout for receiving queue message

func (*QueueTransactionMessageRequest) Validate

func (req *QueueTransactionMessageRequest) Validate() error

type QueueTransactionMessageResponse

type QueueTransactionMessageResponse struct {
	Message *QueueMessage
	// contains filtered or unexported fields
}

func (*QueueTransactionMessageResponse) Ack

func (*QueueTransactionMessageResponse) ExtendVisibilitySeconds

func (qt *QueueTransactionMessageResponse) ExtendVisibilitySeconds(value int) error

func (*QueueTransactionMessageResponse) Reject

func (*QueueTransactionMessageResponse) Resend

func (qt *QueueTransactionMessageResponse) Resend(channel string) error

func (*QueueTransactionMessageResponse) ResendNewMessage

func (qt *QueueTransactionMessageResponse) ResendNewMessage(msg *QueueMessage) error

type QueuesClient

type QueuesClient struct {
	// contains filtered or unexported fields
}

func NewQueuesStreamClient

func NewQueuesStreamClient(ctx context.Context, op ...Option) (*QueuesClient, error)

func (*QueuesClient) AckAll

func (*QueuesClient) Batch

func (q *QueuesClient) Batch(ctx context.Context, messages []*QueueMessage) ([]*SendQueueMessageResult, error)

func (*QueuesClient) Close

func (q *QueuesClient) Close() error

func (*QueuesClient) Peek

func (*QueuesClient) Pull

func (*QueuesClient) QueuesInfo

func (q *QueuesClient) QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)

func (*QueuesClient) Send

func (*QueuesClient) Subscribe

func (q *QueuesClient) Subscribe(ctx context.Context, request *ReceiveQueueMessagesRequest, onReceive func(response *ReceiveQueueMessagesResponse, err error)) (chan struct{}, error)

func (*QueuesClient) Transaction

func (*QueuesClient) TransactionStream

func (q *QueuesClient) TransactionStream(ctx context.Context, request *QueueTransactionMessageRequest, onReceive func(response *QueueTransactionMessageResponse, err error)) (chan struct{}, error)

type QueuesInfo

type QueuesInfo struct {
	TotalQueues int          `json:"total_queues"`
	Sent        int64        `json:"sent"`
	Waiting     int64        `json:"waiting"`
	Delivered   int64        `json:"delivered"`
	Queues      []*QueueInfo `json:"queues"`
}

type ReceiveQueueMessagesRequest

type ReceiveQueueMessagesRequest struct {
	RequestID           string
	ClientID            string
	Channel             string
	MaxNumberOfMessages int32
	WaitTimeSeconds     int32
	IsPeak              bool
	// contains filtered or unexported fields
}

func NewReceiveQueueMessagesRequest

func NewReceiveQueueMessagesRequest() *ReceiveQueueMessagesRequest

func (*ReceiveQueueMessagesRequest) AddTrace

func (req *ReceiveQueueMessagesRequest) AddTrace(name string) *Trace

AddTrace - add tracing support to receive queue message request

func (*ReceiveQueueMessagesRequest) Complete

func (*ReceiveQueueMessagesRequest) Send

Send - sending receive queue messages request , waiting for response or timeout

func (*ReceiveQueueMessagesRequest) SetChannel

SetChannel - set receive queue message request channel - mandatory if default channel was not set

func (*ReceiveQueueMessagesRequest) SetClientId

SetClientId - set receive queue message request ClientId - mandatory if default client was not set

func (*ReceiveQueueMessagesRequest) SetId

SetId - set receive queue message request id, otherwise new random uuid will be set

func (*ReceiveQueueMessagesRequest) SetIsPeak

SetIsPeak - set receive queue message request type, true - peaking at the queue and not actual dequeue , false - dequeue the queue

func (*ReceiveQueueMessagesRequest) SetMaxNumberOfMessages

func (req *ReceiveQueueMessagesRequest) SetMaxNumberOfMessages(max int) *ReceiveQueueMessagesRequest

SetMaxNumberOfMessages - set receive queue message request max number of messages to receive in single call

func (*ReceiveQueueMessagesRequest) SetWaitTimeSeconds

func (req *ReceiveQueueMessagesRequest) SetWaitTimeSeconds(wait int) *ReceiveQueueMessagesRequest

SetWaitTimeSeconds - set receive queue message request wait timout for receiving all requested messages

func (*ReceiveQueueMessagesRequest) Validate

func (req *ReceiveQueueMessagesRequest) Validate() error

type ReceiveQueueMessagesResponse

type ReceiveQueueMessagesResponse struct {
	RequestID        string
	Messages         []*QueueMessage
	MessagesReceived int32
	MessagesExpired  int32
	IsPeak           bool
	IsError          bool
	Error            string
}

type Response

type Response struct {
	RequestId  string
	ResponseTo string
	Metadata   string
	Body       []byte
	ClientId   string
	ExecutedAt time.Time
	Err        error
	Tags       map[string]string
	// contains filtered or unexported fields
}

func NewResponse

func NewResponse() *Response

func (*Response) AddTrace

func (r *Response) AddTrace(name string) *Trace

AddTrace - add tracing support to response

func (*Response) Send

func (r *Response) Send(ctx context.Context) error

Send - sending response to command or query request

func (*Response) SetBody

func (r *Response) SetBody(body []byte) *Response

SetMetadata - set body response, for query only

func (*Response) SetClientId

func (r *Response) SetClientId(clientId string) *Response

SetClientID - set clientId response, if not set default clientId will be used

func (*Response) SetError

func (r *Response) SetError(err error) *Response

SetError - set query or command execution error

func (*Response) SetExecutedAt

func (r *Response) SetExecutedAt(executedAt time.Time) *Response

SetExecutedAt - set query or command execution time

func (*Response) SetMetadata

func (r *Response) SetMetadata(metadata string) *Response

SetMetadata - set metadata response, for query only

func (*Response) SetRequestId

func (r *Response) SetRequestId(id string) *Response

SetId - set response corresponded requestId - mandatory

func (*Response) SetResponseTo

func (r *Response) SetResponseTo(channel string) *Response

SetResponseTo - set response channel as received in CommandReceived or QueryReceived object - mandatory

func (*Response) SetTags

func (r *Response) SetTags(tags map[string]string) *Response

SetTags - set response tags

type SendQueueMessageResult

type SendQueueMessageResult struct {
	MessageID    string
	SentAt       int64
	ExpirationAt int64
	DelayedTo    int64
	IsError      bool
	Error        string
}

type ServerInfo

type ServerInfo struct {
	Host                string
	Version             string
	ServerStartTime     int64
	ServerUpTimeSeconds int64
}

type StreamQueueMessage

type StreamQueueMessage struct {
	RequestID string
	ClientID  string
	Channel   string
	// contains filtered or unexported fields
}

func (*StreamQueueMessage) AddTrace

func (req *StreamQueueMessage) AddTrace(name string) *Trace

AddTrace - add tracing support to stream receive queue message request

func (*StreamQueueMessage) Close

func (req *StreamQueueMessage) Close()

Close - end stream of queue messages and cancel all pending operations

func (*StreamQueueMessage) Next

func (req *StreamQueueMessage) Next(ctx context.Context, visibility, wait int32) (*QueueMessage, error)

Next - receive queue messages request , waiting for response or timeout

func (*StreamQueueMessage) ResendWithNewMessage

func (req *StreamQueueMessage) ResendWithNewMessage(msg *QueueMessage) error

ResendWithNewMessage - resend the current received message to a new channel

func (*StreamQueueMessage) SetChannel

func (req *StreamQueueMessage) SetChannel(channel string) *StreamQueueMessage

SetChannel - set stream queue message request channel - mandatory if default channel was not set

func (*StreamQueueMessage) SetClientId

func (req *StreamQueueMessage) SetClientId(clientId string) *StreamQueueMessage

SetClientId - set stream queue message request ClientId - mandatory if default client was not set

func (*StreamQueueMessage) SetId

SetId - set stream queue message request id, otherwise new random uuid will be set

type SubscriptionOption

type SubscriptionOption interface {
	// contains filtered or unexported methods
}

func StartFromFirstEvent

func StartFromFirstEvent() SubscriptionOption

StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point

func StartFromLastEvent

func StartFromLastEvent() SubscriptionOption

StartFromLastEvent - replay last event and continue stream new events from this point

func StartFromNewEvents

func StartFromNewEvents() SubscriptionOption

StartFromNewEvents - start event store subscription with only new events

func StartFromSequence

func StartFromSequence(sequence int) SubscriptionOption

StartFromSequence - replay events from specific event sequence number and continue stream new events from this point

func StartFromTime

func StartFromTime(since time.Time) SubscriptionOption

StartFromTime - replay events from specific time continue stream new events from this point

func StartFromTimeDelta

func StartFromTimeDelta(delta time.Duration) SubscriptionOption

StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point

type Trace

type Trace struct {
	Name string
	// contains filtered or unexported fields
}

func CreateTrace

func CreateTrace(name string) *Trace

func (*Trace) AddAnnotation

func (t *Trace) AddAnnotation(timestamp time.Time, message string) *Trace

func (*Trace) AddBoolAttribute

func (t *Trace) AddBoolAttribute(key string, value bool) *Trace

func (*Trace) AddInt64Attribute

func (t *Trace) AddInt64Attribute(key string, value int64) *Trace

func (*Trace) AddStringAttribute

func (t *Trace) AddStringAttribute(key string, value string) *Trace

type Transport

type Transport interface {
	Ping(ctx context.Context) (*ServerInfo, error)
	SendEvent(ctx context.Context, event *Event) error
	StreamEvents(ctx context.Context, eventsCh chan *Event, errCh chan error)
	SubscribeToEvents(ctx context.Context, request *EventsSubscription, errCh chan error) (<-chan *Event, error)
	SendEventStore(ctx context.Context, eventStore *EventStore) (*EventStoreResult, error)
	StreamEventsStore(ctx context.Context, eventsCh chan *EventStore, eventsResultCh chan *EventStoreResult, errCh chan error)
	SubscribeToEventsStore(ctx context.Context, request *EventsStoreSubscription, errCh chan error) (<-chan *EventStoreReceive, error)
	SendCommand(ctx context.Context, command *Command) (*CommandResponse, error)
	SubscribeToCommands(ctx context.Context, request *CommandsSubscription, errCh chan error) (<-chan *CommandReceive, error)
	SendQuery(ctx context.Context, query *Query) (*QueryResponse, error)
	SubscribeToQueries(ctx context.Context, request *QueriesSubscription, errCh chan error) (<-chan *QueryReceive, error)
	SendResponse(ctx context.Context, response *Response) error
	SendQueueMessage(ctx context.Context, msg *QueueMessage) (*SendQueueMessageResult, error)
	SendQueueMessages(ctx context.Context, msg []*QueueMessage) ([]*SendQueueMessageResult, error)
	ReceiveQueueMessages(ctx context.Context, req *ReceiveQueueMessagesRequest) (*ReceiveQueueMessagesResponse, error)
	AckAllQueueMessages(ctx context.Context, req *AckAllQueueMessagesRequest) (*AckAllQueueMessagesResponse, error)
	StreamQueueMessage(ctx context.Context, reqCh chan *pb.StreamQueueMessagesRequest, resCh chan *pb.StreamQueueMessagesResponse, errCh chan error, doneCh chan bool)
	QueuesInfo(ctx context.Context, filter string) (*QueuesInfo, error)
	GetGRPCRawClient() (pb.KubemqClient, error)
	Close() error
}

type TransportType

type TransportType int
const (
	TransportTypeGRPC TransportType = iota
	TransportTypeRest
)

Directories

Path Synopsis
examples
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL