broker

package
v0.0.0-...-e56df46 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2024 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var SubscriberRetryHandler = NewRetryHandler()

Functions

This section is empty.

Types

type Broker

type Broker struct {
	*TopicRepository
	SubscriberRepository *SubscriberRepository
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker() *Broker

func (*Broker) GetTopics

func (b *Broker) GetTopics() map[string]bool

func (*Broker) Publish

func (b *Broker) Publish(event *pb.Event) (*pb.ACK, error)

func (*Broker) Retry

func (b *Broker) Retry(_ context.Context, in *pb.RetryRequest) (*pb.ACK, error)

func (*Broker) Subscribe

func (b *Broker) Subscribe(ctx context.Context, topicName string, sId string, sName string) (*Subscriber, error)

func (*Broker) Unsubscribe

func (b *Broker) Unsubscribe(sub *Subscriber)

type RetryHandler

type RetryHandler struct {
	// contains filtered or unexported fields
}

func NewRetryHandler

func NewRetryHandler() *RetryHandler

func (*RetryHandler) CreateRetryQueue

func (rh *RetryHandler) CreateRetryQueue(sName string, eq chan *proto.Event) chan *proto.Event

func (*RetryHandler) HandleRetryQueue

func (rh *RetryHandler) HandleRetryQueue(rq chan *proto.Event, eq chan *proto.Event)

func (*RetryHandler) RemoveRetryQueue

func (rh *RetryHandler) RemoveRetryQueue(subId string)

type RetryMapRepository

type RetryMapRepository struct {
	sync.RWMutex

	RetryQueues map[string]chan *proto.Event
	// contains filtered or unexported fields
}

func NewRetryMapRepository

func NewRetryMapRepository() *RetryMapRepository

type StreamPool

type StreamPool struct {
	SubscriberName string
	Streams        []*Subscriber
	Ch             *chan *proto.Event

	sync.Mutex
	// contains filtered or unexported fields
}

func (*StreamPool) AddSubscriber

func (p *StreamPool) AddSubscriber(s *Subscriber)

Make sure that every subscriber has added to the same stream pool

func (*StreamPool) Delete

func (p *StreamPool) Delete()

type Subscriber

type Subscriber struct {
	Id           string
	Name         string
	EventChannel chan *proto.Event
	RetryQueue   chan *proto.Event
	TopicName    string
	Ctx          context.Context
	// contains filtered or unexported fields
}

func NewSubscriber

func NewSubscriber(ctx context.Context, sId string, sName string, topicName string) *Subscriber

func (*Subscriber) HandleBulkEvent

func (s *Subscriber) HandleBulkEvent(stream *proto.Mercurius_SubscribeServer) error

type SubscriberRepository

type SubscriberRepository struct {
	StreamPools *sync.Map
	// contains filtered or unexported fields
}

func NewSubscriberRepository

func NewSubscriberRepository() *SubscriberRepository

func (*SubscriberRepository) Unsubscribe

func (r *SubscriberRepository) Unsubscribe(subscriber *Subscriber) error

type Topic

type Topic struct {
	sync.RWMutex

	Name                 string
	SubscriberRepository *SubscriberRepository
	EventChan            chan *proto.Event
	// contains filtered or unexported fields
}

func (*Topic) AddSubscriber

func (t *Topic) AddSubscriber(ctx context.Context, id string, name string) (*Subscriber, error)

func (*Topic) PublishEvent

func (t *Topic) PublishEvent(event *proto.Event)

type TopicRepository

type TopicRepository struct {
	Topics *sync.Map
	// contains filtered or unexported fields
}

func NewTopicRepository

func NewTopicRepository() *TopicRepository

func (*TopicRepository) CreateTopic

func (r *TopicRepository) CreateTopic(name string) (*Topic, error)

func (*TopicRepository) GetTopic

func (r *TopicRepository) GetTopic(name string) (*Topic, error)

func (*TopicRepository) Unsubscribe

func (r *TopicRepository) Unsubscribe(subscriber *Subscriber)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL