mka

package
v0.0.0-...-c29e2dc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2023 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RWMode

type RWMode byte

RWMode 支持多写还是主备模式.

const (
	// RWModeMultiRW 多写模式.
	RWModeMultiRW RWMode = iota
	// RWModeBackup 主从模式,正常情况下写入主,主有问题时随机选择一个从.
	RWModeBackup
)

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader 代表一个支持多Kafka集群的reader. 它会从多个kafka集群同时读取消息.

func NewReader

func NewReader(configs []kafka.ReaderConfig) *Reader

NewReader 返回一个支持多Kafka集群的reader.

func (*Reader) Close

func (r *Reader) Close() error

Close 关闭所有的reader, 阻止程序读取更多的kafka消息.

func (*Reader) Lag

func (r *Reader) Lag(i int) int64

Lag returns the lag of the last message returned by ReadMessage, or -1 if r is backed by a consumer group.

func (*Reader) Offset

func (r *Reader) Offset(i int) int64

Offset returns the current absolute offset of the reader, or -1 if r is backed by a consumer group.

func (*Reader) ReadLag

func (r *Reader) ReadLag(ctx context.Context, i int) (lag int64, err error)

ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage.

This method is intended to be used in cases where a program may be unable to call ReadMessage to update the value returned by Lag, but still needs to get an up to date estimation of how far behind the reader is. For example when the consumer is not ready to process the next message.

The function returns a lag of zero when the reader's current offset is negative.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) ([]kafka.Message, error)

ReadMessage reads from all kafka clusters and return the next messages from the r. The method call blocks until at least a message becomes available, or all readers return errors. The program may also specify a context to asynchronously cancel the blocking operation.

The method returns io.EOF to indicate that the reader has been closed.

If consumer groups are used, ReadMessage will automatically commit the offset when called. Note that this could result in an offset being committed before the message is fully processed.

If more fine grained control of when offsets are committed is required, it is recommended to use FetchMessage with CommitMessages instead.

func (*Reader) SetOffset

func (r *Reader) SetOffset(i int, offset int64) error

SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed.

From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note while -1 and -2 were accepted to indicate the first or last offset in previous versions, the meanings of the numbers were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol specification.

func (*Reader) SetOffsetAt

func (r *Reader) SetOffsetAt(ctx context.Context, i int, t time.Time) error

SetOffsetAt changes the offset from which the next batch of messages will be read given the timestamp t.

The method fails if the unable to connect partition leader, or unable to read the offset given the ts, or if the reader has been closed.

func (*Reader) Stats

func (r *Reader) Stats(i int) kafka.ReaderStats

Stats returns a snapshot of the reader stats since the last time the method was called, or since the reader was created if it is called for the first time.

A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka reader and report the metrics to a stats collection system.

type WriteErrors

type WriteErrors []error

func (WriteErrors) Count

func (err WriteErrors) Count() int

Count counts the number of non-nil errors in err.

func (WriteErrors) Error

func (err WriteErrors) Error() string

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer 支持多写Kafka集群. 可以选择多写模式还是主备模式. - 多写模式下: 轮询选择一个kafka集群进行写入 - 主备模式: 优先写入主, 主失败的情况下写入从

func NewWriter

func NewWriter(rwmode RWMode, configs []kafka.WriterConfig) *Writer

NewReNewWriterader 返回一个支持多Kafka集群的writer.

func (*Writer) Close

func (w *Writer) Close() error

Close flushes pending writes, and waits for all writes to complete before returning. Calling Close also prevents new writes from being submitted to the writer, further calls to WriteMessages and the like will fail with io.ErrClosedPipe.

func (*Writer) Stats

func (w *Writer) Stats(i int) kafka.WriterStats

Stats returns a snapshot of the selected writer stats since the last time the method was called, or since the writer was created if it is called for the first time.

A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka writer and report the metrics to a stats collection system.

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msgs ...kafka.Message) error

/ WriteMessages writes a batch of messages to the kafka topic configured on this writers. If write fails, it will try write another kafka cluster again.

Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached.

When sending synchronously and the writer's batch size is configured to be greater than 1, this method blocks until either a full batch can be assembled or the batch timeout is reached. The batch size and timeouts are evaluated per partition, so the choice of Balancer can also influence the flushing behavior. For example, the Hash balancer will require on average N * batch size messages to trigger a flush where N is the number of partitions. The best way to achieve good batching behavior is to share one Writer amongst multiple go routines.

When the method returns an error, it may be of type kafka.WriteError to allow the caller to determine the status of each message.

The context passed as first argument may also be used to asynchronously cancel the operation. Note that in this case there are no guarantees made on whether messages were written to kafka. The program should assume that the whole batch failed and re-write the messages later (which could then cause duplicates).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL