butterfly

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2022 License: MIT Imports: 4 Imported by: 0

README

butterfly-go

Documentation

Index

Constants

View Source
const (
	LastOffset  int64 = -1 // The most recent offset available for a partition.
	FirstOffset int64 = -2 // The least recent offset available for a partition.
)

Variables

This section is empty.

Functions

This section is empty.

Types

type KafkaReader

type KafkaReader struct {
	// contains filtered or unexported fields
}

func NewKafkaReader

func NewKafkaReader(config *ReaderConfig) *KafkaReader

func (*KafkaReader) Close added in v0.1.0

func (r *KafkaReader) Close(ctx context.Context) error

func (*KafkaReader) FetchMessage

func (r *KafkaReader) FetchMessage(ctx context.Context) (ReadMessage, error)

FetchMessage does not commit the message Invoke CommitMessage to commit the messages

func (*KafkaReader) ReadMessage added in v0.0.5

func (r *KafkaReader) ReadMessage(ctx context.Context) (ReadMessage, error)

Also commits before returning

type KafkaWriter

type KafkaWriter struct {
	// contains filtered or unexported fields
}

func NewKafkaWriter

func NewKafkaWriter(writerConfig *WriterConfig) *KafkaWriter

func (*KafkaWriter) Dispose

func (w *KafkaWriter) Dispose() error

func (*KafkaWriter) Write

func (w *KafkaWriter) Write(ctx context.Context, logs ...WriteMessage) error

type Logger

type Logger interface {
	Printf(string, ...interface{})
}

Logger interface API for log.Logger

type LoggerFunc

type LoggerFunc func(string, ...interface{})

LoggerFunc is a bridge between Logger and any third party logger Usage:

l := NewLogger() // some logger
r := kafka.NewReader(kafka.ReaderConfig{
  Logger:      kafka.LoggerFunc(l.Infof),
  ErrorLogger: kafka.LoggerFunc(l.Errorf),
})

func (LoggerFunc) Printf

func (f LoggerFunc) Printf(msg string, args ...interface{})

type ReadMessage added in v0.0.5

type ReadMessage struct {
	Topic     string
	Partition int
	Offset    int64
	Key       []byte
	Value     []byte
	Time      time.Time
}

type Reader

type Reader interface {
	ReadMessage(ctx context.Context) (ReadMessage, error)
	FetchMessage(ctx context.Context) (ReadMessage, error)
	Close(ctx context.Context) error
}

func NewReader

func NewReader(readerConfig *ReaderConfig) Reader

type ReaderConfig

type ReaderConfig struct {
	Endpoint []string
	GroupId  string
	Topics   []string

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	// Default is LastOffset
	StartOffset    int64
	QueueCapacity  int
	CommitInterval time.Duration
}

type StatsdClient

type StatsdClient struct {
	// contains filtered or unexported fields
}

func NewStatsdClient

func NewStatsdClient(config StatsdConfig) (*StatsdClient, error)

func (*StatsdClient) PublishKafkaReadError

func (m *StatsdClient) PublishKafkaReadError()

func (*StatsdClient) PublishKafkaReadOps

func (m *StatsdClient) PublishKafkaReadOps(topic string)

func (*StatsdClient) PublishKafkaWriteError

func (m *StatsdClient) PublishKafkaWriteError(topic string)

func (*StatsdClient) PublishKafkaWriteLatency

func (m *StatsdClient) PublishKafkaWriteLatency(topic string, startTime time.Time)

func (*StatsdClient) PublishKafkaWriteOps

func (m *StatsdClient) PublishKafkaWriteOps(topic string)

type StatsdConfig

type StatsdConfig struct {
	Endpoint string
	PREFIX   string
}

type WriteMessage added in v0.0.5

type WriteMessage struct {
	Key   []byte
	Value []byte
	Topic string
}

type Writer

type Writer interface {
	Write(ctx context.Context, logs ...WriteMessage) error
}

func NewWriter

func NewWriter(writerConfig *WriterConfig) Writer

type WriterConfig

type WriterConfig struct {
	Endpoint     []string
	Topic        string
	BatchTimeout time.Duration
	WriteTimeout time.Duration
	BatchSize    int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL