events

package
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateAppEventRecord

func CreateAppEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord

func CreateNodeEventRecord

func CreateNodeEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord

func CreateQueueEventRecord

func CreateQueueEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord

func CreateRequestEventRecord

func CreateRequestEventRecord(objectID, referenceID, message string, resource *resources.Resource) *si.EventRecord

func CreateUserGroupEventRecord added in v1.5.0

func CreateUserGroupEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord

func Init added in v1.4.0

func Init()

Init Initializes the event system. Only exported for testing.

Types

type EventPublisher

type EventPublisher struct {
	// contains filtered or unexported fields
}

func CreateShimPublisher

func CreateShimPublisher(store *EventStore) *EventPublisher

func (*EventPublisher) StartService

func (sp *EventPublisher) StartService()

func (*EventPublisher) Stop

func (sp *EventPublisher) Stop()

type EventStore

type EventStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

The EventStore operates under the following assumptions:

  • there is a cap for the number of events stored
  • the CollectEvents() function clears the currently stored events in the EventStore

Assuming the rate of events generated by the scheduler component in a given time period is high, calling CollectEvents() periodically should be fine.

func (*EventStore) CollectEvents

func (es *EventStore) CollectEvents() []*si.EventRecord

func (*EventStore) CountStoredEvents

func (es *EventStore) CountStoredEvents() int

func (*EventStore) Store

func (es *EventStore) Store(event *si.EventRecord)

type EventStream added in v1.5.0

type EventStream struct {
	Events <-chan *si.EventRecord
}

EventStream handle type returned to the client that wants to capture the stream of events.

type EventStreamData added in v1.5.0

type EventStreamData struct {
	Name      string
	CreatedAt time.Time
}

EventStreamData contains data about an event stream.

type EventStreaming added in v1.5.0

type EventStreaming struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

EventStreaming implements the event streaming logic. New events are immediately forwarded to all active consumers.

func NewEventStreaming added in v1.5.0

func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming

NewEventStreaming creates a new event streaming infrastructure.

func (*EventStreaming) Close added in v1.5.0

func (e *EventStreaming) Close()

Close stops event streaming completely.

func (*EventStreaming) CreateEventStream added in v1.5.0

func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream

CreateEventStream sets up event streaming for a consumer. The returned EventStream object contains a channel that can be used for reading.

When a consumer is finished, it must call RemoveEventStream to free up resources.

Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number of maximum historical events from the ring buffer. "0" is a valid value and means no past events.

func (*EventStreaming) GetEventStreams added in v1.5.0

func (e *EventStreaming) GetEventStreams() []EventStreamData

GetEventStreams returns the current active event streams.

func (*EventStreaming) PublishEvent added in v1.5.0

func (e *EventStreaming) PublishEvent(event *si.EventRecord)

PublishEvent publishes an event to all event stream consumers.

The streaming logic uses bridging to ensure proper ordering of existing and new events. Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.

If "local" is full, it means that the consumer side has not processed the events at an appropriate pace. Such a consumer is removed and the related channels are closed.

func (*EventStreaming) RemoveEventStream added in v1.5.0

func (e *EventStreaming) RemoveEventStream(consumer *EventStream)

RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.

type EventSystem added in v1.4.0

type EventSystem interface {
	// AddEvent adds an event record to the event system for processing:
	// 1. It is added to a slice from where it is periodically read by the shim publisher.
	// 2. It is added to an internal ring buffer so that clients can retrieve the event history.
	// 3. Streaming clients are updated.
	AddEvent(event *si.EventRecord)

	// StartService starts the event system.
	// This method does not block. Events are processed on a separate goroutine.
	StartService()

	// Stop stops the event system.
	Stop()

	// IsEventTrackingEnabled whether history tracking is currently enabled or not.
	IsEventTrackingEnabled() bool

	// GetEventsFromID retrieves "count" number of elements from the history buffer from "id". Every
	// event has a unique ID inside the ring buffer.
	// If "id" is not in the buffer, then no record is returned, but the currently available range
	// [low..high] is set.
	GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)

	// CreateEventStream creates an event stream (channel) for a consumer.
	// The "name" argument is an arbitrary string for a consumer, which is used for logging. It does not need to be unique.
	// The "count" argument defines how many historical elements should be returned on the stream. Zero is a valid value for "count".
	// The returned type contains a read-only channel which is updated as soon as there is a new event record.
	// It is also used as a handle to stop the streaming.
	// Consumers must read the channel and process the event objects as soon as they can to avoid
	// events piling up inside the channel buffers.
	CreateEventStream(name string, count uint64) *EventStream

	// RemoveStream stops streaming for a given consumer.
	// Consumers that no longer wish to be updated (e.g., a remote client
	// disconnected) *must* call this method to gracefully stop the streaming.
	RemoveStream(*EventStream)

	// GetEventStreams returns the current active event streams.
	GetEventStreams() []EventStreamData
}

func GetEventSystem added in v1.4.0

func GetEventSystem() EventSystem

GetEventSystem returns the event system instance. Initialization happens during the first call.

type EventSystemImpl added in v1.4.0

type EventSystemImpl struct {
	Store *EventStore // storing eventChannel

	sync.RWMutex
	// contains filtered or unexported fields
}

EventSystemImpl main implementation of the event system which is used for history tracking.

func (*EventSystemImpl) AddEvent added in v1.4.0

func (ec *EventSystemImpl) AddEvent(event *si.EventRecord)

AddEvent adds an event record to the event system. See the interface for details.

func (*EventSystemImpl) CloseAllStreams added in v1.5.0

func (ec *EventSystemImpl) CloseAllStreams()

VisibleForTesting

func (*EventSystemImpl) CreateEventStream added in v1.5.0

func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream

CreateEventStream creates an event stream. See the interface for details.

func (*EventSystemImpl) GetEventStreams added in v1.5.0

func (ec *EventSystemImpl) GetEventStreams() []EventStreamData

GetEventStreams returns the current active event streams.

func (*EventSystemImpl) GetEventsFromID added in v1.4.0

func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)

GetEventsFromID retrieves historical elements. See the interface for details.

func (*EventSystemImpl) GetRequestCapacity added in v1.4.0

func (ec *EventSystemImpl) GetRequestCapacity() int

GetRequestCapacity returns the capacity of an intermediate storage which is used by the shim publisher.

func (*EventSystemImpl) GetRingBufferCapacity added in v1.4.0

func (ec *EventSystemImpl) GetRingBufferCapacity() uint64

GetRingBufferCapacity returns the capacity of the buffer which stores historical elements.

func (*EventSystemImpl) IsEventTrackingEnabled added in v1.4.0

func (ec *EventSystemImpl) IsEventTrackingEnabled() bool

IsEventTrackingEnabled whether history tracking is currently enabled or not.

func (*EventSystemImpl) RemoveStream added in v1.5.0

func (ec *EventSystemImpl) RemoveStream(consumer *EventStream)

RemoveStream graceful termination of an event streaming for a consumer. See the interface for details.

func (*EventSystemImpl) Restart added in v1.4.0

func (ec *EventSystemImpl) Restart()

Restart restarts the event system, used during config update.

func (*EventSystemImpl) StartService added in v1.4.0

func (ec *EventSystemImpl) StartService()

StartService starts the event processing in the background. See the interface for details.

func (*EventSystemImpl) StartServiceWithPublisher added in v1.4.0

func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool)

StartServiceWithPublisher starts the event processing background routines. Only exported for testing.

func (*EventSystemImpl) Stop added in v1.4.0

func (ec *EventSystemImpl) Stop()

Stop stops the event system.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL