Version: v0.0.0-...-7055b2f Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: Apache-2.0 Imports: 10 Imported by: 0




This section is empty.


View Source
var DefaultSourceConfig = SourceConfig{
	LostSegmentTimeout: 24 * time.Hour,
	LateSegmentRetries: 16,
	FirstSegmentDelay:  30 * time.Minute,
	BatchSize:          10000,
	WorkerInitDelay:    10 * time.Second,
	WorkerChanSize:     100,
	WorkerStopTimeout:  10 * time.Second,

DefaultSourceConfig is the default ingress source configuration.


This section is empty.


type Controller

type Controller struct {
	// contains filtered or unexported fields

Controller represents the ingress controller

func New

func New(config ControllerConfig) (*Controller, error)

New returns a new ingress controller instance

func (*Controller) Start

func (c *Controller) Start() error

Start will start the controller

func (*Controller) Stop

func (c *Controller) Stop()

Stop will stop the controller

type ControllerConfig

type ControllerConfig struct {
	// Consumer is used to provide group membership functionality used to distribute
	// work across multiple instances. It ensures that only one instance is allowed to
	// process a certain source topic partition at any given moment.
	Consumer core.Factory `required:"true"`

	// Producer is used for writing messages to destination topic.
	Producer core.Factory `required:"true"`

	// SegmentStore provides access to segment events and contents.
	SegmentStore core.Factory `required:"true"`

	// CheckpointStore is used for offset tracking.
	CheckpointStore core.Factory `required:"true"`

	// Unique name that identifies the local region/data center/cloud.
	// Field value is required.
	LocalRegion string `required:"true"`

	// List of sources to ingress from.
	// Will use DefaultSourceConfig if source config was not set.
	// Field value is required.
	Sources map[Source]*SourceConfig `required:"true"`

ControllerConfig represents the ingress controller configuration

type Source

type Source struct {
	// Region name.
	// Field value is required.
	Region string `required:"true"`

	// Kafka topic name.
	// Field value is required.
	Topic string `required:"true"`

Source represents the ingress source.

type SourceConfig

type SourceConfig struct {
	// Topic name where segment messages will be produced.
	// Default value appends '.ingress' suffix to source topic name.
	DestinationTopic string

	// Maximum time to wait for a late segment before it is declared lost and skipped.
	// WARNING: has the potential to break the at-least-once delivery promise.
	// The ingress worker detects missing segments and will reload bucket state up to
	// LateSegmentRetries times using exponential backoff until LostSegmentTimeout is reached.
	// Possible root causes for this scenario:
	//   - problems with cross-region AWS S3 bucket sync operation.
	//	 - problems related to AWS S3 eventual consistency model.
	//   - dropped AWS S3 notification event en-route from S3 -> SQS -> SQSSegmentEventSource -> ConsistentSegmentStore.
	// The default value is currently set to 24h which should give enough time to
	// operations/SRE team to react and fix the problem.
	LostSegmentTimeout time.Duration `min:"1ms"`

	// Number of retry attempts for a late segment before it is declared lost and skipped.
	// Uses exponential backoff.
	// The first computed backoff interval should be at least 1s.
	LateSegmentRetries int `min:"1" max:"50"`

	// Avoids issues related to S3 eventual consistency model.
	// A higher value results in smaller chance of missing first segment.
	FirstSegmentDelay time.Duration `min:"1ms"`

	// Number of segment messages to read/produce in each request.
	// A higher value usually results in better throughput.
	// A checkpoint is performed after each successful batch.
	BatchSize int `min:"1"`

	// Allows last checkpoint to propagate and avoids thundering herd effects during Kafka group rebalance.
	WorkerInitDelay time.Duration `min:"1ms"`

	// Size of ingress worker buffered channel.
	WorkerChanSize int `min:"1"`

	// Ingress worker shutdown grace period.
	WorkerStopTimeout time.Duration `min:"1ms"`

	// Retrier instance used for Producer operations
	ProducerRetrier core.Retrier

	// Retrier instance used for SegmentStore operations
	SegmentStoreRetrier core.Retrier

	// Retrier instance used for CheckpointStore operations
	CheckpointStoreRetrier core.Retrier
	// contains filtered or unexported fields

SourceConfig represents the ingress source configuration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL