core

package
v0.0.0-...-7055b2f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// LevelStreaming is the compaction level used for live/streaming segments.
	LevelStreaming uint32 = 0
)

Variables

View Source
var DefaultLogger = NewNullLogger()

DefaultLogger is the logger used by the replicator

View Source
var DefaultMetrics = NewMetrics(metrics.DefaultRegistry)

DefaultMetrics is the metrics instance used by the replicator

View Source
var SegmentEvent_Type_name = map[int32]string{
	0: "CREATED",
	1: "REMOVED",
}
View Source
var SegmentEvent_Type_value = map[string]int32{
	"CREATED": 0,
	"REMOVED": 1,
}

Functions

func AddDefaultJitter

func AddDefaultJitter(d time.Duration) time.Duration

AddDefaultJitter adds default jitter to provided duration

func AddJitter

func AddJitter(d time.Duration, jitter float64) time.Duration

AddJitter adds random jitter in the range (-jitter, +jitter) to provided duration

Types

type Breaker

type Breaker interface {
	Mark()
}

Breaker implements a simple circuit breaker interface.

var (
	// DefaultKafkaBreaker is the default breaker used to track Kafka errors.
	DefaultKafkaBreaker Breaker = NewThresholdBreaker(100, time.Minute, getDefaultAction("Kafka"))

	// DefaultS3Breaker is the default breaker used to track AWS S3 errors.
	DefaultS3Breaker Breaker = NewThresholdBreaker(10, time.Minute, getDefaultAction("S3"))

	// DefaultSQSBreaker is the default breaker used to track AWS S3 errors.
	DefaultSQSBreaker Breaker = NewThresholdBreaker(20, time.Minute, getDefaultAction("SQS"))
)

type Checkpoint

type Checkpoint struct {
	Region    string    `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"`
	Topic     string    `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
	Partition uint32    `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
	Offset    uint64    `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"`
	Timestamp time.Time `protobuf:"bytes,5,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
}

func (*Checkpoint) Descriptor

func (*Checkpoint) Descriptor() ([]byte, []int)

func (*Checkpoint) ProtoMessage

func (*Checkpoint) ProtoMessage()

func (*Checkpoint) Reset

func (m *Checkpoint) Reset()

func (*Checkpoint) String

func (m *Checkpoint) String() string

func (*Checkpoint) XXX_DiscardUnknown

func (m *Checkpoint) XXX_DiscardUnknown()

func (*Checkpoint) XXX_Marshal

func (m *Checkpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Checkpoint) XXX_Merge

func (m *Checkpoint) XXX_Merge(src proto.Message)

func (*Checkpoint) XXX_Size

func (m *Checkpoint) XXX_Size() int

func (*Checkpoint) XXX_Unmarshal

func (m *Checkpoint) XXX_Unmarshal(b []byte) error

type CheckpointStore

type CheckpointStore interface {
	Save(checkpoint Checkpoint) error
	Load(region, topic string, partition uint32) *Checkpoint
}

CheckpointStore represents the checkpoint storage used by ingress controller.

type Consumer

type Consumer interface {
	Subscribe(topics ...string) error
	Poll() kafka.Event
	Assign(partitions []kafka.TopicPartition) error
	Unassign() error
	Pause(partitions []kafka.TopicPartition) error
	Seek(topic string, partition uint32, offset kafka.Offset) error
	Commit(topic string, partition uint32, offset kafka.Offset) error
}

Consumer represents the consumer operations.

type ExponentialRetrier

type ExponentialRetrier struct {
	// contains filtered or unexported fields
}

ExponentialRetrier implements the retry with exponential backoff resiliency pattern

func NewExponentialRetrier

func NewExponentialRetrier(min, max time.Duration, jitter float64) *ExponentialRetrier

NewExponentialRetrier returns a new ExponentialRetrier instance

func (ExponentialRetrier) Forever

func (r ExponentialRetrier) Forever(ctx context.Context, work func() error) bool

Forever will execute the provided work function until success Returns true if the work was executed successfully

type Factory

type Factory interface {
	Get() (interface{}, error)
}

Factory is used to supply the required component dependency.

func Self

func Self(instance interface{}) Factory

Self represents the Factory that returns the instance when invoked.

type Fields

type Fields map[string]interface{}

Fields type is used with WithFields method

type Gauge

type Gauge interface {
	Update(int64)
}

Gauge is the interface for a gauge metric

type Lifecycle

type Lifecycle interface {
	Start() error
	Stop()
}

Lifecycle is the interface defining methods for instance lifecycle control.

type Logger

type Logger interface {
	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Warn(args ...interface{})
	Warnf(format string, args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
	Panic(args ...interface{})
	Panicf(format string, args ...interface{})
	WithFields(fields Fields) Logger
}

Logger provides basic logging functionality

func NewLogger

func NewLogger(logger *logrus.Logger) Logger

NewLogger returns a new logger instance that uses 'github.com/sirupsen/logrus' under the covers

func NewNullLogger

func NewNullLogger() Logger

NewNullLogger returns the logger instance used to disable all logging

type Message

type Message struct {
	Key       []byte            `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
	Value     []byte            `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	Offset    uint64            `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"`
	Timestamp time.Time         `protobuf:"bytes,4,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
	Headers   []*Message_Header `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty"`
}

func (*Message) Descriptor

func (*Message) Descriptor() ([]byte, []int)

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) Reset

func (m *Message) Reset()

func (*Message) Size

func (r *Message) Size() uint64

Size returns the message raw bytes size.

func (*Message) String

func (m *Message) String() string

func (*Message) XXX_DiscardUnknown

func (m *Message) XXX_DiscardUnknown()

func (*Message) XXX_Marshal

func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Message) XXX_Merge

func (m *Message) XXX_Merge(src proto.Message)

func (*Message) XXX_Size

func (m *Message) XXX_Size() int

func (*Message) XXX_Unmarshal

func (m *Message) XXX_Unmarshal(b []byte) error

type Message_Header

type Message_Header struct {
	Key   string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
	Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}

func (*Message_Header) Descriptor

func (*Message_Header) Descriptor() ([]byte, []int)

func (*Message_Header) ProtoMessage

func (*Message_Header) ProtoMessage()

func (*Message_Header) Reset

func (m *Message_Header) Reset()

func (*Message_Header) String

func (m *Message_Header) String() string

func (*Message_Header) XXX_DiscardUnknown

func (m *Message_Header) XXX_DiscardUnknown()

func (*Message_Header) XXX_Marshal

func (m *Message_Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Message_Header) XXX_Merge

func (m *Message_Header) XXX_Merge(src proto.Message)

func (*Message_Header) XXX_Size

func (m *Message_Header) XXX_Size() int

func (*Message_Header) XXX_Unmarshal

func (m *Message_Header) XXX_Unmarshal(b []byte) error

type Meter

type Meter interface {
	Mark(int64)
}

Meter is the interface for a meter metric

type Metrics

type Metrics interface {
	GetMeter(name string) Meter
	GetTimer(name string) Timer
	GetGauge(name string) Gauge
	Remove(name string)
}

Metrics is the provider for various metric types

func NewMetrics

func NewMetrics(registry metrics.Registry) Metrics

NewMetrics returns a new metrics instance that uses 'github.com/rcrowley/go-metrics' under the covers

func NewNullMetrics

func NewNullMetrics() Metrics

NewNullMetrics returns the metrics instance used to disable all metrics collection

type Producer

type Producer interface {
	Produce(topic string, key, value []byte) error
	ProduceMessages(topic string, partition uint32, messages ...Message) error
}

Producer represents the producer operations.

type PubSub

type PubSub interface {
	Subscribe(topic string) error
	Events() <-chan kafka.Event
	Publish(topic string, key, value []byte) error
}

PubSub represents the publish-subscribe operations.

type Retrier

type Retrier interface {
	Forever(ctx context.Context, work func() error) bool
}

Retrier will execute the provided work function until success Returns true if the work was executed successfully

var (
	// DefaultJitter is the default jitter percentage
	DefaultJitter = 0.25

	// DefaultKafkaRetrier is the default retrier used for Kafka operations
	DefaultKafkaRetrier Retrier = NewExponentialRetrier(100*time.Millisecond, 1*time.Second, DefaultJitter)

	// DefaultS3Retrier is the default retrier used for AWS S3 operations
	DefaultS3Retrier Retrier = NewExponentialRetrier(200*time.Millisecond, 5*time.Second, DefaultJitter)
)

type Segment

type Segment struct {
	Region      string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"`
	Topic       string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
	Partition   uint32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
	Level       uint32 `protobuf:"varint,4,opt,name=level,proto3" json:"level,omitempty"`
	StartOffset uint64 `protobuf:"varint,5,opt,name=startOffset,proto3" json:"startOffset,omitempty"`
	EndOffset   uint64 `protobuf:"varint,6,opt,name=endOffset,proto3" json:"endOffset,omitempty"`
}

func (*Segment) Descriptor

func (*Segment) Descriptor() ([]byte, []int)

func (*Segment) HasOffset

func (s *Segment) HasOffset(offset uint64) bool

HasOffset returns true if the provided offset is contained by the segment.

func (*Segment) ProtoMessage

func (*Segment) ProtoMessage()

func (*Segment) Reset

func (m *Segment) Reset()

func (*Segment) String

func (m *Segment) String() string

func (*Segment) XXX_DiscardUnknown

func (m *Segment) XXX_DiscardUnknown()

func (*Segment) XXX_Marshal

func (m *Segment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Segment) XXX_Merge

func (m *Segment) XXX_Merge(src proto.Message)

func (*Segment) XXX_Size

func (m *Segment) XXX_Size() int

func (*Segment) XXX_Unmarshal

func (m *Segment) XXX_Unmarshal(b []byte) error

type SegmentEvent

type SegmentEvent struct {
	Type        SegmentEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=core.SegmentEvent_Type" json:"type,omitempty"`
	Timestamp   time.Time         `protobuf:"bytes,2,opt,name=timestamp,proto3,stdtime" json:"timestamp"`
	Segment     Segment           `protobuf:"bytes,3,opt,name=segment,proto3" json:"segment"`
	SegmentSize uint64            `protobuf:"varint,4,opt,name=segmentSize,proto3" json:"segmentSize,omitempty"`
}

func (*SegmentEvent) Descriptor

func (*SegmentEvent) Descriptor() ([]byte, []int)

func (*SegmentEvent) ProtoMessage

func (*SegmentEvent) ProtoMessage()

func (*SegmentEvent) Reset

func (m *SegmentEvent) Reset()

func (*SegmentEvent) String

func (m *SegmentEvent) String() string

func (*SegmentEvent) XXX_DiscardUnknown

func (m *SegmentEvent) XXX_DiscardUnknown()

func (*SegmentEvent) XXX_Marshal

func (m *SegmentEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SegmentEvent) XXX_Merge

func (m *SegmentEvent) XXX_Merge(src proto.Message)

func (*SegmentEvent) XXX_Size

func (m *SegmentEvent) XXX_Size() int

func (*SegmentEvent) XXX_Unmarshal

func (m *SegmentEvent) XXX_Unmarshal(b []byte) error

type SegmentEventRequest

type SegmentEventRequest struct {
	SegmentEvent
	Result chan<- error
}

SegmentEventRequest is used to implement the chain-of-responsibility design pattern for handling segment events.

type SegmentEventSource

type SegmentEventSource interface {
	Events() <-chan SegmentEventRequest
}

SegmentEventSource represents source for segment events.

type SegmentEvent_Type

type SegmentEvent_Type int32
const (
	SegmentEvent_CREATED SegmentEvent_Type = 0
	SegmentEvent_REMOVED SegmentEvent_Type = 1
)

func (SegmentEvent_Type) EnumDescriptor

func (SegmentEvent_Type) EnumDescriptor() ([]byte, []int)

func (SegmentEvent_Type) String

func (x SegmentEvent_Type) String() string

type SegmentFormat

type SegmentFormat interface {
	NewWriter(ctx context.Context, path string) (SegmentWriter, error)
	NewReader(ctx context.Context, path string) (SegmentReader, error)
}

SegmentFormat represents the segment data serialization format.

type SegmentInfo

type SegmentInfo struct {
	Segment
	Timestamp time.Time
	Size      uint64
}

SegmentInfo provides contextual information for the corresponding segment.

type SegmentMetadata

type SegmentMetadata struct {
	Region           string    `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"`
	Topic            string    `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
	Partition        uint32    `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
	Level            uint32    `protobuf:"varint,4,opt,name=level,proto3" json:"level,omitempty"`
	StartOffset      uint64    `protobuf:"varint,5,opt,name=startOffset,proto3" json:"startOffset,omitempty"`
	EndOffset        uint64    `protobuf:"varint,6,opt,name=endOffset,proto3" json:"endOffset,omitempty"`
	MessageCount     uint64    `protobuf:"varint,7,opt,name=messageCount,proto3" json:"messageCount,omitempty"`
	CreatedTimestamp time.Time `protobuf:"bytes,8,opt,name=createdTimestamp,proto3,stdtime" json:"createdTimestamp"`
}

func (*SegmentMetadata) Descriptor

func (*SegmentMetadata) Descriptor() ([]byte, []int)

func (*SegmentMetadata) ProtoMessage

func (*SegmentMetadata) ProtoMessage()

func (*SegmentMetadata) Reset

func (m *SegmentMetadata) Reset()

func (*SegmentMetadata) String

func (m *SegmentMetadata) String() string

func (*SegmentMetadata) XXX_DiscardUnknown

func (m *SegmentMetadata) XXX_DiscardUnknown()

func (*SegmentMetadata) XXX_Marshal

func (m *SegmentMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*SegmentMetadata) XXX_Merge

func (m *SegmentMetadata) XXX_Merge(src proto.Message)

func (*SegmentMetadata) XXX_Size

func (m *SegmentMetadata) XXX_Size() int

func (*SegmentMetadata) XXX_Unmarshal

func (m *SegmentMetadata) XXX_Unmarshal(b []byte) error

type SegmentReader

type SegmentReader interface {
	Read(ctx context.Context, count int) ([]Message, error)
	Metadata() SegmentMetadata
	Close(ctx context.Context)
}

SegmentReader represents the segment reader operations.

type SegmentStore

type SegmentStore interface {
	SegmentEventSource
	Create(ctx context.Context) (SegmentWriter, error)
	Open(ctx context.Context, segment Segment) (SegmentReader, error)
	ListSegments(ctx context.Context, region, topic string, partition uint32) (map[Segment]SegmentInfo, error)
	Delete(ctx context.Context, segment Segment) error
}

SegmentStore represents the segment storage.

type SegmentWriter

type SegmentWriter interface {
	Write(ctx context.Context, message Message) error
	Close(ctx context.Context, metadata SegmentMetadata) error
	Abort(ctx context.Context)
}

SegmentWriter represents the segment writer operations.

type ThresholdBreaker

type ThresholdBreaker struct {
	// contains filtered or unexported fields
}

ThresholdBreaker implements a simple circuit breaker that triggers when the error rate reaches the threshold.

func NewThresholdBreaker

func NewThresholdBreaker(threshold int, interval time.Duration, action func()) *ThresholdBreaker

NewThresholdBreaker returns a new ThresholdBreaker instance that allows errors up to threshold inside the interval. When the threshold is reached, the provided action will be invoked.

func (*ThresholdBreaker) Mark

func (b *ThresholdBreaker) Mark()

Mark increments the internal error counter.

type Timer

type Timer interface {
	UpdateSince(time.Time)
}

Timer is the interface for a timer metric

type TopicEOF

type TopicEOF string

TopicEOF signals that PubSub consumer reached end of topic

func (TopicEOF) String

func (t TopicEOF) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL