trigger

package
v0.0.0-...-c154a1c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 16, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	NumWorkersLimit                              = 100000
	DefaultWorkerAvailabilityTimeoutMilliseconds = 10000 // 10 seconds
)

Variables

View Source
var RegistrySingleton = Registry{
	Registry: *registry.NewRegistry("trigger"),
}

RegistrySingleton is a trigger global singleton

Functions

This section is empty.

Types

type AbstractTrigger

type AbstractTrigger struct {
	Trigger Trigger

	// accessed atomically, keep as first field for alignment
	Statistics Statistics

	ID              string
	Logger          logger.Logger
	WorkerAllocator worker.Allocator
	Class           string
	Kind            string
	Name            string
	Namespace       string
	FunctionName    string
	ProjectName     string
	// contains filtered or unexported fields
}

AbstractTrigger implements common trigger operations

func NewAbstractTrigger

func NewAbstractTrigger(logger logger.Logger,
	allocator worker.Allocator,
	configuration *Configuration,
	class string,
	kind string,
	name string,
	restartTriggerChan chan Trigger) (AbstractTrigger, error)

func (*AbstractTrigger) AllocateWorkerAndSubmitEvent

func (at *AbstractTrigger) AllocateWorkerAndSubmitEvent(event nuclio.Event,
	functionLogger logger.Logger,
	timeout time.Duration) (response interface{}, submitError error, processError error)

AllocateWorkerAndSubmitEvent submits event to allocated worker

func (*AbstractTrigger) AllocateWorkerAndSubmitEvents

func (at *AbstractTrigger) AllocateWorkerAndSubmitEvents(events []nuclio.Event,
	functionLogger logger.Logger,
	timeout time.Duration) (responses []interface{}, submitError error, processErrors []error)

AllocateWorkerAndSubmitEvents submits multiple events to an allocated worker

func (*AbstractTrigger) GetClass

func (at *AbstractTrigger) GetClass() string

GetClass returns the class

func (*AbstractTrigger) GetFunctionName

func (at *AbstractTrigger) GetFunctionName() string

GetFunctionName returns function name

func (*AbstractTrigger) GetID

func (at *AbstractTrigger) GetID() string

GetID returns user given ID for this trigger

func (*AbstractTrigger) GetKind

func (at *AbstractTrigger) GetKind() string

GetKind return the kind

func (*AbstractTrigger) GetName

func (at *AbstractTrigger) GetName() string

GetName returns the name

func (*AbstractTrigger) GetNamespace

func (at *AbstractTrigger) GetNamespace() string

GetNamespace returns namespace of function

func (*AbstractTrigger) GetProjectName

func (at *AbstractTrigger) GetProjectName() string

GetProjectName returns project name

func (*AbstractTrigger) GetStatistics

func (at *AbstractTrigger) GetStatistics() *Statistics

GetStatistics returns trigger statistics

func (*AbstractTrigger) GetWorkers

func (at *AbstractTrigger) GetWorkers() []*worker.Worker

GetWorkers returns the list of workers

func (*AbstractTrigger) HandleSubmitPanic

func (at *AbstractTrigger) HandleSubmitPanic(workerInstance *worker.Worker,
	submitError *error)

HandleSubmitPanic handles a panic when submitting to worker

func (*AbstractTrigger) Initialize

func (at *AbstractTrigger) Initialize() error

Initialize performs post creation initializations

func (*AbstractTrigger) Restart

func (at *AbstractTrigger) Restart() error

Restart signals the processor to start the trigger restart procedure

func (*AbstractTrigger) SignalWorkersToContinue

func (at *AbstractTrigger) SignalWorkersToContinue() error

SignalWorkersToContinue sends a signal to all workers, telling them to continue event processing

func (*AbstractTrigger) SignalWorkersToDrain

func (at *AbstractTrigger) SignalWorkersToDrain() error

SignalWorkersToDrain sends a signal to all workers, telling them to drop or ack events that are currently being processed

func (*AbstractTrigger) SignalWorkersToTerminate

func (at *AbstractTrigger) SignalWorkersToTerminate() error

func (*AbstractTrigger) SubmitEventToWorker

func (at *AbstractTrigger) SubmitEventToWorker(functionLogger logger.Logger,
	workerInstance *worker.Worker,
	event nuclio.Event) (response interface{}, processError error)

SubmitEventToWorker submits events to worker and returns response

func (*AbstractTrigger) SubscribeToControlMessageKind

func (at *AbstractTrigger) SubscribeToControlMessageKind(kind controlcommunication.ControlMessageKind,
	controlMessageChan chan *controlcommunication.ControlMessage) error

SubscribeToControlMessageKind subscribes all workers to control message kind

func (*AbstractTrigger) TimeoutWorker

func (at *AbstractTrigger) TimeoutWorker(worker *worker.Worker) error

TimeoutWorker times out a worker

func (*AbstractTrigger) UnsubscribeFromControlMessageKind

func (at *AbstractTrigger) UnsubscribeFromControlMessageKind(kind controlcommunication.ControlMessageKind,
	controlMessageChan chan *controlcommunication.ControlMessage) error

UnsubscribeFromControlMessageKind unsubscribes all workers from control message kind

func (*AbstractTrigger) UpdateStatistics

func (at *AbstractTrigger) UpdateStatistics(success bool)

UpdateStatistics updates the trigger statistics

type AnnotationConfigField

type AnnotationConfigField struct {
	Key             string
	ValueString     *string
	ValueListString []string
	ValueInt        *int
	ValueUInt64     *uint64
	ValueBool       *bool
}

type Configuration

type Configuration struct {
	*functionconfig.Trigger

	// the runtime configuration, for reference
	RuntimeConfiguration *runtime.Configuration

	// a unique trigger ID
	ID string
}

func NewConfiguration

func NewConfiguration(id string,
	triggerConfiguration *functionconfig.Trigger,
	runtimeConfiguration *runtime.Configuration) (*Configuration, error)

func (*Configuration) ParseDurationOrDefault

func (c *Configuration) ParseDurationOrDefault(durationConfigField *DurationConfigField) error

ParseDurationOrDefault parses a duration string into a time.duration field. if empty, sets the field to the default

func (*Configuration) PopulateConfigurationFromAnnotations

func (c *Configuration) PopulateConfigurationFromAnnotations(annotationConfigFields []AnnotationConfigField) error

PopulateConfigurationFromAnnotations allows setting configuration via annotations, for experimental settings

func (*Configuration) PopulateExplicitAckMode

func (c *Configuration) PopulateExplicitAckMode(explicitAckModeValue string,
	triggerConfigurationExplicitAckMode functionconfig.ExplicitAckMode) error

func (*Configuration) ResolveWorkerAllocationMode

func (c *Configuration) ResolveWorkerAllocationMode(modeFromAttributes, modeFromAnnotation partitionworker.AllocationMode) partitionworker.AllocationMode

type Creator

type Creator interface {

	// Create creates a trigger instance
	Create(logger.Logger, string, *functionconfig.Trigger, *runtime.Configuration, *worker.AllocatorSyncMap, chan Trigger) (Trigger, error)
}

Creator creates a trigger instance

type DurationConfigField

type DurationConfigField struct {
	Name    string
	Value   string
	Field   *time.Duration
	Default time.Duration
}

type Factory

type Factory struct{}

func (*Factory) GetWorkerAllocator

func (f *Factory) GetWorkerAllocator(workerAllocatorName string,
	namedWorkerAllocators *worker.AllocatorSyncMap,
	workerAllocatorCreator func() (worker.Allocator, error)) (worker.Allocator, error)

type Registry

type Registry struct {
	registry.Registry
}

func (*Registry) NewTrigger

func (r *Registry) NewTrigger(logger logger.Logger,
	kind string,
	name string,
	triggerConfiguration *functionconfig.Trigger,
	runtimeConfiguration *runtime.Configuration,
	namedWorkerAllocators *worker.AllocatorSyncMap,
	restartTriggerChan chan Trigger) (Trigger, error)

type Secret

type Secret struct {
	Contents string
}

type Statistics

type Statistics struct {
	EventsHandledSuccessTotal uint64
	EventsHandledFailureTotal uint64
	WorkerAllocatorStatistics worker.AllocatorStatistics
}

func (*Statistics) DiffFrom

func (s *Statistics) DiffFrom(prev *Statistics) Statistics

type Trigger

type Trigger interface {

	// Initialize performs post creation initializations
	Initialize() error

	// Start creating events from a given checkpoint (nil - no checkpoint)
	Start(checkpoint functionconfig.Checkpoint) error

	// Stop creating events. returns the current checkpoint
	Stop(force bool) (functionconfig.Checkpoint, error)

	// GetID returns the user given ID for this trigger
	GetID() string

	// GetClass returns the class of source (sync, async, etc)
	GetClass() string

	// GetKind returns the specific kind of source (http, rabbit mq, etc)
	GetKind() string

	// GetName returns the trigger name
	GetName() string

	// GetConfig returns trigger configuration
	GetConfig() map[string]interface{}

	// GetStatistics returns the trigger statistics
	GetStatistics() *Statistics

	// GetWorkers gets direct access to workers for things like housekeeping / management
	// TODO: locks and such when relevant
	GetWorkers() []*worker.Worker

	// GetNamespace returns namespace
	GetNamespace() string

	// GetFunctionName returns function name
	GetFunctionName() string

	// GetProjectName returns project name
	GetProjectName() string

	// TimeoutWorker times out a worker
	TimeoutWorker(worker *worker.Worker) error

	// SignalWorkersToDrain drains all workers
	SignalWorkersToDrain() error

	// SignalWorkersToContinue signal all workers to continue processing
	SignalWorkersToContinue() error

	// SignalWorkersToTerminate signal to all workers that the processor is about to stop working
	SignalWorkersToTerminate() error
}

Trigger is common trigger interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL