messaging

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: Apache-2.0 Imports: 5 Imported by: 3

README

Messaging

messaging package defines Publisher, Subscriber and an aggregate Pubsub interface.

Subscriber interface defines methods used to subscribe to a message broker such as MQTT or NATS or RabbitMQ.

Publisher interface defines methods used to publish messages to a message broker such as MQTT or NATS or RabbitMQ.

Pubsub interface is composed of Publisher and Subscriber interface and can be used to send messages to as well as to receive messages from a message broker.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var File_pkg_messaging_message_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type DeliveryPolicy

type DeliveryPolicy uint8
const (
	// DeliverNewPolicy will only deliver new messages that are sent after the consumer is created.
	// This is the default policy.
	DeliverNewPolicy DeliveryPolicy = iota

	// DeliverAllPolicy starts delivering messages from the very beginning of a stream.
	DeliverAllPolicy
)

type Message

type Message struct {
	Channel   string `protobuf:"bytes,1,opt,name=channel,proto3" json:"channel,omitempty"`
	Subtopic  string `protobuf:"bytes,2,opt,name=subtopic,proto3" json:"subtopic,omitempty"`
	Publisher string `protobuf:"bytes,3,opt,name=publisher,proto3" json:"publisher,omitempty"`
	Protocol  string `protobuf:"bytes,4,opt,name=protocol,proto3" json:"protocol,omitempty"`
	Payload   []byte `protobuf:"bytes,5,opt,name=payload,proto3" json:"payload,omitempty"`
	Created   int64  `protobuf:"varint,6,opt,name=created,proto3" json:"created,omitempty"` // Unix timestamp in nanoseconds
	// contains filtered or unexported fields
}

Message represents a message emitted by the Magistrala adapters layer.

func (*Message) Descriptor deprecated

func (*Message) Descriptor() ([]byte, []int)

Deprecated: Use Message.ProtoReflect.Descriptor instead.

func (*Message) GetChannel

func (x *Message) GetChannel() string

func (*Message) GetCreated

func (x *Message) GetCreated() int64

func (*Message) GetPayload

func (x *Message) GetPayload() []byte

func (*Message) GetProtocol

func (x *Message) GetProtocol() string

func (*Message) GetPublisher

func (x *Message) GetPublisher() string

func (*Message) GetSubtopic

func (x *Message) GetSubtopic() string

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) ProtoReflect

func (x *Message) ProtoReflect() protoreflect.Message

func (*Message) Reset

func (x *Message) Reset()

func (*Message) String

func (x *Message) String() string

type MessageHandler

type MessageHandler interface {
	// Handle handles messages passed by underlying implementation.
	Handle(msg *Message) error

	// Cancel is used for cleanup during unsubscribing and it's optional.
	Cancel() error
}

MessageHandler represents Message handler for Subscriber.

type Option

type Option func(vals interface{}) error

Option represents optional configuration for message broker.

This is used to provide optional configuration parameters to the underlying publisher and pubsub implementation so that it can be configured to meet the specific needs.

For example, it can be used to set the message prefix so that brokers can be used for event sourcing as well as internal message broker. Using value of type interface is not recommended but is the most suitable for this use case as options should be compiled with respect to the underlying broker which can either be RabbitMQ or NATS.

The example below shows how to set the prefix and jetstream stream for NATS.

Example:

broker.NewPublisher(ctx, url, broker.Prefix(eventsPrefix), broker.JSStream(js))

type PubSub

type PubSub interface {
	Publisher
	Subscriber
}

PubSub represents aggregation interface for publisher and subscriber.

type Publisher

type Publisher interface {
	// Publishes message to the stream.
	Publish(ctx context.Context, topic string, msg *Message) error

	// Close gracefully closes message publisher's connection.
	Close() error
}

Publisher specifies message publishing API.

type Subscriber

type Subscriber interface {
	// Subscribe subscribes to the message stream and consumes messages.
	Subscribe(ctx context.Context, cfg SubscriberConfig) error

	// Unsubscribe unsubscribes from the message stream and
	// stops consuming messages.
	Unsubscribe(ctx context.Context, id, topic string) error

	// Close gracefully closes message subscriber's connection.
	Close() error
}

Subscriber specifies message subscription API.

type SubscriberConfig

type SubscriberConfig struct {
	ID             string
	Topic          string
	Handler        MessageHandler
	DeliveryPolicy DeliveryPolicy
}

Directories

Path Synopsis
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the Magistrala IoT platform.
Package mqtt hold the implementation of the Publisher and PubSub interfaces for the MQTT messaging system, the internal messaging broker of the Magistrala IoT platform.
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the Magistrala IoT platform.
Package nats hold the implementation of the Publisher and PubSub interfaces for the NATS messaging system, the internal messaging broker of the Magistrala IoT platform.
tracing
Package tracing provides tracing instrumentation for Magistrala things policies service.
Package tracing provides tracing instrumentation for Magistrala things policies service.
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the Magistrala IoT platform.
Package rabbitmq holds the implementation of the Publisher and PubSub interfaces for the RabbitMQ messaging system, the internal messaging broker of the Magistrala IoT platform.
tracing
Package tracing provides tracing instrumentation for Magistrala things policies service.
Package tracing provides tracing instrumentation for Magistrala things policies service.
Package tracing provides tracing instrumentation for Magistrala things policies service.
Package tracing provides tracing instrumentation for Magistrala things policies service.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL