kafka

package
v0.194.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2023 License: MIT Imports: 19 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// ToKafkaKind is the Kind for the ToKafka Flux function
	ToKafkaKind = "toKafka"
)

Variables

View Source
var DefaultKafkaWriterFactory = func(conf kafka.WriterConfig) KafkaWriter {
	return kafka.NewWriter(conf)
}

DefaultKafkaWriterFactory is a terrible name for a way to make a kafkaWriter that is injectable for testing

Functions

This section is empty.

Types

type KafkaWriter

type KafkaWriter interface {
	io.Closer
	WriteMessages(context.Context, ...kafka.Message) error
}

KafkaWriter is an interface for what we need fromDefaultKafkaWriterFactory

type ToKafkaOpSpec

type ToKafkaOpSpec struct {
	Brokers      []string `json:"brokers"`
	Topic        string   `json:"topic"`
	Balancer     string   `json:"balancer"`
	Name         string   `json:"name"`
	NameColumn   string   `json:"nameColumn"` // either name or name_column must be set, if none is set try to use the "_measurement" column.
	TimeColumn   string   `json:"timeColumn"`
	TagColumns   []string `json:"tagColumns"`
	ValueColumns []string `json:"valueColumns"`
	MsgBufSize   int      `json:"msgBufferSize"` // the maximim number of messages to buffer before sending to kafka, the library we use defaults to 100
}

func (ToKafkaOpSpec) Kind

func (*ToKafkaOpSpec) ReadArgs

func (o *ToKafkaOpSpec) ReadArgs(args flux.Arguments) error

ReadArgs loads a flux.Arguments into ToKafkaOpSpec. It sets several default values. If the time_column isn't set, it defaults to execute.TimeColLabel. If the value_column isn't set it defaults to a []string{execute.DefaultValueColLabel}.

type ToKafkaProcedureSpec

type ToKafkaProcedureSpec struct {
	plan.DefaultCost
	Spec *ToKafkaOpSpec
	// contains filtered or unexported fields
}

func (*ToKafkaProcedureSpec) Copy

func (*ToKafkaProcedureSpec) Kind

type ToKafkaTransformation

type ToKafkaTransformation struct {
	execute.ExecutionNode
	// contains filtered or unexported fields
}

func (*ToKafkaTransformation) Finish

func (t *ToKafkaTransformation) Finish(id execute.DatasetID, err error)

func (*ToKafkaTransformation) Process

func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (err error)

func (*ToKafkaTransformation) RetractTable

func (t *ToKafkaTransformation) RetractTable(id execute.DatasetID, key flux.GroupKey) error

func (*ToKafkaTransformation) UpdateProcessingTime

func (t *ToKafkaTransformation) UpdateProcessingTime(id execute.DatasetID, pt execute.Time) error

func (*ToKafkaTransformation) UpdateWatermark

func (t *ToKafkaTransformation) UpdateWatermark(id execute.DatasetID, pt execute.Time) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL