Documentation
¶
Overview ¶
Package messaging with the 3 flow messages: requests, response and notifications
Package transports with the interface of a client transport connection ¶
Package transports with the 3 flow messages: requests, response and notifications
Package transports with the 3 flow messages: requests, response and notifications
Package messaging with the 3 flow messages: requests, response and notifications
Index ¶
- Constants
- Variables
- type ActionStatus
- type AffordanceType
- type Agent
- func (ag *Agent) PubActionProgress(req RequestMessage, value any) error
- func (ag *Agent) PubEvent(thingID string, name string, value any) error
- func (ag *Agent) PubProperties(thingID string, propMap map[string]any) error
- func (ag *Agent) PubProperty(thingID string, name string, value any) error
- func (ag *Agent) SendResponse(resp *ResponseMessage) error
- func (ag *Agent) SetRequestHandler(cb RequestHandler)
- func (ag *Agent) UpdateThing(tdoc *td.TD) error
- type ConnectionHandler
- type ConnectionInfo
- type Consumer
- func (co *Consumer) Disconnect()
- func (co *Consumer) GetClientID() string
- func (co *Consumer) GetConnection() IConnection
- func (co *Consumer) InvokeAction(dThingID, name string, input any, output any) error
- func (co *Consumer) IsConnected() bool
- func (co *Consumer) ObserveProperty(thingID string, name string) error
- func (co *Consumer) Ping() error
- func (co *Consumer) QueryAction(thingID, name string) (value ActionStatus, err error)
- func (co *Consumer) QueryAllActions(thingID string) (values map[string]ActionStatus, err error)
- func (co *Consumer) ReadAllProperties(thingID string) (values map[string]ThingValue, err error)
- func (co *Consumer) ReadProperty(thingID, name string) (value ThingValue, err error)
- func (co *Consumer) Rpc(operation, thingID, name string, input any, output any) error
- func (co *Consumer) SendRequest(req *RequestMessage, waitForCompletion bool) (resp *ResponseMessage, err error)
- func (co *Consumer) SetConnectHandler(cb ConnectionHandler)
- func (co *Consumer) SetNotificationHandler(cb NotificationHandler)
- func (co *Consumer) SetResponseHandler(cb ResponseHandler)
- func (co *Consumer) Subscribe(thingID string, name string) error
- func (co *Consumer) UnobserveProperty(thingID string, name string) error
- func (co *Consumer) Unsubscribe(thingID string, name string) error
- func (co *Consumer) WaitForCompletion(rChan chan *ResponseMessage, operation, correlationID string, ...) (resp *ResponseMessage, err error)
- func (co *Consumer) WriteProperty(thingID string, name string, input any, wait bool) error
- type ErrorValue
- type GetFormHandler
- type IAuthenticator
- type IClientConnection
- type IConnection
- type IMessageConverter
- type IServerConnection
- type NotificationHandler
- type NotificationMessage
- type RequestHandler
- type RequestMessage
- func (req *RequestMessage) CreateActionResponse(actionID string, state string, output any, err error) (resp *ResponseMessage, as *ActionStatus)
- func (req *RequestMessage) CreateErrorResponse(err error) (errResp *ResponseMessage)
- func (req *RequestMessage) CreateNotification() (notif *NotificationMessage)
- func (req *RequestMessage) CreateResponse(value any, err error) (resp *ResponseMessage)
- func (req *RequestMessage) ToObject(input any) error
- func (req *RequestMessage) ToString(maxlen int) string
- type ResponseHandler
- type ResponseMessage
- type RnRChan
- func (rnr *RnRChan) Close(correlationID string)
- func (rnr *RnRChan) CloseAll()
- func (rnr *RnRChan) HandleResponse(msg *ResponseMessage) bool
- func (rnr *RnRChan) Len() int
- func (rnr *RnRChan) Open(correlationID string) chan *ResponseMessage
- func (rnr *RnRChan) WaitForResponse(replyChan chan *ResponseMessage, timeout time.Duration) (hasResponse bool, resp *ResponseMessage)
- type ThingValue
Constants ¶
const ( // StatusPending - the request has not yet been delivered StatusPending = "pending" // StatusRunning - the request is being processed StatusRunning = "running" // StatusCompleted - the request processing was completed StatusCompleted = "completed" // StatusFailed - the request processing or delivery failed StatusFailed = "failed" )
Request status provided with the response. this aligns with action status values from WoT spec
const ( // WoT http basic protocol without return channel ProtocolTypeHTTPBasic = "http-basic" // websocket sub-protocol ProtocolTypeWSS = "wss" // WoT MQTT protocol over WSS ProtocolTypeWotMQTTWSS = "mqtt-wss" // HiveOT http SSE subprotocol return channel with direct messaging ProtocolTypeHiveotSSE = "hiveot-sse" // HiveOT message envelope passthrough ProtocolTypePassthrough = "passthrough" )
Supported transport protocol bindings types
const DefaultRpcTimeout = time.Second * 60 // 60 for testing; 3 seconds
const MessageTypeNotification = "notification"
MessageTypeNotification identify the message as a notification.
const MessageTypeRequest = "request"
MessageTypeRequest constant that identify a payload as a request
const MessageTypeResponse = "response"
MessageTypeResponse identify the message as a response.
const ThingDirectoryDThingID = "dtw:digitwin:ThingDirectory"
ThingDirectoryDThingID is the Digitwin ThingID of the runtime Directory Service This is duplicated from DirectoryConsumerAPI.go to avoid a compile-time dependency on the runtime. (would be circular). What would be better is to determine this from discovery but this is a lot of work just for cleanliness's sake. Used to update the directory with TD's using this agent.
const ThingDirectoryUpdateThingMethod = "updateThing"
Variables ¶
Functions ¶
This section is empty.
Types ¶
type ActionStatus ¶
type ActionStatus struct {
// ActionID that uniquely identifies the action instance.
// This can be an identifier or a URL.
ActionID string `json:"actionID,omitempty"`
// Error with info when action failed
Error *ErrorValue `json:"error,omitempty"`
// Input of action
Input any `json:"input,omitempty"`
// Name of action
Name string `json:"name,omitempty"`
// Output with Action output
Output any `json:"output,omitempty"`
// SenderID of the client requesting the action
SenderID string `json:"senderID,omitempty"`
// State of action with progress: pending, running, completed, failed
State string `json:"state,omitempty"`
// ThingID digital-twin ThingID the action applies to
ThingID string `json:"thingID,omitempty"`
// Requested time the action request was received
TimeRequested string `json:"timeRequested,omitempty"`
// Updated time the action status was last updated
TimeUpdated string `json:"timeUpdated,omitempty"`
}
ActionStatus is used for tracking the status of an action. NOTE: keep this in sync with the digital twin ActionStatus in the TD.
type AffordanceType ¶
type AffordanceType string
const AffordanceTypeAction AffordanceType = "action"
const AffordanceTypeEvent AffordanceType = "event"
const AffordanceTypeProperty AffordanceType = "property"
type Agent ¶
type Agent struct {
*Consumer
// contains filtered or unexported fields
}
Agent provides the messaging functions needed by hub agents. Agents are also consumers as they are able to invoke services.
Hub agents receive requests and return responses. The underlying transport protocol binding handles subscription.
func NewAgent ¶
func NewAgent(cc IConnection, connHandler ConnectionHandler, notifHandler NotificationHandler, reqHandler RequestHandler, respHandler ResponseHandler, timeout time.Duration) *Agent
NewAgent creates a new agent instance for serving requests and sending responses. Since agents are also consumers, they can also send requests and receive responses.
Agents can be connected to when running a server or connect to a hub or gateway as client.
This is a wrapper around the ClientConnection that provides WoT response messages publishing properties and events to subscribers and publishing a TD.
func (*Agent) PubActionProgress ¶
func (ag *Agent) PubActionProgress(req RequestMessage, value any) error
PubActionProgress helper for agents to send a 'running' ActionStatus notification
This sends an ActionStatus message with status of running.
func (*Agent) PubEvent ¶
PubEvent helper for agents to send an event to subscribers.
The underlying transport protocol binding handles the subscription mechanism as the agent itself doesn't track subscriptions.
func (*Agent) PubProperties ¶
PubProperties helper for agents to publish a map of property values
The underlying transport protocol binding handles the subscription mechanism.
func (*Agent) PubProperty ¶
PubProperty helper for agents to publish a property value notification to observers.
The underlying transport protocol binding handles the subscription mechanism.
func (*Agent) SendResponse ¶
func (ag *Agent) SendResponse(resp *ResponseMessage) error
SendResponse sends a response for a previous request
func (*Agent) SetRequestHandler ¶
func (ag *Agent) SetRequestHandler(cb RequestHandler)
SetRequestHandler set the application handler for incoming requests
type ConnectionHandler ¶
type ConnectionHandler func(connected bool, err error, c IConnection)
ConnectionHandler handles a change in connection status
connected is true when connected without errors err details why connection failed c is the connection instance being established or disconnected
type ConnectionInfo ¶
type ConnectionInfo struct {
// Connection CA
CaCert *x509.Certificate
// GetClientID returns the authenticated clientID of this connection
ClientID string
// GetConnectionID returns the client's connection ID belonging to this endpoint
ConnectionID string
// GetConnectURL returns the full server URL used to establish this connection
ConnectURL string
// Connection timeout settings (clients only)
Timeout time.Duration
}
ConnectionInfo provides details of a connection
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer provides the messaging functionality for consumers This provides a golang API to consumer operations.
func NewConsumer ¶
func NewConsumer(cc IConnection, rpcTimeout time.Duration) *Consumer
NewConsumer returns a new instance of the WoT consumer for use with the given connection. The connection should not be used by others as this consumer takes possession by registering connection callbacks.
This provides the API for common WoT operations such as invoking actions and supports RPC calls by waiting for a response.
Use SetNotificationHandler to set the callback to receive async notifications. Use SetResponseHandler to set the callback to receive async responses. Use SetConnectHandler to set the callback to be notified of connection changes.
cc the client connection to use for sending requests and receiving responses. timeout of the rpc connections or 0 for default (3 sec)
func (*Consumer) Disconnect ¶
func (co *Consumer) Disconnect()
Disconnect the client connection. Do not use this consumer after disconnect.
func (*Consumer) GetClientID ¶
GetClientID returns the client's account ID
func (*Consumer) GetConnection ¶
func (co *Consumer) GetConnection() IConnection
GetConnection returns the underlying connection of this consumer
func (*Consumer) InvokeAction ¶
InvokeAction invokes an action on a thing and wait for the response If the response type is known then provide it with output, otherwise use interface{}
func (*Consumer) IsConnected ¶
IsConnected returns true if the consumer has a connection
func (*Consumer) ObserveProperty ¶
ObserveProperty sends a request to observe one or all properties
thingID is empty for all things name is empty for all properties of the selected things
func (*Consumer) Ping ¶
Ping the server and wait for a pong response This uses the underlying transport native method of ping-pong.
func (*Consumer) QueryAction ¶
func (co *Consumer) QueryAction(thingID, name string) ( value ActionStatus, err error)
QueryAction obtains the status of an action
Q: http-basic protocol returns an array per action in QueryAllActions but only
a single action in QueryAction. This is inconsistent.
The underlying protocol binding constructs the ActionStatus from the protocol specific messages. The hiveot protocol passes this as-is as the output.
func (*Consumer) QueryAllActions ¶
func (co *Consumer) QueryAllActions(thingID string) ( values map[string]ActionStatus, err error)
QueryAllActions returns a map of action status for all actions of a thing.
This returns a map of actionName and the last known action status.
Q: http-basic protocol returns an array for each action. What is the use-case?
that can have multiple concurrent actions? An actuator can only move in one direction at the same time. Maybe the array only applies to stateless actions?
This depends on the underlying protocol binding to construct appropriate ActionStatus message. All hiveot protocols include full information. WoT bindings might not include update timestamp and such.
func (*Consumer) ReadAllProperties ¶
func (co *Consumer) ReadAllProperties(thingID string) ( values map[string]ThingValue, err error)
ReadAllProperties sends a request to read all Thing property values.
This depends on the underlying protocol binding to construct appropriate ResponseMessages and include information such as Timestamp. All hiveot protocols include full information. WoT bindings might be more limited.
func (*Consumer) ReadProperty ¶
func (co *Consumer) ReadProperty(thingID, name string) ( value ThingValue, err error)
ReadProperty sends a request to read a Thing property value.
This depends on the underlying protocol binding to construct appropriate ResponseMessages and include information such as Timestamp. All hiveot protocols include full information. WoT bindings might be too limited.
func (*Consumer) Rpc ¶
Rpc sends a request message and waits for a response. This returns an error if the request fails or if the response contains an error
func (*Consumer) SendRequest ¶
func (co *Consumer) SendRequest(req *RequestMessage, waitForCompletion bool) ( resp *ResponseMessage, err error)
SendRequest sends an operation request and optionally waits for completion or timeout. If waitForCompletion is true and no correlationID is provided then a correlationID will be generated to wait for completion.
If waitForCompletion is false then any response will go to the async response handler and this returns nil response. If waitForCompletion is true this will wait until a response is received with a matching correlationID, or until a timeout occurs.
If the request has no correlation ID, one will be generated.
func (*Consumer) SetConnectHandler ¶
func (co *Consumer) SetConnectHandler(cb ConnectionHandler)
SetConnectHandler sets the notification handler of changes to this consumer connection Intended to notify the client that a reconnect or relogin is needed. Only a single handler is supported. This replaces the previously set callback.
func (*Consumer) SetNotificationHandler ¶
func (co *Consumer) SetNotificationHandler(cb NotificationHandler)
SetNotificationHandler sets the notification handler of events for this consumer Only a single handler is supported. This replaces the previously set callback.
func (*Consumer) SetResponseHandler ¶
func (co *Consumer) SetResponseHandler(cb ResponseHandler)
SetResponseHandler set the handler that receives asynchronous responses Those are response to requests that are not waited for using the baseRnR handler.
func (*Consumer) Subscribe ¶
Subscribe to one or all events of a thing name is the event to subscribe to or "" for all events
func (*Consumer) UnobserveProperty ¶
UnobserveProperty a previous observed property or all properties
func (*Consumer) Unsubscribe ¶
Unsubscribe is a helper for sending an unsubscribe request
func (*Consumer) WaitForCompletion ¶
func (co *Consumer) WaitForCompletion( rChan chan *ResponseMessage, operation, correlationID string, ignoreDisconnect bool) ( resp *ResponseMessage, err error)
WaitForCompletion waits for a completed or failed response message on the given correlationID channel, or until N seconds passed, or the connection drops.
If a proper response is received it is written to the given output and nil (no error) is returned. If anything goes wrong, an error is returned
type ErrorValue ¶
type ErrorValue struct {
// Status code: https://w3c.github.io/wot-profile/#error-responses
Status int `json:"status"`
// Type is a URI reference [RFC3986] that identifies the problem type.
Type string `json:"type"`
// Title contains a short, human-readable summary of the problem type
Title string `json:"title"`
// Detail a human-readable explanation
Detail string `json:"detail,omitempty"`
}
Error response payload
func ErrorValueFromError ¶
func ErrorValueFromError(err error) *ErrorValue
Create an ErrorValue object from the given error. This returns nil if err is nil.
func (*ErrorValue) AsError ¶
func (e *ErrorValue) AsError() error
AsError returns an error instance or nil if no error is contained
func (*ErrorValue) String ¶
func (e *ErrorValue) String() string
type GetFormHandler ¶
GetFormHandler is the handler that provides the client with the form needed to invoke an operation This returns nil if no form is found for the operation.
type IAuthenticator ¶
type IAuthenticator interface {
// AddSecurityScheme adds the wot securityscheme to the given TD
AddSecurityScheme(tdoc *td.TD)
// CreateSessionToken creates a signed session token for a client and adds the session
// sessionID is required. For persistent sessions use the clientID.
CreateSessionToken(clientID, sessionID string, validity time.Duration) (token string)
// DecodeSessionToken and return its claims
DecodeSessionToken(sessionToken string, signedNonce string, nonce string) (
clientID string, sessionID string, err error)
// GetAlg returns the supported security format and authentication algorithm.
// This uses the vocabulary as defined in the TD.
// JWT: "ES256", "ES512", "EdDSA"
// paseto: "local" (symmetric), "public" (asymmetric)
GetAlg() (string, string)
// Login with a password and obtain a new session token with limited duration
// This creates a new session. The token must be refreshed to keep the session alive.
Login(login string, password string) (token string, err error)
// Logout removes the session
Logout(clientID string)
// RefreshToken issues a new session token with an updated expiry time.
// This extends the life of the session.
//
// clientID Client whose token to refresh
// oldToken must be valid
// validitySec validity in seconds of the new token
//
// This returns a new token or an error if the old token isn't valid or doesn't match clientID
RefreshToken(senderID string, oldToken string) (newToken string, err error)
// Set the method to
SetAuthServerURI(authServiceURI string)
// ValidatePassword checks if the given password is valid for the client
ValidatePassword(clientID string, password string) (err error)
// ValidateToken validates the auth token and returns the token clientID.
// If the token is invalid an error is returned
ValidateToken(token string) (clientID string, sessionID string, err error)
}
IAuthenticator is the interface of the authentication capability to obtain and validate session tokens.
type IClientConnection ¶
type IClientConnection interface {
IConnection
// ConnectWithToken connects to the messaging server using an authentication token.
//
// If a connection is already established on this client then it will be closed first.
//
// This connection method must be supported by all transport implementations.
ConnectWithToken(token string) (err error)
}
IClientConnection defines the client interface for establishing connections with a server Intended for consumers to connect to a Thing Agent/Hub and for Service agents that connect to the Hub.
type IConnection ¶
type IConnection interface {
// Disconnect the client.
Disconnect()
// GetConnectionInfo return details of the connection
GetConnectionInfo() ConnectionInfo
// IsConnected returns the current connection status
IsConnected() bool
// SendNotification [agent] sends a notification to subscribers.
// This returns an error if the notification could not be delivered
SendNotification(notif *NotificationMessage) error
// SendRequest client sends a request to an agent.
// This returns an error if the request could not be delivered
SendRequest(req *RequestMessage) error
// SendResponse [agent] sends a response to a request.
// This returns an error if the response could not be delivered
SendResponse(response *ResponseMessage) error
// SetConnectHandler sets the callback for connection status changes
// This replaces any previously set handler.
SetConnectHandler(handler ConnectionHandler)
// SetNotificationHandler [client] sets the callback for receiving notifications.
// This replaces any previously set handler.
SetNotificationHandler(handler NotificationHandler)
// SetRequestHandler set the handler for receiving requests that return a response.
// This replaces any previously set handler.
SetRequestHandler(handler RequestHandler)
// SetResponseHandler [consumer] sets the callback for receiving unhandled
// asynchronous responses to requests.
// If a request is sent with 'sync' set to true then SendRequest will handle
// the response instead.
//
// This replaces any previously set handler.
SetResponseHandler(handler ResponseHandler)
}
IConnection defines the interface of a server or client connection. Intended for exchanging messages between servients.
type IMessageConverter ¶
type IMessageConverter interface {
// DecodeNotification converts a protocol message to a hiveot notification message
// provide the serialized data to avoid multiple unmarshalls
// This returns nil if this isn't a notification.
DecodeNotification(raw []byte) *NotificationMessage
// DecodeRequest converts a protocol message to a hiveot request message
// provide the serialized data to avoid multiple unmarshalls
// This returns nil if this isn't a request.
DecodeRequest(raw []byte) *RequestMessage
// DecodeResponse converts a protocol message to a hiveot response message.
// This returns nil if this isn't a response
DecodeResponse(raw []byte) *ResponseMessage
// EncodeNotification converts a hiveot NotificationMessage to a native protocol message
// return an error if the message cannot be converted.
EncodeNotification(notif *NotificationMessage) (any, error)
// EncodeRequest converts a hiveot RequestMessage to a native protocol message
// return an error if the message cannot be converted.
EncodeRequest(req *RequestMessage) (any, error)
// EncodeResponse converts a hiveot ResponseMessage to a native protocol message
// This returns an error response if the message cannot be converted
EncodeResponse(resp *ResponseMessage) any
// GetProtocolType provides the protocol type for these messages,
// eg ProtocolTypeWSS
GetProtocolType() string
}
IMessageConverter converts between the standardized hiveot request, response and notification messages, and the underlying protocol specific message format.
Its purpose is to assist in decoupling the consumer from the messaging protocol used.
This is used for the WoT websocket protocol, HttpBasic/SSE-SC protocol, MQTT protocol, the native Hiveot transfer and others.
Intended for use by consumers and agents on the client and server side.
type IServerConnection ¶
type IServerConnection interface {
IConnection
}
IServerConnection is the interface of an incoming client connection on the server. Protocol servers must implement this interface to return information to the consumer.
This provides a return channel for sending messages from the digital twin to agents or consumers.
Subscription to events or properties can be made externally via this API, or handled internally by the protocol handler if the protocol defines the messages for subscription.
type NotificationHandler ¶
type NotificationHandler func(msg *NotificationMessage)
NotificationHandler handles a subscruption notification, send by an agent.
retry sending the response at a later time.
type NotificationMessage ¶
type NotificationMessage struct {
// CorrelationID of the request this is a response to, if any.
CorrelationID string `json:"correlationID,omitempty"`
// MessageID unique ID of the message. Intended to detect duplicates.
// Generated by the protocol binding.
MessageID string `json:"messageID,omitempty"`
// MessageType identifies this message payload as a response
// This is set to the value of MessageTypeNotification
MessageType string `json:"messageType"`
// Name of the action or property affordance this is a response from.
// This field is optional and intended to help debugging and logging.
Name string `json:"name,omitempty"`
// The operation this is a notification of. Eg subscribeevent or observeproperty.
Operation string `json:"operation"`
// Authenticated ID of the agent sending the notification, set by the server.
// The protocol server MUST set this to the authenticated sender.
SenderID string `json:"senderID"`
// ThingID of the thing this is a response from.
// For responses passed to consumers this is the digitwin dThingID
// For responses sent by agents this is the agent ThingID
// This field is optional and intended to help debugging and logging.
ThingID string `json:"thingID,omitempty"`
// Timestamp the notification was created
Timestamp string `json:"timestamp,omitempty"`
// Value containing the notification data as described in the TD event or property dataschema.
// If the operation is one of the Thing level operations, the output is specified
// by the operation's dataschema. WoT doesn't have this yet so hiveot will
// define the missing bits if any. (see documentation)
Value any `json:"value"` // native
}
NotificationMessage for sending asynchronous notifications to a subscriber/observer
The Data field contains the message response data as defined by the operation Possible operations in notifications:
- invokeaction action status as per TD
- observeproperty property value as per TD, when status==running
- observeallproperties map[name]value (multiple updates)
- subscribeevent event value as per TD, when status==running
func NewNotificationMessage ¶
func NewNotificationMessage(operation string, thingID, name string, value any) *NotificationMessage
NewNotificationMessage creates a new NotificationMessage instance.
operation is the notification operation thingID is the thing the value applies to (destination of action or source of event) name is the name of the property, event or action affordance as described in the thing TD value is the data as defined in the corresponding affordance dataschema or nil if not applicable
func (*NotificationMessage) ToString ¶
func (notif *NotificationMessage) ToString(maxlen int) string
ToString is a helper to easily read the notification data as a string
type RequestHandler ¶
type RequestHandler func(req *RequestMessage, c IConnection) (response *ResponseMessage)
RequestHandler agent processes a request and returns a response.
req is the envelope that contains the request to process c is the connection on which the request arrived and on which to send asynchronous response(s).
type RequestMessage ¶
type RequestMessage struct {
// CorrelationID of the message. Uniquely identifies the request and must be included
// in the response.
// Notifications can include this to correlate with the subscription.
// Message streams can include this to correlate with the original request.
// This is optional. If omitted, no response will be received.
CorrelationID string `json:"correlationID,omitempty"`
// Created holds the timestamp the request was created in utc
// This MUST be set by the protocol server if not provided.
Created string `json:"created"`
// Input for the request as described in the TD affordance dataschema.
// If the operation is one of the Thing level operations, the input is specified
// by the operation's dataschema. WoT doesn't have this yet so hiveot will
// define the missing bits if any.
// Note: queryaction,cancelaction carry actionID in the Input
Input any `json:"input,omitempty"` // native
// MessageID unique ID of the message. Intended to detect duplicates.
// Generated by the protocol binding if not provided.
MessageID string `json:"messageID"`
// MessageType identifies this message payload as a request.
// This is required and set to "request".
MessageType string `json:"messageType"`
// Name of the event, action or property affordance the request is for.
// This field is optional and only required for specific operations.
Name string `json:"name,omitempty"`
// The operation for this message as defined in TD-1.1 (WotOp...)
// This identifies the request and is a required field.
Operation string `json:"operation"`
// SenderID is the authenticated ID of the client sending the request.
// The protocol server MUST set this to the authenticated client.
// Intended for services that link requests to the client, such as the state storage service.
// This is specific to the Hub as it proxies requests on behalf of clients.
SenderID string `json:"senderID,omitempty"`
// ThingID of the thing this request is for.
// For messages from consumers this is the digitwin dThingID
// For messages to agents this is the agent ThingID
// This field is required.
ThingID string `json:"thingID"`
}
RequestMessage for sending a request for an operation on a Thing or service. Agents/Things MUST send a response when a request is received and a correlationID is included.
The following operations are considered to be requests:
- invokeaction, cancelaction [WoT]
- queryaction, queryallactions [WoT]
- readevent, readallevents (of a Thing) [HiveOT extension]
- subscribe, unsubscribe [WoT] (handled by protocol bindings)
- observe, unobserve [WoT] (handled by protocol bindings)
- readproperty, readallproperties [WoT]
- readtd, readalltds (of a directory or thing) [HiveOT extension]
func NewRequestMessage ¶
func NewRequestMessage(operation string, thingID, name string, input any, correlationID string) *RequestMessage
NewRequestMessage creates a new RequestMessage instance.
operation is the request operation WoTOp... or HTOp... thingID is the thing the value applies to (destination of action or source of event) name is the name of the property, event or action affordance as described in the thing TD input is the request input as defined in the corresponding affordance dataschema. correlationID unique ID of the request or empty when no response is expected
func (*RequestMessage) CreateActionResponse ¶
func (req *RequestMessage) CreateActionResponse( actionID string, state string, output any, err error) (resp *ResponseMessage, as *ActionStatus)
CreateActionResponse is a helper to easily create an action response. This generates an ActionStatus value with the given actionID, state and output.
Typically used by agents to create a response to an invokeaction request. Output can be nil if the action has no output or state is not yet completed. If there is a transient output it can be used as output with status running.
actionID contains the actionID of the action instance, generated by the device state contains the action state, one of Status... constants output contains the action output as defined in the action affordance
func (*RequestMessage) CreateErrorResponse ¶
func (req *RequestMessage) CreateErrorResponse(err error) (errResp *ResponseMessage)
CreateErrorResponse is a helper to easily create an error response from a request. If operation is invokeaction then it generates an ActionStatus response with the given error.
err is set when the request has failed. In that case value can contain error details
func (*RequestMessage) CreateNotification ¶
func (req *RequestMessage) CreateNotification() (notif *NotificationMessage)
CreateNotification is a helper to easily create a status update of a running request.
data contains the payload to include in the notification
func (*RequestMessage) CreateResponse ¶
func (req *RequestMessage) CreateResponse(value any, err error) (resp *ResponseMessage)
CreateResponse is a helper to easily create a response from a request
If operation is invokeaction then it generates an ActionStatus response with the given value as its output and state Completed
value contains the response data err is set when the request has failed. In that case value can contain error details
func (*RequestMessage) ToObject ¶
func (req *RequestMessage) ToObject(input any) error
ToObject is a helper to easily convert the request input to an object
func (*RequestMessage) ToString ¶
func (req *RequestMessage) ToString(maxlen int) string
ToString is a helper to easily convert the request input to a string maxlen is the maximum string length or 0 for unlimited
type ResponseHandler ¶
type ResponseHandler func(msg *ResponseMessage) error
ResponseHandler handles a response to a request, send by an agent. The handler delivers the response to the client that sent the original request.
This returns an error if the response cannot be delivered. This can be used to retry sending the response at a later time.
type ResponseMessage ¶
type ResponseMessage struct {
// CorrelationID of the request this is a response to, if any.
CorrelationID string `json:"correlationID,omitempty"`
// Error contains the short error description when status is failed.
// Matches RFC9457 https://www.rfc-editor.org/rfc/rfc9457
Error *ErrorValue `json:"error"`
// MessageID unique ID of the message. Intended to detect duplicates.
// Generated by the protocol binding.
MessageID string `json:"messageID,omitempty"`
// MessageType identifies this message payload as a response
// This is set to the value of MessageTypeResponse
MessageType string `json:"messageType"`
// Name of the action or property affordance this is a response from.
Name string `json:"name"`
// The operation this is a response to. This MUST be the operation provided in the request.
Operation string `json:"operation"`
// Authenticated ID of the agent sending the response, set by the server.
//
// This is non-wot and a feature of the hiveot Hub, to allow services to link requests
// to authenticated users.
//
// The Hub protocol server MUST set this to the authenticated sender.
SenderID string `json:"senderID"`
// ThingID of the thing this is a response from.
// For responses passed to consumers this is the digitwin dThingID
// For responses sent by agents this is the agent ThingID
// This field is optional and intended to help debugging and logging.
ThingID string `json:"thingID,omitempty"`
// Timestamp the response was created
Timestamp string `json:"timestamp,omitempty"`
// Value of the response as described in the TD affordance output or value dataschema.
// If the operation is one of the Thing level operations, the value is specified
// by the operation's dataschema.
// In case of actions, the value holds the ActionStatus.
//
// Note that different protocol bindings use a different field depending on the operation.
// The message converter stores the response value(s) into the Value field.
//
// If an error is returned then value optionally contains a detailed error description.
Value any `json:"value"`
}
ResponseMessage serves to notify a client of the result of a request.
The Value field contains the message response data as defined by the operation Action related response output:
- invokeaction action output as per TD, when status==completed
- queryaction []ActionStatus object array
- queryallactions map [name][]ActionStatus objects
Property related response output:
- readproperty ThingValue object
- readallproperties map[name]ThingValue objects
Event related response output
- readevent ThingValue object
- readallevents map[name]ThingValue objects
func NewResponseMessage ¶
func NewResponseMessage(operation string, thingID, name string, value any, err error, correlationID string) *ResponseMessage
NewResponseMessage creates a new ResponseMessage instance.
This sets status to completed if err is nil or Failed if err is provided. If the status is not completed or failed then set it to the appropriate value after creation.
operation is the request operation WoTOp... or HTOp... thingID is the thing the value applies to (destination of action or source of event) name is the name of the property, event or action affordance as described in the thing TD value is the response data as defined in the corresponding affordance dataschema or nil if not applicable err is the optional error response which will set status to bad request correlationID ID provided by the request
func (*ResponseMessage) Decode ¶
func (resp *ResponseMessage) Decode(output any) error
Decode the value in the response If response is for an invokeaction then the output is extracted from the ActionStatus
func (*ResponseMessage) ToString ¶
func (resp *ResponseMessage) ToString(maxlen int) string
ToString is a helper to easily read the response output as a string
type RnRChan ¶
type RnRChan struct {
// contains filtered or unexported fields
}
RnRChan is a helper for Request 'n Response message handling using channels. Intended to link responses in asynchronous request-response communication.
Usage:
- create a request ID: shortid.MustGenerate()
- register the request ID: c := Open(correlationID)
- Send the request message in the client, passing the correlationID
- Wait for a response: completed, data := WaitForResponse(c, timeout)
- Handle response message (in client callback): HandleResponse(correlationID,data)
func NewRnRChan ¶
func NewRnRChan() *RnRChan
func (*RnRChan) CloseAll ¶
func (rnr *RnRChan) CloseAll()
CloseAll force closes all channels, ending all waits for RPC responses.
func (*RnRChan) HandleResponse ¶
func (rnr *RnRChan) HandleResponse(msg *ResponseMessage) bool
HandleResponse writes a reply to the request channel.
This returns true on success or false if correlationID is unknown (no-one is waiting) It is up to the handler of this response to close the channel when done.
If a timeout passes while writing is block the write is released.
func (*RnRChan) Open ¶
func (rnr *RnRChan) Open(correlationID string) chan *ResponseMessage
Open a new channel for receiving response to a request Call Close when done.
This returns a reply channel on which the data is received. Use WaitForResponse(rChan)
func (*RnRChan) WaitForResponse ¶
func (rnr *RnRChan) WaitForResponse( replyChan chan *ResponseMessage, timeout time.Duration) ( hasResponse bool, resp *ResponseMessage)
WaitForResponse waits for an answer received on the reply channel. After timeout without response this returns with completed is false.
Use 'autoclose' to immediately close the channel when no further responses are expected. (they would not be read and thus lost)
If the channel was closed this returns hasResponse with no reply
type ThingValue ¶
type ThingValue struct {
// Type of affordance this is a value of: AffordanceTypeProperty|Event|Action
AffordanceType AffordanceType `json:"affordanceType"`
// Output with Payload
//
// Data in format as described by the thing's affordance
Data any `json:"data,omitempty"`
// Name with affordance name
//
// Name of the affordance holding the value
Name string `json:"name,omitempty"`
// ThingID with Thing ID
//
// Digital twin Thing ID
ThingID string `json:"thingID,omitempty"`
// Timestamp with Timestamp time
//
// Time the value was last updated
Timestamp string `json:"timestamp,omitempty"`
}
ThingValue is the internal API response payload to subscribeevent, observeproperty, readevent and readproperty operations. The protocol binding maps between this and the protocol way of encoding values.
func NewThingValue ¶
func NewThingValue(affordanceType AffordanceType, thingID, name string, data any, timestamp string) *ThingValue
func (*ThingValue) ToString ¶
func (tv *ThingValue) ToString(maxlen int) string
ToString is a helper to easily read the response output as a string