Documentation
¶
Index ¶
Constants ¶
View Source
const ( SourceKafkaConsumerConfigAutoCommitEnable = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE" SourceKafkaConsumerConfigAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" SourceKafkaConsumerConfigBootstrapServers = "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS" )
Kafka-related constants
View Source
const ( SinkTypeInflux = "INFLUX" SinkTypeKafka = "KAFKA" SinkTypeBigquery = "BIGQUERY" )
Sink types
View Source
const ( JobStateRunning = "running" JobStateSuspended = "suspended" StateDeployed = "DEPLOYED" StateUserStopped = "USER_STOPPED" StateSystemStopped = "SYSTEM_STOPPED" KeySchemaRegistryStencilCacheAutoRefresh = "SCHEMA_REGISTRY_STENCIL_CACHE_AUTO_REFRESH" KeySchemaRegistryStencilURLs = "SCHEMA_REGISTRY_STENCIL_URLS" )
View Source
const ( StopAction = "stop" StartAction = "start" ResetAction = "reset" )
View Source
const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"
Variables ¶
View Source
var Module = module.Descriptor{ Kind: "dagger", Dependencies: map[string]string{ // contains filtered or unexported fields }, Actions: []module.ActionDesc{ { Name: module.CreateAction, Description: "Creates a new dagger", }, { Name: module.UpdateAction, Description: "Updates an existing dagger", }, { Name: StopAction, Description: "Suspends a running dagger", }, { Name: StartAction, Description: "Starts a suspended dagger", }, { Name: ResetAction, Description: "Resets the offset of a dagger", }, }, DriverFactory: func(confJSON json.RawMessage) (module.Driver, error) { conf := defaultDriverConf if err := json.Unmarshal(confJSON, &conf); err != nil { return nil, err } else if err := validator.TaggedStruct(conf); err != nil { return nil, err } return &daggerDriver{ conf: conf, timeNow: time.Now, kubeDeploy: func(_ context.Context, isCreate bool, kubeConf kube.Config, hc helm.ReleaseConfig) error { canUpdate := func(rel *release.Release) bool { curLabels, ok := rel.Config[labelsConfKey].(map[string]any) if !ok { return false } newLabels, ok := hc.Values[labelsConfKey].(map[string]string) if !ok { return false } isManagedByEntropy := curLabels[labelOrchestrator] == orchestratorLabelValue isSameDeployment := curLabels[labelDeployment] == newLabels[labelDeployment] return isManagedByEntropy && isSameDeployment } helmCl := helm.NewClient(&helm.Config{Kubernetes: kubeConf}) _, errHelm := helmCl.Upsert(&hc, canUpdate) return errHelm }, kubeGetPod: func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { kubeCl, err := kube.NewClient(ctx, conf) if err != nil { return nil, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) } return kubeCl.GetPodDetails(ctx, ns, labels, func(pod v1.Pod) bool { return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil }) }, kubeGetCRD: func(ctx context.Context, conf kube.Config, ns string, name string) (kube.FlinkDeploymentStatus, error) { kubeCl, err := kube.NewClient(ctx, conf) if err != nil { return kube.FlinkDeploymentStatus{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get pod").WithCausef(err.Error()) } crd, err := kubeCl.GetCRDDetails(ctx, ns, name) if err != nil { return kube.FlinkDeploymentStatus{}, err } return parseFlinkCRDStatus(crd.Object) }, consumerReset: consumerReset, }, nil }, }
Functions ¶
This section is empty.
Types ¶
type ChartValues ¶
type Config ¶
type Config struct {
Resources Resources `json:"resources,omitempty"`
Source []Source `json:"source,omitempty"`
Sink Sink `json:"sink,omitempty"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
Replicas int `json:"replicas"`
SinkType string `json:"sink_type"`
Team string `json:"team"`
FlinkName string `json:"flink_name,omitempty"`
DeploymentID string `json:"deployment_id,omitempty"`
Savepoint any `json:"savepoint,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
Deleted bool `json:"deleted,omitempty"`
Namespace string `json:"namespace,omitempty"`
PrometheusURL string `json:"prometheus_url,omitempty"`
JarURI string `json:"jar_uri,omitempty"`
State string `json:"state"`
JobState string `json:"job_state"`
ResetOffset string `json:"reset_offset"`
StopTime *time.Time `json:"stop_time,omitempty"`
DaggerCheckpointURL string `json:"dagger_checkpoint_url,omitempty"`
DaggerSavepointURL string `json:"dagger_savepoint_url,omitempty"`
DaggerK8sHAURL string `json:"dagger_k8s_ha_url,omitempty"`
CloudProvider string `json:"cloud_provider,omitempty"`
}
type FlinkCRDStatus ¶
type Output ¶
type Output struct {
JMDeployStatus string `json:"jm_deploy_status,omitempty"`
JobStatus string `json:"job_status,omitempty"`
State string `json:"state,omitempty"`
Reconcilation string `json:"reconcilation,omitempty"`
Pods []kube.Pod `json:"pods,omitempty"`
Namespace string `json:"namespace,omitempty"`
JobID string `json:"job_id,omitempty"`
Error string `json:"error,omitempty"`
}
type SchemaRegistryStencilURLsParams ¶ added in v0.2.25
type SchemaRegistryStencilURLsParams struct {
SchemaRegistryStencilURLs string `json:"schema_registry_stencil_urls"`
}
type Sink ¶
type Sink struct {
SinkKafka
SinkInflux
SinkBigquery
}
type SinkBigquery ¶
type SinkBigquery struct {
SinkBigqueryTablePartitionExpiryMs string `json:"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS"`
SinkBigqueryRowInsertIDEnable string `json:"SINK_BIGQUERY_ROW_INSERT_ID_ENABLE"`
SinkBigqueryClientReadTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS"`
SinkBigqueryClientConnectTimeoutMs string `json:"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS"`
SinkBigqueryCredentialPath string `json:"SINK_BIGQUERY_CREDENTIAL_PATH"`
SinkBigqueryGoogleCloudProjectID string `json:"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID"`
SinkBigqueryTableName string `json:"SINK_BIGQUERY_TABLE_NAME"`
SinkBigqueryDatasetLabels string `json:"SINK_BIGQUERY_DATASET_LABELS"`
SinkBigqueryTableLabels string `json:"SINK_BIGQUERY_TABLE_LABELS"`
SinkBigqueryDatasetName string `json:"SINK_BIGQUERY_DATASET_NAME"`
SinkBigqueryTablePartitioningEnable string `json:"SINK_BIGQUERY_TABLE_PARTITIONING_ENABLE"`
SinkBigqueryTablePartitionKey string `json:"SINK_BIGQUERY_TABLE_PARTITION_KEY"`
SinkBigqueryDatasetLocation string `json:"SINK_BIGQUERY_DATASET_LOCATION"`
SinkBigqueryBatchSize string `json:"SINK_BIGQUERY_BATCH_SIZE"`
SinkBigqueryTableClusteringEnable string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE"`
SinkBigqueryTableClusteringKeys string `json:"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS"`
SinkErrorTypesForFailure string `json:"SINK_ERROR_TYPES_FOR_FAILURE"`
SinkConnectorSchemaProtoMessageClass string `json:"SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS"`
}
type SinkInflux ¶
type SinkInflux struct {
SinkInfluxBatchSize string `json:"SINK_INFLUX_BATCH_SIZE,omitempty" source:"entropy"`
SinkInfluxDBName string `json:"SINK_INFLUX_DB_NAME,omitempty" source:"dex"`
SinkInfluxFlushDurationMs string `json:"SINK_INFLUX_FLUSH_DURATION_MS,omitempty" source:"entropy"`
SinkInfluxPassword string `json:"SINK_INFLUX_PASSWORD,omitempty" source:"entropy"`
SinkInfluxRetentionPolicy string `json:"SINK_INFLUX_RETENTION_POLICY,omitempty" source:"entropy"`
SinkInfluxURL string `json:"SINK_INFLUX_URL,omitempty" source:"entropy"`
SinkInfluxUsername string `json:"SINK_INFLUX_USERNAME,omitempty" source:"entropy"`
SinkInfluxMeasurementName string `json:"SINK_INFLUX_MEASUREMENT_NAME"`
}
type SinkKafka ¶
type SinkKafka struct {
SinkKafkaBrokers string `json:"SINK_KAFKA_BROKERS"`
SinkKafkaStream string `json:"SINK_KAFKA_STREAM"`
SinkKafkaTopic string `json:"SINK_KAFKA_TOPIC"`
SinkKafkaProtoMsg string `json:"SINK_KAFKA_PROTO_MESSAGE"`
SinkKafkaLingerMs string `json:"SINK_KAFKA_LINGER_MS"`
SinkKafkaProtoKey string `json:"SINK_KAFKA_PROTO_KEY"`
SinkKafkaProduceLargeMessageEnable string `json:"SINK_KAFKA_PRODUCE_LARGE_MESSAGE_ENABLE"`
}
type Source ¶
type Source struct {
InputSchemaProtoClass string `json:"INPUT_SCHEMA_PROTO_CLASS"`
InputSchemaEventTimestampFieldIndex string `json:"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX"`
SourceDetails []SourceDetail `json:"SOURCE_DETAILS"`
InputSchemaTable string `json:"INPUT_SCHEMA_TABLE"`
SourceKafka
SourceParquet
}
type SourceDetail ¶
type SourceKafka ¶
type SourceKafka struct {
SourceKafkaConsumerConfigAutoCommitEnable string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE"`
SourceKafkaConsumerConfigAutoOffsetReset string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET"`
SourceKafkaTopicNames string `json:"SOURCE_KAFKA_TOPIC_NAMES"`
SourceKafkaName string `json:"SOURCE_KAFKA_NAME"`
SourceKafkaConsumerConfigGroupID string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID"`
SourceKafkaConsumerConfigBootstrapServers string `json:"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS"`
}
type SourceParquet ¶
type SourceParquet struct {
SourceParquetFileDateRange interface{} `json:"SOURCE_PARQUET_FILE_DATE_RANGE"`
SourceParquetFilePaths []string `json:"SOURCE_PARQUET_FILE_PATHS"`
}
Click to show internal directories.
Click to hide internal directories.