hubexecutor

package
v0.0.0-...-99e730b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 17 Imported by: 0

README

Integration Hub Executor

Backend execution engine for integration hub configurations.

Overview

The Hub Executor system can execute tasks per integration hub configuration by instance name. One executor instance runs all configured inbound and outbound protocols.

Architecture

+------------------------+
|  HubExecutorManager    |  Manages multiple executors
+------------------------+
         |
         v
+------------------------+
|     HubExecutor        |  Per-instance execution
+------------------------+
         |
    +----+----+----+----+
    |    |    |    |    |
    v    v    v    v    v
+------+------+------+------+------+
| Web  | MQTT | Kafka| OPC  | TCP  |  Protocol Handlers
|Server| Hdlr | Hdlr | UA   | Hdlr |
+------+------+------+------+------+
         |
         v
+------------------------+
| DestinationExecutor    |  Execute configured actions
+------------------------+
         |
    +----+----+----+----+
    v    v    v    v    v
  Job  Transcode  Outbound  Script

Files

File Description
types.go Core types, interfaces, and status enums
destination.go Destination executor for Job, Transcode, Outbound, Script
handlers.go Protocol handlers (WebServer, MQTT, Kafka, ActiveMQ, OPC UA, TCP, SignalR)
hub_executor.go Main executor that manages handlers per hub
manager.go Manager for multiple hub executors

Protocol Handlers

Handler Protocol Status
WebServerHandler REST, SOAP, GraphQL ✅ Implemented
MQTTHandler MQTT ✅ Implemented
KafkaHandler Kafka 🔧 Skeleton
ActiveMQHandler ActiveMQ 🔧 Skeleton
OPCUAHandler OPC UA ✅ Implemented (needs OPC UA client integration)
TCPHandler TCP 🔧 Skeleton
SignalRHandler SignalR 🔧 Skeleton

Destination Types

Type Description
Job Execute IAC jobs with parameters
Execute Transcode Transform data using mapping definitions
Route to Outbound Forward to external HTTP endpoints
Custom Script Execute JavaScript using otto VM

API Endpoints

Method Endpoint Description
POST /hubexecutor/start/:instanceName Start executor by instance name
POST /hubexecutor/start-by-id/:hubId Start executor by hub ID
POST /hubexecutor/stop/:instanceName Stop executor
POST /hubexecutor/restart/:instanceName Restart executor
GET /hubexecutor/status/:instanceName Get executor status
GET /hubexecutor/info/:instanceName Get executor detailed info
GET /hubexecutor/list List all running executors
POST /hubexecutor/start-all Start all enabled hubs
POST /hubexecutor/stop-all Stop all executors

Usage

// Initialize manager
manager := hubexecutor.InitGlobalManager(hubexecutor.ManagerConfig{
    HubLoader: hubexecutor.NewMongoHubLoader(hubexecutor.MongoHubLoaderConfig{
        GetHubByInstanceName: service.GetDefaultHub,
        GetHubByID:           service.GetHub,
        GetAllHubs:           service.GetDefaultEnabledHubs,
    }),
})

// Set job executor
manager.SetJobExecutor(func(ctx context.Context, jobName string, params map[string]interface{}) error {
    return jobService.ExecuteJob(ctx, jobName, params)
})

// Start executor for instance
err := manager.StartExecutor("my-hub-instance")

// Get status
info, _ := manager.GetExecutorInfo("my-hub-instance")

// Stop all
manager.StopAll()

Route Registration

controller := hubexecutor.NewHubExecutorController(manager)
controller.RegisterRoutes(router.Group("/api"))

OPC UA Handler Features

  • Parse OPC tags from protocol group configuration
  • Action trigger support: interval, startup, shutdown, datachange
  • Condition script evaluation (JavaScript)
  • Monitored tags configuration

Configuration

Hub configuration uses the IntegrationHub model:

  • Inbound.ProtocolGroups[] - Inbound protocol configurations
  • Outbound.ProtocolGroups[] - Outbound protocol configurations
  • ProtocolGroup.BrokerConfig - Message broker settings
  • ProtocolGroup.Endpoints[] - Individual endpoints
  • Endpoint.Handler.RouteIDs[] - Destination job references
  • Endpoint.OverrideConfig.destinations[] - Custom destinations
  • Endpoint.OverrideConfig.opc_tags[] - OPC UA tags
  • Endpoint.OverrideConfig.action_trigger - Action trigger config

Instance Roles

The hub executor integrates with the IAC instance roles system. Configure the integration_hub role in configuration.json:

{
  "roles": {
    "roles": [
      {
        "type": "integration_hub",
        "enabled": true,
        "integration_hub": {
          "instance_names": ["my-hub-1", "my-hub-2"],
          "auto_start": true,
          "enable_api": true
        }
      }
    ]
  }
}
Role Configuration Options
Field Type Description
instance_names string[] Hub instance names to run (empty = all enabled defaults)
auto_start bool Auto-start hub executors on instance startup
enable_api bool Enable the hub executor management API endpoints
Available Roles
Role Description
main Main API endpoints and portal
integration_hub Integration hub execution
signalr SignalR server
job_executor Background job processing

One instance can run multiple roles. At least one role must be enabled.

Role-Based Execution

The hub executor is now fully integrated with the IAC instance roles system. When the integration_hub role is enabled:

  1. The HubExecutorManager is automatically initialized with the proper hub loader
  2. Hub executor API endpoints are registered if enable_api is true
  3. Hub executors auto-start if auto_start is true
How It Works

The role initialization happens in main.go via the RoleInitializer:

// Integration Hub role callback
roleInitializer.SetIntegrationHubInitializer(
    func(ctx context.Context, config *configuration.IntegrationHubRoleConfig, router *gin.Engine) error {
        // Initialize the hub executor manager
        manager := hubexecutor.InitGlobalManager(hubexecutor.ManagerConfig{
            HubLoader: hubexecutor.NewMongoHubLoader(hubexecutor.MongoHubLoaderConfig{
                GetHubByInstanceName: services.GetDefaultHub,
                GetHubByID:           services.GetHub,
                GetAllHubs:           services.GetDefaultEnabledHubs,
            }),
        })

        // Set job executor callback for executing IAC jobs
        manager.SetJobExecutor(func(ctx context.Context, jobName string, params map[string]interface{}) error {
            return jobqueue.ExecuteJobByName(ctx, jobName, params)
        })

        // ... register routes and auto-start
    },
    func(ctx context.Context) error {
        // Shutdown logic
    },
)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func EvaluateCondition

func EvaluateCondition(script string, newValue, oldValue interface{}, tagName string) (bool, error)

EvaluateCondition evaluates a JavaScript condition script

func SetGlobalManager

func SetGlobalManager(manager *HubExecutorManager)

SetGlobalManager sets the global hub executor manager

func StartGlobalExecutor

func StartGlobalExecutor(hubName string) error

StartGlobalExecutor starts an executor using the global manager

func StopGlobalExecutor

func StopGlobalExecutor(hubName string) error

StopGlobalExecutor stops an executor using the global manager

Types

type ActionTrigger

type ActionTrigger struct {
	Method          TriggerMethod `json:"method"`
	IntervalMs      int           `json:"intervalMs,omitempty"`
	MonitoredTags   []string      `json:"monitoredTags,omitempty"`
	ConditionScript string        `json:"conditionScript,omitempty"`
}

ActionTrigger represents an action trigger configuration

type ActiveMQHandler

type ActiveMQHandler struct {
	// contains filtered or unexported fields
}

ActiveMQHandler handles ActiveMQ protocol

func NewActiveMQHandler

func NewActiveMQHandler(config ActiveMQHandlerConfig) *ActiveMQHandler

NewActiveMQHandler creates a new ActiveMQ handler

func (*ActiveMQHandler) Name

func (h *ActiveMQHandler) Name() string

Name returns the handler name

func (*ActiveMQHandler) Start

func (h *ActiveMQHandler) Start(ctx context.Context) error

Start starts the ActiveMQ handler

func (*ActiveMQHandler) Status

func (h *ActiveMQHandler) Status() ExecutorStatus

Status returns the current status

func (*ActiveMQHandler) Stop

func (h *ActiveMQHandler) Stop(ctx context.Context) error

Stop stops the ActiveMQ handler

func (*ActiveMQHandler) Type

func (h *ActiveMQHandler) Type() ProtocolType

Type returns the protocol type

type ActiveMQHandlerConfig

type ActiveMQHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	BrokerConfig  *models.MessageBrokerConfig
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

ActiveMQHandlerConfig holds configuration for ActiveMQ handler

type DefaultDestinationExecutor

type DefaultDestinationExecutor struct {
	// contains filtered or unexported fields
}

DefaultDestinationExecutor implements the DestinationExecutor interface

func NewDestinationExecutor

func NewDestinationExecutor() *DefaultDestinationExecutor

NewDestinationExecutor creates a new destination executor

func (*DefaultDestinationExecutor) Execute

Execute executes a destination with the given payload

func (*DefaultDestinationExecutor) SetAgentGatewayExecutor

func (d *DefaultDestinationExecutor) SetAgentGatewayExecutor(fn func(ctx context.Context, gatewayID, input string) (string, error))

SetAgentGatewayExecutor sets the agent gateway execution callback. Call this from contollershandlers.go after AgentGatewayService is initialized.

func (*DefaultDestinationExecutor) SetHistoryRecorder

func (d *DefaultDestinationExecutor) SetHistoryRecorder(recorder HistoryRecorder)

SetHistoryRecorder sets the history recorder for tracking outbound operations

func (*DefaultDestinationExecutor) SetHubInfo

func (d *DefaultDestinationExecutor) SetHubInfo(hubID, hubName string)

SetHubInfo sets the hub information for history recording

func (*DefaultDestinationExecutor) SetJobExecutor

func (d *DefaultDestinationExecutor) SetJobExecutor(executor func(ctx context.Context, jobName string, params map[string]interface{}) error)

SetJobExecutor sets the job executor function

type Destination

type Destination struct {
	ID        string                 `json:"id"`
	Type      string                 `json:"type"` // Job, Execute Transcode, Route to Outbound, Custom Script
	Target    string                 `json:"target,omitempty"`
	MappingID string                 `json:"mappingId,omitempty"`
	Config    map[string]interface{} `json:"config,omitempty"`
}

Destination represents a destination configuration

type DestinationExecutor

type DestinationExecutor interface {
	// Execute executes a destination with the given payload
	Execute(ctx context.Context, dest Destination, payload *MessagePayload) (*ExecutionResult, error)
}

DestinationExecutor is the interface for executing destinations

type ExecutionResult

type ExecutionResult struct {
	Success   bool
	Message   string
	Data      interface{}
	Error     error
	Duration  time.Duration
	Timestamp time.Time
}

ExecutionResult represents the result of destination execution

type ExecutorInfo

type ExecutorInfo struct {
	HubName        string         `json:"hub_name"`
	HubID          string         `json:"hub_id"`
	Version        int            `json:"version"`
	Status         ExecutorStatus `json:"status"`
	StartTime      *time.Time     `json:"start_time,omitempty"`
	LastActivity   *time.Time     `json:"last_activity,omitempty"`
	MessageCount   int64          `json:"message_count"`
	ErrorCount     int64          `json:"error_count"`
	ActiveHandlers int            `json:"active_handlers"`
	Handlers       []HandlerInfo  `json:"handlers"`
}

ExecutorInfo represents information about a running executor

type ExecutorStatus

type ExecutorStatus string

ExecutorStatus represents the status of a hub executor

const (
	StatusStopped  ExecutorStatus = "stopped"
	StatusStarting ExecutorStatus = "starting"
	StatusRunning  ExecutorStatus = "running"
	StatusStopping ExecutorStatus = "stopping"
	StatusError    ExecutorStatus = "error"
)

type HandlerInfo

type HandlerInfo struct {
	Name     string         `json:"name"`
	Type     ProtocolType   `json:"type"`
	Status   ExecutorStatus `json:"status"`
	Protocol string         `json:"protocol"`
	Address  string         `json:"address,omitempty"`
}

HandlerInfo represents information about a protocol handler

type HistoryRecorder

type HistoryRecorder interface {
	RecordTransaction(ctx context.Context, history *models.IntHubHistory) (string, error)
	CompleteTransaction(ctx context.Context, id string, response string, mappedData string, responseStatus int) error
	FailTransaction(ctx context.Context, id string, errorMessage string) error
	RecordAction(ctx context.Context, action *models.IntHubActionHistory) error
}

HistoryRecorder is the interface for recording integration hub history

type HubExecutor

type HubExecutor struct {
	// contains filtered or unexported fields
}

HubExecutor manages the execution of an integration hub configuration

func NewHubExecutor

func NewHubExecutor(config HubExecutorConfig) *HubExecutor

NewHubExecutor creates a new hub executor for a specific hub configuration

func (*HubExecutor) CompleteTransaction

func (e *HubExecutor) CompleteTransaction(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)

CompleteTransaction marks a transaction as completed

func (*HubExecutor) FailTransaction

func (e *HubExecutor) FailTransaction(ctx context.Context, payload *MessagePayload, errorMessage string)

FailTransaction marks a transaction as failed

func (*HubExecutor) GetHub

func (e *HubExecutor) GetHub() *models.IntegrationHub

GetHub returns the hub configuration

func (*HubExecutor) GetHubName

func (e *HubExecutor) GetHubName() string

GetHubName returns the hub name

func (*HubExecutor) GetInfo

func (e *HubExecutor) GetInfo() ExecutorInfo

GetInfo returns information about the executor

func (*HubExecutor) Start

func (e *HubExecutor) Start() error

Start starts the hub executor and all configured handlers

func (*HubExecutor) Status

func (e *HubExecutor) Status() ExecutorStatus

Status returns the current executor status

func (*HubExecutor) Stop

func (e *HubExecutor) Stop() error

Stop stops the hub executor and all handlers

type HubExecutorConfig

type HubExecutorConfig struct {
	Hub              *models.IntegrationHub
	HubName          string
	DestinationExec  DestinationExecutor
	HistoryRecorder  HistoryRecorder
	OnMessageReceive func(payload *MessagePayload)
	OnError          func(err error, source string)
}

HubExecutorConfig holds configuration for a hub executor

type HubExecutorController

type HubExecutorController struct {
	// contains filtered or unexported fields
}

HubExecutorController handles HTTP requests for hub executor management

func NewHubExecutorController

func NewHubExecutorController(manager *HubExecutorManager) *HubExecutorController

NewHubExecutorController creates a new hub executor controller

func (*HubExecutorController) GetExecutorInfo

func (c *HubExecutorController) GetExecutorInfo(ctx *gin.Context)

GetExecutorInfo handles GET /hubexecutor/info/:instanceName @Summary Get executor detailed info @Description Gets detailed information about a hub executor @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 404 {object} map[string]interface{} @Router /hubexecutor/info/{name} [get]

func (*HubExecutorController) GetExecutorStatus

func (c *HubExecutorController) GetExecutorStatus(ctx *gin.Context)

GetExecutorStatus handles GET /hubexecutor/status/:instanceName @Summary Get executor status @Description Gets the status of a hub executor @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/status/{name} [get]

func (*HubExecutorController) ListExecutors

func (c *HubExecutorController) ListExecutors(ctx *gin.Context)

ListExecutors handles GET /hubexecutor/list @Summary List all running executors @Description Returns information about all running hub executors @Tags HubExecutor @Produce json @Success 200 {object} map[string]interface{} @Router /hubexecutor/list [get]

func (*HubExecutorController) RegisterRoutes

func (c *HubExecutorController) RegisterRoutes(router *gin.RouterGroup)

RegisterRoutes registers the hub executor API routes

func (*HubExecutorController) RestartExecutor

func (c *HubExecutorController) RestartExecutor(ctx *gin.Context)

RestartExecutor handles POST /hubexecutor/restart/:instanceName @Summary Restart executor @Description Restarts a hub executor for the specified instance name @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/restart/{name} [post]

func (*HubExecutorController) StartAllExecutors

func (c *HubExecutorController) StartAllExecutors(ctx *gin.Context)

StartAllExecutors handles POST /hubexecutor/start-all @Summary Start all enabled hub executors @Description Starts executors for all enabled hub configurations @Tags HubExecutor @Produce json @Success 200 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/start-all [post]

func (*HubExecutorController) StartExecutor

func (c *HubExecutorController) StartExecutor(ctx *gin.Context)

StartExecutor handles POST /hubexecutor/start/:instanceName @Summary Start executor by instance name @Description Starts a hub executor for the specified instance name @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/start/{name} [post]

func (*HubExecutorController) StartExecutorByHubID

func (c *HubExecutorController) StartExecutorByHubID(ctx *gin.Context)

StartExecutorByHubID handles POST /hubexecutor/start-by-id/:hubId @Summary Start executor by hub ID @Description Starts a hub executor using a specific hub configuration ID @Tags HubExecutor @Produce json @Param hubId path string true "Hub ID" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/start-by-id/{hubId} [post]

func (*HubExecutorController) StopAllExecutors

func (c *HubExecutorController) StopAllExecutors(ctx *gin.Context)

StopAllExecutors handles POST /hubexecutor/stop-all @Summary Stop all running executors @Description Stops all currently running hub executors @Tags HubExecutor @Produce json @Success 200 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/stop-all [post]

func (*HubExecutorController) StopExecutor

func (c *HubExecutorController) StopExecutor(ctx *gin.Context)

StopExecutor handles POST /hubexecutor/stop/:instanceName @Summary Stop executor @Description Stops a hub executor for the specified instance name @Tags HubExecutor @Produce json @Param name path string true "Hub Name" @Success 200 {object} map[string]interface{} @Failure 400 {object} map[string]interface{} @Failure 500 {object} map[string]interface{} @Router /hubexecutor/stop/{name} [post]

type HubExecutorManager

type HubExecutorManager struct {
	// contains filtered or unexported fields
}

HubExecutorManager manages multiple hub executors

func GetGlobalManager

func GetGlobalManager() *HubExecutorManager

GetGlobalManager returns the global hub executor manager

func InitGlobalManager

func InitGlobalManager(config ManagerConfig) *HubExecutorManager

InitGlobalManager initializes and sets the global manager

func NewHubExecutorManager

func NewHubExecutorManager(config ManagerConfig) *HubExecutorManager

NewHubExecutorManager creates a new hub executor manager

func (*HubExecutorManager) GetExecutorInfo

func (m *HubExecutorManager) GetExecutorInfo(hubName string) (*ExecutorInfo, error)

GetExecutorInfo gets detailed info for a specific executor

func (*HubExecutorManager) GetExecutorStatus

func (m *HubExecutorManager) GetExecutorStatus(hubName string) (ExecutorStatus, error)

GetExecutorStatus gets the status of a specific executor

func (*HubExecutorManager) GetRunningCount

func (m *HubExecutorManager) GetRunningCount() int

GetRunningCount returns the number of running executors

func (*HubExecutorManager) IsRunning

func (m *HubExecutorManager) IsRunning(hubName string) bool

IsRunning checks if an executor is running for the given name

func (*HubExecutorManager) ListExecutors

func (m *HubExecutorManager) ListExecutors() []ExecutorInfo

ListExecutors returns information about all running executors

func (*HubExecutorManager) RestartExecutor

func (m *HubExecutorManager) RestartExecutor(hubName string) error

RestartExecutor restarts a hub executor

func (*HubExecutorManager) SendToEndpoint

func (m *HubExecutorManager) SendToEndpoint(
	ctx context.Context,
	hubID string,
	protocolGroupID string,
	endpointID string,
	payload []byte,
	contentType string,
	method string,
) (*OutboundSendResult, error)

SendToEndpoint sends payload to a specific hub endpoint using the protocol configured for that group. It loads the hub configuration by ID, resolves the protocol group and endpoint, then dispatches to the appropriate protocol implementation (HTTP/REST, MQTT, Kafka, MCP, etc.).

func (*HubExecutorManager) SetAgentGatewayExecutor

func (m *HubExecutorManager) SetAgentGatewayExecutor(fn func(ctx context.Context, gatewayID, input string) (string, error))

SetAgentGatewayExecutor sets the agent gateway execution callback on the destination executor.

func (*HubExecutorManager) SetDestinationExecutor

func (m *HubExecutorManager) SetDestinationExecutor(executor DestinationExecutor)

SetDestinationExecutor sets the destination executor

func (*HubExecutorManager) SetHubLoader

func (m *HubExecutorManager) SetHubLoader(loader HubLoader)

SetHubLoader sets the hub loader

func (*HubExecutorManager) SetJobExecutor

func (m *HubExecutorManager) SetJobExecutor(executor JobExecutorFunc)

SetJobExecutor sets the job executor function on the destination executor

func (*HubExecutorManager) StartAll

func (m *HubExecutorManager) StartAll() error

StartAll starts executors for all enabled hubs

func (*HubExecutorManager) StartAllExecutors

func (m *HubExecutorManager) StartAllExecutors() map[string]error

StartAllExecutors starts executors for all enabled hubs and returns results map

func (*HubExecutorManager) StartExecutor

func (m *HubExecutorManager) StartExecutor(hubName string) error

StartExecutor starts a hub executor for the given hub name

func (*HubExecutorManager) StartExecutorByHubID

func (m *HubExecutorManager) StartExecutorByHubID(hubID string) error

StartExecutorByHubID starts a hub executor using a specific hub configuration ID

func (*HubExecutorManager) StopAll

func (m *HubExecutorManager) StopAll() error

StopAll stops all running executors

func (*HubExecutorManager) StopExecutor

func (m *HubExecutorManager) StopExecutor(hubName string) error

StopExecutor stops a hub executor by hub name

type HubExecutorState

type HubExecutorState struct {
	Status         ExecutorStatus
	StartTime      *time.Time
	LastActivity   *time.Time
	MessageCount   int64
	ErrorCount     int64
	ActiveHandlers int
	Handlers       map[string]ProtocolHandler
	OPCTagValues   map[string]*OPCTagValue // tag ID -> current value
	// contains filtered or unexported fields
}

HubExecutorState holds the runtime state of a hub executor

func NewHubExecutorState

func NewHubExecutorState() *HubExecutorState

NewHubExecutorState creates a new executor state

func (*HubExecutorState) GetOPCTagValue

func (s *HubExecutorState) GetOPCTagValue(tagID string) *OPCTagValue

GetOPCTagValue gets an OPC tag value

func (*HubExecutorState) GetStatus

func (s *HubExecutorState) GetStatus() ExecutorStatus

GetStatus gets the executor status

func (*HubExecutorState) IncrementErrorCount

func (s *HubExecutorState) IncrementErrorCount()

IncrementErrorCount increments the error counter

func (*HubExecutorState) IncrementMessageCount

func (s *HubExecutorState) IncrementMessageCount()

IncrementMessageCount increments the message counter

func (*HubExecutorState) SetStatus

func (s *HubExecutorState) SetStatus(status ExecutorStatus)

SetStatus sets the executor status

func (*HubExecutorState) UpdateOPCTagValue

func (s *HubExecutorState) UpdateOPCTagValue(tagID string, value *OPCTagValue)

UpdateOPCTagValue updates an OPC tag value

type HubLoader

type HubLoader interface {
	// LoadByHubName loads the default hub configuration for a hub name
	LoadByHubName(hubName string) (*models.IntegrationHub, error)

	// LoadByID loads a hub configuration by its ID
	LoadByID(hubID string) (*models.IntegrationHub, error)

	// LoadAll loads all enabled hub configurations
	LoadAll() ([]*models.IntegrationHub, error)
}

HubLoader is an interface for loading hub configurations

type JobExecutorFunc

type JobExecutorFunc func(ctx context.Context, jobName string, params map[string]interface{}) error

JobExecutorFunc is a function type for executing jobs

type KafkaHandler

type KafkaHandler struct {
	// contains filtered or unexported fields
}

KafkaHandler handles Kafka protocol

func NewKafkaHandler

func NewKafkaHandler(config KafkaHandlerConfig) *KafkaHandler

NewKafkaHandler creates a new Kafka handler

func (*KafkaHandler) Name

func (h *KafkaHandler) Name() string

Name returns the handler name

func (*KafkaHandler) Start

func (h *KafkaHandler) Start(ctx context.Context) error

Start starts the Kafka handler

func (*KafkaHandler) Status

func (h *KafkaHandler) Status() ExecutorStatus

Status returns the current status

func (*KafkaHandler) Stop

func (h *KafkaHandler) Stop(ctx context.Context) error

Stop stops the Kafka handler

func (*KafkaHandler) Type

func (h *KafkaHandler) Type() ProtocolType

Type returns the protocol type

type KafkaHandlerConfig

type KafkaHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	BrokerConfig  *models.MessageBrokerConfig
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

KafkaHandlerConfig holds configuration for Kafka handler

type MCPClientHandler

type MCPClientHandler struct {
	// contains filtered or unexported fields
}

MCPClientHandler connects to an MCP server (JSON-RPC 2.0 over HTTP) and can invoke named tools as part of Integration Hub outbound message routing.

func NewMCPClientHandler

func NewMCPClientHandler(config MCPClientHandlerConfig) *MCPClientHandler

NewMCPClientHandler creates a new MCP client handler.

func (*MCPClientHandler) CallTool

func (h *MCPClientHandler) CallTool(ctx context.Context, toolName, argsJSON string) (string, error)

CallTool invokes a named tool on the MCP server with the given JSON arguments string. It uses JSON-RPC 2.0 over HTTP and returns the concatenated text content from the response.

func (*MCPClientHandler) Name

func (h *MCPClientHandler) Name() string

Name returns the handler name.

func (*MCPClientHandler) Start

func (h *MCPClientHandler) Start(ctx context.Context) error

Start initializes the MCP session by sending the JSON-RPC 2.0 "initialize" request.

func (*MCPClientHandler) Status

func (h *MCPClientHandler) Status() ExecutorStatus

Status returns the current handler status.

func (*MCPClientHandler) Stop

func (h *MCPClientHandler) Stop(ctx context.Context) error

Stop sends an MCP shutdown notification and marks the handler as stopped.

func (*MCPClientHandler) Type

func (h *MCPClientHandler) Type() ProtocolType

Type returns the protocol type.

type MCPClientHandlerConfig

type MCPClientHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	MCPConfig     *models.MCPProtocolConfig
	Direction     string
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

MCPClientHandlerConfig holds configuration for the MCP client handler.

type MQTTHandler

type MQTTHandler struct {
	// contains filtered or unexported fields
}

MQTTHandler handles MQTT protocol

func NewMQTTHandler

func NewMQTTHandler(config MQTTHandlerConfig) *MQTTHandler

NewMQTTHandler creates a new MQTT handler

func (*MQTTHandler) Name

func (h *MQTTHandler) Name() string

Name returns the handler name

func (*MQTTHandler) Start

func (h *MQTTHandler) Start(ctx context.Context) error

Start starts the MQTT handler

func (*MQTTHandler) Status

func (h *MQTTHandler) Status() ExecutorStatus

Status returns the current status

func (*MQTTHandler) Stop

func (h *MQTTHandler) Stop(ctx context.Context) error

Stop stops the MQTT handler

func (*MQTTHandler) Type

func (h *MQTTHandler) Type() ProtocolType

Type returns the protocol type

type MQTTHandlerConfig

type MQTTHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	BrokerConfig  *models.MessageBrokerConfig
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

MQTTHandlerConfig holds configuration for MQTT handler

type ManagerConfig

type ManagerConfig struct {
	HubLoader       HubLoader
	DestExecutor    DestinationExecutor
	HistoryRecorder HistoryRecorder
	OnMessage       func(payload *MessagePayload)
	OnError         func(err error, source string)
}

ManagerConfig holds configuration for the hub executor manager

type MessagePayload

type MessagePayload struct {
	TransactionID string
	Protocol      ProtocolType
	Direction     string // "inbound" or "outbound"
	EndpointID    string
	Path          string
	Method        string
	Headers       map[string]string
	Body          []byte
	ContentType   string
	Metadata      map[string]interface{}
	Timestamp     time.Time
}

MessagePayload represents an incoming message payload

type MongoHubLoader

type MongoHubLoader struct {
	// contains filtered or unexported fields
}

MongoHubLoader loads hub configurations from MongoDB

func NewMongoHubLoader

func NewMongoHubLoader(config MongoHubLoaderConfig) *MongoHubLoader

NewMongoHubLoader creates a new MongoDB-based hub loader

func (*MongoHubLoader) LoadAll

func (l *MongoHubLoader) LoadAll() ([]*models.IntegrationHub, error)

LoadAll loads all enabled hub configurations

func (*MongoHubLoader) LoadByHubName

func (l *MongoHubLoader) LoadByHubName(hubName string) (*models.IntegrationHub, error)

LoadByHubName loads the default hub configuration for a hub name

func (*MongoHubLoader) LoadByID

func (l *MongoHubLoader) LoadByID(hubID string) (*models.IntegrationHub, error)

LoadByID loads a hub configuration by its ID

type MongoHubLoaderConfig

type MongoHubLoaderConfig struct {
	GetHubByHubName func(name string) (*models.IntegrationHub, error)
	GetHubByID      func(hubID string) (*models.IntegrationHub, error)
	GetAllHubs      func() ([]*models.IntegrationHub, error)
}

MongoHubLoaderConfig holds configuration for MongoHubLoader

type OPCAction

type OPCAction struct {
	ID              string
	Name            string
	TriggerMethod   TriggerMethod
	MonitoredTags   []string
	ConditionScript string
	Destinations    []Destination
	IntervalMs      int
}

OPCAction represents an action triggered by OPC UA data changes

type OPCTag

type OPCTag struct {
	ID             string  `json:"id"`
	TagName        string  `json:"tagName"`
	Address        string  `json:"address"`
	DataType       string  `json:"dataType"`
	IsArray        bool    `json:"isArray"`
	ArraySize      int     `json:"arraySize,omitempty"`
	Permission     string  `json:"permission"`
	SampleInterval int     `json:"sampleInterval"` // milliseconds
	Deadband       float64 `json:"deadband,omitempty"`
	Description    string  `json:"description,omitempty"`
	Unit           string  `json:"unit,omitempty"`
	ScaleFactor    float64 `json:"scaleFactor,omitempty"`
	Offset         float64 `json:"offset,omitempty"`
	Enabled        bool    `json:"enabled"`
}

OPCTag represents an OPC UA tag configuration

type OPCTagValue

type OPCTagValue struct {
	TagID     string
	TagName   string
	Value     interface{}
	Quality   int
	Timestamp time.Time
}

OPCTagValue represents a current OPC tag value

type OPCUAHandler

type OPCUAHandler struct {
	// contains filtered or unexported fields
}

OPCUAHandler handles OPC UA protocol

func NewOPCUAHandler

func NewOPCUAHandler(config OPCUAHandlerConfig) *OPCUAHandler

NewOPCUAHandler creates a new OPC UA handler

func (*OPCUAHandler) Name

func (h *OPCUAHandler) Name() string

Name returns the handler name

func (*OPCUAHandler) Start

func (h *OPCUAHandler) Start(ctx context.Context) error

Start starts the OPC UA handler

func (*OPCUAHandler) Status

func (h *OPCUAHandler) Status() ExecutorStatus

Status returns the current status

func (*OPCUAHandler) Stop

func (h *OPCUAHandler) Stop(ctx context.Context) error

Stop stops the OPC UA handler

func (*OPCUAHandler) Type

func (h *OPCUAHandler) Type() ProtocolType

Type returns the protocol type

type OPCUAHandlerConfig

type OPCUAHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

OPCUAHandlerConfig holds configuration for OPC UA handler

type OutboundSendResult

type OutboundSendResult struct {
	ResponseBody string
	StatusCode   int
	Success      bool
}

OutboundSendResult is the result of a protocol-aware outbound send.

type ProtocolHandler

type ProtocolHandler interface {
	// Start starts the protocol handler
	Start(ctx context.Context) error

	// Stop stops the protocol handler
	Stop(ctx context.Context) error

	// Status returns the current status
	Status() ExecutorStatus

	// Name returns the handler name/identifier
	Name() string

	// Type returns the protocol type
	Type() ProtocolType
}

ProtocolHandler is the interface for all protocol handlers

type ProtocolType

type ProtocolType string

ProtocolType represents supported protocol types

const (
	ProtocolREST     ProtocolType = "REST API"
	ProtocolSOAP     ProtocolType = "SOAP API"
	ProtocolGraphQL  ProtocolType = "GraphQL"
	ProtocolMQTT     ProtocolType = "MQTT Client"
	ProtocolKafka    ProtocolType = "Kafka"
	ProtocolActiveMQ ProtocolType = "ActiveMQ"
	ProtocolOPCUA    ProtocolType = "OPC UA"
	ProtocolTCP      ProtocolType = "TCP"
	ProtocolSignalR  ProtocolType = "SignalR"
	// ProtocolMCP is the Model Context Protocol (JSON-RPC 2.0 over HTTP / stdio)
	ProtocolMCP ProtocolType = "MCP"
)

type SignalRHandler

type SignalRHandler struct {
	// contains filtered or unexported fields
}

SignalRHandler handles SignalR protocol

func NewSignalRHandler

func NewSignalRHandler(config SignalRHandlerConfig) *SignalRHandler

NewSignalRHandler creates a new SignalR handler

func (*SignalRHandler) Name

func (h *SignalRHandler) Name() string

Name returns the handler name

func (*SignalRHandler) Start

func (h *SignalRHandler) Start(ctx context.Context) error

Start starts the SignalR handler

func (*SignalRHandler) Status

func (h *SignalRHandler) Status() ExecutorStatus

Status returns the current status

func (*SignalRHandler) Stop

func (h *SignalRHandler) Stop(ctx context.Context) error

Stop stops the SignalR handler

func (*SignalRHandler) Type

func (h *SignalRHandler) Type() ProtocolType

Type returns the protocol type

type SignalRHandlerConfig

type SignalRHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

SignalRHandlerConfig holds configuration for SignalR handler

type TCPHandler

type TCPHandler struct {
	// contains filtered or unexported fields
}

TCPHandler handles TCP protocol

func NewTCPHandler

func NewTCPHandler(config TCPHandlerConfig) *TCPHandler

NewTCPHandler creates a new TCP handler

func (*TCPHandler) Name

func (h *TCPHandler) Name() string

Name returns the handler name

func (*TCPHandler) Start

func (h *TCPHandler) Start(ctx context.Context) error

Start starts the TCP handler

func (*TCPHandler) Status

func (h *TCPHandler) Status() ExecutorStatus

Status returns the current status

func (*TCPHandler) Stop

func (h *TCPHandler) Stop(ctx context.Context) error

Stop stops the TCP handler

func (*TCPHandler) Type

func (h *TCPHandler) Type() ProtocolType

Type returns the protocol type

type TCPHandlerConfig

type TCPHandlerConfig struct {
	Name          string
	ProtocolGroup *models.HubSimpleProtocolGroup
	Direction     string
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

TCPHandlerConfig holds configuration for TCP handler

type TriggerMethod

type TriggerMethod string

TriggerMethod represents action trigger methods

const (
	TriggerInterval   TriggerMethod = "interval"
	TriggerStartup    TriggerMethod = "startup"
	TriggerShutdown   TriggerMethod = "shutdown"
	TriggerDataChange TriggerMethod = "datachange"
)

type WebServerFromProtocolGroupConfig

type WebServerFromProtocolGroupConfig struct {
	ProtocolGroup *models.HubSimpleProtocolGroup
	Protocol      ProtocolType
	Port          int
	DestExecutor  DestinationExecutor
	OnMessage     func(payload *MessagePayload)
	OnComplete    func(ctx context.Context, payload *MessagePayload, response string, mappedData string, responseStatus int)
	OnFail        func(ctx context.Context, payload *MessagePayload, errorMessage string)
	OnError       func(err error, source string)
	State         *HubExecutorState
}

WebServerFromProtocolGroupConfig holds config for creating WebServerHandler from protocol group

type WebServerHandler

type WebServerHandler struct {
	// contains filtered or unexported fields
}

WebServerHandler handles REST, SOAP, and GraphQL protocols by creating HTTP servers

func NewWebServerHandlerFromProtocolGroup

func NewWebServerHandlerFromProtocolGroup(config WebServerFromProtocolGroupConfig) *WebServerHandler

NewWebServerHandlerFromProtocolGroup creates a web server handler from protocol group config

func (*WebServerHandler) Name

func (h *WebServerHandler) Name() string

Name returns the handler name

func (*WebServerHandler) Start

func (h *WebServerHandler) Start(ctx context.Context) error

Start starts the web server

func (*WebServerHandler) Status

func (h *WebServerHandler) Status() ExecutorStatus

Status returns the current status

func (*WebServerHandler) Stop

func (h *WebServerHandler) Stop(ctx context.Context) error

Stop stops the web server

func (*WebServerHandler) Type

func (h *WebServerHandler) Type() ProtocolType

Type returns the protocol type

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL