subscribers

package
v0.0.0-...-0f8884b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Kafka011Subscriber

type Kafka011Subscriber struct {
	SubscriberBase
	// contains filtered or unexported fields
}

func NewKafka011Subscriber

func NewKafka011Subscriber(config SubscriberKafka011Config) *Kafka011Subscriber

func (Kafka011Subscriber) GetConfigHash

func (s Kafka011Subscriber) GetConfigHash() []byte

func (Kafka011Subscriber) GetForceUpdate

func (s Kafka011Subscriber) GetForceUpdate() bool

func (Kafka011Subscriber) Run

func (s Kafka011Subscriber) Run()

type Map

type Map map[string]*Subscriber

type MqConfig

type MqConfig struct {
	Topics     []string `json:"topics"`
	Servers    []string `json:"servers"`
	ConsumerId string   `json:"consumerGroup"`
}

type RocketMqHttpSubscriber

type RocketMqHttpSubscriber struct {
	SubscriberBase
	// contains filtered or unexported fields
}

func (RocketMqHttpSubscriber) GetConfigHash

func (s RocketMqHttpSubscriber) GetConfigHash() []byte

func (RocketMqHttpSubscriber) GetForceUpdate

func (s RocketMqHttpSubscriber) GetForceUpdate() bool

func (RocketMqHttpSubscriber) Run

func (s RocketMqHttpSubscriber) Run()

type RocketMqTcpSubscriber

type RocketMqTcpSubscriber struct {
}

type Subscriber

type Subscriber interface {
	//订阅名称
	GetName() string
	//订阅数据库名称
	GetConsumerName() string
	//监听数据表列表
	GetTables() map[string]int
	//数据表是否订阅
	TableIn(table string) bool
	//消费dts数据包
	Consume(data consumers.DtsData)
	//启动线程
	Run()
	//退出清理
	Shutdown()
	//更新配置是否重启服务
	GetForceUpdate() bool
	//
	GetConfigHash() []byte
}

type SubscriberBase

type SubscriberBase struct {
	//订阅名称
	Name string
	//数据源名称
	Consumer string
	//消费队列
	Queue chan consumers.DtsData
	//结束
	Quit int

	//
	Tables map[string]int
	//
	ForceUpdate bool
	//
	ConfigHash []byte
	// contains filtered or unexported fields
}

func (*SubscriberBase) Consume

func (sb *SubscriberBase) Consume(data consumers.DtsData)

func (*SubscriberBase) GetConsumerName

func (sb *SubscriberBase) GetConsumerName() string

func (*SubscriberBase) GetName

func (sb *SubscriberBase) GetName() string

func (*SubscriberBase) GetTables

func (sb *SubscriberBase) GetTables() map[string]int

func (*SubscriberBase) SetTables

func (sb *SubscriberBase) SetTables(tables []string)

func (*SubscriberBase) Shutdown

func (sb *SubscriberBase) Shutdown()

func (*SubscriberBase) TableIn

func (sb *SubscriberBase) TableIn(table string) bool

func (*SubscriberBase) WaitExit

func (sb *SubscriberBase) WaitExit()

type SubscriberConfig

type SubscriberConfig struct {
	Name          string            `json:"name"`
	Consumer      string            `json:"consumer"`
	Type          int               `json:"type"`
	Tables        []string          `json:"tables"`
	ForceUpdate   bool              `json:"force_update"`
	TableHashRule map[string]string `json:"table_hash_rule"`
}

func (*SubscriberConfig) Check

func (c *SubscriberConfig) Check() bool

func (*SubscriberConfig) Hash

func (c *SubscriberConfig) Hash() []byte

type SubscriberGroup

type SubscriberGroup struct {
	// contains filtered or unexported fields
}

func NewSubscriberGroup

func NewSubscriberGroup(queue <-chan consumers.DtsData, client *clientv3.Client) (sg *SubscriberGroup)

func (*SubscriberGroup) Dispatch

func (s *SubscriberGroup) Dispatch()

func (*SubscriberGroup) GetSubscriber

func (s *SubscriberGroup) GetSubscriber(consumer, name string) *Subscriber

func (*SubscriberGroup) LoadConfig

func (s *SubscriberGroup) LoadConfig()

func (*SubscriberGroup) Register

func (s *SubscriberGroup) Register(subscriber Subscriber)

func (*SubscriberGroup) Shutdown

func (s *SubscriberGroup) Shutdown()

func (*SubscriberGroup) SyncConfig

func (s *SubscriberGroup) SyncConfig(value []byte)

func (*SubscriberGroup) Watch

func (s *SubscriberGroup) Watch()

type SubscriberKafka011Config

type SubscriberKafka011Config struct {
	SubscriberConfig
	Servers    []string `json:"servers"`
	Topics     []string `json:"topics"`
	ConsumerId string   `json:"consumer_group"`
}

type 3

type SubscriberRocketMqHttpConfig

type SubscriberRocketMqHttpConfig struct {
	SubscriberConfig
	Endpoint   string `json:"endpoint"`
	AccessKey  string `json:"access_key"`
	SecretKey  string `json:"secret_key"`
	Topic      string `json:"topic"`
	InstanceId string `json:"instance_id"`
	GroupId    string `json:"group_id"`
}

type 1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL