core

package
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package core abstracts out the inner workings of CCSMP and provides a few key entities used by various other services. For example, it contains SolClientTransport which wraps the session and context, as well as SolClientPublisher which wraps the session and provides hooks for callbacks. SolClient structs are not responsible for state tracking.

Index

Constants

This section is empty.

Variables

View Source
var RequestCorrelationPrefix = ccsmp.SolClientGoPropCorrelationPrefix

Functions

func GetVersion

func GetVersion() (version, buildTime, variant string)

GetVersion function

func NewCacheResponseProcessor added in v1.9.1

func NewCacheResponseProcessor(callback func(solace.CacheResponse)) cacheResponseProcessor

func SetLogLevel

func SetLogLevel(level logging.LogLevel)

SetLogLevel function

func SetNativeLogLevel

func SetNativeLogLevel(level logging.LogLevel)

SetNativeLogLevel function

func SetVersion

func SetVersion(defaultVersion string)

SetVersion function

func ToNativeError

func ToNativeError(err ErrorInfo, args ...string) error

ToNativeError wrapps the ErrorInfo in a solace.NativeError with a prefix string if provided.

Types

type AcknowledgementHandler

type AcknowledgementHandler func(correlationMsgId uint64, persisted bool, err error)

AcknowledgementHandler defined

type Acknowledgements

type Acknowledgements interface {
	// Registers a callback for correlation
	AddAcknowledgementHandler(ackHandler AcknowledgementHandler) (uint64, func() (messageId uint64, correlationTag []byte))
	// Deregisters the callback for correlation
	RemoveAcknowledgementHandler(pubID uint64)
}

Acknowledgements interface

type AggregatedMetric added in v1.9.1

type AggregatedMetric int

AggregatedMetric structure

const (
	// CacheRequestsSucceeded initialized
	CacheRequestsSucceeded AggregatedMetric = iota
)

type CacheRequest added in v1.9.1

type CacheRequest interface {
	// RequestConfig returns the [resource.CachedMessageSubscriptionRequest] that was configured by the application
	// for this cache request.
	RequestConfig() resource.CachedMessageSubscriptionRequest
	// ID returns the [CacheRequestID] that was specified for this cache request by the application.
	ID() apimessage.CacheRequestID
	// Processor returns the method through which the application decided to handle the cache response that will result
	// from this cache request.
	Processor() CacheResponseProcessor
	// CacheSession returns the [CoreCacheSession] that was created to service this cache request.
	CacheSession() CoreCacheSession
	// Index returns the [CacheRequestMapIndex] used to associate this cache request with its processor in the
	// receiver's internal map.
	Index() CacheRequestMapIndex
	// UsesLocalDispatch returns whether or not the [CacheRequest] uses local dispatch for subscription management.
	UsesLocalDispatch() bool
	// MessageFilter returns the filter used to filter received messages. If the cache request has not configured a filter,
	// this method returns nil.
	MessageFilter() *ReceivedMessageFilter
}

func NewCacheRequest added in v1.9.1

func NewCacheRequest(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID apimessage.CacheRequestID, cacheResponseHandler CacheResponseProcessor, cacheSession ccsmp.SolClientCacheSession, dispatchID uintptr) CacheRequest

type CacheRequestImpl added in v1.9.1

type CacheRequestImpl struct {
	// contains filtered or unexported fields
}

func (*CacheRequestImpl) CacheSession added in v1.9.1

func (cacheRequest *CacheRequestImpl) CacheSession() ccsmp.SolClientCacheSession

func (*CacheRequestImpl) ID added in v1.9.1

func (cacheRequest *CacheRequestImpl) ID() apimessage.CacheRequestID

func (*CacheRequestImpl) Index added in v1.9.1

func (cacheRequest *CacheRequestImpl) Index() CacheRequestMapIndex

func (*CacheRequestImpl) MessageFilter added in v1.9.1

func (cacheRequest *CacheRequestImpl) MessageFilter() *ReceivedMessageFilter

func (*CacheRequestImpl) Processor added in v1.9.1

func (cacheRequest *CacheRequestImpl) Processor() CacheResponseProcessor

func (*CacheRequestImpl) RequestConfig added in v1.9.1

func (cacheRequest *CacheRequestImpl) RequestConfig() resource.CachedMessageSubscriptionRequest

func (*CacheRequestImpl) UsesLocalDispatch added in v1.9.1

func (cacheRequest *CacheRequestImpl) UsesLocalDispatch() bool

type CacheRequestMapIndex added in v1.9.1

type CacheRequestMapIndex = ccsmp.SolClientCacheSessionPt

CacheRequestMapIndex is used as the index in the map between cache sessions and cache response processors in the receiver.

func GetCacheRequestMapIndexFromCacheSession added in v1.9.1

func GetCacheRequestMapIndexFromCacheSession(cacheSession ccsmp.SolClientCacheSession) CacheRequestMapIndex

type CacheRequestor added in v1.9.1

type CacheRequestor interface {
	// CreateCacheRequest creates a cache session and a CacheRequest object which contains the new session along with
	// whatever information is required to send the cache request.
	CreateCacheRequest(resource.CachedMessageSubscriptionRequest, apimessage.CacheRequestID, CacheResponseProcessor, uintptr) (CacheRequest, error)
	// DestroyCacheRequest is used to clean up a cache request object when SendCacheRequestFails
	DestroyCacheRequest(CacheRequest) error
	// SendCacheRequest sends the given cache request object, and configures CCSMP to use the given callback to handle
	// the resulting cache event/response
	SendCacheRequest(CacheRequest, CoreCacheEventCallback, uintptr) error
	// ProcessCacheEvent creates a cache response from the cache event that was asynchronously returned by CCSMP, and
	// gives this response to the application for post-processing using the method configured by the application during
	// the call to RequestCachedAsync or RequestCachedAsyncWithCallback.
	ProcessCacheEvent(*sync.Map, CoreCacheEventInfo)
	// CleanupCacheRequestSubscriptions cleans up subscriptions that are intended to persist only for the lifetime of
	// cache request. Currently, this applies only to CachedOnly cache requests, which use only local dispatch to
	// forward messages to the appropriate consumer callback.
	CleanupCacheRequestSubscriptions(CacheRequest) error
	// CancelPendingCacheRequests Cancels all the cache requests for the cache session associated with the given
	// CacheRequestMapIndex
	CancelPendingCacheRequests(CacheRequestMapIndex, CacheRequest) *CoreCacheEventInfo
}

CacheRequestor interface

type CacheResponseProcessor added in v1.9.1

type CacheResponseProcessor interface {
	// ProcessCacheResponse processes the cache response according to the implementation
	ProcessCacheResponse(solace.CacheResponse)
}

CacheResponseProcessor provides an interface through which the information necessary to process a cache response that is passed from CCSMP can be acquired.

type CoreCacheEventCallback added in v1.9.1

type CoreCacheEventCallback = ccsmp.SolClientCacheEventCallback

CoreCacheEventCallback is a type alias for the callback that CCSMP will call on the context thread to pass the cache event info corresponding to a cache response to the Go API.

type CoreCacheEventInfo added in v1.9.1

type CoreCacheEventInfo = ccsmp.CacheEventInfo

CoreCacheEventInfo is a type alias for the Go representation of the cache event info returned to the API from CCSMP in response to a cache request concluding.

type CoreCacheSession added in v1.9.1

type CoreCacheSession = ccsmp.SolClientCacheSession

func GetCacheSessionFromCacheRequestIndex added in v1.9.1

func GetCacheSessionFromCacheRequestIndex(cacheRequestMapIndex CacheRequestMapIndex) CoreCacheSession

type EndpointProvisioner added in v1.7.0

type EndpointProvisioner interface {
	// Events returns SolClientEvents
	Events() Events
	// IsRunning checks if the internal provisioner is running
	IsRunning() bool
	// Provision the endpoint on the broker from the correlation pointer
	Provision(properties []string, ignoreExistErrors bool) (ProvisionCorrelationID, <-chan ProvisionEvent, ErrorInfo)
	// Deprovision an endpoint on the broker from the given correlation pointer
	Deprovision(properties []string, ignoreMissingErrors bool) (ProvisionCorrelationID, <-chan ProvisionEvent, ErrorInfo)
	// ClearProvisionCorrelation clears the provison correlation with the given ID
	ClearProvisionCorrelation(id ProvisionCorrelationID)
}

EndpointProvisioner interface

type ErrorInfo

type ErrorInfo = *ccsmp.SolClientErrorInfoWrapper

ErrorInfo reexports *ccsmp.SolClientErrorInfoWrapper. In all cases, core.ErrorInfo should be used instead of *ccsmp.SolClientErrorInfoWrapper

type Event

type Event int

Event type defined

const (

	// SolClientEventUp represents ccsmp.SolClientSessionEventUpNotice in sessionEventMapping
	SolClientEventUp Event = iota

	// SolClientEventDown represents SolClientSessionEventDownError in sessionEventMapping
	SolClientEventDown

	// SolClientEventCanSend represents ccsmp.SolClientSessionEventCanSend in sessionEventMapping
	SolClientEventCanSend

	// SolClientEventAcknowledgement represents ccsmp.SolClientSessionEventAcknowledgement in sessionEventMapping
	SolClientEventAcknowledgement

	// SolClientEventRejected represents ccsmp.SolClientSessionEventRejectedMsgError in sessionEventMapping
	SolClientEventRejected

	// SolClientEventReconnect represents ccsmp.SolClientSessionEventReconnectedNotice in sessionEventMapping
	SolClientEventReconnect

	// SolClientEventReconnectAttempt represents ccsmp.SolClientSessionEventReconnectingNotice in sessionEventMapping
	SolClientEventReconnectAttempt

	// SolClientSubscriptionOk represents ccsmp.SolClientSessionEventSubscriptionOk in sessionEventMapping
	SolClientSubscriptionOk

	// SolClientSubscriptionError represents ccsmp.SolClientSessionEventSubscriptionError in sessionEventMapping
	SolClientSubscriptionError

	// SolClientProvisionOk represents ccsmp.SolClientSessionEventProvisionOk in sessionEventMapping
	SolClientProvisionOk

	// SolClientProvisionError represents ccsmp.SolClientSessionEventProvisionError in sessionEventMapping
	SolClientProvisionError
)

type EventHandler

type EventHandler func(SessionEventInfo)

EventHandler type defined

type EventInfo

type EventInfo interface {
	GetError() error
	GetInfoString() string
}

EventInfo interface defined

type Events

type Events interface {
	AddEventHandler(sessionEvent Event, responseCode EventHandler) uint
	RemoveEventHandler(id uint)
}

Events interface defined

type FlowEventInfo

type FlowEventInfo interface {
	EventInfo
}

FlowEventInfo interface

type MessageFilterConfig added in v1.9.1

type MessageFilterConfig = ccsmp.SolClientMessageFilteringConfigPt

type MessageID

type MessageID = ccsmp.SolClientMessageID

MessageID type defined

type MessageSettlementOutcome added in v1.8.0

type MessageSettlementOutcome = ccsmp.SolClientMessageSettlementOutcome

MessageSettlementOutcome type defined

type Metrics

type Metrics interface {
	GetStat(metric metrics.Metric) uint64
	IncrementMetric(metric NextGenMetric, amount uint64)
	ResetStats()
}

Metrics interface

type NextGenMetric

type NextGenMetric int

NextGenMetric structure

const (
	// MetricReceivedMessagesTerminationDiscarded initialized
	MetricReceivedMessagesTerminationDiscarded NextGenMetric = iota

	// MetricReceivedMessagesBackpressureDiscarded initialized
	MetricReceivedMessagesBackpressureDiscarded NextGenMetric = iota

	// MetricPublishMessagesTerminationDiscarded initialized
	MetricPublishMessagesTerminationDiscarded NextGenMetric = iota

	// MetricPublishMessagesBackpressureDiscarded initialized
	MetricPublishMessagesBackpressureDiscarded NextGenMetric = iota

	// MetricInternalDiscardNotifications initialized
	MetricInternalDiscardNotifications NextGenMetric = iota
)

type PersistentEventCallback

type PersistentEventCallback func(event ccsmp.SolClientFlowEvent, eventInfo FlowEventInfo)

PersistentEventCallback type defined

type PersistentReceiver

type PersistentReceiver interface {
	// Destroy destroys the flow
	Destroy(freeMemory bool) ErrorInfo
	// Start will start the receiption of messages
	// Persistent Receivers are started by default after creation
	Start() ErrorInfo
	// Stop will stop the reception of messages
	Stop() ErrorInfo
	// Subscribe will add a subscription to the persistent receiver
	Subscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
	// Unsubscribe will remove the subscription from the persistent receiver
	Unsubscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
	// Ack will acknowledge the given message
	// This method is equivalent to calling the Settle method with the ACCEPTED outcome.
	Ack(msgID MessageID) ErrorInfo
	// Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as
	// indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED,
	// the receiver has to have been preconfigured via its builder to support these settlement outcomes.
	// Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED and raises error for FAILED and REJECTED.
	Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo
	// Destionation returns the destination if retrievable, or an error if one occurred
	Destination() (destination string, durable bool, errorInfo ErrorInfo)
}

PersistentReceiver interface

type ProvisionCorrelationID added in v1.7.0

type ProvisionCorrelationID = uintptr

ProvisionCorrelationID defined

type ProvisionEvent added in v1.7.0

type ProvisionEvent interface {
	GetID() ProvisionCorrelationID
	GetError() error
}

ProvisionEvent is the event passed to a channel on completion of an endpoint provision

type Publishable

type Publishable = ccsmp.SolClientMessagePt

Publishable defined

type Publisher

type Publisher interface {
	// Publish pulishes a message in the form of a SolClientPublishable. Returns any error info from underlying send.
	Publish(message Publishable) ErrorInfo
	// Events returns SolClientEvents
	Events() Events
	// AwaitWritable awaits a writable message. Throws an error if interrupted for termination
	AwaitWritable(terminateSignal chan struct{}) error
	// TaskQueue gets the task queue that can be used to push tasks
	// TaskQueue may be a 0 length queue such that each task is transactioned, ie. no tasks sit in flux
	TaskQueue() chan SendTask
	// checks if the internal publisher is running
	IsRunning() bool
	// Increments a core metric
	IncrementMetric(metric NextGenMetric, amount uint64)
	// Acknowledgements returns the acknowledgement handler
	Acknowledgements() Acknowledgements
	// Requestor returns the reply handler manager
	Requestor() Requestor
}

Publisher interface

type Receivable

type Receivable = ccsmp.SolClientMessagePt

Receivable type defined

type ReceivedMessageFilter added in v1.9.1

type ReceivedMessageFilter interface {
	// SetupFiltering allocates and configures any resources associated with filtering received messages.
	SetupFiltering()
	// CleanupFiltering cleans any resources associated with filtering received messages.
	CleanupFiltering()
	// Filter() returns the configured filter that is applied to received messages
	Filter() MessageFilterConfig
}

type Receiver

type Receiver interface {
	// checks if the internal receiver is running
	IsRunning() bool
	// Replier returns SolClientReplier
	Replier() Replier
	// Events returns SolClientEvents
	Events() Events
	// Register an RX callback, returns a correlation pointer used when adding and removing subscriptions
	RegisterRXCallback(msgCallback RxCallback) uintptr
	// Remove the callback allowing GC to cleanup the function registered
	UnregisterRXCallback(ptr uintptr)
	// Add a subscription to the given correlation pointer
	Subscribe(topic string, ptr uintptr) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
	// Remove a subscription from the given correlation pointer
	Unsubscribe(topic string, ptr uintptr) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
	// Clears the subscription correlation with the given ID
	ClearSubscriptionCorrelation(id SubscriptionCorrelationID)
	// ProvisionEndpoint will provision an endpoint
	ProvisionEndpoint(queueName string, isExclusive bool) ErrorInfo
	// EndpointUnsubscribe will call endpoint unsubscribe on the endpoint
	EndpointUnsubscribe(queueName string, topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
	// IncrementMetric - Increments receiver metrics
	IncrementMetric(metric NextGenMetric, amount uint64)
	// IncrementDuplicateAckCount - Increments receiver duplicate acks (track duplicate accepted settlement outcome metrics from auto-acks)
	IncrementDuplicateAckCount()
	// Creates a new persistent receiver with the given callback
	NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo)
	// CacheRequestor() returns the manager that can be used to run cache operations
	CacheRequestor() CacheRequestor
}

Receiver interface

type Repliable added in v1.6.0

type Repliable = ccsmp.SolClientMessagePt

Repliable interface, this is Repliable alias, repliable are structs that are received as a part of a reply not a struct that can send a reply

type Replier added in v1.6.0

type Replier interface {
	// SendReply will send a reply publishable
	SendReply(replyMsg ReplyPublishable) ErrorInfo
}

Replier interface

type ReplyPublishable added in v1.6.0

type ReplyPublishable = ccsmp.SolClientMessagePt

ReplyPublishable type defined

type Requestor added in v1.6.0

type Requestor interface {
	// CreateReplyToTopic return a replyto topic created from a publisher id
	CreateReplyToTopic(publisherID string) string
	// registers callback for response message for a returned replyTo topic with correlation id generator
	AddRequestorReplyHandler(replyHandler RequestorReplyHandler) (string, func() (messageId uint64, correlationId string), ErrorInfo)
	// deregisters replyHandler from core publisher
	RemoveRequestorReplyHandler(replyToTopic string) ErrorInfo
}

Requestor interface User for making requests in the request reply model

type RequestorReplyCorrelationEntry added in v1.6.0

type RequestorReplyCorrelationEntry = *ccsmpReplyCorrelation

type RequestorReplyHandler added in v1.6.0

type RequestorReplyHandler func(message Repliable, correlationId string) bool

type RxCallback

type RxCallback func(msg Receivable) bool

RxCallback type defined

type SendTask

type SendTask func()

SendTask defined

type SessionEventInfo

type SessionEventInfo interface {
	EventInfo
	GetCorrelationPointer() unsafe.Pointer
}

SessionEventInfo interface defined

type SubscriptionCorrelationID

type SubscriptionCorrelationID = uintptr

SubscriptionCorrelationID defined

type SubscriptionEvent

type SubscriptionEvent interface {
	GetID() SubscriptionCorrelationID
	GetError() error
}

SubscriptionEvent is the event passed to a channel on completion of a subscription

type Transport

type Transport interface {
	Connect() error
	Disconnect() error
	Close() error
	Publisher() Publisher
	Receiver() Receiver
	Metrics() Metrics
	Events() Events
	EndpointProvisioner() EndpointProvisioner
	ID() string
	Host() string
	ModifySessionProperties([]string) error
}

Transport interface

func NewTransport

func NewTransport(host string, properties []string) (Transport, error)

NewTransport function

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL