rules

package
v1.6.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 22 Imported by: 1

Documentation

Overview

Package rules is a rules engine for etcd. Specified functions are triggered whenever a specified rule is satisfied. Rules can contain gin-style attribute specifiers so that classes of keys are matched as opposed to having to specify each node key separately.

Index

Constants

View Source
const (
	// WebhookURLEnv is the environment variable used to specify a callback
	// webhook that will get called every time a callback has finished executing.
	WebhookURLEnv = "RULES_ENGINE_CALLBACK_WEBHOOK"
)

Variables

This section is empty.

Functions

func FormatRuleString

func FormatRuleString(in string) string

FormatRuleString creates an indented, more readable version of a rule string

func FormatWithAttributes

func FormatWithAttributes(pattern string, m Attributes) string

FormatWithAttributes applied the specified attributes to the provided path.

func RuleSatisfied

func RuleSatisfied(rule DynamicRule, triggerKey string, triggerValue *string, kvs map[string]string) (bool, error)

RuleSatisfied returns true if the rule was satisfied and false if it was not. An error is returned if the trigger key did not contain the required path variables to evaluate the rule.

func SetMethod

func SetMethod(ctx context.Context, method string) context.Context

SetMethod sets the method in the context of which an etcd call is being made, allowing metrics to differentiate between different types of calls to etcd.

Types

type AdvancedMetricsCollector added in v1.2.0

type AdvancedMetricsCollector interface {
	MetricsCollector
	ObserveWatchEvents(prefix string, events, totalBytes int)
}

AdvancedMetricsCollector used for collecting metrics additional metrics beyond those required by the base MetricsCollector, implement this interface using your metrics collector of choice (ie Prometheus) Deprecated: instead make use of the WrapWatcher to inject metric collection on watch events

type Attributes

type Attributes interface {
	GetAttribute(string) *string
	Format(string) string
}

Attributes provide access to the key/value pairs associated with dynamic keys. For instance, a dynamic key "/static/:dynamic" that is matched against "/static/value1" would contain an yield an attribute with the key "dynamic" and the value "value1".

func NewAttributes

func NewAttributes(values map[string]string) Attributes

NewAttributes provides a map-based Attributes instance, for instance for testing callbacks.

type BaseEngine

type BaseEngine interface {
	Run()
	Stop()
	IsStopped() bool

	// Shutdown gracefully stops the rules engine and waits for termination to
	// complete. If the provided context expires before the shutdown is complete,
	// then the context's error is returned.
	Shutdown(ctx context.Context) error
}

BaseEngine provides common method for etcd v2 and v3 rules engine instances.

type ContextProvider

type ContextProvider func() (context.Context, context.CancelFunc)

ContextProvider is used to specify a custom provider of a context for a given rule.

type DynamicRule

type DynamicRule interface {
	Expand(map[string][]string) ([]DynamicRule, bool)
	// contains filtered or unexported methods
}

DynamicRule defines rules that have dynamic key paths so that classes of keys can be referenced in rules.

func NewAndRule

func NewAndRule(rules ...DynamicRule) DynamicRule

NewAndRule allows two or more dynamic rules to be combined into a single rule such that every nested rule must be satisfied in order for the overall rule to be satisfied.

func NewCompareLiteralRule added in v1.5.4

func NewCompareLiteralRule(pattern string, comparator func(*string) bool, renderTemplate string) (DynamicRule, error)

NewCompareLiteralRule creates a rule that allows arbitrary comparisons to be performed against values in etcd. When comparator returns true for a given string pointer value, the rule is satisfied. DO NOT retrieve values from etcd in the function body, since that will bypass the caching functionality and put excess load on etcd. The string template value is used to render the output of the String() method, with a single string placeholder that is the etcd key or key pattern. An example: %s = "value" This can help with debugging rules.

func NewEqualsLiteralRule

func NewEqualsLiteralRule(pattern string, value *string) (DynamicRule, error)

NewEqualsLiteralRule creates a rule that compares the provided string value with the value of a node whose key matches the provided key pattern. A nil value indicates that there is no node with the given key.

func NewEqualsRule

func NewEqualsRule(pattern []string) (DynamicRule, error)

NewEqualsRule enables the comparison of two or more node values with the specified key patterns.

func NewNotRule

func NewNotRule(nestedRule DynamicRule) DynamicRule

NewNotRule allows a rule to be negated such that if the nested rule's key matches but the rule is otherwise not satisfied, the not rule is satisfied. This is to enable capabilities such as checking whether a given key is set, i.e. its value is not nil.

func NewOrRule

func NewOrRule(rules ...DynamicRule) DynamicRule

NewOrRule allows two or more dynamic rules to be combined into a single rule such that at least one nested rule must be satisfied in order for the overall rule to be satisfied.

type EngineOption

type EngineOption interface {
	// contains filtered or unexported methods
}

EngineOption instances control the overall behavior of an Engine instance. Behavior for individual rules can be controlled via RuleOption instances.

func EngineConcurrency

func EngineConcurrency(workers int) EngineOption

EngineConcurrency controls the number of concurrent workers processing rule tasks.

func EngineContextProvider

func EngineContextProvider(cp ContextProvider) EngineOption

EngineContextProvider sets a custom provider for generating context instances for use by callbacks.

func EngineCrawlMutex

func EngineCrawlMutex(mutex string, mutexTTL int) EngineOption

EngineCrawlMutex sets an application identifier mutex and a TTL value for the mutex to limit the number of instances of an application performing a crawl at any given time to one. mutexTTL refers to how long the mutex is in effect; if set too short, multiple instances of an application may end up crawling simultaneously. Note that this functionality is only implemented in etcd v3 and that a mutex in etcd v3 is held only while the app instance that created it is still active. This means that setting a high value, such as 3600 seconds, does not expose one to the risk of no crawls occuring for a maximum of one hour if an application instance terminates at the beginning of a crawler run.

func EngineDontShareLockSession added in v1.6.7

func EngineDontShareLockSession() EngineOption

EngineDontShareLockSession forces ETCD to create a new concurrency session for each locking attempt. This can increase the load on ETCD.

func EngineEnhancedRuleFilter

func EngineEnhancedRuleFilter(enhancedRuleFilter bool) EngineOption

EngineEnhancedRuleFilter uses a rule filtering mechanism that more accurately selects rules to be evaluated based on given key/value pair.

func EngineLockAcquisitionTimeout added in v1.1.7

func EngineLockAcquisitionTimeout(lockAcquisitionTimeout int) EngineOption

EngineLockAcquisitionTimeout controls the length of time we wait to acquire a lock.

func EngineLockCoolOff added in v1.5.10

func EngineLockCoolOff(timeout time.Duration) EngineOption

EngineLockCoolOff is an experimental option to preemptively fail locking attempts if an attempt to obtain the same lock was made within the specified duration so that multiple workers reacting to multiple elements of the same rule and attributes do not cause needless locking.

func EngineLockTimeout

func EngineLockTimeout(lockTimeout int) EngineOption

EngineLockTimeout controls the TTL of a lock in seconds.

func EngineMetricsCollector added in v1.1.0

func EngineMetricsCollector(m MetricsCollectorOpt) EngineOption

EngineMetricsCollector sets a custom metrics collector. The MetricsCollector returned by the MetricsCollectorOpt will be upgraded to an AdvancedMetricsCollector is possible.

func EngineRuleWorkBuffer

func EngineRuleWorkBuffer(buffer int) EngineOption

EngineRuleWorkBuffer sets the limit on the number of ruleWork in the channel without a receiving worker.

func EngineSyncDelay

func EngineSyncDelay(delay int) EngineOption

EngineSyncDelay enables the throttling of the crawlers by introducing a delay (in ms) between queries to keep the crawlers from overwhelming etcd.

func EngineSyncInterval

func EngineSyncInterval(interval int) EngineOption

EngineSyncInterval enables the interval between sync or crawler runs to be configured. The interval is in seconds.

func EngineUseSharedLockSession added in v1.5.10

func EngineUseSharedLockSession() EngineOption

EngineUseSharedLockSession is an experimental option to use a single concurrency session for managing locks to reduce the ETCD load by eliminating the need to create new concurrency session for each locking attempt. Deprecated: This option is now used by default.

func EngineUseTryLock added in v1.5.10

func EngineUseTryLock() EngineOption

EngineUseTryLock is an experimental option to fail locking immediately when a lock is already held as opposed to trying to obtain the lock until the timeout expires

func EngineWatchProcessDelay added in v1.5.12

func EngineWatchProcessDelay(base time.Duration, jitterPercent float64) EngineOption

func EngineWatchTimeout

func EngineWatchTimeout(watchTimeout int) EngineOption

EngineWatchTimeout controls the timeout of a watch operation in seconds.

func GetEngineOptions

func GetEngineOptions(options EngineOptions) []EngineOption

GetEngineOptions is used to convert an EngineOptions instance into an array of EngineOption instances which can then be used when initializing an Engine instance

func KeyConstraint

func KeyConstraint(attribute string, prefix string, chars [][]rune) EngineOption

KeyConstraint enables multiple query prefixes to be generated for a specific attribute as a way to limit the scope of a query for a prefix query.

func KeyExpansion

func KeyExpansion(keyExpansion map[string][]string) EngineOption

KeyExpansion enables attributes in rules to be fixed at run time while allowing the rule declarations to continue to use the attribute placeholders. For instance, an application may use a root directory "/:geo" to hold data for a given geography. Passing map[string][]string{"geo":{"na"}} into the KeyExpansion option will cause all rules with the "/:geo/" prefix to be rendered as "/na/..." but all paths rendered with attributes from realized rules will still correctly resolve ":geo" to "na". This allows the placeholder values to be set as application configuration settings while minimizing the scope of the watchers.

func KeyProcessorBuffer added in v1.5.1

func KeyProcessorBuffer(size int) EngineOption

KeyProcessorBuffer controls the number of key processing events can be buffered at one time.

func KeyProcessorConcurrency added in v1.5.1

func KeyProcessorConcurrency(threads int) EngineOption

KeyProcessorConcurrency controls the number of threads processing keys from the watcher and the crawler.

type EngineOptions

type EngineOptions struct {
	Concurrency        *int  `toml:"concurrency"`
	EnhancedRuleFilter *bool `toml:"enhanced_rule_filter"`
}

EngineOptions is used to configure the engine from configuration files

type EtcdMetricsMetadata

type EtcdMetricsMetadata struct {
	Method   string
	Duration time.Duration
	Error    error
}

EtcdMetricsMetadata provides information about calls to etcd

func GetMetricsMetadata

func GetMetricsMetadata(ctx context.Context) *EtcdMetricsMetadata

GetMetricsMetadata gets metadata about an etcd call from the context

type HTTPCallbackHandler added in v1.5.8

type HTTPCallbackHandler struct {
	// contains filtered or unexported fields
}

HTTPCallbackHandler instances can be used to get immediate confirmation that a callback was executed when perfoming integration testing. Not for production use.

func NewHTTPCallbackHander added in v1.5.8

func NewHTTPCallbackHander() HTTPCallbackHandler

func (HTTPCallbackHandler) ClearCallbacks added in v1.5.8

func (htcbh HTTPCallbackHandler) ClearCallbacks()

ClearCallbacks flushes all previous callbacks from the buffered channel.

func (HTTPCallbackHandler) HandleRequest added in v1.5.8

func (htcbh HTTPCallbackHandler) HandleRequest(w http.ResponseWriter, req *http.Request)

func (HTTPCallbackHandler) WaitForCallback added in v1.5.8

func (htcbh HTTPCallbackHandler) WaitForCallback(ctx context.Context, ruleID string, attributes map[string]string) error

WaitForCallback returns a nil error if the callback was executed with the given ruleID and attributes.

type MetricsCollector added in v1.1.0

type MetricsCollector interface {
	IncLockMetric(methodName string, pattern string, lockSucceeded bool)
	IncSatisfiedThenNot(methodName string, pattern string, phaseName string)
	TimesEvaluated(methodName string, ruleID string, count int)
	WorkerQueueWaitTime(methodName string, startTime time.Time)
}

MetricsCollector used for collecting metrics, implement this interface using your metrics collector of choice (ie Prometheus)

type MetricsCollectorOpt added in v1.1.0

type MetricsCollectorOpt func() MetricsCollector

MetricsCollectorOpt ...

type MockMetricsCollector added in v1.1.0

type MockMetricsCollector struct {

	// store what the IncLockMetric function was called with
	IncLockMetricPattern     []string
	IncLockMetricLockSuccess []bool
	IncLockMetricMethod      []string
	// store what the IncSatisfiedThenNot function was called with
	IncSatisfiedThenNotPattern   []string
	IncIncSatisfiedThenNotPhase  []string
	IncIncSatisfiedThenNotMethod []string
	// store what the TimesEvaluated function was called with
	TimesEvaluatedRuleID []string
	TimesEvaluatedCount  []int
	TimesEvaluatedMethod []string
	// store what the WorkerQueueWaitTime was called with
	WorkerQueueWaitTimeTimes  []time.Time
	WorkerQueueWaitTimeMethod []string
	// store what the ObserveWatchEvents was called with
	ObserveWatchEventsPrefixes   []string
	ObserveWatchEventsEvents     []int
	ObserveWatchEventsTotalBytes []int
	// contains filtered or unexported fields
}

MockMetricsCollector a mock metrics collector used in unit tests

func NewMockMetricsCollector added in v1.1.0

func NewMockMetricsCollector() MockMetricsCollector

func (*MockMetricsCollector) IncLockMetric added in v1.1.0

func (m *MockMetricsCollector) IncLockMetric(methodName string, pattern string, lockSucceeded bool)

func (*MockMetricsCollector) IncSatisfiedThenNot added in v1.1.0

func (m *MockMetricsCollector) IncSatisfiedThenNot(methodName string, pattern string, phaseName string)

func (*MockMetricsCollector) ObserveWatchEvents added in v1.2.0

func (m *MockMetricsCollector) ObserveWatchEvents(prefix string, events, totalBytes int)

func (*MockMetricsCollector) SetLogger added in v1.1.0

func (m *MockMetricsCollector) SetLogger(lgr *zap.Logger)

func (*MockMetricsCollector) TimesEvaluated added in v1.1.0

func (m *MockMetricsCollector) TimesEvaluated(methodName string, ruleID string, count int)

func (*MockMetricsCollector) WorkerQueueWaitTime added in v1.1.0

func (m *MockMetricsCollector) WorkerQueueWaitTime(methodName string, startTime time.Time)

type MockWatchWrapper added in v1.3.0

type MockWatchWrapper struct {
	Mww *MockWatcherWrapper
}

func (*MockWatchWrapper) WrapWatcher added in v1.3.0

func (mw *MockWatchWrapper) WrapWatcher(kvw v3.Watcher) v3.Watcher

type MockWatcherWrapper added in v1.3.0

type MockWatcherWrapper struct {
	Logger    *zap.Logger
	Responses []v3.WatchResponse
	KvWatcher v3.Watcher
}

func (*MockWatcherWrapper) Close added in v1.3.0

func (ww *MockWatcherWrapper) Close() error

func (*MockWatcherWrapper) RequestProgress added in v1.3.0

func (ww *MockWatcherWrapper) RequestProgress(ctx context.Context) error

func (*MockWatcherWrapper) Watch added in v1.3.0

func (ww *MockWatcherWrapper) Watch(ctx context.Context, key string, opts ...v3.OpOption) v3.WatchChan

type RuleOption

type RuleOption interface {
	// contains filtered or unexported methods
}

RuleOption instances control the behavior of individual rules.

func RuleContextProvider

func RuleContextProvider(cp ContextProvider) RuleOption

RuleContextProvider sets a custom provider for generating context instances for use by a specific callback.

func RuleID added in v1.1.0

func RuleID(ruleID string) RuleOption

RuleID is the ID associated with the rule

func RuleLockTimeout

func RuleLockTimeout(lockTimeout int) RuleOption

RuleLockTimeout controls the TTL of the locks associated with the rule, in seconds.

type V3Engine

type V3Engine interface {
	BaseEngine
	SetKVWrapper(WrapKV)
	AddRule(rule DynamicRule,
		lockPattern string,
		callback V3RuleTaskCallback,
		options ...RuleOption)
	AddPolling(namespacePattern string,
		preconditions DynamicRule,
		ttl int,
		callback V3RuleTaskCallback) error
	SetWatcherWrapper(WrapWatcher)
}

V3Engine defines the interactions with a rule engine instance communicating with etcd v3.

func NewV3Engine

func NewV3Engine(configV3 v3.Config, logger *zap.Logger, options ...EngineOption) V3Engine

NewV3Engine creates a new V3Engine instance.

func NewV3EngineWithClient

func NewV3EngineWithClient(cl *v3.Client, logger *zap.Logger, options ...EngineOption) V3Engine

NewV3EngineWithClient creates a new V3Engine instance with the provided etcd v3 client instance.

type V3RuleTask

type V3RuleTask struct {
	Attr    Attributes
	Logger  *zap.Logger
	Context context.Context

	Metadata map[string]string
	// contains filtered or unexported fields
}

V3RuleTask instances contain contextual object instances and metadata for use by rule callbacks.

type V3RuleTaskCallback

type V3RuleTaskCallback func(task *V3RuleTask)

V3RuleTaskCallback is the function type for functions that are called as a reulst of a specified rule being satisfied using the etcd v3 API.

type WrapKV

type WrapKV func(v3.KV) v3.KV

WrapKV is used to provide a wrapper for the default etcd v3 KV implementation used by the rules engine.

type WrapWatcher added in v1.3.0

type WrapWatcher func(v3.Watcher) v3.Watcher

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL