tube

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2022 License: MIT Imports: 11 Imported by: 0

README

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Kfk2Db

func Kfk2Db(ctx context.Context, sc KfkStreamConsumer, db *sql.DB, num int, h HandleFunc, query string) error

Kfk2Db simple pull kafka data to clickhouse

Types

type AsyncProducer

type AsyncProducer interface {
	AsyncProduce(context.Context, string) error
	Close() error
}

AsyncProducer kafka client to async produce message

func MustNewAsyncProducer

func MustNewAsyncProducer(topic string, brokerAddr []string) AsyncProducer

MustNewAsyncProducer constructor

type HandleFunc

type HandleFunc func([]byte) (interface{}, error)

HandleFunc receive kafka message ,return result

type KfkStreamConsumer

type KfkStreamConsumer interface {
	// start to async fetch the messages .
	// when the amount of received message is up to the config num,
	// the caller can get the slice of subscribe messages
	Subscribe(ctx context.Context, handle HandleFunc) chan interface{}

	// Commit the continuously outputted messages' offset
	Commit() error

	// stop  fetching  message
	Close() error
}

KfkStreamConsumer aim to accelerate process speed per consumer

func MustNewKfkStreamConsumer

func MustNewKfkStreamConsumer(topic, group string, workerNum int, brokers []string) KfkStreamConsumer

MustNewKfkStreamConsumer constructor of KfkStreamConsumer

type Producer

type Producer interface {
	Produce(string, string) error
	Close() error
}

Producer kafka client to produce message

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL