Documentation
¶
Index ¶
- Constants
- Variables
- func GetSink() api.Sink
- func GetSource() api.Source
- type KafkaCollectStats
- type KafkaSink
- func (k *KafkaSink) Close(ctx api.StreamContext) error
- func (k *KafkaSink) Collect(ctx api.StreamContext, item api.RawTuple) error
- func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (k *KafkaSink) Info() model.SinkInfo
- func (k *KafkaSink) Ping(ctx api.StreamContext, props map[string]any) error
- func (k *KafkaSink) Provision(ctx api.StreamContext, configs map[string]any) error
- type KafkaSource
- func (k *KafkaSource) Close(ctx api.StreamContext) error
- func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
- func (k *KafkaSource) GetOffset() (interface{}, error)
- func (k *KafkaSource) Ping(ctx api.StreamContext, props map[string]any) error
- func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) error
- func (k *KafkaSource) ResetOffset(input map[string]interface{}) error
- func (k *KafkaSource) Rewind(offset interface{}) error
- func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error
Constants ¶
View Source
const ( LblUnmarshal = "unmarshal" LblCollect = "collect" LblReq = "req" LblKafka = "kafka" LblMsg = "msg" LblQueueIn = "queue-in" LblIngest = "ingest" LblSend = "send" LblBytes = "bytes" LblOffset = "offset" )
View Source
const ( SASL_NONE = "none" SASL_PLAIN = "plain" SASL_SCRAM = "scram" )
View Source
const (
LblTarget = "target"
)
Variables ¶
View Source
var ( KafkaSinkCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "kuiper", Subsystem: "kafka_sink", Name: "counter", Help: "counter of Kafka Sink IO", }, []string{metrics.LblType, LblTarget, metrics.LblRuleIDType, metrics.LblOpIDType}) KafkaSinkCollectDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "kuiper", Subsystem: "kafka_sink", Name: "collect_duration_hist", Help: "Sink Historgram Duration of IO", Buckets: prometheus.ExponentialBuckets(10, 2, 20), }, []string{metrics.LblType, LblTarget, metrics.LblRuleIDType, metrics.LblOpIDType}) KafkaSourceCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "kuiper", Subsystem: "kafka_source", Name: "counter", Help: "counter of Kafka Source IO", }, []string{metrics.LblType, metrics.LblRuleIDType, metrics.LblOpIDType}) KafkaSourceGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "kuiper", Subsystem: "kafka_source", Name: "gauge", Help: "Gauge of Kafka Source IO", }, []string{metrics.LblType, metrics.LblRuleIDType, metrics.LblOpIDType}) )
Functions ¶
Types ¶
type KafkaCollectStats ¶ added in v2.0.7
type KafkaSink ¶
type KafkaSink struct { LastStats kafkago.WriterStats // contains filtered or unexported fields }
func (*KafkaSink) Connect ¶
func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
type KafkaSource ¶
type KafkaSource struct {
// contains filtered or unexported fields
}
func (*KafkaSource) Close ¶
func (k *KafkaSource) Close(ctx api.StreamContext) error
func (*KafkaSource) Connect ¶
func (k *KafkaSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error
func (*KafkaSource) GetOffset ¶
func (k *KafkaSource) GetOffset() (interface{}, error)
func (*KafkaSource) Ping ¶
func (k *KafkaSource) Ping(ctx api.StreamContext, props map[string]any) error
func (*KafkaSource) Provision ¶
func (k *KafkaSource) Provision(ctx api.StreamContext, configs map[string]any) error
func (*KafkaSource) ResetOffset ¶
func (k *KafkaSource) ResetOffset(input map[string]interface{}) error
func (*KafkaSource) Rewind ¶
func (k *KafkaSource) Rewind(offset interface{}) error
func (*KafkaSource) Subscribe ¶
func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, ingestError api.ErrorIngest) error
Click to show internal directories.
Click to hide internal directories.