Documentation
¶
Overview ¶
Package core abstracts out the inner workings of CCSMP and provides a few key entities used by various other services. For example, it contains SolClientTransport which wraps the session and context, as well as SolClientPublisher which wraps the session and provides hooks for callbacks. SolClient structs are not responsible for state tracking.
Index ¶
- Variables
- func GetVersion() (version, buildTime, variant string)
- func NewCacheResponseProcessor(callback func(solace.CacheResponse)) cacheResponseProcessor
- func SetLogLevel(level logging.LogLevel)
- func SetNativeLogLevel(level logging.LogLevel)
- func SetVersion(defaultVersion string)
- func ToNativeError(err ErrorInfo, args ...string) error
- type AcknowledgementHandler
- type Acknowledgements
- type AggregatedMetric
- type CacheRequest
- type CacheRequestImpl
- func (cacheRequest *CacheRequestImpl) CacheSession() ccsmp.SolClientCacheSession
- func (cacheRequest *CacheRequestImpl) ID() apimessage.CacheRequestID
- func (cacheRequest *CacheRequestImpl) Index() CacheRequestMapIndex
- func (cacheRequest *CacheRequestImpl) MessageFilter() *ReceivedMessageFilter
- func (cacheRequest *CacheRequestImpl) Processor() CacheResponseProcessor
- func (cacheRequest *CacheRequestImpl) RequestConfig() resource.CachedMessageSubscriptionRequest
- func (cacheRequest *CacheRequestImpl) UsesLocalDispatch() bool
- type CacheRequestMapIndex
- type CacheRequestor
- type CacheResponseProcessor
- type CoreCacheEventCallback
- type CoreCacheEventInfo
- type CoreCacheSession
- type EndpointProvisioner
- type ErrorInfo
- type Event
- type EventHandler
- type EventInfo
- type Events
- type FlowEventInfo
- type MessageFilterConfig
- type MessageID
- type MessageSettlementOutcome
- type Metrics
- type NextGenMetric
- type PersistentEventCallback
- type PersistentReceiver
- type ProvisionCorrelationID
- type ProvisionEvent
- type Publishable
- type Publisher
- type Receivable
- type ReceivedMessageFilter
- type Receiver
- type Repliable
- type Replier
- type ReplyPublishable
- type Requestor
- type RequestorReplyCorrelationEntry
- type RequestorReplyHandler
- type RxCallback
- type SendTask
- type SessionEventInfo
- type SubscriptionCorrelationID
- type SubscriptionEvent
- type Transport
Constants ¶
This section is empty.
Variables ¶
var RequestCorrelationPrefix = ccsmp.SolClientGoPropCorrelationPrefix
Functions ¶
func NewCacheResponseProcessor ¶ added in v1.9.1
func NewCacheResponseProcessor(callback func(solace.CacheResponse)) cacheResponseProcessor
func ToNativeError ¶
ToNativeError wrapps the ErrorInfo in a solace.NativeError with a prefix string if provided.
Types ¶
type AcknowledgementHandler ¶
AcknowledgementHandler defined
type Acknowledgements ¶
type Acknowledgements interface { // Registers a callback for correlation AddAcknowledgementHandler(ackHandler AcknowledgementHandler) (uint64, func() (messageId uint64, correlationTag []byte)) // Deregisters the callback for correlation RemoveAcknowledgementHandler(pubID uint64) }
Acknowledgements interface
type AggregatedMetric ¶ added in v1.9.1
type AggregatedMetric int
AggregatedMetric structure
const ( // CacheRequestsSucceeded initialized CacheRequestsSucceeded AggregatedMetric = iota )
type CacheRequest ¶ added in v1.9.1
type CacheRequest interface { // RequestConfig returns the [resource.CachedMessageSubscriptionRequest] that was configured by the application // for this cache request. RequestConfig() resource.CachedMessageSubscriptionRequest // ID returns the [CacheRequestID] that was specified for this cache request by the application. ID() apimessage.CacheRequestID // Processor returns the method through which the application decided to handle the cache response that will result // from this cache request. Processor() CacheResponseProcessor // CacheSession returns the [CoreCacheSession] that was created to service this cache request. CacheSession() CoreCacheSession // Index returns the [CacheRequestMapIndex] used to associate this cache request with its processor in the // receiver's internal map. Index() CacheRequestMapIndex // UsesLocalDispatch returns whether or not the [CacheRequest] uses local dispatch for subscription management. UsesLocalDispatch() bool // MessageFilter returns the filter used to filter received messages. If the cache request has not configured a filter, // this method returns nil. MessageFilter() *ReceivedMessageFilter }
func NewCacheRequest ¶ added in v1.9.1
func NewCacheRequest(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID apimessage.CacheRequestID, cacheResponseHandler CacheResponseProcessor, cacheSession ccsmp.SolClientCacheSession, dispatchID uintptr) CacheRequest
type CacheRequestImpl ¶ added in v1.9.1
type CacheRequestImpl struct {
// contains filtered or unexported fields
}
func (*CacheRequestImpl) CacheSession ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) CacheSession() ccsmp.SolClientCacheSession
func (*CacheRequestImpl) ID ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) ID() apimessage.CacheRequestID
func (*CacheRequestImpl) Index ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) Index() CacheRequestMapIndex
func (*CacheRequestImpl) MessageFilter ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) MessageFilter() *ReceivedMessageFilter
func (*CacheRequestImpl) Processor ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) Processor() CacheResponseProcessor
func (*CacheRequestImpl) RequestConfig ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) RequestConfig() resource.CachedMessageSubscriptionRequest
func (*CacheRequestImpl) UsesLocalDispatch ¶ added in v1.9.1
func (cacheRequest *CacheRequestImpl) UsesLocalDispatch() bool
type CacheRequestMapIndex ¶ added in v1.9.1
type CacheRequestMapIndex = ccsmp.SolClientCacheSessionPt
CacheRequestMapIndex is used as the index in the map between cache sessions and cache response processors in the receiver.
func GetCacheRequestMapIndexFromCacheSession ¶ added in v1.9.1
func GetCacheRequestMapIndexFromCacheSession(cacheSession ccsmp.SolClientCacheSession) CacheRequestMapIndex
type CacheRequestor ¶ added in v1.9.1
type CacheRequestor interface { // CreateCacheRequest creates a cache session and a CacheRequest object which contains the new session along with // whatever information is required to send the cache request. CreateCacheRequest(resource.CachedMessageSubscriptionRequest, apimessage.CacheRequestID, CacheResponseProcessor, uintptr) (CacheRequest, error) // DestroyCacheRequest is used to clean up a cache request object when SendCacheRequestFails DestroyCacheRequest(CacheRequest) error // SendCacheRequest sends the given cache request object, and configures CCSMP to use the given callback to handle // the resulting cache event/response SendCacheRequest(CacheRequest, CoreCacheEventCallback, uintptr) error // ProcessCacheEvent creates a cache response from the cache event that was asynchronously returned by CCSMP, and // gives this response to the application for post-processing using the method configured by the application during // the call to RequestCachedAsync or RequestCachedAsyncWithCallback. ProcessCacheEvent(*sync.Map, CoreCacheEventInfo) // CleanupCacheRequestSubscriptions cleans up subscriptions that are intended to persist only for the lifetime of // cache request. Currently, this applies only to CachedOnly cache requests, which use only local dispatch to // forward messages to the appropriate consumer callback. CleanupCacheRequestSubscriptions(CacheRequest) error // CancelPendingCacheRequests Cancels all the cache requests for the cache session associated with the given // CacheRequestMapIndex CancelPendingCacheRequests(CacheRequestMapIndex, CacheRequest) *CoreCacheEventInfo }
CacheRequestor interface
type CacheResponseProcessor ¶ added in v1.9.1
type CacheResponseProcessor interface { // ProcessCacheResponse processes the cache response according to the implementation ProcessCacheResponse(solace.CacheResponse) }
CacheResponseProcessor provides an interface through which the information necessary to process a cache response that is passed from CCSMP can be acquired.
type CoreCacheEventCallback ¶ added in v1.9.1
type CoreCacheEventCallback = ccsmp.SolClientCacheEventCallback
CoreCacheEventCallback is a type alias for the callback that CCSMP will call on the context thread to pass the cache event info corresponding to a cache response to the Go API.
type CoreCacheEventInfo ¶ added in v1.9.1
type CoreCacheEventInfo = ccsmp.CacheEventInfo
CoreCacheEventInfo is a type alias for the Go representation of the cache event info returned to the API from CCSMP in response to a cache request concluding.
type CoreCacheSession ¶ added in v1.9.1
type CoreCacheSession = ccsmp.SolClientCacheSession
func GetCacheSessionFromCacheRequestIndex ¶ added in v1.9.1
func GetCacheSessionFromCacheRequestIndex(cacheRequestMapIndex CacheRequestMapIndex) CoreCacheSession
type EndpointProvisioner ¶ added in v1.7.0
type EndpointProvisioner interface { // Events returns SolClientEvents Events() Events // IsRunning checks if the internal provisioner is running IsRunning() bool // Provision the endpoint on the broker from the correlation pointer Provision(properties []string, ignoreExistErrors bool) (ProvisionCorrelationID, <-chan ProvisionEvent, ErrorInfo) // Deprovision an endpoint on the broker from the given correlation pointer Deprovision(properties []string, ignoreMissingErrors bool) (ProvisionCorrelationID, <-chan ProvisionEvent, ErrorInfo) // ClearProvisionCorrelation clears the provison correlation with the given ID ClearProvisionCorrelation(id ProvisionCorrelationID) }
EndpointProvisioner interface
type ErrorInfo ¶
type ErrorInfo = *ccsmp.SolClientErrorInfoWrapper
ErrorInfo reexports *ccsmp.SolClientErrorInfoWrapper. In all cases, core.ErrorInfo should be used instead of *ccsmp.SolClientErrorInfoWrapper
type Event ¶
type Event int
Event type defined
const ( // SolClientEventUp represents ccsmp.SolClientSessionEventUpNotice in sessionEventMapping SolClientEventUp Event = iota // SolClientEventDown represents SolClientSessionEventDownError in sessionEventMapping SolClientEventDown // SolClientEventCanSend represents ccsmp.SolClientSessionEventCanSend in sessionEventMapping SolClientEventCanSend // SolClientEventAcknowledgement represents ccsmp.SolClientSessionEventAcknowledgement in sessionEventMapping SolClientEventAcknowledgement // SolClientEventRejected represents ccsmp.SolClientSessionEventRejectedMsgError in sessionEventMapping SolClientEventRejected // SolClientEventReconnect represents ccsmp.SolClientSessionEventReconnectedNotice in sessionEventMapping SolClientEventReconnect // SolClientEventReconnectAttempt represents ccsmp.SolClientSessionEventReconnectingNotice in sessionEventMapping SolClientEventReconnectAttempt // SolClientSubscriptionOk represents ccsmp.SolClientSessionEventSubscriptionOk in sessionEventMapping SolClientSubscriptionOk // SolClientSubscriptionError represents ccsmp.SolClientSessionEventSubscriptionError in sessionEventMapping SolClientSubscriptionError // SolClientProvisionOk represents ccsmp.SolClientSessionEventProvisionOk in sessionEventMapping SolClientProvisionOk // SolClientProvisionError represents ccsmp.SolClientSessionEventProvisionError in sessionEventMapping SolClientProvisionError )
type Events ¶
type Events interface { AddEventHandler(sessionEvent Event, responseCode EventHandler) uint RemoveEventHandler(id uint) }
Events interface defined
type MessageFilterConfig ¶ added in v1.9.1
type MessageFilterConfig = ccsmp.SolClientMessageFilteringConfigPt
type MessageSettlementOutcome ¶ added in v1.8.0
type MessageSettlementOutcome = ccsmp.SolClientMessageSettlementOutcome
MessageSettlementOutcome type defined
type Metrics ¶
type Metrics interface { GetStat(metric metrics.Metric) uint64 IncrementMetric(metric NextGenMetric, amount uint64) ResetStats() }
Metrics interface
type NextGenMetric ¶
type NextGenMetric int
NextGenMetric structure
const ( // MetricReceivedMessagesTerminationDiscarded initialized MetricReceivedMessagesTerminationDiscarded NextGenMetric = iota // MetricReceivedMessagesBackpressureDiscarded initialized MetricReceivedMessagesBackpressureDiscarded NextGenMetric = iota // MetricPublishMessagesTerminationDiscarded initialized MetricPublishMessagesTerminationDiscarded NextGenMetric = iota // MetricPublishMessagesBackpressureDiscarded initialized MetricPublishMessagesBackpressureDiscarded NextGenMetric = iota // MetricInternalDiscardNotifications initialized MetricInternalDiscardNotifications NextGenMetric = iota )
type PersistentEventCallback ¶
type PersistentEventCallback func(event ccsmp.SolClientFlowEvent, eventInfo FlowEventInfo)
PersistentEventCallback type defined
type PersistentReceiver ¶
type PersistentReceiver interface { // Destroy destroys the flow Destroy(freeMemory bool) ErrorInfo // Start will start the receiption of messages // Persistent Receivers are started by default after creation Start() ErrorInfo // Stop will stop the reception of messages Stop() ErrorInfo // Subscribe will add a subscription to the persistent receiver Subscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // Unsubscribe will remove the subscription from the persistent receiver Unsubscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // Ack will acknowledge the given message // This method is equivalent to calling the Settle method with the ACCEPTED outcome. Ack(msgID MessageID) ErrorInfo // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as // indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, // the receiver has to have been preconfigured via its builder to support these settlement outcomes. // Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED and raises error for FAILED and REJECTED. Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo // Destionation returns the destination if retrievable, or an error if one occurred Destination() (destination string, durable bool, errorInfo ErrorInfo) }
PersistentReceiver interface
type ProvisionCorrelationID ¶ added in v1.7.0
type ProvisionCorrelationID = uintptr
ProvisionCorrelationID defined
type ProvisionEvent ¶ added in v1.7.0
type ProvisionEvent interface { GetID() ProvisionCorrelationID GetError() error }
ProvisionEvent is the event passed to a channel on completion of an endpoint provision
type Publisher ¶
type Publisher interface { // Publish pulishes a message in the form of a SolClientPublishable. Returns any error info from underlying send. Publish(message Publishable) ErrorInfo // Events returns SolClientEvents Events() Events // AwaitWritable awaits a writable message. Throws an error if interrupted for termination AwaitWritable(terminateSignal chan struct{}) error // TaskQueue gets the task queue that can be used to push tasks // TaskQueue may be a 0 length queue such that each task is transactioned, ie. no tasks sit in flux TaskQueue() chan SendTask // checks if the internal publisher is running IsRunning() bool // Increments a core metric IncrementMetric(metric NextGenMetric, amount uint64) // Acknowledgements returns the acknowledgement handler Acknowledgements() Acknowledgements // Requestor returns the reply handler manager Requestor() Requestor }
Publisher interface
type ReceivedMessageFilter ¶ added in v1.9.1
type ReceivedMessageFilter interface { // SetupFiltering allocates and configures any resources associated with filtering received messages. SetupFiltering() // CleanupFiltering cleans any resources associated with filtering received messages. CleanupFiltering() // Filter() returns the configured filter that is applied to received messages Filter() MessageFilterConfig }
type Receiver ¶
type Receiver interface { // checks if the internal receiver is running IsRunning() bool // Replier returns SolClientReplier Replier() Replier // Events returns SolClientEvents Events() Events // Register an RX callback, returns a correlation pointer used when adding and removing subscriptions RegisterRXCallback(msgCallback RxCallback) uintptr // Remove the callback allowing GC to cleanup the function registered UnregisterRXCallback(ptr uintptr) // Add a subscription to the given correlation pointer Subscribe(topic string, ptr uintptr) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // Remove a subscription from the given correlation pointer Unsubscribe(topic string, ptr uintptr) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // Clears the subscription correlation with the given ID ClearSubscriptionCorrelation(id SubscriptionCorrelationID) // ProvisionEndpoint will provision an endpoint ProvisionEndpoint(queueName string, isExclusive bool) ErrorInfo // EndpointUnsubscribe will call endpoint unsubscribe on the endpoint EndpointUnsubscribe(queueName string, topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // IncrementMetric - Increments receiver metrics IncrementMetric(metric NextGenMetric, amount uint64) // IncrementDuplicateAckCount - Increments receiver duplicate acks (track duplicate accepted settlement outcome metrics from auto-acks) IncrementDuplicateAckCount() // Creates a new persistent receiver with the given callback NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo) // CacheRequestor() returns the manager that can be used to run cache operations CacheRequestor() CacheRequestor }
Receiver interface
type Repliable ¶ added in v1.6.0
type Repliable = ccsmp.SolClientMessagePt
Repliable interface, this is Repliable alias, repliable are structs that are received as a part of a reply not a struct that can send a reply
type Replier ¶ added in v1.6.0
type Replier interface { // SendReply will send a reply publishable SendReply(replyMsg ReplyPublishable) ErrorInfo }
Replier interface
type ReplyPublishable ¶ added in v1.6.0
type ReplyPublishable = ccsmp.SolClientMessagePt
ReplyPublishable type defined
type Requestor ¶ added in v1.6.0
type Requestor interface { // CreateReplyToTopic return a replyto topic created from a publisher id CreateReplyToTopic(publisherID string) string // registers callback for response message for a returned replyTo topic with correlation id generator AddRequestorReplyHandler(replyHandler RequestorReplyHandler) (string, func() (messageId uint64, correlationId string), ErrorInfo) // deregisters replyHandler from core publisher RemoveRequestorReplyHandler(replyToTopic string) ErrorInfo }
Requestor interface User for making requests in the request reply model
type RequestorReplyCorrelationEntry ¶ added in v1.6.0
type RequestorReplyCorrelationEntry = *ccsmpReplyCorrelation
type RequestorReplyHandler ¶ added in v1.6.0
type SessionEventInfo ¶
SessionEventInfo interface defined
type SubscriptionCorrelationID ¶
type SubscriptionCorrelationID = uintptr
SubscriptionCorrelationID defined
type SubscriptionEvent ¶
type SubscriptionEvent interface { GetID() SubscriptionCorrelationID GetError() error }
SubscriptionEvent is the event passed to a channel on completion of a subscription