protocol

package module
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2025 License: MIT Imports: 25 Imported by: 2

README

protocol

| API Documentation | Samples | Release Notes |

Overview

This module implements a protocol over MQTT which allows for structured data to be sent and received between applications using two patterns:

  • RPC Command - Send requests, process them, and respond.
  • Telemetry - Send and receive telemetry messages.

Simple RPC Request and Response

package main

import (
	"context"
	"fmt"

	"github.com/Azure/iot-operations-sdks/go/mqtt"
	"github.com/Azure/iot-operations-sdks/go/protocol"
)

type (
	Ping struct{ Message string }
	Pong struct{ Message string }
)

const (
	reqTopic = "mqtt/ping"
	resTopic = "mqtt/pong"
)

var (
	pingEncoding = protocol.JSON[Ping]{}
	pongEncoding = protocol.JSON[Pong]{}
)

func main() {
	// Note: Error handling omitted for simplicity.

	ctx := context.Background()
	app, _ := protocol.NewApplication()

	// Create a new session client for client and server (typically these would
	// be separate services; they're shown here together for simplicity).
	//
	// See the documentation of the mqtt module for more details.
	server, _ := mqtt.NewSessionClient(
		"server",
		mqtt.TCPConnection("localhost", 1883),
	)
	client, _ := mqtt.NewSessionClient(
		"client",
		mqtt.TCPConnection("localhost", 1883),
	)

	// Create a new executor to handle the requests.
	executor, _ := protocol.NewCommandExecutor(
		app,
		server,
		pingEncoding,
		pongEncoding,
		reqTopic,
		func(
			ctx context.Context,
			req *protocol.CommandRequest[Ping],
		) (*protocol.CommandResponse[Pong], error) {
			fmt.Printf("Ping received: %s\n", req.Payload.Message)
			return protocol.Respond(Pong{req.Payload.Message})
		},
	)
	defer executor.Close()

	// Create a new invoker to send the requests.
	invoker, _ := protocol.NewCommandInvoker(
		app,
		client,
		pingEncoding,
		pongEncoding,
		reqTopic,
		protocol.WithResponseTopicPattern(resTopic),
	)
	defer invoker.Close()

	// Executors and invokers should be created before calling Start on the MQTT
	// client to ensure they can handle requests from any existing session.
	server.Start()
	defer server.Stop()
	client.Start()
	defer client.Stop()

	// Start listening to requests on the MQTT connection.
	executor.Start(ctx)
	invoker.Start(ctx)

	// Invoke the request.
	res, _ := invoker.Invoke(ctx, Ping{"Hello!"})
	fmt.Printf("Pong received: %s\n", res.Payload.Message)
}

Documentation

Overview

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Copyright (c) Microsoft Corporation. Licensed under the MIT License.

Index

Constants

View Source
const (
	DefaultCloudEventSpecVersion = "1.0"
	DefaultCloudEventType        = "ms.aio.telemetry"
)
View Source
const DefaultTimeout = 10 * time.Second

DefaultTimeout is the timeout applied to Invoke or Send if none is specified.

Variables

View Source
var ErrUnsupportedContentType = stderr.New("unsupported content type")

ErrUnsupportedContentType should be returned if the content type is not supported by this encoding.

Functions

func Must added in v0.4.0

func Must[T any](t T, e error) T

Must ensures an object is created, or panics on error. Used to create global instances, e.g. of an Application state.

func WithLogger

func WithLogger(logger *slog.Logger) interface {
	Option
	ApplicationOption
	CommandExecutorOption
	CommandInvokerOption
	TelemetryReceiverOption
	TelemetrySenderOption
}

WithLogger enables logging with the provided slog logger.

Types

type Application added in v0.4.0

type Application struct {
	// contains filtered or unexported fields
}

Application represents shared application state.

func NewApplication added in v0.4.0

func NewApplication(opt ...ApplicationOption) (*Application, error)

NewApplication creates a new shared application state. Only one of these should be created per application.

func (*Application) GetHLC added in v0.4.0

func (a *Application) GetHLC() (hlc.HybridLogicalClock, error)

GetHLC syncs the application HLC instance to the current time and returns it.

func (*Application) SetHLC added in v0.4.0

func (a *Application) SetHLC(val hlc.HybridLogicalClock) error

SetHLC syncs the application HLC instance to the given HLC.

type ApplicationOption added in v0.4.0

type ApplicationOption interface {
	// contains filtered or unexported methods
}

ApplicationOption represents a single application option.

type ApplicationOptions added in v0.4.0

type ApplicationOptions struct {
	MaxClockDrift time.Duration
	Logger        *slog.Logger
}

ApplicationOptions are the resolved application options.

func (*ApplicationOptions) Apply added in v0.4.0

func (o *ApplicationOptions) Apply(
	opts []ApplicationOption,
	rest ...ApplicationOption,
)

Apply resolves the provided list of options.

type CloudEvent added in v0.2.0

type CloudEvent struct {
	ID          string
	Source      *url.URL
	SpecVersion string
	Type        string

	DataContentType string
	DataSchema      *url.URL
	Subject         string
	Time            time.Time
}

CloudEvent provides an implementation of the CloudEvents 1.0 spec; see: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md

func CloudEventFromTelemetry added in v0.4.0

func CloudEventFromTelemetry[T any](
	msg *TelemetryMessage[T],
) (*CloudEvent, error)

CloudEventFromTelemetry extracts cloud event data from the given telemetry message. It will return an error if any required properties are missing or any properties do not match the expected schema.

func (*CloudEvent) Attrs added in v0.2.0

func (ce *CloudEvent) Attrs() []slog.Attr

Attrs returns additional attributes for slog.

type CommandExecutor

type CommandExecutor[Req any, Res any] struct {
	// contains filtered or unexported fields
}

CommandExecutor provides the ability to execute a single command.

func NewCommandExecutor

func NewCommandExecutor[Req, Res any](
	app *Application,
	client MqttClient,
	requestEncoding Encoding[Req],
	responseEncoding Encoding[Res],
	requestTopicPattern string,
	handler CommandHandler[Req, Res],
	opt ...CommandExecutorOption,
) (ce *CommandExecutor[Req, Res], err error)

NewCommandExecutor creates a new command executor.

func (*CommandExecutor[Req, Res]) Close added in v0.2.0

func (ce *CommandExecutor[Req, Res]) Close()

Close the command executor to free its resources.

func (*CommandExecutor[Req, Res]) Start added in v0.2.0

func (ce *CommandExecutor[Req, Res]) Start(ctx context.Context) error

Start listening to the MQTT request topic.

type CommandExecutorOption

type CommandExecutorOption interface {
	// contains filtered or unexported methods
}

CommandExecutorOption represents a single command executor option.

type CommandExecutorOptions

type CommandExecutorOptions struct {
	Idempotent bool

	Concurrency uint
	Timeout     time.Duration
	ShareName   string

	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

CommandExecutorOptions are the resolved command executor options.

func (*CommandExecutorOptions) Apply

func (o *CommandExecutorOptions) Apply(
	opts []CommandExecutorOption,
	rest ...CommandExecutorOption,
)

Apply resolves the provided list of options.

func (*CommandExecutorOptions) ApplyOptions

func (o *CommandExecutorOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type CommandHandler

type CommandHandler[Req any, Res any] = func(
	context.Context,
	*CommandRequest[Req],
) (*CommandResponse[Res], error)

CommandHandler is the user-provided implementation of a single command execution. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.

type CommandInvoker

type CommandInvoker[Req any, Res any] struct {
	// contains filtered or unexported fields
}

CommandInvoker provides the ability to invoke a single command.

func NewCommandInvoker

func NewCommandInvoker[Req, Res any](
	app *Application,
	client MqttClient,
	requestEncoding Encoding[Req],
	responseEncoding Encoding[Res],
	requestTopicPattern string,
	opt ...CommandInvokerOption,
) (ci *CommandInvoker[Req, Res], err error)

NewCommandInvoker creates a new command invoker.

func (*CommandInvoker[Req, Res]) Close added in v0.2.0

func (ci *CommandInvoker[Req, Res]) Close()

Close the command invoker to free its resources.

func (*CommandInvoker[Req, Res]) Invoke

func (ci *CommandInvoker[Req, Res]) Invoke(
	ctx context.Context,
	req Req,
	opt ...InvokeOption,
) (res *CommandResponse[Res], err error)

Invoke calls the command. This call will block until the command returns; any desired parallelism between invocations should be handled by the caller using normal Go constructs.

func (*CommandInvoker[Req, Res]) Start added in v0.2.0

func (ci *CommandInvoker[Req, Res]) Start(ctx context.Context) error

Start listening to the response topic(s). Must be called before any calls to Invoke.

type CommandInvokerOption

type CommandInvokerOption interface {
	// contains filtered or unexported methods
}

CommandInvokerOption represents a single command invoker option.

type CommandInvokerOptions

type CommandInvokerOptions struct {
	ResponseTopicPattern string
	ResponseTopicPrefix  string
	ResponseTopicSuffix  string

	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

CommandInvokerOptions are the resolved command invoker options.

func (*CommandInvokerOptions) Apply

func (o *CommandInvokerOptions) Apply(
	opts []CommandInvokerOption,
	rest ...CommandInvokerOption,
)

Apply resolves the provided list of options.

func (*CommandInvokerOptions) ApplyOptions

func (o *CommandInvokerOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type CommandRequest

type CommandRequest[Req any] struct {
	Message[Req]
}

CommandRequest contains per-message data and methods that are exposed to the command handlers.

type CommandResponse

type CommandResponse[Res any] struct {
	Message[Res]
}

CommandResponse contains per-message data and methods that are returned by the command handlers.

func Respond

func Respond[Res any](
	payload Res,
	opt ...RespondOption,
) (*CommandResponse[Res], error)

Respond is a shorthand to create a command response with required values and options set appropriately. Note that the response may be incomplete and will be filled out by the library after being returned.

type Custom added in v0.4.0

type Custom struct{}

Custom represents data that is externally serialized into a byte stream via custom code.

func (Custom) Deserialize added in v0.4.0

func (Custom) Deserialize(data *Data) (Data, error)

Deserialize returns the data unchanged.

func (Custom) Serialize added in v0.4.0

func (Custom) Serialize(t Data) (*Data, error)

Serialize returns the data unchanged.

type Data added in v0.4.0

type Data struct {
	Payload       []byte
	ContentType   string
	PayloadFormat byte
}

Data represents encoded values along with their transmitted content type.

type Empty

type Empty struct{}

Empty represents an encoding that contains no value.

func (Empty) Deserialize

func (Empty) Deserialize(data *Data) (any, error)

Deserialize validates that the payload is empty.

func (Empty) Serialize

func (Empty) Serialize(t any) (*Data, error)

Serialize validates that the payload is empty.

type Encoding

type Encoding[T any] interface {
	Serialize(T) (*Data, error)
	Deserialize(*Data) (T, error)
}

Encoding is a translation between a concrete Go type T and encoded data. All methods *must* be thread-safe.

type InvokeOption

type InvokeOption interface {
	// contains filtered or unexported methods
}

InvokeOption represent a single per-invoke option.

type InvokeOptions

type InvokeOptions struct {
	Timeout     time.Duration
	TopicTokens map[string]string
	Metadata    map[string]string
}

InvokeOptions are the resolved per-invoke options.

func (*InvokeOptions) Apply

func (o *InvokeOptions) Apply(
	opts []InvokeOption,
	rest ...InvokeOption,
)

Apply resolves the provided list of options.

type JSON

type JSON[T any] struct{}

JSON is a simple implementation of a JSON encoding.

func (JSON[T]) Deserialize

func (JSON[T]) Deserialize(data *Data) (T, error)

Deserialize translates JSON bytes into the Go type T.

func (JSON[T]) Serialize

func (JSON[T]) Serialize(t T) (*Data, error)

Serialize translates the Go type T into JSON bytes.

type Listener

type Listener interface {
	Start(context.Context) error
	Close()
}

Listener represents an object which will listen to a MQTT topic.

type Listeners added in v0.2.0

type Listeners []Listener

Listeners represents a collection of MQTT listeners.

func (Listeners) Close added in v0.2.0

func (ls Listeners) Close()

Close all underlying MQTT topics and free resources.

func (Listeners) Start added in v0.2.0

func (ls Listeners) Start(ctx context.Context) error

Start listening to all underlying MQTT topics.

type Message

type Message[T any] struct {
	// The message payload.
	Payload T

	// The ID of the calling MQTT client.
	ClientID string

	// The data that identifies a single unique request.
	CorrelationData string

	// The timestamp of when the message was sent.
	Timestamp hlc.HybridLogicalClock

	// All topic tokens resolved from the incoming topic.
	TopicTokens map[string]string

	// Any user-provided metadata values.
	Metadata map[string]string

	// The raw payload data.
	*Data
}

Message contains common message data that is exposed to message handlers.

type MqttClient added in v0.2.0

type MqttClient interface {
	ID() string
	Publish(
		context.Context,
		string,
		[]byte,
		...mqtt.PublishOption,
	) (*mqtt.Ack, error)
	RegisterMessageHandler(mqtt.MessageHandler) func()
	Subscribe(
		context.Context,
		string,
		...mqtt.SubscribeOption,
	) (*mqtt.Ack, error)
	Unsubscribe(
		context.Context,
		string,
		...mqtt.UnsubscribeOption,
	) (*mqtt.Ack, error)
}

MqttClient is the client used for the underlying MQTT connection.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option represents any of the option types, and can be filtered and applied by the ApplyOptions methods on the option structs.

type Raw added in v0.1.1

type Raw struct{}

Raw represents a raw byte stream.

func (Raw) Deserialize added in v0.1.1

func (Raw) Deserialize(data *Data) ([]byte, error)

Deserialize returns the bytes unchanged.

func (Raw) Serialize added in v0.1.1

func (Raw) Serialize(t []byte) (*Data, error)

Serialize returns the bytes unchanged.

type RespondOption

type RespondOption interface {
	// contains filtered or unexported methods
}

RespondOption represent a single per-response option.

type RespondOptions

type RespondOptions struct {
	Metadata map[string]string
}

RespondOptions are the resolved per-response options.

func (*RespondOptions) Apply

func (o *RespondOptions) Apply(
	opts []RespondOption,
	rest ...RespondOption,
)

Apply resolves the provided list of options.

type SendOption

type SendOption interface {
	// contains filtered or unexported methods
}

SendOption represent a single per-send option.

func WithCloudEvent added in v0.2.0

func WithCloudEvent(ce *CloudEvent) SendOption

WithCloudEvent adds a cloud event payload to the telemetry message.

type SendOptions

type SendOptions struct {
	CloudEvent *CloudEvent
	Retain     bool

	Timeout     time.Duration
	TopicTokens map[string]string
	Metadata    map[string]string
}

SendOptions are the resolved per-send options.

func (*SendOptions) Apply

func (o *SendOptions) Apply(
	opts []SendOption,
	rest ...SendOption,
)

Apply resolves the provided list of options.

type TelemetryHandler

type TelemetryHandler[T any] = func(context.Context, *TelemetryMessage[T]) error

TelemetryHandler is the user-provided implementation of a single telemetry event handler. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.

type TelemetryMessage

type TelemetryMessage[T any] struct {
	Message[T]

	// Ack provides a function to manually ack if enabled and if possible;
	// it will be nil otherwise. Note that, since QoS 0 messages cannot be
	// acked, this will be nil in this case even if manual ack is enabled.
	Ack func()
}

TelemetryMessage contains per-message data and methods that are exposed to the telemetry handlers.

type TelemetryReceiver

type TelemetryReceiver[T any] struct {
	// contains filtered or unexported fields
}

TelemetryReceiver provides the ability to handle the receipt of a single telemetry.

func NewTelemetryReceiver

func NewTelemetryReceiver[T any](
	app *Application,
	client MqttClient,
	encoding Encoding[T],
	topicPattern string,
	handler TelemetryHandler[T],
	opt ...TelemetryReceiverOption,
) (tr *TelemetryReceiver[T], err error)

NewTelemetryReceiver creates a new telemetry receiver.

func (*TelemetryReceiver[T]) Close added in v0.2.0

func (tr *TelemetryReceiver[T]) Close()

Close the telemetry receiver to free its resources.

func (*TelemetryReceiver[T]) Start added in v0.2.0

func (tr *TelemetryReceiver[T]) Start(ctx context.Context) error

Start listening to the MQTT telemetry topic.

type TelemetryReceiverOption

type TelemetryReceiverOption interface {
	// contains filtered or unexported methods
}

TelemetryReceiverOption represents a single telemetry receiver option.

type TelemetryReceiverOptions

type TelemetryReceiverOptions struct {
	ManualAck bool

	Concurrency uint
	Timeout     time.Duration
	ShareName   string

	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

TelemetryReceiverOptions are the resolved telemetry receiver options.

func (*TelemetryReceiverOptions) Apply

func (o *TelemetryReceiverOptions) Apply(
	opts []TelemetryReceiverOption,
	rest ...TelemetryReceiverOption,
)

Apply resolves the provided list of options.

func (*TelemetryReceiverOptions) ApplyOptions

func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type TelemetrySender

type TelemetrySender[T any] struct {
	// contains filtered or unexported fields
}

TelemetrySender provides the ability to send a single telemetry.

func NewTelemetrySender

func NewTelemetrySender[T any](
	app *Application,
	client MqttClient,
	encoding Encoding[T],
	topicPattern string,
	opt ...TelemetrySenderOption,
) (ts *TelemetrySender[T], err error)

NewTelemetrySender creates a new telemetry sender.

func (*TelemetrySender[T]) Send

func (ts *TelemetrySender[T]) Send(
	ctx context.Context,
	val T,
	opt ...SendOption,
) (err error)

Send emits the telemetry. This will block until the message is ack'd.

type TelemetrySenderOption

type TelemetrySenderOption interface {
	// contains filtered or unexported methods
}

TelemetrySenderOption represents a single telemetry sender option.

type TelemetrySenderOptions

type TelemetrySenderOptions struct {
	TopicNamespace string
	TopicTokens    map[string]string
	Logger         *slog.Logger
}

TelemetrySenderOptions are the resolved telemetry sender options.

func (*TelemetrySenderOptions) Apply

func (o *TelemetrySenderOptions) Apply(
	opts []TelemetrySenderOption,
	rest ...TelemetrySenderOption,
)

Apply resolves the provided list of options.

func (*TelemetrySenderOptions) ApplyOptions

func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option)

ApplyOptions filters and resolves the provided list of options.

type WithConcurrency

type WithConcurrency uint

WithConcurrency indicates how many handlers can execute in parallel.

type WithFencingToken

type WithFencingToken hlc.HybridLogicalClock

WithFencingToken provides a fencing token to be used by the executor.

type WithIdempotent

type WithIdempotent bool

WithIdempotent marks the command as idempotent.

type WithManualAck

type WithManualAck bool

WithManualAck indicates that the handler is responsible for manually acking the telemetry message.

type WithMaxClockDrift added in v0.4.0

type WithMaxClockDrift time.Duration

WithMaxClockDrift specifies how long HLCs are allowed to drift from the wall clock before they are considered no longer valid.

type WithMetadata

type WithMetadata map[string]string

WithMetadata specifies user-provided metadata values.

type WithResponseTopicPattern added in v0.4.0

type WithResponseTopicPattern string

WithResponseTopicPattern specifies a custom response topic pattern. Note that this overrides any provided response topic prefix or suffix.

type WithResponseTopicPrefix

type WithResponseTopicPrefix string

WithResponseTopicPrefix specifies a custom prefix for the response topic. If no response topic options are specified, this will default to a value of "clients/<MQTT client ID>".

type WithResponseTopicSuffix

type WithResponseTopicSuffix string

WithResponseTopicSuffix specifies a custom suffix for the response topic.

type WithRetain

type WithRetain bool

WithRetain indicates that the telemetry event should be retained by the broker.

type WithShareName

type WithShareName string

WithShareName connects this listener to a shared MQTT subscription.

type WithTimeout added in v0.2.0

type WithTimeout time.Duration

WithTimeout applies a context timeout to the message invocation or handler execution, as appropriate.

type WithTopicNamespace

type WithTopicNamespace string

WithTopicNamespace specifies a namespace that will be prepended to the topic.

type WithTopicTokenNamespace

type WithTopicTokenNamespace string

WithTopicTokenNamespace specifies a namespace that will be prepended to all previously-specified topic tokens. Topic tokens specified after this option will not be namespaced, allowing this to differentiate user tokens from system tokens.

type WithTopicTokens

type WithTopicTokens map[string]string

WithTopicTokens specifies topic token values.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL