Documentation ¶
Index ¶
- type Distributor
- func (d *Distributor) FindSubscribers(eventType string) []int64
- func (d *Distributor) FindSubscription(subid int64) *metadata.Subscription
- func (d *Distributor) ListSubscriptionIDs() []int64
- func (d *Distributor) LoadSubscriptions() error
- func (d *Distributor) Start(eventHandler *EventHandler) error
- type EventHandler
- type EventPusher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Distributor ¶
type Distributor struct {
// contains filtered or unexported fields
}
Distributor is event subscription distributer.
func NewDistributer ¶
func NewDistributer(ctx context.Context, engine *backbone.Engine, db dal.RDB, cache *redis.Client, subWatcher reflector.Interface) *Distributor
NewDistributer creates a new Distributor instance.
func (*Distributor) FindSubscribers ¶
func (d *Distributor) FindSubscribers(eventType string) []int64
FindSubscribers returns all subscribers on event type.
func (*Distributor) FindSubscription ¶
func (d *Distributor) FindSubscription(subid int64) *metadata.Subscription
FindSubscription return target subscription base on subid.
func (*Distributor) ListSubscriptionIDs ¶
func (d *Distributor) ListSubscriptionIDs() []int64
ListSubscriptionIDs return all subscription ids.
func (*Distributor) LoadSubscriptions ¶
func (d *Distributor) LoadSubscriptions() error
LoadSubscriptions loads all subscriptions in cc.
func (*Distributor) Start ¶
func (d *Distributor) Start(eventHandler *EventHandler) error
Start starts the Distributor, it would load all subscriptions in listwatch mode, and handle runtime subscription update messages, push event to subscribers when tatget event happend.
type EventHandler ¶
type EventHandler struct {
// contains filtered or unexported fields
}
EventHandler manages all event pushers, and update pushers in dynamic mode, when there are events need to be sent, the pusher would check master state and send message to subscriber in callback or not.
func NewEventHandler ¶
func NewEventHandler(ctx context.Context, engine *backbone.Engine, cache *redis.Client) *EventHandler
NewEventHandler creates new EventHandler object.
func (*EventHandler) Handle ¶
func (h *EventHandler) Handle(nodes []*watch.ChainNode, eventDetailStrs []string, opts *watch.WatchEventOptions) error
Handle handles events distributed by distributer, add events to real handle queue to handle host identifier infos. Handler would find all relate subscribers and send event message to target subscribers by callback.
func (*EventHandler) SetDistributer ¶
func (h *EventHandler) SetDistributer(distributer *Distributor)
SetDistributer setups distributer to event handler.
func (*EventHandler) Start ¶
func (h *EventHandler) Start() error
Start starts event handler and keep processing event from distributer.
type EventPusher ¶
type EventPusher struct {
// contains filtered or unexported fields
}
EventPusher sends target events to subscribers in callback mode.
func NewEventPusher ¶
func NewEventPusher(ctx context.Context, engine *backbone.Engine, subid int64, cache *redis.Client, distributer *Distributor, pusherHandleTotal *prometheus.CounterVec, pusherHandleDuration *prometheus.HistogramVec) *EventPusher
NewEventPusher creates a new EventPusher object.
func (*EventPusher) Handle ¶
func (s *EventPusher) Handle(dist *metadata.DistInst) error
Handle add event dist inst to subscriber chan, and pusher would send message to subscriber base on callback.
func (*EventPusher) Run ¶
func (s *EventPusher) Run()
Run setups pusher and keep handling event dist.