messaging

package
v0.0.0-...-43a846b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2025 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package messaging with the 3 flow messages: requests, response and notifications

Package transports with the interface of a client transport connection

Package transports with the 3 flow messages: requests, response and notifications

Package transports with the 3 flow messages: requests, response and notifications

Package messaging with the 3 flow messages: requests, response and notifications

Index

Constants

View Source
const (
	// StatusPending - the request has not yet been delivered
	StatusPending = "pending"
	// StatusRunning - the request is being processed
	StatusRunning = "running"
	// StatusCompleted - the request processing was completed
	StatusCompleted = "completed"
	// StatusFailed - the request processing or delivery failed
	StatusFailed = "failed"
)

Request status provided with the response. this aligns with action status values from WoT spec

View Source
const (
	// WoT http basic protocol without return channel
	ProtocolTypeHTTPBasic = "http-basic"

	// websocket sub-protocol
	ProtocolTypeWSS = "wss"

	// WoT MQTT protocol over WSS
	ProtocolTypeWotMQTTWSS = "mqtt-wss"

	// HiveOT http SSE subprotocol return channel with direct messaging
	ProtocolTypeHiveotSSE = "hiveot-sse"

	// HiveOT message envelope passthrough
	ProtocolTypePassthrough = "passthrough"
)

Supported transport protocol bindings types

View Source
const DefaultRpcTimeout = time.Second * 60 // 60 for testing; 3 seconds
View Source
const MessageTypeNotification = "notification"

MessageTypeNotification identify the message as a notification.

View Source
const MessageTypeRequest = "request"

MessageTypeRequest constant that identify a payload as a request

View Source
const MessageTypeResponse = "response"

MessageTypeResponse identify the message as a response.

View Source
const ThingDirectoryDThingID = "dtw:digitwin:ThingDirectory"

ThingDirectoryDThingID is the Digitwin ThingID of the runtime Directory Service This is duplicated from DirectoryConsumerAPI.go to avoid a compile-time dependency on the runtime. (would be circular). What would be better is to determine this from discovery but this is a lot of work just for cleanliness's sake. Used to update the directory with TD's using this agent.

View Source
const ThingDirectoryUpdateThingMethod = "updateThing"

Variables

View Source
var UnauthorizedError error = unauthorizedError{}

Functions

This section is empty.

Types

type ActionStatus

type ActionStatus struct {

	// ActionID that uniquely identifies the action instance.
	// This can be an identifier or a URL.
	ActionID string `json:"actionID,omitempty"`

	// Error with info when action failed
	Error *ErrorValue `json:"error,omitempty"`

	// Input of action
	Input any `json:"input,omitempty"`

	// Name of action
	Name string `json:"name,omitempty"`

	// Output with Action output
	Output any `json:"output,omitempty"`

	// SenderID of the client requesting the action
	SenderID string `json:"senderID,omitempty"`

	// State of action with progress: pending, running, completed, failed
	State string `json:"state,omitempty"`

	// ThingID digital-twin ThingID the action applies to
	ThingID string `json:"thingID,omitempty"`

	// Requested time the action request was received
	TimeRequested string `json:"timeRequested,omitempty"`

	// Updated time the action status was last updated
	TimeUpdated string `json:"timeUpdated,omitempty"`
}

ActionStatus is used for tracking the status of an action. NOTE: keep this in sync with the digital twin ActionStatus in the TD.

type AffordanceType

type AffordanceType string
const AffordanceTypeAction AffordanceType = "action"
const AffordanceTypeEvent AffordanceType = "event"
const AffordanceTypeProperty AffordanceType = "property"

type Agent

type Agent struct {
	*Consumer
	// contains filtered or unexported fields
}

Agent provides the messaging functions needed by hub agents. Agents are also consumers as they are able to invoke services.

Hub agents receive requests and return responses. The underlying transport protocol binding handles subscription.

func NewAgent

func NewAgent(cc IConnection,
	connHandler ConnectionHandler,
	notifHandler NotificationHandler,
	reqHandler RequestHandler,
	respHandler ResponseHandler,
	timeout time.Duration) *Agent

NewAgent creates a new agent instance for serving requests and sending responses. Since agents are also consumers, they can also send requests and receive responses.

Agents can be connected to when running a server or connect to a hub or gateway as client.

This is a wrapper around the ClientConnection that provides WoT response messages publishing properties and events to subscribers and publishing a TD.

func (*Agent) PubActionProgress

func (ag *Agent) PubActionProgress(req RequestMessage, value any) error

PubActionProgress helper for agents to send a 'running' ActionStatus notification

This sends an ActionStatus message with status of running.

func (*Agent) PubEvent

func (ag *Agent) PubEvent(thingID string, name string, value any) error

PubEvent helper for agents to send an event to subscribers.

The underlying transport protocol binding handles the subscription mechanism as the agent itself doesn't track subscriptions.

func (*Agent) PubProperties

func (ag *Agent) PubProperties(thingID string, propMap map[string]any) error

PubProperties helper for agents to publish a map of property values

The underlying transport protocol binding handles the subscription mechanism.

func (*Agent) PubProperty

func (ag *Agent) PubProperty(thingID string, name string, value any) error

PubProperty helper for agents to publish a property value notification to observers.

The underlying transport protocol binding handles the subscription mechanism.

func (*Agent) SendResponse

func (ag *Agent) SendResponse(resp *ResponseMessage) error

SendResponse sends a response for a previous request

func (*Agent) SetRequestHandler

func (ag *Agent) SetRequestHandler(cb RequestHandler)

SetRequestHandler set the application handler for incoming requests

func (*Agent) UpdateThing

func (ag *Agent) UpdateThing(tdoc *td.TD) error

UpdateThing helper for agents to publish an update of a TD in the directory Note that this depends on the runtime directory service.

FIXME: change to use directory forms

type ConnectionHandler

type ConnectionHandler func(connected bool, err error, c IConnection)

ConnectionHandler handles a change in connection status

connected is true when connected without errors
err details why connection failed
c is the connection instance being established or disconnected

type ConnectionInfo

type ConnectionInfo struct {

	// Connection CA
	CaCert *x509.Certificate

	// GetClientID returns the authenticated clientID of this connection
	ClientID string

	// GetConnectionID returns the client's connection ID belonging to this endpoint
	ConnectionID string

	// GetConnectURL returns the full server URL used to establish this connection
	ConnectURL string

	// Connection timeout settings (clients only)
	Timeout time.Duration
}

ConnectionInfo provides details of a connection

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides the messaging functionality for consumers This provides a golang API to consumer operations.

func NewConsumer

func NewConsumer(cc IConnection, rpcTimeout time.Duration) *Consumer

NewConsumer returns a new instance of the WoT consumer for use with the given connection. The connection should not be used by others as this consumer takes possession by registering connection callbacks.

This provides the API for common WoT operations such as invoking actions and supports RPC calls by waiting for a response.

Use SetNotificationHandler to set the callback to receive async notifications. Use SetResponseHandler to set the callback to receive async responses. Use SetConnectHandler to set the callback to be notified of connection changes.

cc the client connection to use for sending requests and receiving responses.
timeout of the rpc connections or 0 for default (3 sec)

func (*Consumer) Disconnect

func (co *Consumer) Disconnect()

Disconnect the client connection. Do not use this consumer after disconnect.

func (*Consumer) GetClientID

func (co *Consumer) GetClientID() string

GetClientID returns the client's account ID

func (*Consumer) GetConnection

func (co *Consumer) GetConnection() IConnection

GetConnection returns the underlying connection of this consumer

func (*Consumer) InvokeAction

func (co *Consumer) InvokeAction(
	dThingID, name string, input any, output any) error

InvokeAction invokes an action on a thing and wait for the response If the response type is known then provide it with output, otherwise use interface{}

func (*Consumer) IsConnected

func (co *Consumer) IsConnected() bool

IsConnected returns true if the consumer has a connection

func (*Consumer) ObserveProperty

func (co *Consumer) ObserveProperty(thingID string, name string) error

ObserveProperty sends a request to observe one or all properties

thingID is empty for all things
name is empty for all properties of the selected things

func (*Consumer) Ping

func (co *Consumer) Ping() error

Ping the server and wait for a pong response This uses the underlying transport native method of ping-pong.

func (*Consumer) QueryAction

func (co *Consumer) QueryAction(thingID, name string) (
	value ActionStatus, err error)

QueryAction obtains the status of an action

Q: http-basic protocol returns an array per action in QueryAllActions but only

a single action in QueryAction. This is inconsistent.

The underlying protocol binding constructs the ActionStatus from the protocol specific messages. The hiveot protocol passes this as-is as the output.

func (*Consumer) QueryAllActions

func (co *Consumer) QueryAllActions(thingID string) (
	values map[string]ActionStatus, err error)

QueryAllActions returns a map of action status for all actions of a thing.

This returns a map of actionName and the last known action status.

Q: http-basic protocol returns an array for each action. What is the use-case?

that can have multiple concurrent actions? An actuator can only move in
one direction at the same time.
Maybe the array only applies to stateless actions?

This depends on the underlying protocol binding to construct appropriate ActionStatus message. All hiveot protocols include full information. WoT bindings might not include update timestamp and such.

func (*Consumer) ReadAllProperties

func (co *Consumer) ReadAllProperties(thingID string) (
	values map[string]ThingValue, err error)

ReadAllProperties sends a request to read all Thing property values.

This depends on the underlying protocol binding to construct appropriate ResponseMessages and include information such as Timestamp. All hiveot protocols include full information. WoT bindings might be more limited.

func (*Consumer) ReadProperty

func (co *Consumer) ReadProperty(thingID, name string) (
	value ThingValue, err error)

ReadProperty sends a request to read a Thing property value.

This depends on the underlying protocol binding to construct appropriate ResponseMessages and include information such as Timestamp. All hiveot protocols include full information. WoT bindings might be too limited.

func (*Consumer) Rpc

func (co *Consumer) Rpc(operation, thingID, name string, input any, output any) error

Rpc sends a request message and waits for a response. This returns an error if the request fails or if the response contains an error

func (*Consumer) SendRequest

func (co *Consumer) SendRequest(req *RequestMessage, waitForCompletion bool) (
	resp *ResponseMessage, err error)

SendRequest sends an operation request and optionally waits for completion or timeout. If waitForCompletion is true and no correlationID is provided then a correlationID will be generated to wait for completion.

If waitForCompletion is false then any response will go to the async response handler and this returns nil response. If waitForCompletion is true this will wait until a response is received with a matching correlationID, or until a timeout occurs.

If the request has no correlation ID, one will be generated.

func (*Consumer) SetConnectHandler

func (co *Consumer) SetConnectHandler(cb ConnectionHandler)

SetConnectHandler sets the notification handler of changes to this consumer connection Intended to notify the client that a reconnect or relogin is needed. Only a single handler is supported. This replaces the previously set callback.

func (*Consumer) SetNotificationHandler

func (co *Consumer) SetNotificationHandler(cb NotificationHandler)

SetNotificationHandler sets the notification handler of events for this consumer Only a single handler is supported. This replaces the previously set callback.

func (*Consumer) SetResponseHandler

func (co *Consumer) SetResponseHandler(cb ResponseHandler)

SetResponseHandler set the handler that receives asynchronous responses Those are response to requests that are not waited for using the baseRnR handler.

func (*Consumer) Subscribe

func (co *Consumer) Subscribe(thingID string, name string) error

Subscribe to one or all events of a thing name is the event to subscribe to or "" for all events

func (*Consumer) UnobserveProperty

func (co *Consumer) UnobserveProperty(thingID string, name string) error

UnobserveProperty a previous observed property or all properties

func (*Consumer) Unsubscribe

func (co *Consumer) Unsubscribe(thingID string, name string) error

Unsubscribe is a helper for sending an unsubscribe request

func (*Consumer) WaitForCompletion

func (co *Consumer) WaitForCompletion(
	rChan chan *ResponseMessage, operation, correlationID string, ignoreDisconnect bool) (
	resp *ResponseMessage, err error)

WaitForCompletion waits for a completed or failed response message on the given correlationID channel, or until N seconds passed, or the connection drops.

If a proper response is received it is written to the given output and nil (no error) is returned. If anything goes wrong, an error is returned

func (*Consumer) WriteProperty

func (co *Consumer) WriteProperty(thingID string, name string, input any, wait bool) error

WriteProperty is a helper to send a write property request

type ErrorValue

type ErrorValue struct {
	// Status code: https://w3c.github.io/wot-profile/#error-responses
	Status int `json:"status"`
	// Type is a URI reference [RFC3986] that identifies the problem type.
	Type string `json:"type"`
	// Title contains a short, human-readable summary of the problem type
	Title string `json:"title"`
	// Detail a human-readable explanation
	Detail string `json:"detail,omitempty"`
}

Error response payload

func ErrorValueFromError

func ErrorValueFromError(err error) *ErrorValue

Create an ErrorValue object from the given error. This returns nil if err is nil.

func (*ErrorValue) AsError

func (e *ErrorValue) AsError() error

AsError returns an error instance or nil if no error is contained

func (*ErrorValue) String

func (e *ErrorValue) String() string

type GetFormHandler

type GetFormHandler func(op string, thingID string, name string) *td.Form

GetFormHandler is the handler that provides the client with the form needed to invoke an operation This returns nil if no form is found for the operation.

type IAuthenticator

type IAuthenticator interface {
	// AddSecurityScheme adds the wot securityscheme to the given TD
	AddSecurityScheme(tdoc *td.TD)

	// CreateSessionToken creates a signed session token for a client and adds the session
	// sessionID is required. For persistent sessions use the clientID.
	CreateSessionToken(clientID, sessionID string, validity time.Duration) (token string)

	// DecodeSessionToken and return its claims
	DecodeSessionToken(sessionToken string, signedNonce string, nonce string) (
		clientID string, sessionID string, err error)

	// GetAlg returns the supported security format and authentication algorithm.
	// This uses the vocabulary as defined in the TD.
	// JWT: "ES256", "ES512", "EdDSA"
	// paseto: "local" (symmetric), "public" (asymmetric)
	GetAlg() (string, string)

	// Login with a password and obtain a new session token with limited duration
	// This creates a new session. The token must be refreshed to keep the session alive.
	Login(login string, password string) (token string, err error)

	// Logout removes the session
	Logout(clientID string)

	// RefreshToken issues a new session token with an updated expiry time.
	// This extends the life of the session.
	//
	//	clientID Client whose token to refresh
	//	oldToken must be valid
	//	validitySec validity in seconds of the new token
	//
	// This returns a new token or an error if the old token isn't valid or doesn't match clientID
	RefreshToken(senderID string, oldToken string) (newToken string, err error)

	// Set the method to
	SetAuthServerURI(authServiceURI string)

	// ValidatePassword checks if the given password is valid for the client
	ValidatePassword(clientID string, password string) (err error)

	// ValidateToken validates the auth token and returns the token clientID.
	// If the token is invalid an error is returned
	ValidateToken(token string) (clientID string, sessionID string, err error)
}

IAuthenticator is the interface of the authentication capability to obtain and validate session tokens.

type IClientConnection

type IClientConnection interface {
	IConnection

	// ConnectWithToken connects to the messaging server using an authentication token.
	//
	// If a connection is already established on this client then it will be closed first.
	//
	// This connection method must be supported by all transport implementations.
	ConnectWithToken(token string) (err error)
}

IClientConnection defines the client interface for establishing connections with a server Intended for consumers to connect to a Thing Agent/Hub and for Service agents that connect to the Hub.

type IConnection

type IConnection interface {

	// Disconnect the client.
	Disconnect()

	// GetConnectionInfo return details of the connection
	GetConnectionInfo() ConnectionInfo

	// IsConnected returns the current connection status
	IsConnected() bool

	// SendNotification [agent] sends a notification to subscribers.
	// This returns an error if the notification could not be delivered
	SendNotification(notif *NotificationMessage) error

	// SendRequest client sends a request to an agent.
	// This returns an error if the request could not be delivered
	SendRequest(req *RequestMessage) error

	// SendResponse [agent] sends a response to a request.
	// This returns an error if the response could not be delivered
	SendResponse(response *ResponseMessage) error

	// SetConnectHandler sets the callback for connection status changes
	// This replaces any previously set handler.
	SetConnectHandler(handler ConnectionHandler)

	// SetNotificationHandler [client] sets the callback for receiving notifications.
	// This replaces any previously set handler.
	SetNotificationHandler(handler NotificationHandler)

	// SetRequestHandler set the handler for receiving requests that return a response.
	// This replaces any previously set handler.
	SetRequestHandler(handler RequestHandler)

	// SetResponseHandler [consumer] sets the callback for receiving unhandled
	// asynchronous responses to requests.
	// If a request is sent with 'sync' set to true then SendRequest will handle
	// the response instead.
	//
	// This replaces any previously set handler.
	SetResponseHandler(handler ResponseHandler)
}

IConnection defines the interface of a server or client connection. Intended for exchanging messages between servients.

type IMessageConverter

type IMessageConverter interface {
	// DecodeNotification converts a protocol message to a hiveot notification message
	// provide the serialized data to avoid multiple unmarshalls
	// This returns nil if this isn't a notification.
	DecodeNotification(raw []byte) *NotificationMessage

	// DecodeRequest converts a protocol message to a hiveot request message
	// provide the serialized data to avoid multiple unmarshalls
	// This returns nil if this isn't a request.
	DecodeRequest(raw []byte) *RequestMessage

	// DecodeResponse converts a protocol message to a hiveot response message.
	// This returns nil if this isn't a response
	DecodeResponse(raw []byte) *ResponseMessage

	// EncodeNotification converts a hiveot NotificationMessage to a native protocol message
	// return an error if the message cannot be converted.
	EncodeNotification(notif *NotificationMessage) (any, error)

	// EncodeRequest converts a hiveot RequestMessage to a native protocol message
	// return an error if the message cannot be converted.
	EncodeRequest(req *RequestMessage) (any, error)

	// EncodeResponse converts a hiveot ResponseMessage to a native protocol message
	// This returns an error response if the message cannot be converted
	EncodeResponse(resp *ResponseMessage) any

	// GetProtocolType provides the protocol type for these messages,
	// eg ProtocolTypeWSS
	GetProtocolType() string
}

IMessageConverter converts between the standardized hiveot request, response and notification messages, and the underlying protocol specific message format.

Its purpose is to assist in decoupling the consumer from the messaging protocol used.

This is used for the WoT websocket protocol, HttpBasic/SSE-SC protocol, MQTT protocol, the native Hiveot transfer and others.

Intended for use by consumers and agents on the client and server side.

type IServerConnection

type IServerConnection interface {
	IConnection
}

IServerConnection is the interface of an incoming client connection on the server. Protocol servers must implement this interface to return information to the consumer.

This provides a return channel for sending messages from the digital twin to agents or consumers.

Subscription to events or properties can be made externally via this API, or handled internally by the protocol handler if the protocol defines the messages for subscription.

type NotificationHandler

type NotificationHandler func(msg *NotificationMessage)

NotificationHandler handles a subscruption notification, send by an agent.

retry sending the response at a later time.

type NotificationMessage

type NotificationMessage struct {

	// CorrelationID of the request this is a response to, if any.
	CorrelationID string `json:"correlationID,omitempty"`

	// MessageID unique ID of the message. Intended to detect duplicates.
	// Generated by the protocol binding.
	MessageID string `json:"messageID,omitempty"`

	// MessageType identifies this message payload as a response
	// This is set to the value of MessageTypeNotification
	MessageType string `json:"messageType"`

	// Name of the action or property affordance this is a response from.
	// This field is optional and intended to help debugging and logging.
	Name string `json:"name,omitempty"`

	// The operation this is a notification of. Eg subscribeevent or observeproperty.
	Operation string `json:"operation"`

	// Authenticated ID of the agent sending the notification, set by the server.
	// The protocol server MUST set this to the authenticated sender.
	SenderID string `json:"senderID"`

	// ThingID of the thing this is a response from.
	// For responses passed to consumers this is the digitwin dThingID
	// For responses sent by agents this is the agent ThingID
	// This field is optional and intended to help debugging and logging.
	ThingID string `json:"thingID,omitempty"`

	// Timestamp the notification was created
	Timestamp string `json:"timestamp,omitempty"`

	// Value containing the notification data as described in the TD event or property dataschema.
	// If the operation is one of the Thing level operations, the output is specified
	// by the operation's dataschema. WoT doesn't have this yet so hiveot will
	// define the missing bits if any. (see documentation)
	Value any `json:"value"` // native

}

NotificationMessage for sending asynchronous notifications to a subscriber/observer

The Data field contains the message response data as defined by the operation Possible operations in notifications:

  • invokeaction action status as per TD
  • observeproperty property value as per TD, when status==running
  • observeallproperties map[name]value (multiple updates)
  • subscribeevent event value as per TD, when status==running

func NewNotificationMessage

func NewNotificationMessage(operation string, thingID, name string, value any) *NotificationMessage

NewNotificationMessage creates a new NotificationMessage instance.

operation is the notification operation
thingID is the thing the value applies to (destination of action or source of event)
name is the name of the property, event or action affordance as described in the thing TD
value is the data as defined in the corresponding affordance dataschema or nil if not applicable

func (*NotificationMessage) ToString

func (notif *NotificationMessage) ToString(maxlen int) string

ToString is a helper to easily read the notification data as a string

type RequestHandler

type RequestHandler func(req *RequestMessage, c IConnection) (response *ResponseMessage)

RequestHandler agent processes a request and returns a response.

req is the envelope that contains the request to process
c is the connection on which the request arrived and on which to send
asynchronous response(s).

type RequestMessage

type RequestMessage struct {

	// CorrelationID of the message. Uniquely identifies the request and must be included
	// in the response.
	// Notifications can include this to correlate with the subscription.
	// Message streams can include this to correlate with the original request.
	// This is optional. If omitted, no response will be received.
	CorrelationID string `json:"correlationID,omitempty"`

	// Created holds the timestamp the request was created in utc
	// This MUST be set by the protocol server if not provided.
	Created string `json:"created"`

	// Input for the request as described in the TD affordance dataschema.
	// If the operation is one of the Thing level operations, the input is specified
	// by the operation's dataschema. WoT doesn't have this yet so hiveot will
	// define the missing bits if any.
	// Note: queryaction,cancelaction carry actionID in the Input
	Input any `json:"input,omitempty"` // native

	// MessageID unique ID of the message. Intended to detect duplicates.
	// Generated by the protocol binding if not provided.
	MessageID string `json:"messageID"`

	// MessageType identifies this message payload as a request.
	// This is required and set to "request".
	MessageType string `json:"messageType"`

	// Name of the event, action or property affordance the request is for.
	// This field is optional and only required for specific operations.
	Name string `json:"name,omitempty"`

	// The operation for this message as defined in TD-1.1 (WotOp...)
	// This identifies the request and is a required field.
	Operation string `json:"operation"`

	// SenderID is the authenticated ID of the client sending the request.
	// The protocol server MUST set this to the authenticated client.
	// Intended for services that link requests to the client, such as the state storage service.
	// This is specific to the Hub as it proxies requests on behalf of clients.
	SenderID string `json:"senderID,omitempty"`

	// ThingID of the thing this request is for.
	// For messages from consumers this is the digitwin dThingID
	// For messages to agents this is the agent ThingID
	// This field is required.
	ThingID string `json:"thingID"`
}

RequestMessage for sending a request for an operation on a Thing or service. Agents/Things MUST send a response when a request is received and a correlationID is included.

The following operations are considered to be requests:

  • invokeaction, cancelaction [WoT]
  • queryaction, queryallactions [WoT]
  • readevent, readallevents (of a Thing) [HiveOT extension]
  • subscribe, unsubscribe [WoT] (handled by protocol bindings)
  • observe, unobserve [WoT] (handled by protocol bindings)
  • readproperty, readallproperties [WoT]
  • readtd, readalltds (of a directory or thing) [HiveOT extension]

func NewRequestMessage

func NewRequestMessage(operation string, thingID, name string, input any, correlationID string) *RequestMessage

NewRequestMessage creates a new RequestMessage instance.

operation is the request operation WoTOp... or HTOp...
thingID is the thing the value applies to (destination of action or source of event)
name is the name of the property, event or action affordance as described in the thing TD
input is the request input as defined in the corresponding affordance dataschema.
correlationID unique ID of the request or empty when no response is expected

func (*RequestMessage) CreateActionResponse

func (req *RequestMessage) CreateActionResponse(
	actionID string, state string, output any, err error) (resp *ResponseMessage, as *ActionStatus)

CreateActionResponse is a helper to easily create an action response. This generates an ActionStatus value with the given actionID, state and output.

Typically used by agents to create a response to an invokeaction request. Output can be nil if the action has no output or state is not yet completed. If there is a transient output it can be used as output with status running.

actionID contains the actionID of the action instance, generated by the device
state contains the action state, one of Status... constants
output contains the action output as defined in the action affordance

func (*RequestMessage) CreateErrorResponse

func (req *RequestMessage) CreateErrorResponse(err error) (errResp *ResponseMessage)

CreateErrorResponse is a helper to easily create an error response from a request. If operation is invokeaction then it generates an ActionStatus response with the given error.

err is set when the request has failed. In that case value can contain error details

func (*RequestMessage) CreateNotification

func (req *RequestMessage) CreateNotification() (notif *NotificationMessage)

CreateNotification is a helper to easily create a status update of a running request.

data contains the payload to include in the notification

func (*RequestMessage) CreateResponse

func (req *RequestMessage) CreateResponse(value any, err error) (resp *ResponseMessage)

CreateResponse is a helper to easily create a response from a request

If operation is invokeaction then it generates an ActionStatus response with the given value as its output and state Completed

value contains the response data
err is set when the request has failed. In that case value can contain error details

func (*RequestMessage) ToObject

func (req *RequestMessage) ToObject(input any) error

ToObject is a helper to easily convert the request input to an object

func (*RequestMessage) ToString

func (req *RequestMessage) ToString(maxlen int) string

ToString is a helper to easily convert the request input to a string maxlen is the maximum string length or 0 for unlimited

type ResponseHandler

type ResponseHandler func(msg *ResponseMessage) error

ResponseHandler handles a response to a request, send by an agent. The handler delivers the response to the client that sent the original request.

This returns an error if the response cannot be delivered. This can be used to retry sending the response at a later time.

type ResponseMessage

type ResponseMessage struct {

	// CorrelationID of the request this is a response to, if any.
	CorrelationID string `json:"correlationID,omitempty"`

	// Error contains the short error description when status is failed.
	// Matches RFC9457 https://www.rfc-editor.org/rfc/rfc9457
	Error *ErrorValue `json:"error"`

	// MessageID unique ID of the message. Intended to detect duplicates.
	// Generated by the protocol binding.
	MessageID string `json:"messageID,omitempty"`

	// MessageType identifies this message payload as a response
	// This is set to the value of MessageTypeResponse
	MessageType string `json:"messageType"`

	// Name of the action or property affordance this is a response from.
	Name string `json:"name"`

	// The operation this is a response to. This MUST be the operation provided in the request.
	Operation string `json:"operation"`

	// Authenticated ID of the agent sending the response, set by the server.
	//
	// This is non-wot and a feature of the hiveot Hub, to allow services to link requests
	// to authenticated users.
	//
	// The Hub protocol server MUST set this to the authenticated sender.
	SenderID string `json:"senderID"`

	// ThingID of the thing this is a response from.
	// For responses passed to consumers this is the digitwin dThingID
	// For responses sent by agents this is the agent ThingID
	// This field is optional and intended to help debugging and logging.
	ThingID string `json:"thingID,omitempty"`

	// Timestamp the response was created
	Timestamp string `json:"timestamp,omitempty"`

	// Value of the response as described in the TD affordance output or value dataschema.
	// If the operation is one of the Thing level operations, the value is specified
	// by the operation's dataschema.
	// In case of actions, the value holds the ActionStatus.
	//
	// Note that different protocol bindings use a different field depending on the operation.
	// The message converter stores the response value(s) into the Value field.
	//
	// If an error is returned then value optionally contains a detailed error description.
	Value any `json:"value"`
}

ResponseMessage serves to notify a client of the result of a request.

The Value field contains the message response data as defined by the operation Action related response output:

  • invokeaction action output as per TD, when status==completed
  • queryaction []ActionStatus object array
  • queryallactions map [name][]ActionStatus objects

Property related response output:

  • readproperty ThingValue object
  • readallproperties map[name]ThingValue objects

Event related response output

  • readevent ThingValue object
  • readallevents map[name]ThingValue objects

func NewResponseMessage

func NewResponseMessage(operation string, thingID, name string, value any, err error, correlationID string) *ResponseMessage

NewResponseMessage creates a new ResponseMessage instance.

This sets status to completed if err is nil or Failed if err is provided. If the status is not completed or failed then set it to the appropriate value after creation.

operation is the request operation WoTOp... or HTOp...
thingID is the thing the value applies to (destination of action or source of event)
name is the name of the property, event or action affordance as described in the thing TD
value is the response data as defined in the corresponding affordance dataschema or nil if not applicable
err is the optional error response which will set status to bad request
correlationID  ID provided by the request

func (*ResponseMessage) Decode

func (resp *ResponseMessage) Decode(output any) error

Decode the value in the response If response is for an invokeaction then the output is extracted from the ActionStatus

func (*ResponseMessage) ToString

func (resp *ResponseMessage) ToString(maxlen int) string

ToString is a helper to easily read the response output as a string

type RnRChan

type RnRChan struct {
	// contains filtered or unexported fields
}

RnRChan is a helper for Request 'n Response message handling using channels. Intended to link responses in asynchronous request-response communication.

Usage:

  1. create a request ID: shortid.MustGenerate()
  2. register the request ID: c := Open(correlationID)
  3. Send the request message in the client, passing the correlationID
  4. Wait for a response: completed, data := WaitForResponse(c, timeout)
  5. Handle response message (in client callback): HandleResponse(correlationID,data)

func NewRnRChan

func NewRnRChan() *RnRChan

func (*RnRChan) Close

func (rnr *RnRChan) Close(correlationID string)

Close removes the request channel

func (*RnRChan) CloseAll

func (rnr *RnRChan) CloseAll()

CloseAll force closes all channels, ending all waits for RPC responses.

func (*RnRChan) HandleResponse

func (rnr *RnRChan) HandleResponse(msg *ResponseMessage) bool

HandleResponse writes a reply to the request channel.

This returns true on success or false if correlationID is unknown (no-one is waiting) It is up to the handler of this response to close the channel when done.

If a timeout passes while writing is block the write is released.

func (*RnRChan) Len

func (rnr *RnRChan) Len() int

func (*RnRChan) Open

func (rnr *RnRChan) Open(correlationID string) chan *ResponseMessage

Open a new channel for receiving response to a request Call Close when done.

This returns a reply channel on which the data is received. Use WaitForResponse(rChan)

func (*RnRChan) WaitForResponse

func (rnr *RnRChan) WaitForResponse(
	replyChan chan *ResponseMessage, timeout time.Duration) (
	hasResponse bool, resp *ResponseMessage)

WaitForResponse waits for an answer received on the reply channel. After timeout without response this returns with completed is false.

Use 'autoclose' to immediately close the channel when no further responses are expected. (they would not be read and thus lost)

If the channel was closed this returns hasResponse with no reply

type ThingValue

type ThingValue struct {
	// Type of affordance this is a value of: AffordanceTypeProperty|Event|Action
	AffordanceType AffordanceType `json:"affordanceType"`

	// Output with Payload
	//
	// Data in format as described by the thing's affordance
	Data any `json:"data,omitempty"`

	// Name with affordance name
	//
	// Name of the affordance holding the value
	Name string `json:"name,omitempty"`

	// ThingID with Thing ID
	//
	// Digital twin Thing ID
	ThingID string `json:"thingID,omitempty"`

	// Timestamp with Timestamp time
	//
	// Time the value was last updated
	Timestamp string `json:"timestamp,omitempty"`
}

ThingValue is the internal API response payload to subscribeevent, observeproperty, readevent and readproperty operations. The protocol binding maps between this and the protocol way of encoding values.

func NewThingValue

func NewThingValue(affordanceType AffordanceType, thingID, name string, data any, timestamp string) *ThingValue

func (*ThingValue) ToString

func (tv *ThingValue) ToString(maxlen int) string

ToString is a helper to easily read the response output as a string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL