push

package
v4.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusUp = iota
	StatusDown
	StatusRetryableError
	StatusNonRetryableError
)

Status update contants that will be propagated to the push manager's user

View Source
const (
	SSEEventTypeSync    = "sync"
	SSEEventTypeMessage = "message"
	SSEEventTypeError   = "error"
)

SSE event type constants

View Source
const (
	MessageTypeUpdate = iota
	MessageTypeControl
	MessageTypeOccupancy
)

Message type constants

View Source
const (
	UpdateTypeSplitChange   = "SPLIT_UPDATE"
	UpdateTypeSplitKill     = "SPLIT_KILL"
	UpdateTypeSegmentChange = "SEGMENT_UPDATE"
	UpdateTypeContol        = "CONTROL"
)

Update type constants

View Source
const (
	ControlTypeStreamingEnabled  = "STREAMING_ENABLED"
	ControlTypeStreamingPaused   = "STREAMING_PAUSED"
	ControlTypeStreamingDisabled = "STREAMING_DISABLED"
)

Control type constants

Variables

View Source
var ErrAlreadyRunning = errors.New("push manager already running")

ErrAlreadyRunning is the error to be returned when .Start() is called on an already running instance

View Source
var ErrEmptyEvent = errors.New("empty incoming event")

ErrEmptyEvent indicates an event without message and event fields

View Source
var ErrNotRunning = errors.New("push manager not running")

ErrNotRunning is the error to be returned when .Stop() is called on a non-running instance

Functions

This section is empty.

Types

type AblyError

type AblyError struct {
	// contains filtered or unexported fields
}

AblyError struct

func (*AblyError) Code

func (a *AblyError) Code() int

Code returns the error code

func (*AblyError) EventType

func (a *AblyError) EventType() string

EventType always returns SSEEventTypeError for AblyError

func (*AblyError) Href

func (a *AblyError) Href() string

Href returns the documentation link

func (*AblyError) IsRetryable

func (a *AblyError) IsRetryable() bool

IsRetryable returns whether the error is recoverable via a push subsystem restart

func (*AblyError) Message

func (a *AblyError) Message() string

Message returns the error message

func (*AblyError) StatusCode

func (a *AblyError) StatusCode() int

StatusCode returns the status code

func (*AblyError) String

func (a *AblyError) String() string

String returns the string representation of the ably error

func (*AblyError) Timestamp

func (a *AblyError) Timestamp() int64

Timestamp returns the error timestamp

type BaseMessage

type BaseMessage struct {
	// contains filtered or unexported fields
}

BaseMessage contains the basic message-specific fields and methods

func (*BaseMessage) Channel

func (m *BaseMessage) Channel() string

Channel returns which channel the message was received in

func (*BaseMessage) EventType

func (m *BaseMessage) EventType() string

EventType always returns SSEEventTypeMessage for BaseMessage and embedding types

func (*BaseMessage) Timestamp

func (m *BaseMessage) Timestamp() int64

Timestamp returns the timestamp of the message reception

type BaseUpdate

type BaseUpdate struct {
	BaseMessage
	// contains filtered or unexported fields
}

BaseUpdate contains fields & methods related to update-based messages

func (*BaseUpdate) ChangeNumber

func (b *BaseUpdate) ChangeNumber() int64

ChangeNumber returns the changeNumber of the update

func (*BaseUpdate) MessageType

func (b *BaseUpdate) MessageType() int64

MessageType alwats returns MessageType for Update messages

type ControlUpdate

type ControlUpdate struct {
	BaseMessage
	// contains filtered or unexported fields
}

ControlUpdate represents a control notification generated by the split push subsystem

func (*ControlUpdate) ControlType

func (u *ControlUpdate) ControlType() string

ControlType returns the type of control notification received

func (*ControlUpdate) MessageType

func (u *ControlUpdate) MessageType() int64

MessageType always returns MessageTypeControl for Control messages

func (*ControlUpdate) String

func (u *ControlUpdate) String() string

String returns a string representation of this notification

type Event

type Event interface {
	fmt.Stringer
	EventType() string
	Timestamp() int64
}

Event basic interface

type FeedbackLoop

type FeedbackLoop = chan<- int64

FeedbackLoop is a type alias for the type of chan that must be supplied for push status tobe propagated

type Manager

type Manager interface {
	Start() error
	Stop() error
	StopWorkers()
	StartWorkers()
	NextRefresh() time.Time
}

Manager interface contains public methods for push manager

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl implements the manager interface

func NewManager

func NewManager(
	logger logging.LoggerInterface,
	synchronizer synchronizerInterface,
	cfg *conf.AdvancedConfig,
	feedbackLoop chan<- int64,
	authAPI service.AuthClient,
	runtimeTelemetry storage.TelemetryRuntimeProducer,
	metadata dtos.Metadata,
	clientKey *string,
	hcMonitor application.MonitorProducerInterface,
) (*ManagerImpl, error)

NewManager constructs a new push manager

func (*ManagerImpl) NextRefresh added in v4.0.2

func (m *ManagerImpl) NextRefresh() time.Time

NextRefresh returns the time when the next token refresh will happen

func (*ManagerImpl) Start

func (m *ManagerImpl) Start() error

Start initiates the authentication flow and if successful initiates a connection

func (*ManagerImpl) StartWorkers

func (m *ManagerImpl) StartWorkers()

StartWorkers start the splits & segments workers

func (*ManagerImpl) Stop

func (m *ManagerImpl) Stop() error

Stop method stops the sse client and it's status monitoring goroutine

func (*ManagerImpl) StopWorkers

func (m *ManagerImpl) StopWorkers()

StopWorkers stops the splits & segments workers

type Message

type Message interface {
	Event
	MessageType() int64
	Channel() string
}

Message basic interface

type NotificationParser

type NotificationParser interface {
	ParseAndForward(sse.IncomingMessage) (*int64, error)
}

NotificationParser interface

type NotificationParserImpl

type NotificationParserImpl struct {
	// contains filtered or unexported fields
}

NotificationParserImpl implementas the NotificationParser interface

func NewNotificationParserImpl added in v4.1.2

func NewNotificationParserImpl(
	loggerInterface logging.LoggerInterface,
	onSplitUpdate func(update *SplitChangeUpdate) error,
	onSplitKill func(*SplitKillUpdate) error,
	onSegmentUpdate func(*SegmentChangeUpdate) error,
	onControlUpdate func(*ControlUpdate) *int64,
	onOccupancyMessage func(*OccupancyMessage) *int64,
	onAblyError func(*AblyError) *int64) *NotificationParserImpl

func (*NotificationParserImpl) ParseAndForward

func (p *NotificationParserImpl) ParseAndForward(raw sse.IncomingMessage) (*int64, error)

ParseAndForward accepts an incoming RAW event and returns a properly parsed & typed event

type OccupancyMessage

type OccupancyMessage struct {
	BaseMessage
	// contains filtered or unexported fields
}

OccupancyMessage contains fields & methods related to ocupancy messages

func (*OccupancyMessage) ChannelWithoutPrefix

func (o *OccupancyMessage) ChannelWithoutPrefix() string

ChannelWithoutPrefix returns the original channel namem without the metadata prefix

func (*OccupancyMessage) MessageType

func (o *OccupancyMessage) MessageType() int64

MessageType always returns MessageTypeOccupancy for Occupancy messages

func (*OccupancyMessage) Publishers

func (o *OccupancyMessage) Publishers() int64

Publishers returbs the amount of publishers in the current channel

func (*OccupancyMessage) String

func (o *OccupancyMessage) String() string

Strings returns the string representation of an occupancy message

type Processor

type Processor interface {
	ProcessSplitChangeUpdate(update *SplitChangeUpdate) error
	ProcessSplitKillUpdate(update *SplitKillUpdate) error
	ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error
	StartWorkers()
	StopWorkers()
}

Processor provides the interface for an update-message processor

type ProcessorImpl

type ProcessorImpl struct {
	// contains filtered or unexported fields
}

ProcessorImpl struct for notification processor

func NewProcessor

func NewProcessor(
	splitQueueSize int64,
	segmentQueueSize int64,
	synchronizer synchronizerInterface,
	logger logging.LoggerInterface,
) (*ProcessorImpl, error)

NewProcessor creates new processor

func (*ProcessorImpl) ProcessSegmentChangeUpdate

func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *SegmentChangeUpdate) error

ProcessSegmentChangeUpdate accepts a segment change notification and schedules a fetch

func (*ProcessorImpl) ProcessSplitChangeUpdate

func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *SplitChangeUpdate) error

ProcessSplitChangeUpdate accepts a split change notifications and schedules a fetch

func (*ProcessorImpl) ProcessSplitKillUpdate

func (p *ProcessorImpl) ProcessSplitKillUpdate(update *SplitKillUpdate) error

ProcessSplitKillUpdate accepts a split kill notification, issues a local kill and schedules a fetch

func (*ProcessorImpl) StartWorkers

func (p *ProcessorImpl) StartWorkers()

StartWorkers enables split & segments workers

func (*ProcessorImpl) StopWorkers

func (p *ProcessorImpl) StopWorkers()

StopWorkers pauses split & segments workers

type SSESyncEvent

type SSESyncEvent struct {
	// contains filtered or unexported fields
}

SSESyncEvent represents an SSE Sync event with only id (used for resuming connections)

func (*SSESyncEvent) EventType

func (e *SSESyncEvent) EventType() string

EventType always returns SSEEventTypeSync for SSESyncEvents

func (*SSESyncEvent) String

func (e *SSESyncEvent) String() string

String returns the string represenation of the event

func (*SSESyncEvent) Timestamp

func (e *SSESyncEvent) Timestamp() int64

Timestamp returns the timestamp of the event parsing

type SegmentChangeUpdate

type SegmentChangeUpdate struct {
	BaseUpdate
	// contains filtered or unexported fields
}

SegmentChangeUpdate represents a segment change notification generated in the split servers.

func (*SegmentChangeUpdate) SegmentName

func (u *SegmentChangeUpdate) SegmentName() string

SegmentName returns the name of the updated segment

func (*SegmentChangeUpdate) String

func (u *SegmentChangeUpdate) String() string

String returns the string representation of a segment update notification

func (*SegmentChangeUpdate) UpdateType

func (u *SegmentChangeUpdate) UpdateType() string

UpdateType is always UpdateTypeSegmentChange for Segmet Updates

type SegmentUpdateWorker

type SegmentUpdateWorker struct {
	// contains filtered or unexported fields
}

SegmentUpdateWorker struct

func NewSegmentUpdateWorker

func NewSegmentUpdateWorker(
	segmentQueue chan SegmentChangeUpdate,
	synchronizer synchronizerInterface,
	logger logging.LoggerInterface,
) (*SegmentUpdateWorker, error)

NewSegmentUpdateWorker creates SegmentUpdateWorker

func (*SegmentUpdateWorker) IsRunning

func (s *SegmentUpdateWorker) IsRunning() bool

IsRunning indicates if worker is running or not

func (*SegmentUpdateWorker) Start

func (s *SegmentUpdateWorker) Start()

Start starts worker

func (*SegmentUpdateWorker) Stop

func (s *SegmentUpdateWorker) Stop()

Stop stops worker

type SplitChangeUpdate

type SplitChangeUpdate struct {
	BaseUpdate
}

SplitChangeUpdate represents a SplitChange notification generated in the split servers

func (*SplitChangeUpdate) String

func (u *SplitChangeUpdate) String() string

String returns the String representation of a split change notification

func (*SplitChangeUpdate) UpdateType

func (u *SplitChangeUpdate) UpdateType() string

UpdateType always returns UpdateTypeSplitChange for SplitKillUpdate messages

type SplitKillUpdate

type SplitKillUpdate struct {
	BaseUpdate
	// contains filtered or unexported fields
}

SplitKillUpdate represents a SplitKill notification generated in the split servers

func (*SplitKillUpdate) DefaultTreatment

func (u *SplitKillUpdate) DefaultTreatment() string

DefaultTreatment returns the last default treatment seen in the split servers for this split

func (*SplitKillUpdate) SplitName

func (u *SplitKillUpdate) SplitName() string

SplitName returns the name of the killed split

func (*SplitKillUpdate) String

func (u *SplitKillUpdate) String() string

String returns the string representation of this update

func (*SplitKillUpdate) ToSplitChangeUpdate

func (u *SplitKillUpdate) ToSplitChangeUpdate() *SplitChangeUpdate

ToSplitChangeUpdate Maps this kill notification to a split change one

func (*SplitKillUpdate) UpdateType

func (u *SplitKillUpdate) UpdateType() string

UpdateType always returns UpdateTypeSplitKill for SplitKillUpdate messages

type SplitUpdateWorker

type SplitUpdateWorker struct {
	// contains filtered or unexported fields
}

SplitUpdateWorker struct

func NewSplitUpdateWorker

func NewSplitUpdateWorker(
	splitQueue chan SplitChangeUpdate,
	synchronizer synchronizerInterface,
	logger logging.LoggerInterface,
) (*SplitUpdateWorker, error)

NewSplitUpdateWorker creates SplitUpdateWorker

func (*SplitUpdateWorker) IsRunning

func (s *SplitUpdateWorker) IsRunning() bool

IsRunning indicates if worker is running or not

func (*SplitUpdateWorker) Start

func (s *SplitUpdateWorker) Start()

Start starts worker

func (*SplitUpdateWorker) Stop

func (s *SplitUpdateWorker) Stop()

Stop stops worker

type StatusTracker

type StatusTracker interface {
	HandleOccupancy(*OccupancyMessage) *int64
	HandleControl(*ControlUpdate) *int64
	HandleAblyError(*AblyError) *int64
	HandleDisconnection() *int64
	NotifySSEShutdownExpected()
	Reset()
}

StatusTracker keeps track of the status of the push subsystem and generates appropriate status change notifications.

type StatusTrackerImpl

type StatusTrackerImpl struct {
	// contains filtered or unexported fields
}

StatusTrackerImpl is a concrete implementation of the StatusTracker interface

func NewStatusTracker

func NewStatusTracker(logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *StatusTrackerImpl

NewStatusTracker returns a new StatusTracker

func (*StatusTrackerImpl) HandleAblyError

func (p *StatusTrackerImpl) HandleAblyError(errorEvent *AblyError) (newStatus *int64)

HandleAblyError should be called whenever an ably error is received

func (*StatusTrackerImpl) HandleControl

func (p *StatusTrackerImpl) HandleControl(controlUpdate *ControlUpdate) *int64

HandleControl should be called whenever a control notification is received

func (*StatusTrackerImpl) HandleDisconnection

func (p *StatusTrackerImpl) HandleDisconnection() *int64

HandleDisconnection should be called whenver the SSE client gets disconnected

func (*StatusTrackerImpl) HandleOccupancy

func (p *StatusTrackerImpl) HandleOccupancy(message *OccupancyMessage) (newStatus *int64)

HandleOccupancy should be called for every occupancy notification received

func (*StatusTrackerImpl) NotifySSEShutdownExpected

func (p *StatusTrackerImpl) NotifySSEShutdownExpected()

NotifySSEShutdownExpected should be called when we are forcefully closing the SSE client

func (*StatusTrackerImpl) Reset

func (p *StatusTrackerImpl) Reset()

Reset should be called on initialization and when the a new connection is being established (to start from scratch)

type Update

type Update interface {
	Message
	UpdateType() string
	ChangeNumber() int64
}

Update basic interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL