Documentation
¶
Overview ¶
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Copyright (c) Microsoft Corporation. Licensed under the MIT License.
Index ¶
- Constants
- Variables
- func Must[T any](t T, e error) T
- func WithLogger(logger *slog.Logger) interface{ ... }
- type Application
- type ApplicationOption
- type ApplicationOptions
- type CloudEvent
- type CommandExecutor
- type CommandExecutorOption
- type CommandExecutorOptions
- type CommandHandler
- type CommandInvoker
- type CommandInvokerOption
- type CommandInvokerOptions
- type CommandRequest
- type CommandResponse
- type Custom
- type Data
- type Empty
- type Encoding
- type InvokeOption
- type InvokeOptions
- type JSON
- type Listener
- type Listeners
- type Message
- type MqttClient
- type Option
- type Raw
- type RespondOption
- type RespondOptions
- type SendOption
- type SendOptions
- type TelemetryHandler
- type TelemetryMessage
- type TelemetryReceiver
- type TelemetryReceiverOption
- type TelemetryReceiverOptions
- type TelemetrySender
- type TelemetrySenderOption
- type TelemetrySenderOptions
- type WithConcurrency
- type WithFencingToken
- type WithIdempotent
- type WithManualAck
- type WithMaxClockDrift
- type WithMetadata
- type WithResponseTopicPattern
- type WithResponseTopicPrefix
- type WithResponseTopicSuffix
- type WithRetain
- type WithShareName
- type WithTimeout
- type WithTopicNamespace
- type WithTopicTokenNamespace
- type WithTopicTokens
Constants ¶
const ( DefaultCloudEventSpecVersion = "1.0" DefaultCloudEventType = "ms.aio.telemetry" )
const DefaultTimeout = 10 * time.Second
DefaultTimeout is the timeout applied to Invoke or Send if none is specified.
Variables ¶
var ErrUnsupportedContentType = stderr.New("unsupported content type")
ErrUnsupportedContentType should be returned if the content type is not supported by this encoding.
Functions ¶
func Must ¶ added in v0.4.0
Must ensures an object is created, or panics on error. Used to create global instances, e.g. of an Application state.
func WithLogger ¶
func WithLogger(logger *slog.Logger) interface { Option ApplicationOption CommandExecutorOption CommandInvokerOption TelemetryReceiverOption TelemetrySenderOption }
WithLogger enables logging with the provided slog logger.
Types ¶
type Application ¶ added in v0.4.0
type Application struct {
// contains filtered or unexported fields
}
Application represents shared application state.
func NewApplication ¶ added in v0.4.0
func NewApplication(opt ...ApplicationOption) (*Application, error)
NewApplication creates a new shared application state. Only one of these should be created per application.
func (*Application) GetHLC ¶ added in v0.4.0
func (a *Application) GetHLC() (hlc.HybridLogicalClock, error)
GetHLC syncs the application HLC instance to the current time and returns it.
func (*Application) SetHLC ¶ added in v0.4.0
func (a *Application) SetHLC(val hlc.HybridLogicalClock) error
SetHLC syncs the application HLC instance to the given HLC.
type ApplicationOption ¶ added in v0.4.0
type ApplicationOption interface {
// contains filtered or unexported methods
}
ApplicationOption represents a single application option.
type ApplicationOptions ¶ added in v0.4.0
ApplicationOptions are the resolved application options.
func (*ApplicationOptions) Apply ¶ added in v0.4.0
func (o *ApplicationOptions) Apply( opts []ApplicationOption, rest ...ApplicationOption, )
Apply resolves the provided list of options.
type CloudEvent ¶ added in v0.2.0
type CloudEvent struct { ID string Source *url.URL SpecVersion string Type string DataContentType string DataSchema *url.URL Subject string Time time.Time }
CloudEvent provides an implementation of the CloudEvents 1.0 spec; see: https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md
func CloudEventFromTelemetry ¶ added in v0.4.0
func CloudEventFromTelemetry[T any]( msg *TelemetryMessage[T], ) (*CloudEvent, error)
CloudEventFromTelemetry extracts cloud event data from the given telemetry message. It will return an error if any required properties are missing or any properties do not match the expected schema.
func (*CloudEvent) Attrs ¶ added in v0.2.0
func (ce *CloudEvent) Attrs() []slog.Attr
Attrs returns additional attributes for slog.
type CommandExecutor ¶
CommandExecutor provides the ability to execute a single command.
func NewCommandExecutor ¶
func NewCommandExecutor[Req, Res any]( app *Application, client MqttClient, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopicPattern string, handler CommandHandler[Req, Res], opt ...CommandExecutorOption, ) (ce *CommandExecutor[Req, Res], err error)
NewCommandExecutor creates a new command executor.
func (*CommandExecutor[Req, Res]) Close ¶ added in v0.2.0
func (ce *CommandExecutor[Req, Res]) Close()
Close the command executor to free its resources.
type CommandExecutorOption ¶
type CommandExecutorOption interface {
// contains filtered or unexported methods
}
CommandExecutorOption represents a single command executor option.
type CommandExecutorOptions ¶
type CommandExecutorOptions struct { Idempotent bool Concurrency uint Timeout time.Duration TopicNamespace string TopicTokens map[string]string Logger *slog.Logger }
CommandExecutorOptions are the resolved command executor options.
func (*CommandExecutorOptions) Apply ¶
func (o *CommandExecutorOptions) Apply( opts []CommandExecutorOption, rest ...CommandExecutorOption, )
Apply resolves the provided list of options.
func (*CommandExecutorOptions) ApplyOptions ¶
func (o *CommandExecutorOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type CommandHandler ¶
type CommandHandler[Req any, Res any] = func( context.Context, *CommandRequest[Req], ) (*CommandResponse[Res], error)
CommandHandler is the user-provided implementation of a single command execution. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.
type CommandInvoker ¶
CommandInvoker provides the ability to invoke a single command.
func NewCommandInvoker ¶
func NewCommandInvoker[Req, Res any]( app *Application, client MqttClient, requestEncoding Encoding[Req], responseEncoding Encoding[Res], requestTopicPattern string, opt ...CommandInvokerOption, ) (ci *CommandInvoker[Req, Res], err error)
NewCommandInvoker creates a new command invoker.
func (*CommandInvoker[Req, Res]) Close ¶ added in v0.2.0
func (ci *CommandInvoker[Req, Res]) Close()
Close the command invoker to free its resources.
func (*CommandInvoker[Req, Res]) Invoke ¶
func (ci *CommandInvoker[Req, Res]) Invoke( ctx context.Context, req Req, opt ...InvokeOption, ) (res *CommandResponse[Res], err error)
Invoke calls the command. This call will block until the command returns; any desired parallelism between invocations should be handled by the caller using normal Go constructs.
type CommandInvokerOption ¶
type CommandInvokerOption interface {
// contains filtered or unexported methods
}
CommandInvokerOption represents a single command invoker option.
type CommandInvokerOptions ¶
type CommandInvokerOptions struct { ResponseTopicPattern string ResponseTopicPrefix string ResponseTopicSuffix string TopicNamespace string TopicTokens map[string]string Logger *slog.Logger }
CommandInvokerOptions are the resolved command invoker options.
func (*CommandInvokerOptions) Apply ¶
func (o *CommandInvokerOptions) Apply( opts []CommandInvokerOption, rest ...CommandInvokerOption, )
Apply resolves the provided list of options.
func (*CommandInvokerOptions) ApplyOptions ¶
func (o *CommandInvokerOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type CommandRequest ¶
CommandRequest contains per-message data and methods that are exposed to the command handlers.
type CommandResponse ¶
CommandResponse contains per-message data and methods that are returned by the command handlers.
type Custom ¶ added in v0.4.0
type Custom struct{}
Custom represents data that is externally serialized into a byte stream via custom code.
func (Custom) Deserialize ¶ added in v0.4.0
func (Custom) Deserialize(data *Data) (Data, error)
Deserialize returns the data unchanged.
type Data ¶ added in v0.4.0
Data represents encoded values along with their transmitted content type.
type Empty ¶
type Empty struct{}
Empty represents an encoding that contains no value.
func (Empty) Deserialize ¶
Deserialize validates that the payload is empty.
type Encoding ¶
Encoding is a translation between a concrete Go type T and encoded data. All methods *must* be thread-safe.
type InvokeOption ¶
type InvokeOption interface {
// contains filtered or unexported methods
}
InvokeOption represent a single per-invoke option.
type InvokeOptions ¶
type InvokeOptions struct { Timeout time.Duration TopicTokens map[string]string Metadata map[string]string }
InvokeOptions are the resolved per-invoke options.
type JSON ¶
type JSON[T any] struct{}
JSON is a simple implementation of a JSON encoding.
func (JSON[T]) Deserialize ¶
func (JSON[T]) Deserialize(data *Data) (T, error)
Deserialize translates JSON bytes into the Go type T.
type Listeners ¶ added in v0.2.0
type Listeners []Listener
Listeners represents a collection of MQTT listeners.
type Message ¶
type Message[T any] struct { // The message payload. Payload T // The ID of the calling MQTT client. ClientID string // The data that identifies a single unique request. CorrelationData string // The timestamp of when the message was sent. Timestamp hlc.HybridLogicalClock // All topic tokens resolved from the incoming topic. TopicTokens map[string]string // Any user-provided metadata values. Metadata map[string]string // The raw payload data. *Data }
Message contains common message data that is exposed to message handlers.
type MqttClient ¶ added in v0.2.0
type MqttClient interface { ID() string Publish( context.Context, string, []byte, ...mqtt.PublishOption, ) (*mqtt.Ack, error) RegisterMessageHandler(mqtt.MessageHandler) func() Subscribe( context.Context, string, ...mqtt.SubscribeOption, ) (*mqtt.Ack, error) Unsubscribe( context.Context, string, ...mqtt.UnsubscribeOption, ) (*mqtt.Ack, error) }
MqttClient is the client used for the underlying MQTT connection.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option represents any of the option types, and can be filtered and applied by the ApplyOptions methods on the option structs.
type Raw ¶ added in v0.1.1
type Raw struct{}
Raw represents a raw byte stream.
func (Raw) Deserialize ¶ added in v0.1.1
Deserialize returns the bytes unchanged.
type RespondOption ¶
type RespondOption interface {
// contains filtered or unexported methods
}
RespondOption represent a single per-response option.
type RespondOptions ¶
RespondOptions are the resolved per-response options.
type SendOption ¶
type SendOption interface {
// contains filtered or unexported methods
}
SendOption represent a single per-send option.
func WithCloudEvent ¶ added in v0.2.0
func WithCloudEvent(ce *CloudEvent) SendOption
WithCloudEvent adds a cloud event payload to the telemetry message.
type SendOptions ¶
type SendOptions struct { CloudEvent *CloudEvent Retain bool Timeout time.Duration TopicTokens map[string]string Metadata map[string]string }
SendOptions are the resolved per-send options.
type TelemetryHandler ¶
TelemetryHandler is the user-provided implementation of a single telemetry event handler. It is treated as blocking; all parallelism is handled by the library. This *must* be thread-safe.
type TelemetryMessage ¶
type TelemetryMessage[T any] struct { Message[T] // Ack provides a function to manually ack if enabled and if possible; // it will be nil otherwise. Note that, since QoS 0 messages cannot be // acked, this will be nil in this case even if manual ack is enabled. Ack func() }
TelemetryMessage contains per-message data and methods that are exposed to the telemetry handlers.
type TelemetryReceiver ¶
type TelemetryReceiver[T any] struct { // contains filtered or unexported fields }
TelemetryReceiver provides the ability to handle the receipt of a single telemetry.
func NewTelemetryReceiver ¶
func NewTelemetryReceiver[T any]( app *Application, client MqttClient, encoding Encoding[T], topicPattern string, handler TelemetryHandler[T], opt ...TelemetryReceiverOption, ) (tr *TelemetryReceiver[T], err error)
NewTelemetryReceiver creates a new telemetry receiver.
func (*TelemetryReceiver[T]) Close ¶ added in v0.2.0
func (tr *TelemetryReceiver[T]) Close()
Close the telemetry receiver to free its resources.
type TelemetryReceiverOption ¶
type TelemetryReceiverOption interface {
// contains filtered or unexported methods
}
TelemetryReceiverOption represents a single telemetry receiver option.
type TelemetryReceiverOptions ¶
type TelemetryReceiverOptions struct { ManualAck bool Concurrency uint Timeout time.Duration TopicNamespace string TopicTokens map[string]string Logger *slog.Logger }
TelemetryReceiverOptions are the resolved telemetry receiver options.
func (*TelemetryReceiverOptions) Apply ¶
func (o *TelemetryReceiverOptions) Apply( opts []TelemetryReceiverOption, rest ...TelemetryReceiverOption, )
Apply resolves the provided list of options.
func (*TelemetryReceiverOptions) ApplyOptions ¶
func (o *TelemetryReceiverOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type TelemetrySender ¶
type TelemetrySender[T any] struct { // contains filtered or unexported fields }
TelemetrySender provides the ability to send a single telemetry.
func NewTelemetrySender ¶
func NewTelemetrySender[T any]( app *Application, client MqttClient, encoding Encoding[T], topicPattern string, opt ...TelemetrySenderOption, ) (ts *TelemetrySender[T], err error)
NewTelemetrySender creates a new telemetry sender.
type TelemetrySenderOption ¶
type TelemetrySenderOption interface {
// contains filtered or unexported methods
}
TelemetrySenderOption represents a single telemetry sender option.
type TelemetrySenderOptions ¶
type TelemetrySenderOptions struct { TopicNamespace string TopicTokens map[string]string Logger *slog.Logger }
TelemetrySenderOptions are the resolved telemetry sender options.
func (*TelemetrySenderOptions) Apply ¶
func (o *TelemetrySenderOptions) Apply( opts []TelemetrySenderOption, rest ...TelemetrySenderOption, )
Apply resolves the provided list of options.
func (*TelemetrySenderOptions) ApplyOptions ¶
func (o *TelemetrySenderOptions) ApplyOptions(opts []Option, rest ...Option)
ApplyOptions filters and resolves the provided list of options.
type WithConcurrency ¶
type WithConcurrency uint
WithConcurrency indicates how many handlers can execute in parallel.
type WithFencingToken ¶
type WithFencingToken hlc.HybridLogicalClock
WithFencingToken provides a fencing token to be used by the executor.
type WithManualAck ¶
type WithManualAck bool
WithManualAck indicates that the handler is responsible for manually acking the telemetry message.
type WithMaxClockDrift ¶ added in v0.4.0
WithMaxClockDrift specifies how long HLCs are allowed to drift from the wall clock before they are considered no longer valid.
type WithMetadata ¶
WithMetadata specifies user-provided metadata values.
type WithResponseTopicPattern ¶ added in v0.4.0
type WithResponseTopicPattern string
WithResponseTopicPattern specifies a custom response topic pattern. Note that this overrides any provided response topic prefix or suffix.
type WithResponseTopicPrefix ¶
type WithResponseTopicPrefix string
WithResponseTopicPrefix specifies a custom prefix for the response topic. If no response topic options are specified, this will default to a value of "clients/<MQTT client ID>".
type WithResponseTopicSuffix ¶
type WithResponseTopicSuffix string
WithResponseTopicSuffix specifies a custom suffix for the response topic.
type WithRetain ¶
type WithRetain bool
WithRetain indicates that the telemetry event should be retained by the broker.
type WithShareName ¶
type WithShareName string
WithShareName connects this listener to a shared MQTT subscription.
type WithTimeout ¶ added in v0.2.0
WithTimeout applies a context timeout to the message invocation or handler execution, as appropriate.
type WithTopicNamespace ¶
type WithTopicNamespace string
WithTopicNamespace specifies a namespace that will be prepended to the topic.
type WithTopicTokenNamespace ¶
type WithTopicTokenNamespace string
WithTopicTokenNamespace specifies a namespace that will be prepended to all previously-specified topic tokens. Topic tokens specified after this option will not be namespaced, allowing this to differentiate user tokens from system tokens.
type WithTopicTokens ¶
WithTopicTokens specifies topic token values.