Documentation ¶
Index ¶
- Constants
- Variables
- type AbstractTrigger
- func (at *AbstractTrigger) AllocateWorkerAndSubmitEvent(event nuclio.Event, functionLogger logger.Logger, timeout time.Duration) (response interface{}, submitError error, processError error)
- func (at *AbstractTrigger) AllocateWorkerAndSubmitEvents(events []nuclio.Event, functionLogger logger.Logger, timeout time.Duration) (responses []interface{}, submitError error, processErrors []error)
- func (at *AbstractTrigger) GetClass() string
- func (at *AbstractTrigger) GetFunctionName() string
- func (at *AbstractTrigger) GetID() string
- func (at *AbstractTrigger) GetKind() string
- func (at *AbstractTrigger) GetName() string
- func (at *AbstractTrigger) GetNamespace() string
- func (at *AbstractTrigger) GetProjectName() string
- func (at *AbstractTrigger) GetStatistics() *Statistics
- func (at *AbstractTrigger) GetWorkers() []*worker.Worker
- func (at *AbstractTrigger) HandleSubmitPanic(workerInstance *worker.Worker, submitError *error)
- func (at *AbstractTrigger) Initialize() error
- func (at *AbstractTrigger) Restart() error
- func (at *AbstractTrigger) SignalWorkersToContinue() error
- func (at *AbstractTrigger) SignalWorkersToDrain() error
- func (at *AbstractTrigger) SignalWorkersToTerminate() error
- func (at *AbstractTrigger) SubmitEventToWorker(functionLogger logger.Logger, workerInstance *worker.Worker, ...) (response interface{}, processError error)
- func (at *AbstractTrigger) SubscribeToControlMessageKind(kind controlcommunication.ControlMessageKind, ...) error
- func (at *AbstractTrigger) TimeoutWorker(worker *worker.Worker) error
- func (at *AbstractTrigger) UnsubscribeFromControlMessageKind(kind controlcommunication.ControlMessageKind, ...) error
- func (at *AbstractTrigger) UpdateStatistics(success bool)
- type AnnotationConfigField
- type Configuration
- func (c *Configuration) ParseDurationOrDefault(durationConfigField *DurationConfigField) error
- func (c *Configuration) PopulateConfigurationFromAnnotations(annotationConfigFields []AnnotationConfigField) error
- func (c *Configuration) PopulateExplicitAckMode(explicitAckModeValue string, ...) error
- func (c *Configuration) ResolveWorkerAllocationMode(modeFromAttributes, modeFromAnnotation partitionworker.AllocationMode) partitionworker.AllocationMode
- type Creator
- type DurationConfigField
- type Factory
- type Registry
- type Secret
- type Statistics
- type Trigger
Constants ¶
const ( NumWorkersLimit = 100000 DefaultWorkerAvailabilityTimeoutMilliseconds = 10000 // 10 seconds )
Variables ¶
var RegistrySingleton = Registry{ Registry: *registry.NewRegistry("trigger"), }
RegistrySingleton is a trigger global singleton
Functions ¶
This section is empty.
Types ¶
type AbstractTrigger ¶
type AbstractTrigger struct { Trigger Trigger // accessed atomically, keep as first field for alignment Statistics Statistics ID string Logger logger.Logger WorkerAllocator worker.Allocator Class string Kind string Name string Namespace string FunctionName string ProjectName string // contains filtered or unexported fields }
AbstractTrigger implements common trigger operations
func NewAbstractTrigger ¶
func (*AbstractTrigger) AllocateWorkerAndSubmitEvent ¶
func (at *AbstractTrigger) AllocateWorkerAndSubmitEvent(event nuclio.Event, functionLogger logger.Logger, timeout time.Duration) (response interface{}, submitError error, processError error)
AllocateWorkerAndSubmitEvent submits event to allocated worker
func (*AbstractTrigger) AllocateWorkerAndSubmitEvents ¶
func (at *AbstractTrigger) AllocateWorkerAndSubmitEvents(events []nuclio.Event, functionLogger logger.Logger, timeout time.Duration) (responses []interface{}, submitError error, processErrors []error)
AllocateWorkerAndSubmitEvents submits multiple events to an allocated worker
func (*AbstractTrigger) GetClass ¶
func (at *AbstractTrigger) GetClass() string
GetClass returns the class
func (*AbstractTrigger) GetFunctionName ¶
func (at *AbstractTrigger) GetFunctionName() string
GetFunctionName returns function name
func (*AbstractTrigger) GetID ¶
func (at *AbstractTrigger) GetID() string
GetID returns user given ID for this trigger
func (*AbstractTrigger) GetKind ¶
func (at *AbstractTrigger) GetKind() string
GetKind return the kind
func (*AbstractTrigger) GetName ¶
func (at *AbstractTrigger) GetName() string
GetName returns the name
func (*AbstractTrigger) GetNamespace ¶
func (at *AbstractTrigger) GetNamespace() string
GetNamespace returns namespace of function
func (*AbstractTrigger) GetProjectName ¶
func (at *AbstractTrigger) GetProjectName() string
GetProjectName returns project name
func (*AbstractTrigger) GetStatistics ¶
func (at *AbstractTrigger) GetStatistics() *Statistics
GetStatistics returns trigger statistics
func (*AbstractTrigger) GetWorkers ¶
func (at *AbstractTrigger) GetWorkers() []*worker.Worker
GetWorkers returns the list of workers
func (*AbstractTrigger) HandleSubmitPanic ¶
func (at *AbstractTrigger) HandleSubmitPanic(workerInstance *worker.Worker, submitError *error)
HandleSubmitPanic handles a panic when submitting to worker
func (*AbstractTrigger) Initialize ¶
func (at *AbstractTrigger) Initialize() error
Initialize performs post creation initializations
func (*AbstractTrigger) Restart ¶
func (at *AbstractTrigger) Restart() error
Restart signals the processor to start the trigger restart procedure
func (*AbstractTrigger) SignalWorkersToContinue ¶
func (at *AbstractTrigger) SignalWorkersToContinue() error
SignalWorkersToContinue sends a signal to all workers, telling them to continue event processing
func (*AbstractTrigger) SignalWorkersToDrain ¶
func (at *AbstractTrigger) SignalWorkersToDrain() error
SignalWorkersToDrain sends a signal to all workers, telling them to drop or ack events that are currently being processed
func (*AbstractTrigger) SignalWorkersToTerminate ¶
func (at *AbstractTrigger) SignalWorkersToTerminate() error
func (*AbstractTrigger) SubmitEventToWorker ¶
func (at *AbstractTrigger) SubmitEventToWorker(functionLogger logger.Logger, workerInstance *worker.Worker, event nuclio.Event) (response interface{}, processError error)
SubmitEventToWorker submits events to worker and returns response
func (*AbstractTrigger) SubscribeToControlMessageKind ¶
func (at *AbstractTrigger) SubscribeToControlMessageKind(kind controlcommunication.ControlMessageKind, controlMessageChan chan *controlcommunication.ControlMessage) error
SubscribeToControlMessageKind subscribes all workers to control message kind
func (*AbstractTrigger) TimeoutWorker ¶
func (at *AbstractTrigger) TimeoutWorker(worker *worker.Worker) error
TimeoutWorker times out a worker
func (*AbstractTrigger) UnsubscribeFromControlMessageKind ¶
func (at *AbstractTrigger) UnsubscribeFromControlMessageKind(kind controlcommunication.ControlMessageKind, controlMessageChan chan *controlcommunication.ControlMessage) error
UnsubscribeFromControlMessageKind unsubscribes all workers from control message kind
func (*AbstractTrigger) UpdateStatistics ¶
func (at *AbstractTrigger) UpdateStatistics(success bool)
UpdateStatistics updates the trigger statistics
type AnnotationConfigField ¶
type Configuration ¶
type Configuration struct { *functionconfig.Trigger // the runtime configuration, for reference RuntimeConfiguration *runtime.Configuration // a unique trigger ID ID string }
func NewConfiguration ¶
func NewConfiguration(id string, triggerConfiguration *functionconfig.Trigger, runtimeConfiguration *runtime.Configuration) (*Configuration, error)
func (*Configuration) ParseDurationOrDefault ¶
func (c *Configuration) ParseDurationOrDefault(durationConfigField *DurationConfigField) error
ParseDurationOrDefault parses a duration string into a time.duration field. if empty, sets the field to the default
func (*Configuration) PopulateConfigurationFromAnnotations ¶
func (c *Configuration) PopulateConfigurationFromAnnotations(annotationConfigFields []AnnotationConfigField) error
PopulateConfigurationFromAnnotations allows setting configuration via annotations, for experimental settings
func (*Configuration) PopulateExplicitAckMode ¶
func (c *Configuration) PopulateExplicitAckMode(explicitAckModeValue string, triggerConfigurationExplicitAckMode functionconfig.ExplicitAckMode) error
func (*Configuration) ResolveWorkerAllocationMode ¶
func (c *Configuration) ResolveWorkerAllocationMode(modeFromAttributes, modeFromAnnotation partitionworker.AllocationMode) partitionworker.AllocationMode
type Creator ¶
type Creator interface { // Create creates a trigger instance Create(logger.Logger, string, *functionconfig.Trigger, *runtime.Configuration, *worker.AllocatorSyncMap, chan Trigger) (Trigger, error) }
Creator creates a trigger instance
type DurationConfigField ¶
type Registry ¶
func (*Registry) NewTrigger ¶
func (r *Registry) NewTrigger(logger logger.Logger, kind string, name string, triggerConfiguration *functionconfig.Trigger, runtimeConfiguration *runtime.Configuration, namedWorkerAllocators *worker.AllocatorSyncMap, restartTriggerChan chan Trigger) (Trigger, error)
type Statistics ¶
type Statistics struct { EventsHandledSuccessTotal uint64 EventsHandledFailureTotal uint64 WorkerAllocatorStatistics worker.AllocatorStatistics }
func (*Statistics) DiffFrom ¶
func (s *Statistics) DiffFrom(prev *Statistics) Statistics
type Trigger ¶
type Trigger interface { // Initialize performs post creation initializations Initialize() error // Start creating events from a given checkpoint (nil - no checkpoint) Start(checkpoint functionconfig.Checkpoint) error // Stop creating events. returns the current checkpoint Stop(force bool) (functionconfig.Checkpoint, error) // GetID returns the user given ID for this trigger GetID() string // GetClass returns the class of source (sync, async, etc) GetClass() string // GetKind returns the specific kind of source (http, rabbit mq, etc) GetKind() string // GetName returns the trigger name GetName() string // GetConfig returns trigger configuration GetConfig() map[string]interface{} // GetStatistics returns the trigger statistics GetStatistics() *Statistics // GetWorkers gets direct access to workers for things like housekeeping / management // TODO: locks and such when relevant GetWorkers() []*worker.Worker // GetNamespace returns namespace GetNamespace() string // GetFunctionName returns function name GetFunctionName() string // GetProjectName returns project name GetProjectName() string // TimeoutWorker times out a worker TimeoutWorker(worker *worker.Worker) error // SignalWorkersToDrain drains all workers SignalWorkersToDrain() error // SignalWorkersToContinue signal all workers to continue processing SignalWorkersToContinue() error // SignalWorkersToTerminate signal to all workers that the processor is about to stop working SignalWorkersToTerminate() error }
Trigger is common trigger interface