Documentation ¶
Index ¶
- Variables
- type Broker
- func (b *Broker) GetTopics() map[string]bool
- func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error)
- func (b *Broker) Retry(_ context.Context, in *pb.RetryRequest) (*pb.ACK, error)
- func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sName string) (*Subscriber, error)
- func (b *Broker) Unsubscribe(sub *Subscriber)
- type RetryHandler
- type RetryMapRepository
- type StreamPool
- type Subscriber
- type SubscriberRepository
- type Topic
- type TopicRepository
Constants ¶
This section is empty.
Variables ¶
View Source
var SubscriberRetryHandler = NewRetryHandler()
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct { *TopicRepository SubscriberRepository *SubscriberRepository // contains filtered or unexported fields }
func (*Broker) Unsubscribe ¶
func (b *Broker) Unsubscribe(sub *Subscriber)
type RetryHandler ¶
type RetryHandler struct {
// contains filtered or unexported fields
}
func NewRetryHandler ¶
func NewRetryHandler() *RetryHandler
func (*RetryHandler) CreateRetryQueue ¶
func (*RetryHandler) HandleRetryQueue ¶
func (rh *RetryHandler) HandleRetryQueue(rq chan *proto.Event, eq chan *proto.Event)
func (*RetryHandler) RemoveRetryQueue ¶
func (rh *RetryHandler) RemoveRetryQueue(subId string)
type RetryMapRepository ¶
type RetryMapRepository struct { sync.RWMutex RetryQueues map[string]chan *proto.Event // contains filtered or unexported fields }
func NewRetryMapRepository ¶
func NewRetryMapRepository() *RetryMapRepository
type StreamPool ¶
type StreamPool struct { SubscriberName string Streams []*Subscriber Ch *chan *proto.Event sync.Mutex // contains filtered or unexported fields }
func (*StreamPool) AddSubscriber ¶
func (p *StreamPool) AddSubscriber(s *Subscriber)
Make sure that every subscriber has added to the same stream pool
func (*StreamPool) Delete ¶
func (p *StreamPool) Delete()
type Subscriber ¶
type Subscriber struct { Id string Name string EventChannel chan *proto.Event RetryQueue chan *proto.Event TopicName string Ctx context.Context // contains filtered or unexported fields }
func NewSubscriber ¶
func (*Subscriber) HandleBulkEvent ¶
func (s *Subscriber) HandleBulkEvent(stream *proto.Mercurius_SubscribeServer) error
type SubscriberRepository ¶
type SubscriberRepository struct { StreamPools *sync.Map // contains filtered or unexported fields }
func NewSubscriberRepository ¶
func NewSubscriberRepository() *SubscriberRepository
func (*SubscriberRepository) Unsubscribe ¶
func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error
type Topic ¶
type Topic struct { sync.RWMutex Name string SubscriberRepository *SubscriberRepository EventChan chan *proto.Event // contains filtered or unexported fields }
func (*Topic) AddSubscriber ¶
func (*Topic) PublishEvent ¶
type TopicRepository ¶
func NewTopicRepository ¶
func NewTopicRepository() *TopicRepository
func (*TopicRepository) CreateTopic ¶
func (r *TopicRepository) CreateTopic(name string) (*Topic, error)
func (*TopicRepository) Unsubscribe ¶
func (r *TopicRepository) Unsubscribe(subscriber *Subscriber)
Click to show internal directories.
Click to hide internal directories.