config

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2020 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CheckConsistency

func CheckConsistency(topicConfig TopicConfig, clusterConfig ClusterConfig) error

CheckConsistency verifies that the argument topic config is consistent with the argument cluster, e.g. has the same environment and region, etc.

Types

type ClusterConfig

type ClusterConfig struct {
	Meta ClusterMeta `json:"meta"`
	Spec ClusterSpec `json:"spec"`
}

ClusterConfig stores information about a cluster that's referred to by one or more topic configs. These configs should reflect the reality of what's been set up externally; there's no way to "apply" these at the moment.

func LoadClusterBytes

func LoadClusterBytes(contents []byte) (ClusterConfig, error)

LoadClusterBytes loads a ClusterConfig from YAML bytes.

func LoadClusterFile

func LoadClusterFile(path string) (ClusterConfig, error)

LoadClusterFile loads a ClusterConfig from a path to a YAML file.

func (ClusterConfig) GetDefaultRetentionDropStepDuration

func (c ClusterConfig) GetDefaultRetentionDropStepDuration() (time.Duration, error)

func (ClusterConfig) NewAdminClient

func (c ClusterConfig) NewAdminClient(
	ctx context.Context,
	sess *session.Session,
	readOnly bool,
) (*admin.Client, error)

NewAdminClient returns a new admin client using the parameters in the current cluster config.

func (ClusterConfig) Validate

func (c ClusterConfig) Validate() error

Validate evaluates whether the cluster config is valid.

type ClusterMeta

type ClusterMeta struct {
	Name        string `json:"name"`
	Region      string `json:"region"`
	Environment string `json:"environment"`
	Description string `json:"description"`
}

ClusterMeta contains (mostly immutable) metadata about the cluster. Inspired by the meta fields in Kubernetes objects.

type ClusterSpec

type ClusterSpec struct {
	// BootstrapAddrs is a list of one or more broker bootstrap addresses. These can use IPs
	// or DNS names.
	BootstrapAddrs []string `json:"bootstrapAddrs"`

	// ZKAddrs is a list of one or more zookeeper addresses. These can use IPs
	// or DNS names.
	ZKAddrs []string `json:"zkAddrs"`

	// ZKPrefix is the prefix under which all zk nodes for the cluster are stored. If blank,
	// these are assumed to be under the zk root.
	ZKPrefix string `json:"zkPrefix"`

	// ZKLockPath indicates where locks are stored in zookeeper. If blank, then
	// no locking will be used on apply operations.
	ZKLockPath string `json:"zkLockPath"`

	// ClusterID is the value of the [prefix]/cluster/id node in zookeeper. If set, it's used
	// to validate that the cluster we're communicating with is the right one. If blank,
	// this check isn't done.
	ClusterID string `json:"clusterID"`

	// VersionMajor stores the major version of the cluster. This isn't currently
	// used for any logic in the tool, but it may be used in the future to adjust API calls
	// and/or decide whether to use zk or brokers for certain information.
	VersionMajor KafkaVersionMajor `json:"versionMajor"`

	// DefaultThrottleMB is the default broker throttle used for migrations in this
	// cluster. If unset, then a reasonable default is used instead.
	DefaultThrottleMB int64 `json:"defaultThrottleMB"`

	// DefaultRetentionDropStepDuration is the default amount of time that retention drops will be
	// limited by. If unset, no retention drop limiting will be applied.
	DefaultRetentionDropStepDurationStr string `json:"defaultRetentionDropStepDuration"`
}

ClusterSpec contains the details necessary to communicate with a kafka cluster.

type KafkaVersionMajor

type KafkaVersionMajor string

KafkaVersionMajor is a string type for storing Kafka versions.

const (
	// KafkaVersionMajor010 represents kafka v0.10 and its associated minor versions.
	KafkaVersionMajor010 KafkaVersionMajor = "v0.10"

	// KafkaVersionMajor2 represents kafka v2 and its associated minor versions.
	KafkaVersionMajor2 KafkaVersionMajor = "v2"
)

type PickerMethod

type PickerMethod string

PickerMethod is a string type that stores a picker method for breaking ties when choosing the replica placements for a topic.

const (
	// PickerMethodClusterUse uses broker frequency in the topic, breaking ties by
	// looking at the total number of replicas across the entire cluster that each broker
	// appears in.
	PickerMethodClusterUse PickerMethod = "cluster-use"

	// PickerMethodLowestIndex uses broker frequency in the topic, breaking ties by
	// choosing the broker with the lowest index.
	PickerMethodLowestIndex PickerMethod = "lowest-index"

	// PickerMethodRandomized uses broker frequency in the topic, breaking ties by
	// using a repeatably random choice from the options.
	PickerMethodRandomized PickerMethod = "randomized"
)

type PlacementStrategy

type PlacementStrategy string

PlacementStrategy is a string type that stores a replica placement strategy for a topic.

const (
	// PlacementStrategyAny allows any partition placement.
	PlacementStrategyAny PlacementStrategy = "any"

	// PlacementStrategyBalancedLeaders is a strategy that ensures the leaders of
	// each partition are balanced by rack, but does not care about the placements
	// of the non-leader replicas.
	PlacementStrategyBalancedLeaders PlacementStrategy = "balanced-leaders"

	// PlacementStrategyInRack is a strategy in which the leaders are balanced
	// and the replicas for each partition are in the same rack as the leader.
	PlacementStrategyInRack PlacementStrategy = "in-rack"

	// PlacementStrategyStatic uses a static placement defined in the config. This is for
	// testing only and should generally not be used in production.
	PlacementStrategyStatic PlacementStrategy = "static"

	// PlacementStrategyStaticInRack is a strategy in which the replicas in each partition
	// are chosen from the rack in a static list, but the specific replicas within each partition
	// aren't specified.
	PlacementStrategyStaticInRack PlacementStrategy = "static-in-rack"
)

type TopicConfig

type TopicConfig struct {
	Meta TopicMeta `json:"meta"`
	Spec TopicSpec `json:"spec"`
}

TopicConfig represents the desired configuration of a topic.

func LoadTopicBytes

func LoadTopicBytes(contents []byte) (TopicConfig, error)

LoadTopicBytes loads a TopicConfig from YAML bytes.

func LoadTopicsFile added in v0.0.3

func LoadTopicsFile(path string) ([]TopicConfig, error)

LoadTopicsFile loads one or more TopicConfigs from a path to a YAML file.

func TopicConfigFromTopicInfo

func TopicConfigFromTopicInfo(
	clusterConfig ClusterConfig,
	topicInfo admin.TopicInfo,
) TopicConfig

TopicConfigFromTopicInfo generates a TopicConfig from a ClusterConfig and admin.TopicInfo struct generated from the cluster state.

func (*TopicConfig) SetDefaults

func (t *TopicConfig) SetDefaults()

SetDefaults sets the default migration and placement settings in a topic config if these aren't set.

func (TopicConfig) ToNewTopicConfig

func (t TopicConfig) ToNewTopicConfig() (kafka.TopicConfig, error)

ToNewTopicConfig converts a TopicConfig to a kafka.TopicConfig that can be used by kafka-go to create a new topic.

func (TopicConfig) ToYAML

func (t TopicConfig) ToYAML() (string, error)

ToYAML converts the current TopicConfig to a YAML string.

func (TopicConfig) Validate

func (t TopicConfig) Validate(numRacks int) error

Validate evaluates whether the topic config is valid.

type TopicMeta

type TopicMeta struct {
	Name        string `json:"name"`
	Cluster     string `json:"cluster"`
	Region      string `json:"region"`
	Environment string `json:"environment"`
	Description string `json:"description"`

	// Consumers is a list of consumers who are expected to consume from this
	// topic.
	Consumers []string `json:"consumers,omitempty"`
}

TopicMeta stores the (mostly immutable) metadata associated with a topic. Inspired by the meta structs in Kubernetes objects.

type TopicMigrationConfig

type TopicMigrationConfig struct {
	ThrottleMB         int64 `json:"throttleMB"`
	PartitionBatchSize int   `json:"partitionBatchSize"`
}

TopicMigrationConfig configures the throttles and batch sizes used when running a partition migration. If these are left unset, resonable defaults will be used instead.

type TopicPlacementConfig

type TopicPlacementConfig struct {
	Strategy PlacementStrategy `json:"strategy"`
	Picker   PickerMethod      `json:"picker,omitempty"`

	// StaticAssignments is a list of lists of desired replica assignments. It's used
	// for the "static" strategy only.
	StaticAssignments [][]int `json:"staticAssignments,omitempty"`

	// StaticRackAssignments is a list of list of desired replica assignments. It's used
	// for the "static-in-rack" strategy only.
	StaticRackAssignments []string `json:"staticRackAssignments,omitempty"`
}

TopicPlacementConfig describes how the partition replicas in a topic should be chosen.

type TopicSettings

type TopicSettings map[string]interface{}

TopicSettings is a map of key/value pairs that correspond to Kafka topic config settings.

func FromConfigMap

func FromConfigMap(configMap map[string]string) TopicSettings

FromConfigMap converts a string map from a Kafka topic to a TopicSettings instance.

func (TopicSettings) ConfigMapDiffs

func (t TopicSettings) ConfigMapDiffs(
	configMap map[string]string,
) ([]string, []string, error)

ConfigMapDiffs compares these topic settings to a string map fetched from the cluster. It returns the keys that are set in the settings but different in the cluster and also the keys that are set in the cluster but not set in the settings.

func (TopicSettings) Copy

func (t TopicSettings) Copy() TopicSettings

Copy returns a shallow copy of this settings instance.

func (TopicSettings) GetValueStr

func (t TopicSettings) GetValueStr(key string) (string, error)

GetValueStr returns the string value for a key in this settings instance. It returns an error if the key is not found.

func (TopicSettings) HasKey

func (t TopicSettings) HasKey(key string) bool

HasKey returns whether the current settings instance contains the argument key.

func (TopicSettings) ReduceRetentionDrop

func (t TopicSettings) ReduceRetentionDrop(
	configMap map[string]string,
	retentionDropStepDuration time.Duration,
) (bool, error)

func (TopicSettings) ToConfigEntries

func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, error)

ToConfigEntries converts the argument keys in the current settings into a slice of kafka-go config entries. If keys is nil, then all fields are converted.

func (TopicSettings) Validate

func (t TopicSettings) Validate() error

Validate determines whether the given settings are valid. See https://kafka.apache.org/documentation/#topicconfigs for details.

type TopicSpec

type TopicSpec struct {
	Partitions        int           `json:"partitions"`
	ReplicationFactor int           `json:"replicationFactor"`
	RetentionMinutes  int           `json:"retentionMinutes,omitempty"`
	Settings          TopicSettings `json:"settings,omitempty"`

	PlacementConfig TopicPlacementConfig  `json:"placement"`
	MigrationConfig *TopicMigrationConfig `json:"migration,omitempty"`
}

TopicSpec stores the (mutable) specification for a topic.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL