Version: v0.2.2 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2020 License: Apache-2.0 Imports: 12 Imported by: 0




View Source
const (
	// sink Possible Statuses
	SinkStatusOnError = "ON_ERROR"
	SinkStatusRunning = "RUNNING"
	SinkStatusWaiting = "WAITING"

	DefaultChannelSize = 100

Possible Statuses

View Source
const KafkaType = "Kafka"

KafkaType type of sink

View Source
const MaxRetry = 20

MaxRetry retry max

View Source
const PulsarType = "Pulsar"

PulsarType type of sink

View Source
const StdoutType = "Stdout"

StdoutType type of sink


View Source
var Factory = map[string]sinkCreator{
	StdoutType: NewStdout,
	KafkaType:  NewKafka,
	PulsarType: NewPulsar,

Factory sink Factory


func MsgByteSize added in v0.2.2

func MsgByteSize(msg *sarama.ProducerMessage) int

func SendMsg added in v0.2.2

func SendMsg(msgs []*sarama.ProducerMessage, producer sarama.SyncProducer) int64


type Kafka

type Kafka struct {
	KafkaConf *KafkaSinkConfig

Kafka representation of kafka sink

func (*Kafka) ProcessGenericEvent added in v0.2.2

func (k *Kafka) ProcessGenericEvent(genericMsg *events.GenericEvent) (*KafkaMessage, error)

ProcessGenericEvent process Generic Event

func (*Kafka) ProcessSQLEvent added in v0.2.2

func (k *Kafka) ProcessSQLEvent(sqlEvent *events.SQLEvent) (*KafkaMessage, error)

ProcessSQLEvent process Sql Event

func (*Kafka) ProducerLoop added in v0.2.2

func (k *Kafka) ProducerLoop(producer sarama.SyncProducer, in chan *KafkaMessage)

func (*Kafka) Start

func (k *Kafka) Start(_ ...interface{}) error

Start kafka sink

func (*Kafka) StartConsumer added in v0.2.2

func (k *Kafka) StartConsumer(kafkaChan chan *KafkaMessage)

StartConsumer consume input chan

func (*Kafka) StartProducer added in v0.2.2

func (k *Kafka) StartProducer(in chan *KafkaMessage, stop chan error)

StartProducer send message to kafka

type KafkaGSSAPI added in v0.2.0

type KafkaGSSAPI struct {
	ServiceName        string `json:"service_name" mapstructure:"service_name"`
	Realm              string `json:"realm"`
	KerberosConfigPath string `json:"kerberos_config_path" mapstructure:"kerberos_config_path"`
	KeyTabPath         string `json:"key_tab_path" mapstructure:"key_tab_path"`

type KafkaMessage added in v0.2.0

type KafkaMessage struct {
	// The Kafka topic for this message.
	Topic string
	// The partitioning key for this message.
	Key string
	// The source offset of message
	Offset *events.Offset
	// The actual serialized message to store In Kafka.
	Value []byte

KafkaSinkConfig representation of Kafka Message

type KafkaSinkConfig added in v0.2.2

type KafkaSinkConfig struct {
	TLS             bool         `json:"tls"`
	Kerberos        bool         `json:"kerberos"`
	ShuffleEvent    bool         `json:"shuffle_event" mapstructure:"shuffle_event"`
	Topic           string       `json:"topic"`
	TopicPrefix     string       `json:"topic_prefix" mapstructure:"topic_prefix"`
	ClientID        string       `json:"client_id" mapstructure:"client_id"`
	Brokers         []string     `json:"brokers"`
	Producer        *KafkaUser   `json:"Producer"`
	Consumer        *KafkaUser   `json:"consumer"`
	GSSAPI          *KafkaGSSAPI `json:"gssapi"`
	MaxMessageBytes int          `json:"max_message_bytes" mapstructure:"max_message_bytes"`
	NbProducer      int          `json:"nb_producer" mapstructure:"nb_producer"`

KafkaSinkConfig representation of kafka sink config

type KafkaUser added in v0.2.2

type KafkaUser struct {
	User     string `json:"user"`
	Password string `json:"password"`

KafkaUser representation of kafka User

type Pulsar added in v0.2.0

type Pulsar struct {
	PulsarConf *PulsarSinkConfig
	Producer   pulsar.Producer

Pulsar representation of Pulsar sink

func (*Pulsar) GetInputChan added in v0.2.0

func (p *Pulsar) GetInputChan() chan events.LookatchEvent

GetInputChan return the input channel attached to this sink

func (*Pulsar) ProcessEvent added in v0.2.2

func (p *Pulsar) ProcessEvent(msg events.LookatchEvent) error

ProcessEvent convert LookatchEvent to Pulsar ProducerMessage

func (*Pulsar) Start added in v0.2.0

func (p *Pulsar) Start(_ ...interface{}) error

Start connect to pulsar and start Producer

func (*Pulsar) StartProducer added in v0.2.2

func (p *Pulsar) StartProducer()

StartConsumer consume input chan

type PulsarSinkConfig added in v0.2.2

type PulsarSinkConfig struct {
	Topic string `json:"topic"`
	URL   string `json:"url"`
	Token string `json:"token"`

PulsarSinkConfig representation of kafka sink config

type Sink

type Sink struct {
	In            chan events.LookatchEvent
	Stop          chan error
	Commit        chan interface{}
	Name          string
	EncryptionKey string
	Conf          *viper.Viper

Sink representation of sink

func (*Sink) GetCommitChan added in v0.2.0

func (s *Sink) GetCommitChan() chan interface{}

GetCommitChan return the Commit channel attached to this sink

func (*Sink) GetInputChan added in v0.2.0

func (s *Sink) GetInputChan() chan events.LookatchEvent

GetInputChan return input channel attach to sink

func (*Sink) SendCommit added in v0.2.0

func (s *Sink) SendCommit(payload interface{})

SendCommit send a Commit message into the Commit channel of this sink

type SinkI

type SinkI interface {
	Start(...interface{}) error
	GetInputChan() chan events.LookatchEvent
	GetCommitChan() chan interface{}

SinkI sink interface

func New

func New(name string, sinkType string, conf *viper.Viper, stop chan error) (SinkI, error)

New create new sink

func NewKafka added in v0.2.2

func NewKafka(s *Sink) (SinkI, error)

NewKafka create new kafka sink

func NewPulsar added in v0.2.2

func NewPulsar(s *Sink) (SinkI, error)

NewPulsar create new pulsar sink

func NewStdout added in v0.2.2

func NewStdout(s *Sink) (SinkI, error)

NewStdout create new stdout sink

type Stdout

type Stdout struct {

Stdout representation of sink

func (*Stdout) Start

func (s *Stdout) Start(i ...interface{}) (err error)

Start stdout sink

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL