Documentation
¶
Index ¶
- Constants
- Variables
- type AuthenticationRef
- type Autoscaler
- type AutoscalerSpec
- type AutoscalerType
- type ChartValues
- type Config
- type Fallback
- type FirehoseAutoscaler
- type HorizontalPodAutoscaler
- type InitContainer
- type Keda
- type Output
- type Policy
- type RequestsAndLimits
- type ScaleBehaviour
- type ScaleParams
- type Scaler
- type StartParams
- type Telegraf
- type TelegrafConf
- type Trigger
- type Triggers
- type UsageSpec
Constants ¶
View Source
const ( KedaPausedAnnotationKey = "autoscaling.keda.sh/paused" KedaPausedReplicaAnnotationKey = "autoscaling.keda.sh/paused-replicas" KedaKafkaMetadataBootstrapServersKey = "bootstrapServers" KedaKafkaMetadataTopicKey = "topic" KedaKafkaMetadataConsumerGroupKey = "consumerGroup" KafkaTopicDelimiter = "|" )
View Source
const ( ScaleAction = "scale" StartAction = "start" StopAction = "stop" ResetAction = "reset" ResetV2Action = "reset-v2" UpgradeAction = "upgrade" )
View Source
const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
Variables ¶
View Source
var Module = module.Descriptor{ Kind: "firehose", Dependencies: map[string]string{ // contains filtered or unexported fields }, Actions: []module.ActionDesc{ { Name: module.CreateAction, Description: "Creates a new firehose", }, { Name: module.UpdateAction, Description: "Update all configurations of firehose", }, { Name: ResetAction, Description: "Stop firehose, reset consumer group, restart", }, { Name: ResetV2Action, Description: "Stop firehose, reset consumer group, restart with datetime option", }, { Name: StopAction, Description: "Stop all replicas of this firehose.", }, { Name: StartAction, Description: "Start the firehose if it is currently stopped.", }, { Name: ScaleAction, Description: "Scale the number of replicas to given number.", }, { Name: UpgradeAction, Description: "Upgrade firehose version", }, }, DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { mu.Lock() defer mu.Unlock() conf := defaultDriverConf if err := json.Unmarshal(confJSON, &conf); err != nil { return nil, err } else if err := validator.TaggedStruct(conf); err != nil { return nil, err } return &firehoseDriver{ conf: conf, timeNow: time.Now, kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error { canUpdate := func(rel *release.Release) bool { curLabels, ok := rel.Config[labelsConfKey].(map[string]any) if !ok { return false } newLabels, ok := hc.Values[labelsConfKey].(map[string]string) if !ok { return false } isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment] return isManagedByEntropy && isSameDeployment } helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf}) _, errHelm := helmCl.Upsert(&hc, canUpdate) return errHelm }, kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { kubeCl, err := kube.NewClient(ctx, conf) if err != nil { return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) } return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool { return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil }) }, kubeGetDeployment: func(ctx context.Context, conf kube.Config, ns, name string) (kube.Deployment, error) { kubeCl, err := kube.NewClient(ctx, conf) if err != nil { return kube.Deployment{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get deployment").WithCausef(err.Error()) } return kubeCl.GetDeploymentDetails(ctx, ns, name) }, consumerReset: consumerReset, }, nil }, }
Functions ¶
This section is empty.
Types ¶
type AuthenticationRef ¶ added in v0.2.35
type Autoscaler ¶ added in v0.2.35
type Autoscaler struct {
Enabled bool `json:"enabled"`
Type AutoscalerType `json:"type,omitempty"`
Spec AutoscalerSpec `json:"spec,omitempty"`
}
func (*Autoscaler) GetHelmValues ¶ added in v0.2.35
func (autoscaler *Autoscaler) GetHelmValues(cfg Config) (map[string]any, error)
func (*Autoscaler) UnmarshalJSON ¶ added in v0.2.35
func (autoscaler *Autoscaler) UnmarshalJSON(data []byte) error
func (*Autoscaler) Validate ¶ added in v0.3.6
func (autoscaler *Autoscaler) Validate() error
type AutoscalerSpec ¶ added in v0.2.35
type AutoscalerType ¶ added in v0.2.35
type AutoscalerType string
const (
KEDA AutoscalerType = "keda"
)
type ChartValues ¶
type Config ¶
type Config struct {
// Stopped flag when set forces the firehose to be stopped on next sync.
Stopped bool `json:"stopped"`
// StopTime can be set to schedule the firehose to be stopped at given time.
StopTime *time.Time `json:"stop_time,omitempty"`
// Replicas is the number of firehose instances to run.
Replicas int `json:"replicas"`
// Namespace is the target namespace where firehose should be deployed.
// Inherits from driver config.
Namespace string `json:"namespace,omitempty"`
// DeploymentID will be used as the release-name for the deployment.
// Must be shorter than 53 chars if set. If not set, one will be generated
// automatically.
DeploymentID string `json:"deployment_id,omitempty"`
// EnvVariables contains all the firehose environment config values.
EnvVariables map[string]string `json:"env_variables,omitempty"`
// ResetOffset represents the value to which kafka consumer offset was set to
ResetOffset string `json:"reset_offset,omitempty"`
Limits UsageSpec `json:"limits,omitempty"`
Requests UsageSpec `json:"requests,omitempty"`
Telegraf *Telegraf `json:"telegraf,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
InitContainer InitContainer `json:"init_container,omitempty"`
Autoscaler *Autoscaler `json:"autoscaler,omitempty"`
}
type FirehoseAutoscaler ¶ added in v0.2.35
type HorizontalPodAutoscaler ¶ added in v0.2.35
type HorizontalPodAutoscaler struct {
ScaleDown ScaleBehaviour `json:"scale_down,omitempty"`
ScaleUp ScaleBehaviour `json:"scale_up,omitempty"`
}
type InitContainer ¶
type Keda ¶ added in v0.2.35
type Keda struct {
Paused bool `json:"paused,omitempty"`
PausedWithReplica bool `json:"paused_with_replica,omitempty"`
PausedReplica int `json:"paused_replica,omitempty"`
MinReplicas int `json:"min_replicas"`
MaxReplicas int `json:"max_replicas"`
PollingInterval int `json:"polling_interval,omitempty"`
CooldownPeriod int `json:"cooldown_period,omitempty"`
Triggers map[string]Trigger `json:"triggers,omitempty"`
RestoreToOriginalReplica bool `json:"restore_to_original_replica_count,omitempty"`
Fallback *Fallback `json:"fallback,omitempty"`
HPA *HorizontalPodAutoscaler `json:"hpa,omitempty"`
}
func (*Keda) GetHelmValues ¶ added in v0.2.35
func (*Keda) ReadConfig ¶ added in v0.2.35
type Output ¶
type Output struct {
Pods []kube.Pod `json:"pods,omitempty"`
Namespace string `json:"namespace,omitempty"`
ReleaseName string `json:"release_name,omitempty"`
Deployment *kube.Deployment `json:"deployment,omitempty"`
DesiredStatus string `json:"desired_status,omitempty"`
AutoscalerEnabled bool `json:"autoscaler_enabled,omitempty"`
}
type RequestsAndLimits ¶
type ScaleBehaviour ¶ added in v0.2.35
type ScaleParams ¶
type ScaleParams struct {
Replicas int `json:"replicas"`
}
type StartParams ¶
type Telegraf ¶
type Telegraf struct {
Enabled bool `json:"enabled,omitempty"`
Image map[string]any `json:"image,omitempty"`
Config TelegrafConf `json:"config,omitempty"`
}
type TelegrafConf ¶
Click to show internal directories.
Click to hide internal directories.