Versions in this module Expand all Collapse all v0 v0.3.1 Feb 3, 2021 v0.3.0 Feb 1, 2021 v0.2.0 May 6, 2020 Changes in this version + func String(str string) *string v0.1.0 Apr 25, 2020 Changes in this version + func Duration(d time.Duration) *time.Duration + func Int(i int) *int + func StringPtr(str string) *string + type Config struct + BaseKafkaConfig KafkaConfigMap + DataSource string + DatabaseBindingProvider DatabaseBindingProvider + KafkaConsumerProvider KafkaConsumerProvider + KafkaProducerProvider KafkaProducerProvider + LeaderGroupID string + LeaderTopic string + Limits Limits + Name string + NeliProvider NeliProvider + OutboxTable string + ProducerKafkaConfig KafkaConfigMap + Scribe scribe.Scribe + func Unmarshal(in []byte) (Config, error) + func (c *Config) SetDefaults() + func (c Config) String() string + func (c Config) Validate() error + type DatabaseBinding interface + Dispose func() + Mark func(leaderID uuid.UUID, limit int) ([]OutboxRecord, error) + Purge func(id int64) (bool, error) + Reset func(id int64) (bool, error) + func NewPostgresBinding(dataSource string, outboxTable string) (DatabaseBinding, error) + type DatabaseBindingProvider func(dataSource string, outboxTable string) (DatabaseBinding, error) + func StandardPostgresBindingProvider() DatabaseBindingProvider + type Event interface + type EventHandler func(e Event) + type Harvest interface + Await func() error + InFlightRecordKeys func() []string + InFlightRecords func() int + IsLeader func() bool + LeaderID func() *uuid.UUID + SetEventHandler func(eventHandler EventHandler) + Start func() error + State func() State + Stop func() + func New(config Config) (Harvest, error) + type KafkaConfigMap map[string]interface + type KafkaConsumer interface + Close func() error + ReadMessage func(timeout time.Duration) (*kafka.Message, error) + Subscribe func(topic string, rebalanceCb kafka.RebalanceCb) error + type KafkaConsumerProvider func(conf *KafkaConfigMap) (KafkaConsumer, error) + func StandardKafkaConsumerProvider() KafkaConsumerProvider + type KafkaHeader struct + Key string + Value string + func (h KafkaHeader) String() string + type KafkaHeaders []KafkaHeader + type KafkaProducer interface + Close func() + Events func() chan kafka.Event + Produce func(msg *kafka.Message, deliveryChan chan kafka.Event) error + type KafkaProducerProvider func(conf *KafkaConfigMap) (KafkaProducer, error) + func StandardKafkaProducerProvider() KafkaProducerProvider + type LeaderAcquired struct + func (e LeaderAcquired) LeaderID() uuid.UUID + func (e LeaderAcquired) String() string + type LeaderFenced struct + func (e LeaderFenced) String() string + type LeaderRefreshed struct + func (e LeaderRefreshed) LeaderID() uuid.UUID + func (e LeaderRefreshed) String() string + type LeaderRevoked struct + func (e LeaderRevoked) String() string + type Limits struct + DrainInterval *time.Duration + HeartbeatTimeout *time.Duration + IOErrorBackoff *time.Duration + MarkBackoff *time.Duration + MarkQueryRecords *int + MaxInFlightRecords *int + MaxPollInterval *time.Duration + MinMetricsInterval *time.Duration + MinPollInterval *time.Duration + PollDuration *time.Duration + QueueTimeout *time.Duration + SendBuffer *int + SendConcurrency *int + func (l *Limits) SetDefaults() + func (l Limits) String() string + func (l Limits) Validate() error + type MeterRead struct + func (e MeterRead) Stats() metric.MeterStats + func (e MeterRead) String() string + type NeliProvider func(config goneli.Config, barrier goneli.Barrier) (goneli.Neli, error) + func StandardNeliProvider() NeliProvider + type OutboxRecord struct + CreateTime time.Time + ID int64 + KafkaHeaders KafkaHeaders + KafkaKey string + KafkaTopic string + KafkaValue *string + LeaderID *uuid.UUID + func (rec OutboxRecord) String() string + type State int + const Created + const Running + const Stopped + const Stopping