Documentation
¶
Index ¶
- func EvaluateCondition(script string, newValue, oldValue interface{}, tagName string) (bool, error)
- func SetGlobalManager(manager *HubExecutorManager)
- func StartGlobalExecutor(hubName string) error
- func StopGlobalExecutor(hubName string) error
- type ActionTrigger
- type ActiveMQHandler
- type ActiveMQHandlerConfig
- type DefaultDestinationExecutor
- func (d *DefaultDestinationExecutor) Execute(ctx context.Context, dest Destination, payload *MessagePayload) (*ExecutionResult, error)
- func (d *DefaultDestinationExecutor) SetAgentGatewayExecutor(fn func(ctx context.Context, gatewayID, input string) (string, error))
- func (d *DefaultDestinationExecutor) SetHistoryRecorder(recorder HistoryRecorder)
- func (d *DefaultDestinationExecutor) SetHubInfo(hubID, hubName string)
- func (d *DefaultDestinationExecutor) SetJobExecutor(...)
- type Destination
- type DestinationExecutor
- type ExecutionResult
- type ExecutorInfo
- type ExecutorStatus
- type HandlerInfo
- type HistoryRecorder
- type HubExecutor
- func (e *HubExecutor) CompleteTransaction(ctx context.Context, payload *MessagePayload, response string, ...)
- func (e *HubExecutor) FailTransaction(ctx context.Context, payload *MessagePayload, errorMessage string)
- func (e *HubExecutor) GetHub() *models.IntegrationHub
- func (e *HubExecutor) GetHubName() string
- func (e *HubExecutor) GetInfo() ExecutorInfo
- func (e *HubExecutor) Start() error
- func (e *HubExecutor) Status() ExecutorStatus
- func (e *HubExecutor) Stop() error
- type HubExecutorConfig
- type HubExecutorController
- func (c *HubExecutorController) GetExecutorInfo(ctx *gin.Context)
- func (c *HubExecutorController) GetExecutorStatus(ctx *gin.Context)
- func (c *HubExecutorController) ListExecutors(ctx *gin.Context)
- func (c *HubExecutorController) RegisterRoutes(router *gin.RouterGroup)
- func (c *HubExecutorController) RestartExecutor(ctx *gin.Context)
- func (c *HubExecutorController) StartAllExecutors(ctx *gin.Context)
- func (c *HubExecutorController) StartExecutor(ctx *gin.Context)
- func (c *HubExecutorController) StartExecutorByHubID(ctx *gin.Context)
- func (c *HubExecutorController) StopAllExecutors(ctx *gin.Context)
- func (c *HubExecutorController) StopExecutor(ctx *gin.Context)
- type HubExecutorManager
- func (m *HubExecutorManager) GetExecutorInfo(hubName string) (*ExecutorInfo, error)
- func (m *HubExecutorManager) GetExecutorStatus(hubName string) (ExecutorStatus, error)
- func (m *HubExecutorManager) GetRunningCount() int
- func (m *HubExecutorManager) IsRunning(hubName string) bool
- func (m *HubExecutorManager) ListExecutors() []ExecutorInfo
- func (m *HubExecutorManager) RestartExecutor(hubName string) error
- func (m *HubExecutorManager) SendToEndpoint(ctx context.Context, hubID string, protocolGroupID string, endpointID string, ...) (*OutboundSendResult, error)
- func (m *HubExecutorManager) SetAgentGatewayExecutor(fn func(ctx context.Context, gatewayID, input string) (string, error))
- func (m *HubExecutorManager) SetDestinationExecutor(executor DestinationExecutor)
- func (m *HubExecutorManager) SetHubLoader(loader HubLoader)
- func (m *HubExecutorManager) SetJobExecutor(executor JobExecutorFunc)
- func (m *HubExecutorManager) StartAll() error
- func (m *HubExecutorManager) StartAllExecutors() map[string]error
- func (m *HubExecutorManager) StartExecutor(hubName string) error
- func (m *HubExecutorManager) StartExecutorByHubID(hubID string) error
- func (m *HubExecutorManager) StopAll() error
- func (m *HubExecutorManager) StopExecutor(hubName string) error
- type HubExecutorState
- func (s *HubExecutorState) GetOPCTagValue(tagID string) *OPCTagValue
- func (s *HubExecutorState) GetStatus() ExecutorStatus
- func (s *HubExecutorState) IncrementErrorCount()
- func (s *HubExecutorState) IncrementMessageCount()
- func (s *HubExecutorState) SetStatus(status ExecutorStatus)
- func (s *HubExecutorState) UpdateOPCTagValue(tagID string, value *OPCTagValue)
- type HubLoader
- type JobExecutorFunc
- type KafkaHandler
- type KafkaHandlerConfig
- type MCPClientHandler
- func (h *MCPClientHandler) CallTool(ctx context.Context, toolName, argsJSON string) (string, error)
- func (h *MCPClientHandler) Name() string
- func (h *MCPClientHandler) Start(ctx context.Context) error
- func (h *MCPClientHandler) Status() ExecutorStatus
- func (h *MCPClientHandler) Stop(ctx context.Context) error
- func (h *MCPClientHandler) Type() ProtocolType
- type MCPClientHandlerConfig
- type MQTTHandler
- type MQTTHandlerConfig
- type ManagerConfig
- type MessagePayload
- type MongoHubLoader
- type MongoHubLoaderConfig
- type OPCAction
- type OPCTag
- type OPCTagValue
- type OPCUAHandler
- type OPCUAHandlerConfig
- type OutboundSendResult
- type ProtocolHandler
- type ProtocolType
- type SignalRHandler
- type SignalRHandlerConfig
- type TCPHandler
- type TCPHandlerConfig
- type TriggerMethod
- type WebServerFromProtocolGroupConfig
- type WebServerHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func EvaluateCondition ¶
EvaluateCondition evaluates a JavaScript condition script
func SetGlobalManager ¶
func SetGlobalManager(manager *HubExecutorManager)
SetGlobalManager sets the global hub executor manager
func StartGlobalExecutor ¶
StartGlobalExecutor starts an executor using the global manager
func StopGlobalExecutor ¶
StopGlobalExecutor stops an executor using the global manager
Types ¶
type ActionTrigger ¶
type ActionTrigger struct {
Method TriggerMethod `json:"method"`
IntervalMs int `json:"intervalMs,omitempty"`
MonitoredTags []string `json:"monitoredTags,omitempty"`
ConditionScript string `json:"conditionScript,omitempty"`
}
ActionTrigger represents an action trigger configuration
type ActiveMQHandler ¶
type ActiveMQHandler struct {
// contains filtered or unexported fields
}
ActiveMQHandler handles ActiveMQ protocol
func NewActiveMQHandler ¶
func NewActiveMQHandler(config ActiveMQHandlerConfig) *ActiveMQHandler
NewActiveMQHandler creates a new ActiveMQ handler
func (*ActiveMQHandler) Name ¶
func (h *ActiveMQHandler) Name() string
Name returns the handler name
func (*ActiveMQHandler) Start ¶
func (h *ActiveMQHandler) Start(ctx context.Context) error
Start starts the ActiveMQ handler
func (*ActiveMQHandler) Status ¶
func (h *ActiveMQHandler) Status() ExecutorStatus
Status returns the current status
func (*ActiveMQHandler) Stop ¶
func (h *ActiveMQHandler) Stop(ctx context.Context) error
Stop stops the ActiveMQ handler
func (*ActiveMQHandler) Type ¶
func (h *ActiveMQHandler) Type() ProtocolType
Type returns the protocol type
type ActiveMQHandlerConfig ¶
type ActiveMQHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
BrokerConfig *models.MessageBrokerConfig
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
ActiveMQHandlerConfig holds configuration for ActiveMQ handler
type DefaultDestinationExecutor ¶
type DefaultDestinationExecutor struct {
// contains filtered or unexported fields
}
DefaultDestinationExecutor implements the DestinationExecutor interface
func NewDestinationExecutor ¶
func NewDestinationExecutor() *DefaultDestinationExecutor
NewDestinationExecutor creates a new destination executor
func (*DefaultDestinationExecutor) Execute ¶
func (d *DefaultDestinationExecutor) Execute(ctx context.Context, dest Destination, payload *MessagePayload) (*ExecutionResult, error)
Execute executes a destination with the given payload
func (*DefaultDestinationExecutor) SetAgentGatewayExecutor ¶
func (d *DefaultDestinationExecutor) SetAgentGatewayExecutor(fn func(ctx context.Context, gatewayID, input string) (string, error))
SetAgentGatewayExecutor sets the agent gateway execution callback. Call this from contollershandlers.go after AgentGatewayService is initialized.
func (*DefaultDestinationExecutor) SetHistoryRecorder ¶
func (d *DefaultDestinationExecutor) SetHistoryRecorder(recorder HistoryRecorder)
SetHistoryRecorder sets the history recorder for tracking outbound operations
func (*DefaultDestinationExecutor) SetHubInfo ¶
func (d *DefaultDestinationExecutor) SetHubInfo(hubID, hubName string)
SetHubInfo sets the hub information for history recording
func (*DefaultDestinationExecutor) SetJobExecutor ¶
func (d *DefaultDestinationExecutor) SetJobExecutor(executor func(ctx context.Context, jobName string, params map[string]interface{}) error)
SetJobExecutor sets the job executor function
type Destination ¶
type Destination struct {
ID string `json:"id"`
Type string `json:"type"` // Job, Execute Transcode, Route to Outbound, Custom Script
Target string `json:"target,omitempty"`
MappingID string `json:"mappingId,omitempty"`
Config map[string]interface{} `json:"config,omitempty"`
}
Destination represents a destination configuration
type DestinationExecutor ¶
type DestinationExecutor interface {
// Execute executes a destination with the given payload
Execute(ctx context.Context, dest Destination, payload *MessagePayload) (*ExecutionResult, error)
}
DestinationExecutor is the interface for executing destinations
type ExecutionResult ¶
type ExecutionResult struct {
Success bool
Message string
Data interface{}
Error error
Duration time.Duration
Timestamp time.Time
}
ExecutionResult represents the result of destination execution
type ExecutorInfo ¶
type ExecutorInfo struct {
HubName string `json:"hub_name"`
HubID string `json:"hub_id"`
Version int `json:"version"`
Status ExecutorStatus `json:"status"`
StartTime *time.Time `json:"start_time,omitempty"`
LastActivity *time.Time `json:"last_activity,omitempty"`
MessageCount int64 `json:"message_count"`
ErrorCount int64 `json:"error_count"`
ActiveHandlers int `json:"active_handlers"`
Handlers []HandlerInfo `json:"handlers"`
}
ExecutorInfo represents information about a running executor
type ExecutorStatus ¶
type ExecutorStatus string
ExecutorStatus represents the status of a hub executor
const ( StatusStopped ExecutorStatus = "stopped" StatusStarting ExecutorStatus = "starting" StatusRunning ExecutorStatus = "running" StatusStopping ExecutorStatus = "stopping" StatusError ExecutorStatus = "error" )
type HandlerInfo ¶
type HandlerInfo struct {
Name string `json:"name"`
Type ProtocolType `json:"type"`
Status ExecutorStatus `json:"status"`
Protocol string `json:"protocol"`
Address string `json:"address,omitempty"`
}
HandlerInfo represents information about a protocol handler
type HistoryRecorder ¶
type HistoryRecorder interface {
RecordTransaction(ctx context.Context, history *models.IntHubHistory) (string, error)
CompleteTransaction(ctx context.Context, id string, response string, mappedData string, responseStatus int) error
FailTransaction(ctx context.Context, id string, errorMessage string) error
RecordAction(ctx context.Context, action *models.IntHubActionHistory) error
}
HistoryRecorder is the interface for recording integration hub history
type HubExecutor ¶
type HubExecutor struct {
// contains filtered or unexported fields
}
HubExecutor manages the execution of an integration hub configuration
func NewHubExecutor ¶
func NewHubExecutor(config HubExecutorConfig) *HubExecutor
NewHubExecutor creates a new hub executor for a specific hub configuration
func (*HubExecutor) CompleteTransaction ¶
func (e *HubExecutor) CompleteTransaction(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
CompleteTransaction marks a transaction as completed
func (*HubExecutor) FailTransaction ¶
func (e *HubExecutor) FailTransaction(ctx context.Context, payload *MessagePayload, errorMessage string)
FailTransaction marks a transaction as failed
func (*HubExecutor) GetHub ¶
func (e *HubExecutor) GetHub() *models.IntegrationHub
GetHub returns the hub configuration
func (*HubExecutor) GetHubName ¶
func (e *HubExecutor) GetHubName() string
GetHubName returns the hub name
func (*HubExecutor) GetInfo ¶
func (e *HubExecutor) GetInfo() ExecutorInfo
GetInfo returns information about the executor
func (*HubExecutor) Start ¶
func (e *HubExecutor) Start() error
Start starts the hub executor and all configured handlers
func (*HubExecutor) Status ¶
func (e *HubExecutor) Status() ExecutorStatus
Status returns the current executor status
func (*HubExecutor) Stop ¶
func (e *HubExecutor) Stop() error
Stop stops the hub executor and all handlers
type HubExecutorConfig ¶
type HubExecutorConfig struct {
Hub *models.IntegrationHub
HubName string
DestinationExec DestinationExecutor
HistoryRecorder HistoryRecorder
OnMessageReceive func(payload *MessagePayload)
OnError func(err error, source string)
}
HubExecutorConfig holds configuration for a hub executor
type HubExecutorController ¶
type HubExecutorController struct {
// contains filtered or unexported fields
}
HubExecutorController handles HTTP requests for hub executor management
func NewHubExecutorController ¶
func NewHubExecutorController(manager *HubExecutorManager) *HubExecutorController
NewHubExecutorController creates a new hub executor controller
func (*HubExecutorController) GetExecutorInfo ¶
func (c *HubExecutorController) GetExecutorInfo(ctx *gin.Context)
GetExecutorInfo handles GET /hubexecutor/info/:instanceName @Summary Get executor detailed info @Description Gets detailed information about a hub executor @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 404 {object} map[string]interface{} @Router /hubexecutor/info/{name} [get]
func (*HubExecutorController) GetExecutorStatus ¶
func (c *HubExecutorController) GetExecutorStatus(ctx *gin.Context)
GetExecutorStatus handles GET /hubexecutor/status/:instanceName @Summary Get executor status @Description Gets the status of a hub executor @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/status/{name} [get]
func (*HubExecutorController) ListExecutors ¶
func (c *HubExecutorController) ListExecutors(ctx *gin.Context)
ListExecutors handles GET /hubexecutor/list @Summary List all running executors @Description Returns information about all running hub executors @Tags HubExecutor @Produce json @Success 200 {object} map[string]interface{} @Router /hubexecutor/list [get]
func (*HubExecutorController) RegisterRoutes ¶
func (c *HubExecutorController) RegisterRoutes(router *gin.RouterGroup)
RegisterRoutes registers the hub executor API routes
func (*HubExecutorController) RestartExecutor ¶
func (c *HubExecutorController) RestartExecutor(ctx *gin.Context)
RestartExecutor handles POST /hubexecutor/restart/:instanceName @Summary Restart executor @Description Restarts a hub executor for the specified instance name @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/restart/{name} [post]
func (*HubExecutorController) StartAllExecutors ¶
func (c *HubExecutorController) StartAllExecutors(ctx *gin.Context)
StartAllExecutors handles POST /hubexecutor/start-all @Summary Start all enabled hub executors @Description Starts executors for all enabled hub configurations @Tags HubExecutor @Produce json @Success 200 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/start-all [post]
func (*HubExecutorController) StartExecutor ¶
func (c *HubExecutorController) StartExecutor(ctx *gin.Context)
StartExecutor handles POST /hubexecutor/start/:instanceName @Summary Start executor by instance name @Description Starts a hub executor for the specified instance name @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/start/{name} [post]
func (*HubExecutorController) StartExecutorByHubID ¶
func (c *HubExecutorController) StartExecutorByHubID(ctx *gin.Context)
StartExecutorByHubID handles POST /hubexecutor/start-by-id/:hubId @Summary Start executor by hub ID @Description Starts a hub executor using a specific hub configuration ID @Tags HubExecutor @Produce json @Param hubId path string true "Hub ID" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/start-by-id/{hubId} [post]
func (*HubExecutorController) StopAllExecutors ¶
func (c *HubExecutorController) StopAllExecutors(ctx *gin.Context)
StopAllExecutors handles POST /hubexecutor/stop-all @Summary Stop all running executors @Description Stops all currently running hub executors @Tags HubExecutor @Produce json @Success 200 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/stop-all [post]
func (*HubExecutorController) StopExecutor ¶
func (c *HubExecutorController) StopExecutor(ctx *gin.Context)
StopExecutor handles POST /hubexecutor/stop/:instanceName @Summary Stop executor @Description Stops a hub executor for the specified instance name @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/stop/{name} [post]
type HubExecutorManager ¶
type HubExecutorManager struct {
// contains filtered or unexported fields
}
HubExecutorManager manages multiple hub executors
func GetGlobalManager ¶
func GetGlobalManager() *HubExecutorManager
GetGlobalManager returns the global hub executor manager
func InitGlobalManager ¶
func InitGlobalManager(config ManagerConfig) *HubExecutorManager
InitGlobalManager initializes and sets the global manager
func NewHubExecutorManager ¶
func NewHubExecutorManager(config ManagerConfig) *HubExecutorManager
NewHubExecutorManager creates a new hub executor manager
func (*HubExecutorManager) GetExecutorInfo ¶
func (m *HubExecutorManager) GetExecutorInfo(hubName string) (*ExecutorInfo, error)
GetExecutorInfo gets detailed info for a specific executor
func (*HubExecutorManager) GetExecutorStatus ¶
func (m *HubExecutorManager) GetExecutorStatus(hubName string) (ExecutorStatus, error)
GetExecutorStatus gets the status of a specific executor
func (*HubExecutorManager) GetRunningCount ¶
func (m *HubExecutorManager) GetRunningCount() int
GetRunningCount returns the number of running executors
func (*HubExecutorManager) IsRunning ¶
func (m *HubExecutorManager) IsRunning(hubName string) bool
IsRunning checks if an executor is running for the given name
func (*HubExecutorManager) ListExecutors ¶
func (m *HubExecutorManager) ListExecutors() []ExecutorInfo
ListExecutors returns information about all running executors
func (*HubExecutorManager) RestartExecutor ¶
func (m *HubExecutorManager) RestartExecutor(hubName string) error
RestartExecutor restarts a hub executor
func (*HubExecutorManager) SendToEndpoint ¶
func (m *HubExecutorManager) SendToEndpoint( ctx context.Context, hubID string, protocolGroupID string, endpointID string, payload []byte, contentType string, method string, ) (*OutboundSendResult, error)
SendToEndpoint sends payload to a specific hub endpoint using the protocol configured for that group. It loads the hub configuration by ID, resolves the protocol group and endpoint, then dispatches to the appropriate protocol implementation (HTTP/REST, MQTT, Kafka, MCP, etc.).
func (*HubExecutorManager) SetAgentGatewayExecutor ¶
func (m *HubExecutorManager) SetAgentGatewayExecutor(fn func(ctx context.Context, gatewayID, input string) (string, error))
SetAgentGatewayExecutor sets the agent gateway execution callback on the destination executor.
func (*HubExecutorManager) SetDestinationExecutor ¶
func (m *HubExecutorManager) SetDestinationExecutor(executor DestinationExecutor)
SetDestinationExecutor sets the destination executor
func (*HubExecutorManager) SetHubLoader ¶
func (m *HubExecutorManager) SetHubLoader(loader HubLoader)
SetHubLoader sets the hub loader
func (*HubExecutorManager) SetJobExecutor ¶
func (m *HubExecutorManager) SetJobExecutor(executor JobExecutorFunc)
SetJobExecutor sets the job executor function on the destination executor
func (*HubExecutorManager) StartAll ¶
func (m *HubExecutorManager) StartAll() error
StartAll starts executors for all enabled hubs
func (*HubExecutorManager) StartAllExecutors ¶
func (m *HubExecutorManager) StartAllExecutors() map[string]error
StartAllExecutors starts executors for all enabled hubs and returns results map
func (*HubExecutorManager) StartExecutor ¶
func (m *HubExecutorManager) StartExecutor(hubName string) error
StartExecutor starts a hub executor for the given hub name
func (*HubExecutorManager) StartExecutorByHubID ¶
func (m *HubExecutorManager) StartExecutorByHubID(hubID string) error
StartExecutorByHubID starts a hub executor using a specific hub configuration ID
func (*HubExecutorManager) StopAll ¶
func (m *HubExecutorManager) StopAll() error
StopAll stops all running executors
func (*HubExecutorManager) StopExecutor ¶
func (m *HubExecutorManager) StopExecutor(hubName string) error
StopExecutor stops a hub executor by hub name
type HubExecutorState ¶
type HubExecutorState struct {
Status ExecutorStatus
StartTime *time.Time
LastActivity *time.Time
MessageCount int64
ErrorCount int64
ActiveHandlers int
Handlers map[string]ProtocolHandler
OPCTagValues map[string]*OPCTagValue // tag ID -> current value
// contains filtered or unexported fields
}
HubExecutorState holds the runtime state of a hub executor
func NewHubExecutorState ¶
func NewHubExecutorState() *HubExecutorState
NewHubExecutorState creates a new executor state
func (*HubExecutorState) GetOPCTagValue ¶
func (s *HubExecutorState) GetOPCTagValue(tagID string) *OPCTagValue
GetOPCTagValue gets an OPC tag value
func (*HubExecutorState) GetStatus ¶
func (s *HubExecutorState) GetStatus() ExecutorStatus
GetStatus gets the executor status
func (*HubExecutorState) IncrementErrorCount ¶
func (s *HubExecutorState) IncrementErrorCount()
IncrementErrorCount increments the error counter
func (*HubExecutorState) IncrementMessageCount ¶
func (s *HubExecutorState) IncrementMessageCount()
IncrementMessageCount increments the message counter
func (*HubExecutorState) SetStatus ¶
func (s *HubExecutorState) SetStatus(status ExecutorStatus)
SetStatus sets the executor status
func (*HubExecutorState) UpdateOPCTagValue ¶
func (s *HubExecutorState) UpdateOPCTagValue(tagID string, value *OPCTagValue)
UpdateOPCTagValue updates an OPC tag value
type HubLoader ¶
type HubLoader interface {
// LoadByHubName loads the default hub configuration for a hub name
LoadByHubName(hubName string) (*models.IntegrationHub, error)
// LoadByID loads a hub configuration by its ID
LoadByID(hubID string) (*models.IntegrationHub, error)
// LoadAll loads all enabled hub configurations
LoadAll() ([]*models.IntegrationHub, error)
}
HubLoader is an interface for loading hub configurations
type JobExecutorFunc ¶
JobExecutorFunc is a function type for executing jobs
type KafkaHandler ¶
type KafkaHandler struct {
// contains filtered or unexported fields
}
KafkaHandler handles Kafka protocol
func NewKafkaHandler ¶
func NewKafkaHandler(config KafkaHandlerConfig) *KafkaHandler
NewKafkaHandler creates a new Kafka handler
func (*KafkaHandler) Start ¶
func (h *KafkaHandler) Start(ctx context.Context) error
Start starts the Kafka handler
func (*KafkaHandler) Status ¶
func (h *KafkaHandler) Status() ExecutorStatus
Status returns the current status
func (*KafkaHandler) Stop ¶
func (h *KafkaHandler) Stop(ctx context.Context) error
Stop stops the Kafka handler
func (*KafkaHandler) Type ¶
func (h *KafkaHandler) Type() ProtocolType
Type returns the protocol type
type KafkaHandlerConfig ¶
type KafkaHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
BrokerConfig *models.MessageBrokerConfig
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
KafkaHandlerConfig holds configuration for Kafka handler
type MCPClientHandler ¶
type MCPClientHandler struct {
// contains filtered or unexported fields
}
MCPClientHandler connects to an MCP server (JSON-RPC 2.0 over HTTP) and can invoke named tools as part of Integration Hub outbound message routing.
func NewMCPClientHandler ¶
func NewMCPClientHandler(config MCPClientHandlerConfig) *MCPClientHandler
NewMCPClientHandler creates a new MCP client handler.
func (*MCPClientHandler) CallTool ¶
CallTool invokes a named tool on the MCP server with the given JSON arguments string. It uses JSON-RPC 2.0 over HTTP and returns the concatenated text content from the response.
func (*MCPClientHandler) Name ¶
func (h *MCPClientHandler) Name() string
Name returns the handler name.
func (*MCPClientHandler) Start ¶
func (h *MCPClientHandler) Start(ctx context.Context) error
Start initializes the MCP session by sending the JSON-RPC 2.0 "initialize" request.
func (*MCPClientHandler) Status ¶
func (h *MCPClientHandler) Status() ExecutorStatus
Status returns the current handler status.
func (*MCPClientHandler) Stop ¶
func (h *MCPClientHandler) Stop(ctx context.Context) error
Stop sends an MCP shutdown notification and marks the handler as stopped.
func (*MCPClientHandler) Type ¶
func (h *MCPClientHandler) Type() ProtocolType
Type returns the protocol type.
type MCPClientHandlerConfig ¶
type MCPClientHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
MCPConfig *models.MCPProtocolConfig
Direction string
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnError func(err error, source string)
State *HubExecutorState
}
MCPClientHandlerConfig holds configuration for the MCP client handler.
type MQTTHandler ¶
type MQTTHandler struct {
// contains filtered or unexported fields
}
MQTTHandler handles MQTT protocol
func NewMQTTHandler ¶
func NewMQTTHandler(config MQTTHandlerConfig) *MQTTHandler
NewMQTTHandler creates a new MQTT handler
func (*MQTTHandler) Start ¶
func (h *MQTTHandler) Start(ctx context.Context) error
Start starts the MQTT handler
func (*MQTTHandler) Status ¶
func (h *MQTTHandler) Status() ExecutorStatus
Status returns the current status
type MQTTHandlerConfig ¶
type MQTTHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
BrokerConfig *models.MessageBrokerConfig
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
MQTTHandlerConfig holds configuration for MQTT handler
type ManagerConfig ¶
type ManagerConfig struct {
HubLoader HubLoader
DestExecutor DestinationExecutor
HistoryRecorder HistoryRecorder
OnMessage func(payload *MessagePayload)
OnError func(err error, source string)
}
ManagerConfig holds configuration for the hub executor manager
type MessagePayload ¶
type MessagePayload struct {
TransactionID string
Protocol ProtocolType
Direction string // "inbound" or "outbound"
EndpointID string
Path string
Method string
Headers map[string]string
Body []byte
ContentType string
Metadata map[string]interface{}
Timestamp time.Time
}
MessagePayload represents an incoming message payload
type MongoHubLoader ¶
type MongoHubLoader struct {
// contains filtered or unexported fields
}
MongoHubLoader loads hub configurations from MongoDB
func NewMongoHubLoader ¶
func NewMongoHubLoader(config MongoHubLoaderConfig) *MongoHubLoader
NewMongoHubLoader creates a new MongoDB-based hub loader
func (*MongoHubLoader) LoadAll ¶
func (l *MongoHubLoader) LoadAll() ([]*models.IntegrationHub, error)
LoadAll loads all enabled hub configurations
func (*MongoHubLoader) LoadByHubName ¶
func (l *MongoHubLoader) LoadByHubName(hubName string) (*models.IntegrationHub, error)
LoadByHubName loads the default hub configuration for a hub name
func (*MongoHubLoader) LoadByID ¶
func (l *MongoHubLoader) LoadByID(hubID string) (*models.IntegrationHub, error)
LoadByID loads a hub configuration by its ID
type MongoHubLoaderConfig ¶
type MongoHubLoaderConfig struct {
GetHubByHubName func(name string) (*models.IntegrationHub, error)
GetHubByID func(hubID string) (*models.IntegrationHub, error)
GetAllHubs func() ([]*models.IntegrationHub, error)
}
MongoHubLoaderConfig holds configuration for MongoHubLoader
type OPCAction ¶
type OPCAction struct {
ID string
Name string
TriggerMethod TriggerMethod
MonitoredTags []string
ConditionScript string
Destinations []Destination
IntervalMs int
}
OPCAction represents an action triggered by OPC UA data changes
type OPCTag ¶
type OPCTag struct {
ID string `json:"id"`
TagName string `json:"tagName"`
Address string `json:"address"`
DataType string `json:"dataType"`
IsArray bool `json:"isArray"`
ArraySize int `json:"arraySize,omitempty"`
Permission string `json:"permission"`
SampleInterval int `json:"sampleInterval"` // milliseconds
Deadband float64 `json:"deadband,omitempty"`
Description string `json:"description,omitempty"`
Unit string `json:"unit,omitempty"`
ScaleFactor float64 `json:"scaleFactor,omitempty"`
Offset float64 `json:"offset,omitempty"`
Enabled bool `json:"enabled"`
}
OPCTag represents an OPC UA tag configuration
type OPCTagValue ¶
type OPCTagValue struct {
TagID string
TagName string
Value interface{}
Quality int
Timestamp time.Time
}
OPCTagValue represents a current OPC tag value
type OPCUAHandler ¶
type OPCUAHandler struct {
// contains filtered or unexported fields
}
OPCUAHandler handles OPC UA protocol
func NewOPCUAHandler ¶
func NewOPCUAHandler(config OPCUAHandlerConfig) *OPCUAHandler
NewOPCUAHandler creates a new OPC UA handler
func (*OPCUAHandler) Start ¶
func (h *OPCUAHandler) Start(ctx context.Context) error
Start starts the OPC UA handler
func (*OPCUAHandler) Status ¶
func (h *OPCUAHandler) Status() ExecutorStatus
Status returns the current status
func (*OPCUAHandler) Stop ¶
func (h *OPCUAHandler) Stop(ctx context.Context) error
Stop stops the OPC UA handler
func (*OPCUAHandler) Type ¶
func (h *OPCUAHandler) Type() ProtocolType
Type returns the protocol type
type OPCUAHandlerConfig ¶
type OPCUAHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
OPCUAHandlerConfig holds configuration for OPC UA handler
type OutboundSendResult ¶
OutboundSendResult is the result of a protocol-aware outbound send.
type ProtocolHandler ¶
type ProtocolHandler interface {
// Start starts the protocol handler
Start(ctx context.Context) error
// Stop stops the protocol handler
Stop(ctx context.Context) error
// Status returns the current status
Status() ExecutorStatus
// Name returns the handler name/identifier
Name() string
// Type returns the protocol type
Type() ProtocolType
}
ProtocolHandler is the interface for all protocol handlers
type ProtocolType ¶
type ProtocolType string
ProtocolType represents supported protocol types
const ( ProtocolREST ProtocolType = "REST API" ProtocolSOAP ProtocolType = "SOAP API" ProtocolGraphQL ProtocolType = "GraphQL" ProtocolMQTT ProtocolType = "MQTT Client" ProtocolKafka ProtocolType = "Kafka" ProtocolActiveMQ ProtocolType = "ActiveMQ" ProtocolOPCUA ProtocolType = "OPC UA" ProtocolTCP ProtocolType = "TCP" ProtocolSignalR ProtocolType = "SignalR" // ProtocolMCP is the Model Context Protocol (JSON-RPC 2.0 over HTTP / stdio) ProtocolMCP ProtocolType = "MCP" )
type SignalRHandler ¶
type SignalRHandler struct {
// contains filtered or unexported fields
}
SignalRHandler handles SignalR protocol
func NewSignalRHandler ¶
func NewSignalRHandler(config SignalRHandlerConfig) *SignalRHandler
NewSignalRHandler creates a new SignalR handler
func (*SignalRHandler) Start ¶
func (h *SignalRHandler) Start(ctx context.Context) error
Start starts the SignalR handler
func (*SignalRHandler) Status ¶
func (h *SignalRHandler) Status() ExecutorStatus
Status returns the current status
func (*SignalRHandler) Stop ¶
func (h *SignalRHandler) Stop(ctx context.Context) error
Stop stops the SignalR handler
func (*SignalRHandler) Type ¶
func (h *SignalRHandler) Type() ProtocolType
Type returns the protocol type
type SignalRHandlerConfig ¶
type SignalRHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
SignalRHandlerConfig holds configuration for SignalR handler
type TCPHandler ¶
type TCPHandler struct {
// contains filtered or unexported fields
}
TCPHandler handles TCP protocol
func NewTCPHandler ¶
func NewTCPHandler(config TCPHandlerConfig) *TCPHandler
NewTCPHandler creates a new TCP handler
func (*TCPHandler) Start ¶
func (h *TCPHandler) Start(ctx context.Context) error
Start starts the TCP handler
func (*TCPHandler) Status ¶
func (h *TCPHandler) Status() ExecutorStatus
Status returns the current status
type TCPHandlerConfig ¶
type TCPHandlerConfig struct {
Name string
ProtocolGroup *models.HubSimpleProtocolGroup
Direction string
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
TCPHandlerConfig holds configuration for TCP handler
type TriggerMethod ¶
type TriggerMethod string
TriggerMethod represents action trigger methods
const ( TriggerInterval TriggerMethod = "interval" TriggerStartup TriggerMethod = "startup" TriggerShutdown TriggerMethod = "shutdown" TriggerDataChange TriggerMethod = "datachange" )
type WebServerFromProtocolGroupConfig ¶
type WebServerFromProtocolGroupConfig struct {
ProtocolGroup *models.HubSimpleProtocolGroup
Protocol ProtocolType
Port int
DestExecutor DestinationExecutor
OnMessage func(payload *MessagePayload)
OnComplete func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
OnFail func(ctx context.Context, payload *MessagePayload, errorMessage string)
OnError func(err error, source string)
State *HubExecutorState
}
WebServerFromProtocolGroupConfig holds config for creating WebServerHandler from protocol group
type WebServerHandler ¶
type WebServerHandler struct {
// contains filtered or unexported fields
}
WebServerHandler handles REST, SOAP, and GraphQL protocols by creating HTTP servers
func NewWebServerHandlerFromProtocolGroup ¶
func NewWebServerHandlerFromProtocolGroup(config WebServerFromProtocolGroupConfig) *WebServerHandler
NewWebServerHandlerFromProtocolGroup creates a web server handler from protocol group config
func (*WebServerHandler) Name ¶
func (h *WebServerHandler) Name() string
Name returns the handler name
func (*WebServerHandler) Start ¶
func (h *WebServerHandler) Start(ctx context.Context) error
Start starts the web server
func (*WebServerHandler) Status ¶
func (h *WebServerHandler) Status() ExecutorStatus
Status returns the current status
func (*WebServerHandler) Stop ¶
func (h *WebServerHandler) Stop(ctx context.Context) error
Stop stops the web server
func (*WebServerHandler) Type ¶
func (h *WebServerHandler) Type() ProtocolType
Type returns the protocol type