Documentation ¶
Index ¶
- Constants
- Variables
- func AddDefaultJitter(d time.Duration) time.Duration
- func AddJitter(d time.Duration, jitter float64) time.Duration
- type Breaker
- type Checkpoint
- func (*Checkpoint) Descriptor() ([]byte, []int)
- func (*Checkpoint) ProtoMessage()
- func (m *Checkpoint) Reset()
- func (m *Checkpoint) String() string
- func (m *Checkpoint) XXX_DiscardUnknown()
- func (m *Checkpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Checkpoint) XXX_Merge(src proto.Message)
- func (m *Checkpoint) XXX_Size() int
- func (m *Checkpoint) XXX_Unmarshal(b []byte) error
- type CheckpointStore
- type Consumer
- type ExponentialRetrier
- type Factory
- type Fields
- type Gauge
- type Lifecycle
- type Logger
- type Message
- func (*Message) Descriptor() ([]byte, []int)
- func (*Message) ProtoMessage()
- func (m *Message) Reset()
- func (r *Message) Size() uint64
- func (m *Message) String() string
- func (m *Message) XXX_DiscardUnknown()
- func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Message) XXX_Merge(src proto.Message)
- func (m *Message) XXX_Size() int
- func (m *Message) XXX_Unmarshal(b []byte) error
- type Message_Header
- func (*Message_Header) Descriptor() ([]byte, []int)
- func (*Message_Header) ProtoMessage()
- func (m *Message_Header) Reset()
- func (m *Message_Header) String() string
- func (m *Message_Header) XXX_DiscardUnknown()
- func (m *Message_Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Message_Header) XXX_Merge(src proto.Message)
- func (m *Message_Header) XXX_Size() int
- func (m *Message_Header) XXX_Unmarshal(b []byte) error
- type Meter
- type Metrics
- type Producer
- type PubSub
- type Retrier
- type Segment
- func (*Segment) Descriptor() ([]byte, []int)
- func (s *Segment) HasOffset(offset uint64) bool
- func (*Segment) ProtoMessage()
- func (m *Segment) Reset()
- func (m *Segment) String() string
- func (m *Segment) XXX_DiscardUnknown()
- func (m *Segment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Segment) XXX_Merge(src proto.Message)
- func (m *Segment) XXX_Size() int
- func (m *Segment) XXX_Unmarshal(b []byte) error
- type SegmentEvent
- func (*SegmentEvent) Descriptor() ([]byte, []int)
- func (*SegmentEvent) ProtoMessage()
- func (m *SegmentEvent) Reset()
- func (m *SegmentEvent) String() string
- func (m *SegmentEvent) XXX_DiscardUnknown()
- func (m *SegmentEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *SegmentEvent) XXX_Merge(src proto.Message)
- func (m *SegmentEvent) XXX_Size() int
- func (m *SegmentEvent) XXX_Unmarshal(b []byte) error
- type SegmentEventRequest
- type SegmentEventSource
- type SegmentEvent_Type
- type SegmentFormat
- type SegmentInfo
- type SegmentMetadata
- func (*SegmentMetadata) Descriptor() ([]byte, []int)
- func (*SegmentMetadata) ProtoMessage()
- func (m *SegmentMetadata) Reset()
- func (m *SegmentMetadata) String() string
- func (m *SegmentMetadata) XXX_DiscardUnknown()
- func (m *SegmentMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *SegmentMetadata) XXX_Merge(src proto.Message)
- func (m *SegmentMetadata) XXX_Size() int
- func (m *SegmentMetadata) XXX_Unmarshal(b []byte) error
- type SegmentReader
- type SegmentStore
- type SegmentWriter
- type ThresholdBreaker
- type Timer
- type TopicEOF
Constants ¶
const ( // LevelStreaming is the compaction level used for live/streaming segments. LevelStreaming uint32 = 0 )
Variables ¶
var DefaultLogger = NewNullLogger()
DefaultLogger is the logger used by the replicator
var DefaultMetrics = NewMetrics(metrics.DefaultRegistry)
DefaultMetrics is the metrics instance used by the replicator
var SegmentEvent_Type_name = map[int32]string{
0: "CREATED",
1: "REMOVED",
}
var SegmentEvent_Type_value = map[string]int32{
"CREATED": 0,
"REMOVED": 1,
}
Functions ¶
func AddDefaultJitter ¶
AddDefaultJitter adds default jitter to provided duration
Types ¶
type Breaker ¶
type Breaker interface {
Mark()
}
Breaker implements a simple circuit breaker interface.
var ( // DefaultKafkaBreaker is the default breaker used to track Kafka errors. DefaultKafkaBreaker Breaker = NewThresholdBreaker(100, time.Minute, getDefaultAction("Kafka")) // DefaultS3Breaker is the default breaker used to track AWS S3 errors. DefaultS3Breaker Breaker = NewThresholdBreaker(10, time.Minute, getDefaultAction("S3")) // DefaultSQSBreaker is the default breaker used to track AWS S3 errors. DefaultSQSBreaker Breaker = NewThresholdBreaker(20, time.Minute, getDefaultAction("SQS")) )
type Checkpoint ¶
type Checkpoint struct { Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` Partition uint32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"` Offset uint64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` Timestamp time.Time `protobuf:"bytes,5,opt,name=timestamp,proto3,stdtime" json:"timestamp"` }
func (*Checkpoint) Descriptor ¶
func (*Checkpoint) Descriptor() ([]byte, []int)
func (*Checkpoint) ProtoMessage ¶
func (*Checkpoint) ProtoMessage()
func (*Checkpoint) Reset ¶
func (m *Checkpoint) Reset()
func (*Checkpoint) String ¶
func (m *Checkpoint) String() string
func (*Checkpoint) XXX_DiscardUnknown ¶
func (m *Checkpoint) XXX_DiscardUnknown()
func (*Checkpoint) XXX_Marshal ¶
func (m *Checkpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*Checkpoint) XXX_Merge ¶
func (m *Checkpoint) XXX_Merge(src proto.Message)
func (*Checkpoint) XXX_Size ¶
func (m *Checkpoint) XXX_Size() int
func (*Checkpoint) XXX_Unmarshal ¶
func (m *Checkpoint) XXX_Unmarshal(b []byte) error
type CheckpointStore ¶
type CheckpointStore interface { Save(checkpoint Checkpoint) error Load(region, topic string, partition uint32) *Checkpoint }
CheckpointStore represents the checkpoint storage used by ingress controller.
type Consumer ¶
type Consumer interface { Subscribe(topics ...string) error Poll() kafka.Event Assign(partitions []kafka.TopicPartition) error Unassign() error Pause(partitions []kafka.TopicPartition) error Seek(topic string, partition uint32, offset kafka.Offset) error Commit(topic string, partition uint32, offset kafka.Offset) error }
Consumer represents the consumer operations.
type ExponentialRetrier ¶
type ExponentialRetrier struct {
// contains filtered or unexported fields
}
ExponentialRetrier implements the retry with exponential backoff resiliency pattern
func NewExponentialRetrier ¶
func NewExponentialRetrier(min, max time.Duration, jitter float64) *ExponentialRetrier
NewExponentialRetrier returns a new ExponentialRetrier instance
type Factory ¶
type Factory interface {
Get() (interface{}, error)
}
Factory is used to supply the required component dependency.
type Lifecycle ¶
type Lifecycle interface { Start() error Stop() }
Lifecycle is the interface defining methods for instance lifecycle control.
type Logger ¶
type Logger interface { Debug(args ...interface{}) Debugf(format string, args ...interface{}) Info(args ...interface{}) Infof(format string, args ...interface{}) Warn(args ...interface{}) Warnf(format string, args ...interface{}) Error(args ...interface{}) Errorf(format string, args ...interface{}) Panic(args ...interface{}) Panicf(format string, args ...interface{}) WithFields(fields Fields) Logger }
Logger provides basic logging functionality
func NewLogger ¶
NewLogger returns a new logger instance that uses 'github.com/sirupsen/logrus' under the covers
func NewNullLogger ¶
func NewNullLogger() Logger
NewNullLogger returns the logger instance used to disable all logging
type Message ¶
type Message struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` Offset uint64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` Timestamp time.Time `protobuf:"bytes,4,opt,name=timestamp,proto3,stdtime" json:"timestamp"` Headers []*Message_Header `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty"` }
func (*Message) Descriptor ¶
func (*Message) ProtoMessage ¶
func (*Message) ProtoMessage()
func (*Message) XXX_DiscardUnknown ¶
func (m *Message) XXX_DiscardUnknown()
func (*Message) XXX_Marshal ¶
func (*Message) XXX_Unmarshal ¶
type Message_Header ¶
type Message_Header struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` }
func (*Message_Header) Descriptor ¶
func (*Message_Header) Descriptor() ([]byte, []int)
func (*Message_Header) ProtoMessage ¶
func (*Message_Header) ProtoMessage()
func (*Message_Header) Reset ¶
func (m *Message_Header) Reset()
func (*Message_Header) String ¶
func (m *Message_Header) String() string
func (*Message_Header) XXX_DiscardUnknown ¶
func (m *Message_Header) XXX_DiscardUnknown()
func (*Message_Header) XXX_Marshal ¶
func (m *Message_Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*Message_Header) XXX_Merge ¶
func (m *Message_Header) XXX_Merge(src proto.Message)
func (*Message_Header) XXX_Size ¶
func (m *Message_Header) XXX_Size() int
func (*Message_Header) XXX_Unmarshal ¶
func (m *Message_Header) XXX_Unmarshal(b []byte) error
type Metrics ¶
type Metrics interface { GetMeter(name string) Meter GetTimer(name string) Timer GetGauge(name string) Gauge Remove(name string) }
Metrics is the provider for various metric types
func NewMetrics ¶
func NewMetrics(registry metrics.Registry) Metrics
NewMetrics returns a new metrics instance that uses 'github.com/rcrowley/go-metrics' under the covers
func NewNullMetrics ¶
func NewNullMetrics() Metrics
NewNullMetrics returns the metrics instance used to disable all metrics collection
type Producer ¶
type Producer interface { Produce(topic string, key, value []byte) error ProduceMessages(topic string, partition uint32, messages ...Message) error }
Producer represents the producer operations.
type PubSub ¶
type PubSub interface { Subscribe(topic string) error Events() <-chan kafka.Event Publish(topic string, key, value []byte) error }
PubSub represents the publish-subscribe operations.
type Retrier ¶
Retrier will execute the provided work function until success Returns true if the work was executed successfully
var ( // DefaultJitter is the default jitter percentage DefaultJitter = 0.25 // DefaultKafkaRetrier is the default retrier used for Kafka operations DefaultKafkaRetrier Retrier = NewExponentialRetrier(100*time.Millisecond, 1*time.Second, DefaultJitter) // DefaultS3Retrier is the default retrier used for AWS S3 operations DefaultS3Retrier Retrier = NewExponentialRetrier(200*time.Millisecond, 5*time.Second, DefaultJitter) )
type Segment ¶
type Segment struct { Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` Partition uint32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"` Level uint32 `protobuf:"varint,4,opt,name=level,proto3" json:"level,omitempty"` StartOffset uint64 `protobuf:"varint,5,opt,name=startOffset,proto3" json:"startOffset,omitempty"` EndOffset uint64 `protobuf:"varint,6,opt,name=endOffset,proto3" json:"endOffset,omitempty"` }
func (*Segment) Descriptor ¶
func (*Segment) HasOffset ¶
HasOffset returns true if the provided offset is contained by the segment.
func (*Segment) ProtoMessage ¶
func (*Segment) ProtoMessage()
func (*Segment) XXX_DiscardUnknown ¶
func (m *Segment) XXX_DiscardUnknown()
func (*Segment) XXX_Marshal ¶
func (*Segment) XXX_Unmarshal ¶
type SegmentEvent ¶
type SegmentEvent struct { Type SegmentEvent_Type `protobuf:"varint,1,opt,name=type,proto3,enum=core.SegmentEvent_Type" json:"type,omitempty"` Timestamp time.Time `protobuf:"bytes,2,opt,name=timestamp,proto3,stdtime" json:"timestamp"` Segment Segment `protobuf:"bytes,3,opt,name=segment,proto3" json:"segment"` SegmentSize uint64 `protobuf:"varint,4,opt,name=segmentSize,proto3" json:"segmentSize,omitempty"` }
func (*SegmentEvent) Descriptor ¶
func (*SegmentEvent) Descriptor() ([]byte, []int)
func (*SegmentEvent) ProtoMessage ¶
func (*SegmentEvent) ProtoMessage()
func (*SegmentEvent) Reset ¶
func (m *SegmentEvent) Reset()
func (*SegmentEvent) String ¶
func (m *SegmentEvent) String() string
func (*SegmentEvent) XXX_DiscardUnknown ¶
func (m *SegmentEvent) XXX_DiscardUnknown()
func (*SegmentEvent) XXX_Marshal ¶
func (m *SegmentEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*SegmentEvent) XXX_Merge ¶
func (m *SegmentEvent) XXX_Merge(src proto.Message)
func (*SegmentEvent) XXX_Size ¶
func (m *SegmentEvent) XXX_Size() int
func (*SegmentEvent) XXX_Unmarshal ¶
func (m *SegmentEvent) XXX_Unmarshal(b []byte) error
type SegmentEventRequest ¶
type SegmentEventRequest struct { SegmentEvent Result chan<- error }
SegmentEventRequest is used to implement the chain-of-responsibility design pattern for handling segment events.
type SegmentEventSource ¶
type SegmentEventSource interface {
Events() <-chan SegmentEventRequest
}
SegmentEventSource represents source for segment events.
type SegmentEvent_Type ¶
type SegmentEvent_Type int32
const ( SegmentEvent_CREATED SegmentEvent_Type = 0 SegmentEvent_REMOVED SegmentEvent_Type = 1 )
func (SegmentEvent_Type) EnumDescriptor ¶
func (SegmentEvent_Type) EnumDescriptor() ([]byte, []int)
func (SegmentEvent_Type) String ¶
func (x SegmentEvent_Type) String() string
type SegmentFormat ¶
type SegmentFormat interface { NewWriter(ctx context.Context, path string) (SegmentWriter, error) NewReader(ctx context.Context, path string) (SegmentReader, error) }
SegmentFormat represents the segment data serialization format.
type SegmentInfo ¶
SegmentInfo provides contextual information for the corresponding segment.
type SegmentMetadata ¶
type SegmentMetadata struct { Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` Partition uint32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"` Level uint32 `protobuf:"varint,4,opt,name=level,proto3" json:"level,omitempty"` StartOffset uint64 `protobuf:"varint,5,opt,name=startOffset,proto3" json:"startOffset,omitempty"` EndOffset uint64 `protobuf:"varint,6,opt,name=endOffset,proto3" json:"endOffset,omitempty"` MessageCount uint64 `protobuf:"varint,7,opt,name=messageCount,proto3" json:"messageCount,omitempty"` CreatedTimestamp time.Time `protobuf:"bytes,8,opt,name=createdTimestamp,proto3,stdtime" json:"createdTimestamp"` }
func (*SegmentMetadata) Descriptor ¶
func (*SegmentMetadata) Descriptor() ([]byte, []int)
func (*SegmentMetadata) ProtoMessage ¶
func (*SegmentMetadata) ProtoMessage()
func (*SegmentMetadata) Reset ¶
func (m *SegmentMetadata) Reset()
func (*SegmentMetadata) String ¶
func (m *SegmentMetadata) String() string
func (*SegmentMetadata) XXX_DiscardUnknown ¶
func (m *SegmentMetadata) XXX_DiscardUnknown()
func (*SegmentMetadata) XXX_Marshal ¶
func (m *SegmentMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*SegmentMetadata) XXX_Merge ¶
func (m *SegmentMetadata) XXX_Merge(src proto.Message)
func (*SegmentMetadata) XXX_Size ¶
func (m *SegmentMetadata) XXX_Size() int
func (*SegmentMetadata) XXX_Unmarshal ¶
func (m *SegmentMetadata) XXX_Unmarshal(b []byte) error
type SegmentReader ¶
type SegmentReader interface { Read(ctx context.Context, count int) ([]Message, error) Metadata() SegmentMetadata Close(ctx context.Context) }
SegmentReader represents the segment reader operations.
type SegmentStore ¶
type SegmentStore interface { SegmentEventSource Create(ctx context.Context) (SegmentWriter, error) Open(ctx context.Context, segment Segment) (SegmentReader, error) ListSegments(ctx context.Context, region, topic string, partition uint32) (map[Segment]SegmentInfo, error) Delete(ctx context.Context, segment Segment) error }
SegmentStore represents the segment storage.
type SegmentWriter ¶
type SegmentWriter interface { Write(ctx context.Context, message Message) error Close(ctx context.Context, metadata SegmentMetadata) error Abort(ctx context.Context) }
SegmentWriter represents the segment writer operations.
type ThresholdBreaker ¶
type ThresholdBreaker struct {
// contains filtered or unexported fields
}
ThresholdBreaker implements a simple circuit breaker that triggers when the error rate reaches the threshold.
func NewThresholdBreaker ¶
func NewThresholdBreaker(threshold int, interval time.Duration, action func()) *ThresholdBreaker
NewThresholdBreaker returns a new ThresholdBreaker instance that allows errors up to threshold inside the interval. When the threshold is reached, the provided action will be invoked.
func (*ThresholdBreaker) Mark ¶
func (b *ThresholdBreaker) Mark()
Mark increments the internal error counter.