Documentation
¶
Index ¶
- Constants
- Variables
- type AblyError
- type BaseMessage
- type BaseUpdate
- type ControlUpdate
- type Event
- type FeedbackLoop
- type Manager
- type ManagerImpl
- type Message
- type NotificationParser
- type NotificationParserImpl
- type OccupancyMessage
- type Processor
- type ProcessorImpl
- func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error
- func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *SplitChangeUpdate) error
- func (p *ProcessorImpl) ProcessSplitKillUpdate(update *SplitKillUpdate) error
- func (p *ProcessorImpl) StartWorkers()
- func (p *ProcessorImpl) StopWorkers()
- type SSESyncEvent
- type SegmentChangeUpdate
- type SegmentUpdateWorker
- type SplitChangeUpdate
- type SplitKillUpdate
- type SplitUpdateWorker
- type StatusTracker
- type StatusTrackerImpl
- func (p *StatusTrackerImpl) HandleAblyError(errorEvent *AblyError) (newStatus *int64)
- func (p *StatusTrackerImpl) HandleControl(controlUpdate *ControlUpdate) *int64
- func (p *StatusTrackerImpl) HandleDisconnection() *int64
- func (p *StatusTrackerImpl) HandleOccupancy(message *OccupancyMessage) (newStatus *int64)
- func (p *StatusTrackerImpl) NotifySSEShutdownExpected()
- func (p *StatusTrackerImpl) Reset()
- type Update
Constants ¶
const ( StatusUp = iota StatusDown StatusRetryableError StatusNonRetryableError )
Status update contants that will be propagated to the push manager's user
const ( SSEEventTypeSync = "sync" SSEEventTypeMessage = "message" SSEEventTypeError = "error" )
SSE event type constants
const ( MessageTypeUpdate = iota MessageTypeControl MessageTypeOccupancy )
Message type constants
const ( UpdateTypeSplitChange = "SPLIT_UPDATE" UpdateTypeSplitKill = "SPLIT_KILL" UpdateTypeSegmentChange = "SEGMENT_UPDATE" UpdateTypeContol = "CONTROL" )
Update type constants
const ( ControlTypeStreamingEnabled = "STREAMING_ENABLED" ControlTypeStreamingPaused = "STREAMING_PAUSED" ControlTypeStreamingDisabled = "STREAMING_DISABLED" )
Control type constants
Variables ¶
var ErrAlreadyRunning = errors.New("push manager already running")
ErrAlreadyRunning is the error to be returned when .Start() is called on an already running instance
var ErrEmptyEvent = errors.New("empty incoming event")
ErrEmptyEvent indicates an event without message and event fields
var ErrNotRunning = errors.New("push manager not running")
ErrNotRunning is the error to be returned when .Stop() is called on a non-running instance
Functions ¶
This section is empty.
Types ¶
type AblyError ¶
type AblyError struct {
// contains filtered or unexported fields
}
AblyError struct
func (*AblyError) IsRetryable ¶
IsRetryable returns whether the error is recoverable via a push subsystem restart
func (*AblyError) StatusCode ¶
StatusCode returns the status code
type BaseMessage ¶
type BaseMessage struct {
// contains filtered or unexported fields
}
BaseMessage contains the basic message-specific fields and methods
func (*BaseMessage) Channel ¶
func (m *BaseMessage) Channel() string
Channel returns which channel the message was received in
func (*BaseMessage) EventType ¶
func (m *BaseMessage) EventType() string
EventType always returns SSEEventTypeMessage for BaseMessage and embedding types
func (*BaseMessage) Timestamp ¶
func (m *BaseMessage) Timestamp() int64
Timestamp returns the timestamp of the message reception
type BaseUpdate ¶
type BaseUpdate struct { BaseMessage // contains filtered or unexported fields }
BaseUpdate contains fields & methods related to update-based messages
func (*BaseUpdate) ChangeNumber ¶
func (b *BaseUpdate) ChangeNumber() int64
ChangeNumber returns the changeNumber of the update
func (*BaseUpdate) MessageType ¶
func (b *BaseUpdate) MessageType() int64
MessageType alwats returns MessageType for Update messages
type ControlUpdate ¶
type ControlUpdate struct { BaseMessage // contains filtered or unexported fields }
ControlUpdate represents a control notification generated by the split push subsystem
func (*ControlUpdate) ControlType ¶
func (u *ControlUpdate) ControlType() string
ControlType returns the type of control notification received
func (*ControlUpdate) MessageType ¶
func (u *ControlUpdate) MessageType() int64
MessageType always returns MessageTypeControl for Control messages
func (*ControlUpdate) String ¶
func (u *ControlUpdate) String() string
String returns a string representation of this notification
type FeedbackLoop ¶
type FeedbackLoop = chan<- int64
FeedbackLoop is a type alias for the type of chan that must be supplied for push status tobe propagated
type Manager ¶
type Manager interface { Start() error Stop() error StopWorkers() StartWorkers() NextRefresh() time.Time }
Manager interface contains public methods for push manager
type ManagerImpl ¶
type ManagerImpl struct {
// contains filtered or unexported fields
}
ManagerImpl implements the manager interface
func NewManager ¶
func NewManager( logger logging.LoggerInterface, synchronizer synchronizerInterface, cfg *conf.AdvancedConfig, feedbackLoop chan<- int64, authAPI service.AuthClient, runtimeTelemetry storage.TelemetryRuntimeProducer, metadata dtos.Metadata, clientKey *string, hcMonitor application.MonitorProducerInterface, ) (*ManagerImpl, error)
NewManager constructs a new push manager
func (*ManagerImpl) NextRefresh ¶ added in v4.0.2
func (m *ManagerImpl) NextRefresh() time.Time
NextRefresh returns the time when the next token refresh will happen
func (*ManagerImpl) Start ¶
func (m *ManagerImpl) Start() error
Start initiates the authentication flow and if successful initiates a connection
func (*ManagerImpl) StartWorkers ¶
func (m *ManagerImpl) StartWorkers()
StartWorkers start the splits & segments workers
func (*ManagerImpl) Stop ¶
func (m *ManagerImpl) Stop() error
Stop method stops the sse client and it's status monitoring goroutine
func (*ManagerImpl) StopWorkers ¶
func (m *ManagerImpl) StopWorkers()
StopWorkers stops the splits & segments workers
type NotificationParser ¶
type NotificationParser interface {
ParseAndForward(sse.IncomingMessage) (*int64, error)
}
NotificationParser interface
type NotificationParserImpl ¶
type NotificationParserImpl struct {
// contains filtered or unexported fields
}
NotificationParserImpl implementas the NotificationParser interface
func NewNotificationParserImpl ¶ added in v4.1.2
func NewNotificationParserImpl( loggerInterface logging.LoggerInterface, onSplitUpdate func(update *SplitChangeUpdate) error, onSplitKill func(*SplitKillUpdate) error, onSegmentUpdate func(*SegmentChangeUpdate) error, onControlUpdate func(*ControlUpdate) *int64, onOccupancyMessage func(*OccupancyMessage) *int64, onAblyError func(*AblyError) *int64) *NotificationParserImpl
func (*NotificationParserImpl) ParseAndForward ¶
func (p *NotificationParserImpl) ParseAndForward(raw sse.IncomingMessage) (*int64, error)
ParseAndForward accepts an incoming RAW event and returns a properly parsed & typed event
type OccupancyMessage ¶
type OccupancyMessage struct { BaseMessage // contains filtered or unexported fields }
OccupancyMessage contains fields & methods related to ocupancy messages
func (*OccupancyMessage) ChannelWithoutPrefix ¶
func (o *OccupancyMessage) ChannelWithoutPrefix() string
ChannelWithoutPrefix returns the original channel namem without the metadata prefix
func (*OccupancyMessage) MessageType ¶
func (o *OccupancyMessage) MessageType() int64
MessageType always returns MessageTypeOccupancy for Occupancy messages
func (*OccupancyMessage) Publishers ¶
func (o *OccupancyMessage) Publishers() int64
Publishers returbs the amount of publishers in the current channel
func (*OccupancyMessage) String ¶
func (o *OccupancyMessage) String() string
Strings returns the string representation of an occupancy message
type Processor ¶
type Processor interface { ProcessSplitChangeUpdate(update *SplitChangeUpdate) error ProcessSplitKillUpdate(update *SplitKillUpdate) error ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error StartWorkers() StopWorkers() }
Processor provides the interface for an update-message processor
type ProcessorImpl ¶
type ProcessorImpl struct {
// contains filtered or unexported fields
}
ProcessorImpl struct for notification processor
func NewProcessor ¶
func NewProcessor( splitQueueSize int64, segmentQueueSize int64, synchronizer synchronizerInterface, logger logging.LoggerInterface, ) (*ProcessorImpl, error)
NewProcessor creates new processor
func (*ProcessorImpl) ProcessSegmentChangeUpdate ¶
func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error
ProcessSegmentChangeUpdate accepts a segment change notification and schedules a fetch
func (*ProcessorImpl) ProcessSplitChangeUpdate ¶
func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *SplitChangeUpdate) error
ProcessSplitChangeUpdate accepts a split change notifications and schedules a fetch
func (*ProcessorImpl) ProcessSplitKillUpdate ¶
func (p *ProcessorImpl) ProcessSplitKillUpdate(update *SplitKillUpdate) error
ProcessSplitKillUpdate accepts a split kill notification, issues a local kill and schedules a fetch
func (*ProcessorImpl) StartWorkers ¶
func (p *ProcessorImpl) StartWorkers()
StartWorkers enables split & segments workers
func (*ProcessorImpl) StopWorkers ¶
func (p *ProcessorImpl) StopWorkers()
StopWorkers pauses split & segments workers
type SSESyncEvent ¶
type SSESyncEvent struct {
// contains filtered or unexported fields
}
SSESyncEvent represents an SSE Sync event with only id (used for resuming connections)
func (*SSESyncEvent) EventType ¶
func (e *SSESyncEvent) EventType() string
EventType always returns SSEEventTypeSync for SSESyncEvents
func (*SSESyncEvent) String ¶
func (e *SSESyncEvent) String() string
String returns the string represenation of the event
func (*SSESyncEvent) Timestamp ¶
func (e *SSESyncEvent) Timestamp() int64
Timestamp returns the timestamp of the event parsing
type SegmentChangeUpdate ¶
type SegmentChangeUpdate struct { BaseUpdate // contains filtered or unexported fields }
SegmentChangeUpdate represents a segment change notification generated in the split servers.
func (*SegmentChangeUpdate) SegmentName ¶
func (u *SegmentChangeUpdate) SegmentName() string
SegmentName returns the name of the updated segment
func (*SegmentChangeUpdate) String ¶
func (u *SegmentChangeUpdate) String() string
String returns the string representation of a segment update notification
func (*SegmentChangeUpdate) UpdateType ¶
func (u *SegmentChangeUpdate) UpdateType() string
UpdateType is always UpdateTypeSegmentChange for Segmet Updates
type SegmentUpdateWorker ¶
type SegmentUpdateWorker struct {
// contains filtered or unexported fields
}
SegmentUpdateWorker struct
func NewSegmentUpdateWorker ¶
func NewSegmentUpdateWorker( segmentQueue chan SegmentChangeUpdate, synchronizer synchronizerInterface, logger logging.LoggerInterface, ) (*SegmentUpdateWorker, error)
NewSegmentUpdateWorker creates SegmentUpdateWorker
func (*SegmentUpdateWorker) IsRunning ¶
func (s *SegmentUpdateWorker) IsRunning() bool
IsRunning indicates if worker is running or not
type SplitChangeUpdate ¶
type SplitChangeUpdate struct {
BaseUpdate
}
SplitChangeUpdate represents a SplitChange notification generated in the split servers
func (*SplitChangeUpdate) String ¶
func (u *SplitChangeUpdate) String() string
String returns the String representation of a split change notification
func (*SplitChangeUpdate) UpdateType ¶
func (u *SplitChangeUpdate) UpdateType() string
UpdateType always returns UpdateTypeSplitChange for SplitKillUpdate messages
type SplitKillUpdate ¶
type SplitKillUpdate struct { BaseUpdate // contains filtered or unexported fields }
SplitKillUpdate represents a SplitKill notification generated in the split servers
func (*SplitKillUpdate) DefaultTreatment ¶
func (u *SplitKillUpdate) DefaultTreatment() string
DefaultTreatment returns the last default treatment seen in the split servers for this split
func (*SplitKillUpdate) SplitName ¶
func (u *SplitKillUpdate) SplitName() string
SplitName returns the name of the killed split
func (*SplitKillUpdate) String ¶
func (u *SplitKillUpdate) String() string
String returns the string representation of this update
func (*SplitKillUpdate) ToSplitChangeUpdate ¶
func (u *SplitKillUpdate) ToSplitChangeUpdate() *SplitChangeUpdate
ToSplitChangeUpdate Maps this kill notification to a split change one
func (*SplitKillUpdate) UpdateType ¶
func (u *SplitKillUpdate) UpdateType() string
UpdateType always returns UpdateTypeSplitKill for SplitKillUpdate messages
type SplitUpdateWorker ¶
type SplitUpdateWorker struct {
// contains filtered or unexported fields
}
SplitUpdateWorker struct
func NewSplitUpdateWorker ¶
func NewSplitUpdateWorker( splitQueue chan SplitChangeUpdate, synchronizer synchronizerInterface, logger logging.LoggerInterface, ) (*SplitUpdateWorker, error)
NewSplitUpdateWorker creates SplitUpdateWorker
func (*SplitUpdateWorker) IsRunning ¶
func (s *SplitUpdateWorker) IsRunning() bool
IsRunning indicates if worker is running or not
type StatusTracker ¶
type StatusTracker interface { HandleOccupancy(*OccupancyMessage) *int64 HandleControl(*ControlUpdate) *int64 HandleAblyError(*AblyError) *int64 HandleDisconnection() *int64 NotifySSEShutdownExpected() Reset() }
StatusTracker keeps track of the status of the push subsystem and generates appropriate status change notifications.
type StatusTrackerImpl ¶
type StatusTrackerImpl struct {
// contains filtered or unexported fields
}
StatusTrackerImpl is a concrete implementation of the StatusTracker interface
func NewStatusTracker ¶
func NewStatusTracker(logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *StatusTrackerImpl
NewStatusTracker returns a new StatusTracker
func (*StatusTrackerImpl) HandleAblyError ¶
func (p *StatusTrackerImpl) HandleAblyError(errorEvent *AblyError) (newStatus *int64)
HandleAblyError should be called whenever an ably error is received
func (*StatusTrackerImpl) HandleControl ¶
func (p *StatusTrackerImpl) HandleControl(controlUpdate *ControlUpdate) *int64
HandleControl should be called whenever a control notification is received
func (*StatusTrackerImpl) HandleDisconnection ¶
func (p *StatusTrackerImpl) HandleDisconnection() *int64
HandleDisconnection should be called whenver the SSE client gets disconnected
func (*StatusTrackerImpl) HandleOccupancy ¶
func (p *StatusTrackerImpl) HandleOccupancy(message *OccupancyMessage) (newStatus *int64)
HandleOccupancy should be called for every occupancy notification received
func (*StatusTrackerImpl) NotifySSEShutdownExpected ¶
func (p *StatusTrackerImpl) NotifySSEShutdownExpected()
NotifySSEShutdownExpected should be called when we are forcefully closing the SSE client
func (*StatusTrackerImpl) Reset ¶
func (p *StatusTrackerImpl) Reset()
Reset should be called on initialization and when the a new connection is being established (to start from scratch)