pubsub

package
v0.0.0-...-85cba55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2021 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	SignalsServiceTopic       = "signals"
	SignalsRemoveSignalAction = "remove-signal"
	SignalsCreateSignalAction = "create-signal"
	SignalsStatusResolved     = "resolved"
	SignalsStatusRejected     = "rejected"

	RestAPIServiceTopic              = "restapi"
	RestAPIServiceUpdateSignalStatus = "update-signal-status"

	NotificationServiceTopic                   = "notification"
	NotificationServiceTrendSignalNotification = "trend-signal-notification"

	TickerServiceTopic       = "ticker"
	TickerServiceTickTopic   = "ticker.tick"
	TickerServiceSubscribe   = "subscribe"
	TickerServiceUnsubscribe = "unsubscribe"
)

Variables

View Source
var Publish = func(topicName string, msg *Message) error {
	Debug("[pubsub.publish]: topic=%s, %s", topicName, msg.JSON())

	ctx := context.Background()
	topic, err := openTopic(ctx, topicName)
	if err != nil {
		return fmt.Errorf("cannot open topic %q: %w", topicName, err)
	}

	message := pubsub.Message{Body: msg.JSON()}

	if err := topic.Send(ctx, &message); err != nil {
		return fmt.Errorf("cannot publish topic %q: %w", topicName, err)
	}
	return nil
}
View Source
var Subscribe = func(topic string, cb Callback) (context.CancelFunc, error) {
	Debug("[pubsub.subscribe]: topic=%s", topic)

	ctx := context.Background()
	sub, err := openSubscription(ctx, topic)
	if err != nil {
		return nil, fmt.Errorf("cannot subscribe pubsub.%s: %w", topic, err)
	}

	go func() {
		for {
			msg, err := sub.Receive(ctx)
			if err != nil {
				Errorf("Receiving message: %v", err)
				break
			}

			msg.Ack()
			processMessage(msg, cb)
		}
	}()
	return func() { _ = sub.Shutdown(ctx) }, nil
}

Functions

func Debug

func Debug(format string, a ...interface{})

func Errorf

func Errorf(format string, a ...interface{})

func Info

func Info(format string, a ...interface{})

func Init

func Init(conf *config.SystemConfig) error

func Warn

func Warn(format string, a ...interface{})

Types

type Callback

type Callback func(*Message) error

type Message

type Message struct {
	// internal protocol
	UUID       string
	ID         uint // TODO: migrate on UUID (at the moment it's easier to trace)
	NextAction string
	Price      float64
	Status     string
	Error      string
	Body       []byte

	// external protocol
	Accuracy     float64
	CurrencyPair string
	Email        string
	Exchange     string
	TrendPrice   float64
}

func (*Message) JSON

func (m *Message) JSON() []byte

func (*Message) Render

func (m *Message) Render(content string) string

type PubsubTestSuite

type PubsubTestSuite struct {
	suite.Suite
	// contains filtered or unexported fields
}

func (*PubsubTestSuite) GetPublication

func (s *PubsubTestSuite) GetPublication(topic, action string) *Message

func (*PubsubTestSuite) GetPublications

func (s *PubsubTestSuite) GetPublications(topic string) []Message

func (*PubsubTestSuite) GetSubscription

func (s *PubsubTestSuite) GetSubscription(topic string) Callback

func (*PubsubTestSuite) MockPubsub

func (s *PubsubTestSuite) MockPubsub()

func (*PubsubTestSuite) ResetPubsub

func (s *PubsubTestSuite) ResetPubsub()

func (*PubsubTestSuite) ResetPubsubPublications

func (s *PubsubTestSuite) ResetPubsubPublications()

func (*PubsubTestSuite) ResetPubsubSubscriptions

func (s *PubsubTestSuite) ResetPubsubSubscriptions()

func (*PubsubTestSuite) SendMessage

func (s *PubsubTestSuite) SendMessage(topic string, msg *Message) error

func (*PubsubTestSuite) SetupTest

func (s *PubsubTestSuite) SetupTest()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL