controllers

package
v1.0.0-beta.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2021 License: Apache-2.0 Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BatcherTag           = "batcher"
	BatcherLabelInstance = "redshiftbatcher"
)
View Source
const (
	LoaderTag           = "loader"
	LoaderLabelInstance = "redshiftloader"
)
View Source
const (
	K8sEventTypeNormal  = "Normal"
	K8sEventTypeWarning = "Warning"
)
View Source
const (
	AllSinkGroup        = "all"
	MainSinkGroup       = "main"
	ReloadSinkGroup     = "reload"
	ReloadDupeSinkGroup = "reload-dupe"

	DefaultMaxBatcherLag = int64(100)
	DefautMaxLoaderLag   = int64(10)

	ReloadTableSuffix = "_ts_adx_reload"
)
View Source
const (
	InstanceLabel  = "app.kubernetes.io/instance"
	InstanceName   = "practo.dev/name"
	SinkGroupLabel = "practo.dev/sinkgroup"
	RskResource    = "practo.dev/resource"
)
View Source
const (
	MaxTopicRelease = 5
)

Variables

This section is empty.

Functions

func MaskDiff

func MaskDiff(
	topics []string,
	maskFile string,
	desiredVersion string,
	currentVersion string,
	gitToken string,
	kafkaTopicsCache *sync.Map,
	includeTablesCache *sync.Map,
) (
	[]string,
	[]string,
	[]string,
	error,
)

MaskDiff reads two database mask configurations and returns the list of topics whose mask values has changed. returns the updated list of kafka topics return the list of include tables based on desired mask config

func NewRedshiftConn

func NewRedshiftConn(
	client client.Client,
	secretName,
	secretNamespace string,
	database *string,
) (
	*redshift.Redshift,
	error,
)

func NewRedshiftConnection

func NewRedshiftConnection(
	secret map[string]string,
	schema string,
) (
	*redshift.Redshift,
	error,
)

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

func (Batcher) Config

func (b Batcher) Config() *corev1.ConfigMap

func (Batcher) Deployment

func (b Batcher) Deployment() *appsv1.Deployment

func (Batcher) Name

func (b Batcher) Name() string

func (Batcher) Namespace

func (b Batcher) Namespace() string

func (Batcher) Topics

func (b Batcher) Topics() []string

func (Batcher) UpdateConfig

func (b Batcher) UpdateConfig(current *corev1.ConfigMap) bool

func (Batcher) UpdateDeployment

func (b Batcher) UpdateDeployment(current *appsv1.Deployment) bool

type ConfigMapCreatedEvent

type ConfigMapCreatedEvent struct {
	Object runtime.Object
	Name   string
}

func (ConfigMapCreatedEvent) Record

func (d ConfigMapCreatedEvent) Record(recorder record.EventRecorder)

type ConfigMapDeletedEvent

type ConfigMapDeletedEvent struct {
	Object runtime.Object
	Name   string
}

func (ConfigMapDeletedEvent) Record

func (d ConfigMapDeletedEvent) Record(recorder record.EventRecorder)

type Deployment

type Deployment interface {
	Name() string
	Namespace() string
	Config() *corev1.ConfigMap
	Deployment() *appsv1.Deployment
	UpdateConfig(current *corev1.ConfigMap) bool
	UpdateDeployment(current *appsv1.Deployment) bool
	Topics() []string
}

func NewBatcher

func NewBatcher(
	name string,
	rsk *tipocav1.RedshiftSink,
	maskFileVersion string,
	secret map[string]string,
	sinkGroup string,
	sinkGroupSpec *tipocav1.SinkGroupSpec,
	consumerGroups map[string]consumerGroup,
	defaultImage string,
	defaultKafkaVersion string,
	tlsConfig *kafka.TLSConfig,
) (
	Deployment,
	error,
)

func NewLoader

func NewLoader(
	name string,
	rsk *tipocav1.RedshiftSink,
	tableSuffix string,
	secret map[string]string,
	sinkGroup string,
	sinkGroupSpec *tipocav1.SinkGroupSpec,
	consumerGroups map[string]consumerGroup,
	defaultImage string,
	defaultKafkaVersion string,
	tlsConfig *kafka.TLSConfig,
	defaultMaxOpenConns int,
	defaultMaxIdleConns int,
	prometheusURL string,
	redshiftMetrics bool,
) (
	Deployment,
	error,
)

type DeploymentCreatedEvent

type DeploymentCreatedEvent struct {
	Object runtime.Object
	Name   string
}

func (DeploymentCreatedEvent) Record

func (d DeploymentCreatedEvent) Record(recorder record.EventRecorder)

type DeploymentDeletedEvent

type DeploymentDeletedEvent struct {
	Object runtime.Object
	Name   string
}

func (DeploymentDeletedEvent) Record

func (d DeploymentDeletedEvent) Record(recorder record.EventRecorder)

type DeploymentUpdatedEvent

type DeploymentUpdatedEvent struct {
	Object runtime.Object
	Name   string
}

func (DeploymentUpdatedEvent) Record

func (d DeploymentUpdatedEvent) Record(recorder record.EventRecorder)

type Loader

type Loader struct {
	// contains filtered or unexported fields
}

func (Loader) Config

func (l Loader) Config() *corev1.ConfigMap

func (Loader) Deployment

func (l Loader) Deployment() *appsv1.Deployment

func (Loader) Name

func (l Loader) Name() string

func (Loader) Namespace

func (l Loader) Namespace() string

func (Loader) Topics

func (l Loader) Topics() []string

func (Loader) UpdateConfig

func (l Loader) UpdateConfig(current *corev1.ConfigMap) bool

func (Loader) UpdateDeployment

func (l Loader) UpdateDeployment(current *appsv1.Deployment) bool

type ReconcilerEvent

type ReconcilerEvent interface {

	// Record this into an event recorder as a Kubernetes API event
	Record(recorder record.EventRecorder)
}

ReconcilerEvent represents the action of the operator having actually done anything. Any meaningful change should result in one of these.

type RedshiftSinkReconciler

type RedshiftSinkReconciler struct {
	client.Client

	Log      logr.Logger
	Scheme   *runtime.Scheme
	Recorder record.EventRecorder

	KafkaTopicRegexes  *sync.Map
	KafkaClients       *sync.Map
	KafkaTopicsCache   *sync.Map
	KafkaRealtimeCache *sync.Map
	ReleaseCache       *sync.Map
	GitCache           *sync.Map
	IncludeTablesCache *sync.Map

	DefaultBatcherImage         string
	DefaultLoaderImage          string
	DefaultSecretRefName        string
	DefaultSecretRefNamespace   string
	DefaultKafkaVersion         string
	DefaultRedshiftMaxIdleConns int
	DefaultRedshiftMaxOpenConns int

	AllowedResources []string

	PrometheusClient prometheus.Client
	RedshiftMetrics  bool
}

RedshiftSinkReconciler reconciles a RedshiftSink object

func (*RedshiftSinkReconciler) Reconcile

func (r *RedshiftSinkReconciler) Reconcile(
	ctx context.Context,
	req ctrl.Request,
) (
	_ ctrl.Result,
	reterr error,
)

func (*RedshiftSinkReconciler) SetupWithManager

func (r *RedshiftSinkReconciler) SetupWithManager(mgr ctrl.Manager) error

SetupWithManager sets up the controller and applies all controller configs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL