firehose

package
v0.3.0-beta1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	KedaPausedAnnotationKey        = "autoscaling.keda.sh/paused"
	KedaPausedReplicaAnnotationKey = "autoscaling.keda.sh/paused-replicas"

	KedaKafkaMetadataBootstrapServersKey = "bootstrapServers"
	KedaKafkaMetadataTopicKey            = "topic"
	KedaKafkaMetadataConsumerGroupKey    = "consumerGroup"
)
View Source
const (
	ScaleAction   = "scale"
	StartAction   = "start"
	StopAction    = "stop"
	ResetAction   = "reset"
	ResetV2Action = "reset-v2"
	UpgradeAction = "upgrade"
)
View Source
const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"

Variables

View Source
var Module = module.Descriptor{
	Kind: "firehose",
	Dependencies: map[string]string{
		// contains filtered or unexported fields
	},
	Actions: []module.ActionDesc{
		{
			Name:        module.CreateAction,
			Description: "Creates a new firehose",
		},
		{
			Name:        module.UpdateAction,
			Description: "Update all configurations of firehose",
		},
		{
			Name:        ResetAction,
			Description: "Stop firehose, reset consumer group, restart",
		},
		{
			Name:        ResetV2Action,
			Description: "Stop firehose, reset consumer group, restart with datetime option",
		},
		{
			Name:        StopAction,
			Description: "Stop all replicas of this firehose.",
		},
		{
			Name:        StartAction,
			Description: "Start the firehose if it is currently stopped.",
		},
		{
			Name:        ScaleAction,
			Description: "Scale the number of replicas to given number.",
		},
		{
			Name:        UpgradeAction,
			Description: "Upgrade firehose version",
		},
	},
	DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) {
		mu.Lock()
		defer mu.Unlock()

		conf := defaultDriverConf
		if err := json.Unmarshal(confJSON, &conf); err != nil {
			return nil, err
		} else if err := validator.TaggedStruct(conf); err != nil {
			return nil, err
		}

		return &firehoseDriver{
			conf:    conf,
			timeNow: time.Now,
			kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error {
				canUpdate := func(rel *release.Release) bool {
					curLabels, ok := rel.Config[labelsConfKey].(map[string]any)
					if !ok {
						return false
					}
					newLabels, ok := hc.Values[labelsConfKey].(map[string]string)
					if !ok {
						return false
					}

					isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue
					isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment]

					return isManagedByEntropy && isSameDeployment
				}

				helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf})
				_, errHelm := helmCl.Upsert(&hc, canUpdate)
				return errHelm
			},
			kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error())
				}
				return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool {

					return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil
				})
			},
			kubeGetDeployment: func(ctx context.Context, conf kube.Config, ns, name string) (kube.Deployment, error) {
				kubeCl, err := kube.NewClient(ctx, conf)
				if err != nil {
					return kube.Deployment{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get deployment").WithCausef(err.Error())
				}
				return kubeCl.GetDeploymentDetails(ctx, ns, name)
			},
			consumerReset: consumerReset,
		}, nil
	},
}

Functions

This section is empty.

Types

type AuthenticationRef added in v0.2.35

type AuthenticationRef struct {
	Name string `json:"name,omitempty" validate:"required"`
	Kind string `json:"kind,omitempty"`
}

type Autoscaler added in v0.2.35

type Autoscaler struct {
	Enabled bool           `json:"enabled"`
	Type    AutoscalerType `json:"type,omitempty"`
	Spec    AutoscalerSpec `json:"spec,omitempty"`
}

func (*Autoscaler) GetHelmValues added in v0.2.35

func (autoscaler *Autoscaler) GetHelmValues(cfg Config) (map[string]any, error)

func (*Autoscaler) UnmarshalJSON added in v0.2.35

func (autoscaler *Autoscaler) UnmarshalJSON(data []byte) error

type AutoscalerSpec added in v0.2.35

type AutoscalerSpec interface {
	ReadConfig(cfg Config, driverConf driverConf) error
	Pause(replica ...int)
	Resume()
	GetHelmValues(cfg Config) (map[string]any, error)
}

type AutoscalerType added in v0.2.35

type AutoscalerType string
const (
	KEDA AutoscalerType = "keda"
)

type ChartValues

type ChartValues struct {
	ImageRepository string `json:"image_repository" validate:"required"`
	ImageTag        string `json:"image_tag" validate:"required"`
	ChartVersion    string `json:"chart_version" validate:"required"`
	ImagePullPolicy string `json:"image_pull_policy" validate:"required"`
}

type Config

type Config struct {
	// Stopped flag when set forces the firehose to be stopped on next sync.
	Stopped bool `json:"stopped"`

	// StopTime can be set to schedule the firehose to be stopped at given time.
	StopTime *time.Time `json:"stop_time,omitempty"`

	// Replicas is the number of firehose instances to run.
	Replicas int `json:"replicas"`

	// Namespace is the target namespace where firehose should be deployed.
	// Inherits from driver config.
	Namespace string `json:"namespace,omitempty"`

	// DeploymentID will be used as the release-name for the deployment.
	// Must be shorter than 53 chars if set. If not set, one will be generated
	// automatically.
	DeploymentID string `json:"deployment_id,omitempty"`

	// EnvVariables contains all the firehose environment config values.
	EnvVariables map[string]string `json:"env_variables,omitempty"`

	// ResetOffset represents the value to which kafka consumer offset was set to
	ResetOffset string `json:"reset_offset,omitempty"`

	Limits        UsageSpec     `json:"limits,omitempty"`
	Requests      UsageSpec     `json:"requests,omitempty"`
	Telegraf      *Telegraf     `json:"telegraf,omitempty"`
	ChartValues   *ChartValues  `json:"chart_values,omitempty"`
	InitContainer InitContainer `json:"init_container,omitempty"`
	Autoscaler    *Autoscaler   `json:"autoscaler,omitempty"`
}

type Fallback added in v0.2.35

type Fallback struct {
	Behavior         string `json:"behavior,omitempty"`
	Replicas         int    `json:"replicas,omitempty"`
	FailureThreshold int    `json:"failure_threshold,omitempty"`
}

type FirehoseAutoscaler added in v0.2.35

type FirehoseAutoscaler struct {
	Keda Keda `json:"keda,omitempty"`
}

type HorizontalPodAutoscaler added in v0.2.35

type HorizontalPodAutoscaler struct {
	ScaleDown ScaleBehaviour `json:"scale_down,omitempty"`
	ScaleUp   ScaleBehaviour `json:"scale_up,omitempty"`
}

type InitContainer

type InitContainer struct {
	Enabled bool `json:"enabled"`

	Args    []string `json:"args"`
	Command []string `json:"command"`

	Repository string `json:"repository"`
	ImageTag   string `json:"image_tag"`
	PullPolicy string `json:"pull_policy"`
}

type Keda added in v0.2.35

type Keda struct {
	Paused                   bool                     `json:"paused,omitempty"`
	PausedWithReplica        bool                     `json:"paused_with_replica,omitempty"`
	PausedReplica            int                      `json:"paused_replica,omitempty"`
	MinReplicas              int                      `json:"min_replicas,omitempty"`
	MaxReplicas              int                      `json:"max_replicas,omitempty"`
	PollingInterval          int                      `json:"polling_interval,omitempty"`
	CooldownPeriod           int                      `json:"cooldown_period,omitempty"`
	Triggers                 map[string]Trigger       `json:"triggers,omitempty"`
	RestoreToOriginalReplica bool                     `json:"restore_to_original_replica_count,omitempty"`
	Fallback                 *Fallback                `json:"fallback,omitempty"`
	HPA                      *HorizontalPodAutoscaler `json:"hpa,omitempty"`
}

func (*Keda) GetHelmValues added in v0.2.35

func (keda *Keda) GetHelmValues(cfg Config) (map[string]any, error)

func (*Keda) Pause added in v0.2.35

func (keda *Keda) Pause(replica ...int)

func (*Keda) ReadConfig added in v0.2.35

func (keda *Keda) ReadConfig(cfg Config, driverCfg driverConf) error

func (*Keda) Resume added in v0.2.35

func (keda *Keda) Resume()

type Output

type Output struct {
	Pods              []kube.Pod       `json:"pods,omitempty"`
	Namespace         string           `json:"namespace,omitempty"`
	ReleaseName       string           `json:"release_name,omitempty"`
	Deployment        *kube.Deployment `json:"deployment,omitempty"`
	DesiredStatus     string           `json:"desired_status,omitempty"`
	AutoscalerEnabled bool             `json:"autoscaler_enabled,omitempty"`
}

type Policy added in v0.2.35

type Policy struct {
	Type          string  `json:"type,omitempty"`
	Value         float32 `json:"value,omitempty"`
	PeriodSeconds int     `json:"period_seconds,omitempty"`
}

type RequestsAndLimits

type RequestsAndLimits struct {
	Limits   UsageSpec `json:"limits,omitempty"`
	Requests UsageSpec `json:"requests,omitempty"`
}

type ScaleBehaviour added in v0.2.35

type ScaleBehaviour struct {
	Policies                   []Policy `json:"policies,omitempty"`
	StabilizationWindowSeconds *int     `json:"stabilization_window_seconds,omitempty"`
	Tolerance                  *float32 `json:"tolerance,omitempty"`
}

type ScaleParams

type ScaleParams struct {
	Replicas int `json:"replicas"`
}

type Scaler added in v0.2.35

type Scaler string
const (
	KAFKA      Scaler = "kafka"
	PROMETHEUS Scaler = "prometheus"
)

type StartParams

type StartParams struct {
	StopTime *time.Time `json:"stop_time"`
}

type Telegraf

type Telegraf struct {
	Enabled bool           `json:"enabled,omitempty"`
	Image   map[string]any `json:"image,omitempty"`
	Config  TelegrafConf   `json:"config,omitempty"`
}

type TelegrafConf

type TelegrafConf struct {
	Output               map[string]any    `json:"output"`
	AdditionalGlobalTags map[string]string `json:"additional_global_tags"`
}

type Trigger added in v0.2.35

type Trigger struct {
	Type              Scaler            `json:"type,omitempty"`
	Metadata          map[string]string `json:"metadata,omitempty"`
	AuthenticationRef AuthenticationRef `json:"authentication_ref,omitempty"`
}

type UsageSpec

type UsageSpec struct {
	CPU    string `json:"cpu,omitempty" validate:"required"`
	Memory string `json:"memory,omitempty" validate:"required"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL