Documentation
¶
Overview ¶
Package datahub provides hierarchical integration hub schema definitions
Index ¶
- Constants
- type ArrayIterationContext
- type ArrayMapping
- type DataHub
- func (dh *DataHub) AddMapping(mapping *MappingDefinition)
- func (dh *DataHub) AddRoutingRule(rule *RoutingRule)
- func (dh *DataHub) Close() error
- func (dh *DataHub) Disable()
- func (dh *DataHub) Enable()
- func (dh *DataHub) GetAdapter(name string) (ProtocolAdapter, error)
- func (dh *DataHub) GetMapping(id string) (*MappingDefinition, error)
- func (dh *DataHub) GetMessageHistory(limit int) []MessageHistoryEntry
- func (dh *DataHub) IsEnabled() bool
- func (dh *DataHub) LoadMappingsFromFile(filePath string) error
- func (dh *DataHub) RegisterAdapter(name string, adapter ProtocolAdapter) error
- func (dh *DataHub) RouteMessage(envelope *MessageEnvelope) error
- type DirectionNode
- type FieldMapping
- type GraphQLAdapter
- func (a *GraphQLAdapter) Close() error
- func (a *GraphQLAdapter) GetProtocolName() string
- func (a *GraphQLAdapter) Health() error
- func (a *GraphQLAdapter) Initialize(config map[string]interface{}) error
- func (a *GraphQLAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *GraphQLAdapter) Send(envelope *MessageEnvelope) error
- type HierarchicalTreeNode
- type HubEndpoint
- type HubEndpointJob
- type HubInstance
- type HubMigrationLog
- type HubProtocolGroup
- type HubRoute
- type IntegrationJobCreator
- func (ijc *IntegrationJobCreator) CreateManualJob(ctx context.Context, jobDefinitionID string, customData map[string]interface{}) (string, error)
- func (ijc *IntegrationJobCreator) Disable()
- func (ijc *IntegrationJobCreator) Enable()
- func (ijc *IntegrationJobCreator) GetStats() map[string]interface{}
- func (ijc *IntegrationJobCreator) IsEnabled() bool
- func (ijc *IntegrationJobCreator) OnMessageReceived(ctx context.Context, protocol string, topic string, payload interface{}, ...) ([]string, error)
- func (ijc *IntegrationJobCreator) OnMessageSent(ctx context.Context, protocol string, destination string, payload interface{}, ...) (string, error)
- type JobConfig
- type JobConfigManager
- func (jcm *JobConfigManager) CreateJobPayload(jobDef *JobDefinition, messageData map[string]interface{}) map[string]interface{}
- func (jcm *JobConfigManager) Disable()
- func (jcm *JobConfigManager) Enable()
- func (jcm *JobConfigManager) EvaluateTriggerCondition(condition *MappingCondition, data map[string]interface{}) bool
- func (jcm *JobConfigManager) FindMatchingJobs(protocol string, topic string, messageData map[string]interface{}) []JobDefinition
- func (jcm *JobConfigManager) GetJobDefinition(jobID string) (*JobDefinition, error)
- func (jcm *JobConfigManager) GetJobsForProtocol(protocol string) []JobDefinition
- func (jcm *JobConfigManager) GetJobsForTrigger(triggerType string) []JobDefinition
- func (jcm *JobConfigManager) GetStats() map[string]interface{}
- func (jcm *JobConfigManager) IsEnabled() bool
- func (jcm *JobConfigManager) LoadFromFile(filePath string) error
- func (jcm *JobConfigManager) Reload() error
- type JobDefinition
- type JobHandler
- func (jh *JobHandler) Disable()
- func (jh *JobHandler) Enable()
- func (jh *JobHandler) ExecuteReceiveJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
- func (jh *JobHandler) ExecuteSendJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
- func (jh *JobHandler) ExecuteTransformJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
- func (jh *JobHandler) GetStats() map[string]interface{}
- func (jh *JobHandler) IsEnabled() bool
- type JobTrigger
- type KafkaAdapter
- func (a *KafkaAdapter) Close() error
- func (a *KafkaAdapter) GetProtocolName() string
- func (a *KafkaAdapter) Health() error
- func (a *KafkaAdapter) Initialize(config map[string]interface{}) error
- func (a *KafkaAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *KafkaAdapter) Send(envelope *MessageEnvelope) error
- type MQTTAdapter
- func (a *MQTTAdapter) Close() error
- func (a *MQTTAdapter) GetProtocolName() string
- func (a *MQTTAdapter) Health() error
- func (a *MQTTAdapter) Initialize(config map[string]interface{}) error
- func (a *MQTTAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *MQTTAdapter) Send(envelope *MessageEnvelope) error
- type MappingCondition
- type MappingDefinition
- type MessageBusAdapter
- func (a *MessageBusAdapter) Close() error
- func (a *MessageBusAdapter) GetProtocolName() string
- func (a *MessageBusAdapter) Health() error
- func (a *MessageBusAdapter) Initialize(config map[string]interface{}) error
- func (a *MessageBusAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *MessageBusAdapter) Send(envelope *MessageEnvelope) error
- type MessageEnvelope
- type MessageHistory
- type MessageHistoryEntry
- type ProtocolAdapter
- type RESTAdapter
- func (a *RESTAdapter) Close() error
- func (a *RESTAdapter) GetProtocolName() string
- func (a *RESTAdapter) Health() error
- func (a *RESTAdapter) Initialize(config map[string]interface{}) error
- func (a *RESTAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *RESTAdapter) Send(envelope *MessageEnvelope) error
- type ResolvedEndpointConfig
- type RouteCondition
- type RoutingRule
- type SOAPAdapter
- func (a *SOAPAdapter) Close() error
- func (a *SOAPAdapter) GetProtocolName() string
- func (a *SOAPAdapter) Health() error
- func (a *SOAPAdapter) Initialize(config map[string]interface{}) error
- func (a *SOAPAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *SOAPAdapter) Send(envelope *MessageEnvelope) error
- type TCPAdapter
- func (a *TCPAdapter) Close() error
- func (a *TCPAdapter) GetProtocolName() string
- func (a *TCPAdapter) Health() error
- func (a *TCPAdapter) Initialize(config map[string]interface{}) error
- func (a *TCPAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
- func (a *TCPAdapter) Send(envelope *MessageEnvelope) error
- type TransformEngine
- type TransformFunction
- type TransformRule
Constants ¶
const ( ProtocolREST = "REST" ProtocolSOAP = "SOAP" ProtocolMQTT = "MQTT" ProtocolKafka = "Kafka" ProtocolRabbitMQ = "RabbitMQ" ProtocolActiveMQ = "ActiveMQ" ProtocolAMQP = "AMQP" ProtocolTCP = "TCP" ProtocolGraphQL = "GraphQL" ProtocolWebSocket = "WebSocket" ProtocolgRPC = "gRPC" ProtocolFTP = "FTP" ProtocolSFTP = "SFTP" ProtocolFile = "File" ProtocolDatabase = "Database" ProtocolEmail = "Email" ProtocolSMS = "SMS" )
const ( DirectionInbound = "inbound" DirectionOutbound = "outbound" )
const ( JobTypeManual = "Manual" JobTypeScheduled = "Scheduled" JobTypeTriggered = "Triggered" )
const ( OnErrorRetry = "Retry" OnErrorSkip = "Skip" OnErrorSendToDeadLetter = "SendToDeadLetter" )
const ( TransformationNone = "None" TransformationXSLT = "XSLT" TransformationJavaScript = "JavaScript" TransformationJSONata = "JSONata" TransformationCustom = "Custom" )
const ( MigrationStatusPending = "pending" MigrationStatusCompleted = "completed" MigrationStatusFailed = "failed" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ArrayIterationContext ¶
type ArrayIterationContext struct {
CurrentIndex int
ParentPath string
ItemData interface{}
Metadata map[string]interface{}
}
ArrayIterationContext tracks the current position during array iteration
type ArrayMapping ¶
type ArrayMapping struct {
Mode string `json:"mode"` // iterate, flatten, expand, filter, merge
ItemMappings []FieldMapping `json:"item_mappings"` // Mappings to apply to each array item
FilterCondition *MappingCondition `json:"filter_condition,omitempty"` // Condition to filter array items
SortBy string `json:"sort_by,omitempty"` // Field to sort by
SortOrder string `json:"sort_order,omitempty"` // asc, desc
Limit int `json:"limit,omitempty"` // Limit number of items
GroupBy string `json:"group_by,omitempty"` // Field to group by
AggregateFunc string `json:"aggregate_func,omitempty"` // sum, avg, count, min, max
}
ArrayMapping defines how to handle array transformations
type DataHub ¶
type DataHub struct {
// contains filtered or unexported fields
}
DataHub is the central message transformation and routing hub
func GetGlobalDataHub ¶
func GetGlobalDataHub() *DataHub
GetGlobalDataHub returns the global DataHub instance (singleton)
func NewDataHub ¶
NewDataHub creates a new DataHub instance
func (*DataHub) AddMapping ¶
func (dh *DataHub) AddMapping(mapping *MappingDefinition)
AddMapping adds a mapping definition
func (*DataHub) AddRoutingRule ¶
func (dh *DataHub) AddRoutingRule(rule *RoutingRule)
AddRoutingRule adds a routing rule
func (*DataHub) GetAdapter ¶
func (dh *DataHub) GetAdapter(name string) (ProtocolAdapter, error)
GetAdapter retrieves a protocol adapter by name
func (*DataHub) GetMapping ¶
func (dh *DataHub) GetMapping(id string) (*MappingDefinition, error)
GetMapping retrieves a mapping definition by ID
func (*DataHub) GetMessageHistory ¶
func (dh *DataHub) GetMessageHistory(limit int) []MessageHistoryEntry
GetMessageHistory returns message history entries
func (*DataHub) LoadMappingsFromFile ¶
LoadMappingsFromFile loads mapping definitions from a JSON file
func (*DataHub) RegisterAdapter ¶
func (dh *DataHub) RegisterAdapter(name string, adapter ProtocolAdapter) error
RegisterAdapter registers a protocol adapter
func (*DataHub) RouteMessage ¶
func (dh *DataHub) RouteMessage(envelope *MessageEnvelope) error
RouteMessage routes a message based on routing rules
type DirectionNode ¶
type DirectionNode struct {
Direction string `json:"direction"` // "inbound" or "outbound"
InstanceID string `json:"instance_id"`
}
DirectionNode represents a direction node in the tree (inbound/outbound)
type FieldMapping ¶
type FieldMapping struct {
SourcePath string `json:"source_path"` // JSONPath or XPath expression
TargetPath string `json:"target_path"` // JSONPath or XPath expression
DataType string `json:"data_type"` // string, int, float, bool, date, array, object
DefaultValue interface{} `json:"default_value,omitempty"`
Required bool `json:"required"`
TransformFunc string `json:"transform_func,omitempty"` // Built-in function name
CustomScript string `json:"custom_script,omitempty"` // JavaScript/Lua script for complex transforms
// Array mapping configuration
ArrayMapping *ArrayMapping `json:"array_mapping,omitempty"` // Configuration for array iteration
NestedMappings []FieldMapping `json:"nested_mappings,omitempty"` // Mappings for nested objects/arrays
Optional bool `json:"optional,omitempty"` // Skip if source doesn't exist
}
FieldMapping defines a single field transformation
type GraphQLAdapter ¶
type GraphQLAdapter struct {
// contains filtered or unexported fields
}
GraphQLAdapter adapts GraphQL client/server to ProtocolAdapter interface
func NewGraphQLAdapter ¶
func NewGraphQLAdapter(client interface{}) *GraphQLAdapter
NewGraphQLAdapter creates a new GraphQL adapter
func (*GraphQLAdapter) Close ¶
func (a *GraphQLAdapter) Close() error
func (*GraphQLAdapter) GetProtocolName ¶
func (a *GraphQLAdapter) GetProtocolName() string
func (*GraphQLAdapter) Health ¶
func (a *GraphQLAdapter) Health() error
func (*GraphQLAdapter) Initialize ¶
func (a *GraphQLAdapter) Initialize(config map[string]interface{}) error
func (*GraphQLAdapter) Receive ¶
func (a *GraphQLAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*GraphQLAdapter) Send ¶
func (a *GraphQLAdapter) Send(envelope *MessageEnvelope) error
type HierarchicalTreeNode ¶
type HierarchicalTreeNode struct {
ID string `json:"id"`
Type string `json:"type"` // instance, direction, protocol_group, endpoint
Label string `json:"label"`
Icon string `json:"icon"`
Data interface{} `json:"data"` // Actual data (HubInstance, HubProtocolGroup, HubEndpoint, or DirectionNode)
Children []*HierarchicalTreeNode `json:"children,omitempty"`
}
HierarchicalTreeNode represents a node in the hierarchical tree Used for UI tree representation
type HubEndpoint ¶
type HubEndpoint struct {
ID string `json:"id" db:"id"`
ProtocolGroupID string `json:"protocol_group_id" db:"protocol_group_id"`
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
// Endpoint-specific configuration
EndpointURL string `json:"endpoint_url" db:"endpoint_url"`
Port int `json:"port" db:"port"`
Path string `json:"path" db:"path"`
Method string `json:"method" db:"method"` // GET, POST, PUT, DELETE, etc.
// Override settings (NULL/zero means inherit from protocol_group)
OverrideConfig map[string]interface{} `json:"override_config" db:"override_config"`
MessageType string `json:"message_type" db:"message_type"`
Timeout int `json:"timeout" db:"timeout"`
RetryAttempts int `json:"retry_attempts" db:"retry_attempts"`
RetryInterval int `json:"retry_interval" db:"retry_interval"`
// Authentication overrides
AuthType string `json:"auth_type" db:"auth_type"`
AuthConfig map[string]interface{} `json:"auth_config" db:"auth_config"`
// Protocol-specific configs
QueueConfig map[string]interface{} `json:"queue_config" db:"queue_config"` // For MQTT, AMQP, Kafka, RabbitMQ
FileConfig map[string]interface{} `json:"file_config" db:"file_config"` // For FTP, SFTP, File
// Schema validation
ValidateSchema bool `json:"validate_schema" db:"validate_schema"`
SchemaDefinition string `json:"schema_definition" db:"schema_definition"`
Enabled bool `json:"enabled" db:"enabled"`
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
Active bool `json:"active" db:"active"`
ReferenceID string `json:"referenceid" db:"referenceid"`
CreatedBy string `json:"createdby" db:"createdby"`
CreatedOn time.Time `json:"createdon" db:"createdon"`
ModifiedBy string `json:"modifiedby" db:"modifiedby"`
ModifiedOn time.Time `json:"modifiedon" db:"modifiedon"`
RowVersionStamp int `json:"rowversionstamp" db:"rowversionstamp"`
}
HubEndpoint represents individual endpoint configuration Can override protocol group settings
type HubEndpointJob ¶
type HubEndpointJob struct {
ID string `json:"id" db:"id"`
EndpointID string `json:"endpoint_id" db:"endpoint_id"`
JobID string `json:"job_id" db:"job_id"`
JobType string `json:"job_type" db:"job_type"` // Manual, Scheduled, Triggered
// Schedule configuration
CronExpression string `json:"cron_expression" db:"cron_expression"`
IntervalSeconds int `json:"interval_seconds" db:"interval_seconds"`
// Trigger configuration
TriggerEventType string `json:"trigger_event_type" db:"trigger_event_type"`
TriggerEventSource string `json:"trigger_event_source" db:"trigger_event_source"`
TriggerEventFilter string `json:"trigger_event_filter" db:"trigger_event_filter"`
// Execution
Timeout int `json:"timeout" db:"timeout"`
MaxConcurrent int `json:"max_concurrent" db:"max_concurrent"`
RetryOnFailure bool `json:"retry_on_failure" db:"retry_on_failure"`
MaxRetries int `json:"max_retries" db:"max_retries"`
Parameters map[string]interface{} `json:"parameters" db:"parameters"`
Enabled bool `json:"enabled" db:"enabled"`
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
Active bool `json:"active" db:"active"`
ReferenceID string `json:"referenceid" db:"referenceid"`
CreatedBy string `json:"createdby" db:"createdby"`
CreatedOn time.Time `json:"createdon" db:"createdon"`
ModifiedBy string `json:"modifiedby" db:"modifiedby"`
ModifiedOn time.Time `json:"modifiedon" db:"modifiedon"`
RowVersionStamp int `json:"rowversionstamp" db:"rowversionstamp"`
}
HubEndpointJob links jobs to specific endpoints
type HubInstance ¶
type HubInstance struct {
ID string `json:"id" db:"id"`
InstanceID string `json:"instance_id" db:"instance_id"`
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
Enabled bool `json:"enabled" db:"enabled"`
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
Active bool `json:"active" db:"active"`
ReferenceID string `json:"referenceid" db:"referenceid"`
CreatedBy string `json:"createdby" db:"createdby"`
CreatedOn time.Time `json:"createdon" db:"createdon"`
ModifiedBy string `json:"modifiedby" db:"modifiedby"`
ModifiedOn time.Time `json:"modifiedon" db:"modifiedon"`
RowVersionStamp int `json:"rowversionstamp" db:"rowversionstamp"`
}
HubInstance represents a hub instance assignment Links to system InstanceID (com.InstanceID) for managing integration configurations
type HubMigrationLog ¶
type HubMigrationLog struct {
ID string `json:"id" db:"id"`
OldHubID string `json:"old_hub_id" db:"old_hub_id"`
NewInstanceID string `json:"new_instance_id" db:"new_instance_id"`
MigrationStatus string `json:"migration_status" db:"migration_status"` // pending, completed, failed
MigrationDetails map[string]interface{} `json:"migration_details" db:"migration_details"`
ErrorMessage string `json:"error_message" db:"error_message"`
MigratedAt time.Time `json:"migrated_at" db:"migrated_at"`
CreatedBy string `json:"createdby" db:"createdby"`
CreatedOn time.Time `json:"createdon" db:"createdon"`
}
HubMigrationLog tracks migration from old flat structure to hierarchical
type HubProtocolGroup ¶
type HubProtocolGroup struct {
ID string `json:"id" db:"id"`
InstanceID string `json:"instance_id" db:"instance_id"`
Direction string `json:"direction" db:"direction"` // "inbound" or "outbound"
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
Protocol string `json:"protocol" db:"protocol"` // REST, SOAP, MQTT, Kafka, etc.
// Base configuration inherited by endpoints
BaseConfig map[string]interface{} `json:"base_config" db:"base_config"`
// Protocol-specific defaults
MessageType string `json:"message_type" db:"message_type"`
Timeout int `json:"timeout" db:"timeout"`
RetryAttempts int `json:"retry_attempts" db:"retry_attempts"`
RetryInterval int `json:"retry_interval" db:"retry_interval"`
// Authentication defaults
AuthType string `json:"auth_type" db:"auth_type"`
AuthConfig map[string]interface{} `json:"auth_config" db:"auth_config"`
Enabled bool `json:"enabled" db:"enabled"`
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
Active bool `json:"active" db:"active"`
ReferenceID string `json:"referenceid" db:"referenceid"`
CreatedBy string `json:"createdby" db:"createdby"`
CreatedOn time.Time `json:"createdon" db:"createdon"`
ModifiedBy string `json:"modifiedby" db:"modifiedby"`
ModifiedOn time.Time `json:"modifiedon" db:"modifiedon"`
RowVersionStamp int `json:"rowversionstamp" db:"rowversionstamp"`
}
HubProtocolGroup represents protocol-level configuration group Provides base configuration that child endpoints inherit
type HubRoute ¶
type HubRoute struct {
ID string `json:"id" db:"id"`
SourceEndpointID string `json:"source_endpoint_id" db:"source_endpoint_id"`
DestinationEndpointID string `json:"destination_endpoint_id" db:"destination_endpoint_id"`
Name string `json:"name" db:"name"`
Description string `json:"description" db:"description"`
// Routing logic
SourceFilter string `json:"source_filter" db:"source_filter"` // JSONPath or XPath
Conditions []RouteCondition `json:"conditions" db:"conditions"`
// Transformation
TransformationType string `json:"transformation_type" db:"transformation_type"` // None, XSLT, JavaScript, JSONata, Custom
Transformation string `json:"transformation" db:"transformation"`
FieldMappings []FieldMapping `json:"field_mappings" db:"field_mappings"`
// Execution
Priority int `json:"priority" db:"priority"`
AsyncMode bool `json:"async_mode" db:"async_mode"`
// Error handling
OnError string `json:"on_error" db:"on_error"` // Retry, Skip, SendToDeadLetter
DeadLetterQueue string `json:"dead_letter_queue" db:"dead_letter_queue"`
MaxRetries int `json:"max_retries" db:"max_retries"`
Enabled bool `json:"enabled" db:"enabled"`
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
Active bool `json:"active" db:"active"`
ReferenceID string `json:"referenceid" db:"referenceid"`
CreatedBy string `json:"createdby" db:"createdby"`
CreatedOn time.Time `json:"createdon" db:"createdon"`
ModifiedBy string `json:"modifiedby" db:"modifiedby"`
ModifiedOn time.Time `json:"modifiedon" db:"modifiedon"`
RowVersionStamp int `json:"rowversionstamp" db:"rowversionstamp"`
}
HubRoute represents message routing configuration between endpoints
type IntegrationJobCreator ¶
type IntegrationJobCreator struct {
// contains filtered or unexported fields
}
IntegrationJobCreator creates jobs automatically when messages are received This bridges the gap between protocol adapters and the job system
func NewIntegrationJobCreator ¶
func NewIntegrationJobCreator(jobConfigManager *JobConfigManager, db *sql.DB, logger *logrus.Logger) *IntegrationJobCreator
NewIntegrationJobCreator creates a new integration job creator
func (*IntegrationJobCreator) CreateManualJob ¶
func (ijc *IntegrationJobCreator) CreateManualJob( ctx context.Context, jobDefinitionID string, customData map[string]interface{}, ) (string, error)
CreateManualJob creates a job manually (not triggered by message)
func (*IntegrationJobCreator) Disable ¶
func (ijc *IntegrationJobCreator) Disable()
Disable disables the integration job creator
func (*IntegrationJobCreator) Enable ¶
func (ijc *IntegrationJobCreator) Enable()
Enable enables the integration job creator
func (*IntegrationJobCreator) GetStats ¶
func (ijc *IntegrationJobCreator) GetStats() map[string]interface{}
GetStats returns statistics
func (*IntegrationJobCreator) IsEnabled ¶
func (ijc *IntegrationJobCreator) IsEnabled() bool
IsEnabled returns whether the integration job creator is enabled
func (*IntegrationJobCreator) OnMessageReceived ¶
func (ijc *IntegrationJobCreator) OnMessageReceived( ctx context.Context, protocol string, topic string, payload interface{}, metadata map[string]interface{}, ) ([]string, error)
OnMessageReceived is called when a message is received from any protocol It automatically creates jobs based on configuration
func (*IntegrationJobCreator) OnMessageSent ¶
func (ijc *IntegrationJobCreator) OnMessageSent( ctx context.Context, protocol string, destination string, payload interface{}, metadata map[string]interface{}, ) (string, error)
OnMessageSent is called before a message is sent It can create jobs for outbound processing
type JobConfig ¶
type JobConfig struct {
Jobs []JobDefinition `json:"jobs"`
}
JobConfig defines configuration for automated job creation
type JobConfigManager ¶
type JobConfigManager struct {
// contains filtered or unexported fields
}
JobConfigManager manages job configurations
func NewJobConfigManager ¶
func NewJobConfigManager(hub *DataHub, logger *logrus.Logger) *JobConfigManager
NewJobConfigManager creates a new job config manager
func (*JobConfigManager) CreateJobPayload ¶
func (jcm *JobConfigManager) CreateJobPayload(jobDef *JobDefinition, messageData map[string]interface{}) map[string]interface{}
CreateJobPayload creates a job payload from a job definition and message data
func (*JobConfigManager) Disable ¶
func (jcm *JobConfigManager) Disable()
Disable disables the job config manager
func (*JobConfigManager) Enable ¶
func (jcm *JobConfigManager) Enable()
Enable enables the job config manager
func (*JobConfigManager) EvaluateTriggerCondition ¶
func (jcm *JobConfigManager) EvaluateTriggerCondition(condition *MappingCondition, data map[string]interface{}) bool
EvaluateTriggerCondition evaluates whether a trigger condition is met
func (*JobConfigManager) FindMatchingJobs ¶
func (jcm *JobConfigManager) FindMatchingJobs(protocol string, topic string, messageData map[string]interface{}) []JobDefinition
FindMatchingJobs finds all jobs that match the given message
func (*JobConfigManager) GetJobDefinition ¶
func (jcm *JobConfigManager) GetJobDefinition(jobID string) (*JobDefinition, error)
GetJobDefinition gets a job definition by ID
func (*JobConfigManager) GetJobsForProtocol ¶
func (jcm *JobConfigManager) GetJobsForProtocol(protocol string) []JobDefinition
GetJobsForProtocol gets all jobs for a specific protocol
func (*JobConfigManager) GetJobsForTrigger ¶
func (jcm *JobConfigManager) GetJobsForTrigger(triggerType string) []JobDefinition
GetJobsForTrigger gets all jobs that match a trigger type
func (*JobConfigManager) GetStats ¶
func (jcm *JobConfigManager) GetStats() map[string]interface{}
GetStats returns statistics about configured jobs
func (*JobConfigManager) IsEnabled ¶
func (jcm *JobConfigManager) IsEnabled() bool
IsEnabled returns whether the job config manager is enabled
func (*JobConfigManager) LoadFromFile ¶
func (jcm *JobConfigManager) LoadFromFile(filePath string) error
LoadFromFile loads job configurations from a JSON file
func (*JobConfigManager) Reload ¶
func (jcm *JobConfigManager) Reload() error
Reload reloads the configuration from file
type JobDefinition ¶
type JobDefinition struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Enabled bool `json:"enabled"`
Type string `json:"type"` // transform, receive, send, route
// Trigger configuration
Trigger JobTrigger `json:"trigger"`
// Job parameters
Protocol string `json:"protocol"`
Source string `json:"source,omitempty"`
Destination string `json:"destination,omitempty"`
MappingID string `json:"mapping_id,omitempty"`
RoutingRule string `json:"routing_rule,omitempty"`
// Job execution settings
Priority int `json:"priority"`
MaxRetries int `json:"max_retries"`
Timeout int `json:"timeout"` // seconds
AutoRoute bool `json:"auto_route"`
// Metadata and options
Metadata map[string]interface{} `json:"metadata,omitempty"`
Options map[string]interface{} `json:"options,omitempty"`
}
JobDefinition defines a single job configuration
type JobHandler ¶
type JobHandler struct {
// contains filtered or unexported fields
}
JobHandler handles DataHub transformation and routing jobs
func NewJobHandler ¶
func NewJobHandler(hub *DataHub, db *sql.DB, docDB *documents.DocDB, logger *logrus.Logger) *JobHandler
NewJobHandler creates a new DataHub job handler
func (*JobHandler) ExecuteReceiveJob ¶
func (jh *JobHandler) ExecuteReceiveJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
ExecuteReceiveJob executes a message receive job This polls a protocol adapter for new messages
func (*JobHandler) ExecuteSendJob ¶
func (jh *JobHandler) ExecuteSendJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
ExecuteSendJob executes a message send job This sends a message through a protocol adapter
func (*JobHandler) ExecuteTransformJob ¶
func (jh *JobHandler) ExecuteTransformJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)
ExecuteTransformJob executes a transformation job This is called by the job worker to process messages
func (*JobHandler) GetStats ¶
func (jh *JobHandler) GetStats() map[string]interface{}
GetStats returns job handler statistics
func (*JobHandler) IsEnabled ¶
func (jh *JobHandler) IsEnabled() bool
IsEnabled returns whether the job handler is enabled
type JobTrigger ¶
type JobTrigger struct {
Type string `json:"type"` // on_receive, on_schedule, on_event, manual
Protocol string `json:"protocol,omitempty"` // For on_receive triggers
Topic string `json:"topic,omitempty"`
Schedule string `json:"schedule,omitempty"` // Cron expression
Event string `json:"event,omitempty"`
Condition *MappingCondition `json:"condition,omitempty"`
Config map[string]interface{} `json:"config,omitempty"`
}
JobTrigger defines when a job should be created
type KafkaAdapter ¶
type KafkaAdapter struct {
// contains filtered or unexported fields
}
KafkaAdapter adapts Kafka consumer/producer to ProtocolAdapter interface
func NewKafkaAdapter ¶
func NewKafkaAdapter(producer, consumer interface{}) *KafkaAdapter
NewKafkaAdapter creates a new Kafka adapter
func (*KafkaAdapter) Close ¶
func (a *KafkaAdapter) Close() error
func (*KafkaAdapter) GetProtocolName ¶
func (a *KafkaAdapter) GetProtocolName() string
func (*KafkaAdapter) Health ¶
func (a *KafkaAdapter) Health() error
func (*KafkaAdapter) Initialize ¶
func (a *KafkaAdapter) Initialize(config map[string]interface{}) error
func (*KafkaAdapter) Receive ¶
func (a *KafkaAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*KafkaAdapter) Send ¶
func (a *KafkaAdapter) Send(envelope *MessageEnvelope) error
type MQTTAdapter ¶
type MQTTAdapter struct {
// contains filtered or unexported fields
}
MQTTAdapter adapts MQTT client to ProtocolAdapter interface
func NewMQTTAdapter ¶
func NewMQTTAdapter(client interface{}) *MQTTAdapter
NewMQTTAdapter creates a new MQTT adapter
func (*MQTTAdapter) Close ¶
func (a *MQTTAdapter) Close() error
func (*MQTTAdapter) GetProtocolName ¶
func (a *MQTTAdapter) GetProtocolName() string
func (*MQTTAdapter) Health ¶
func (a *MQTTAdapter) Health() error
func (*MQTTAdapter) Initialize ¶
func (a *MQTTAdapter) Initialize(config map[string]interface{}) error
func (*MQTTAdapter) Receive ¶
func (a *MQTTAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*MQTTAdapter) Send ¶
func (a *MQTTAdapter) Send(envelope *MessageEnvelope) error
type MappingCondition ¶
type MappingCondition struct {
Field string `json:"field"` // Field to check
Operator string `json:"operator"` // eq, ne, gt, lt, contains, regex, exists
Value interface{} `json:"value"`
LogicOp string `json:"logic_op,omitempty"` // and, or (for chaining conditions)
}
MappingCondition defines when a mapping should be applied
type MappingDefinition ¶
type MappingDefinition struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
SourceProtocol string `json:"source_protocol"`
SourceSchema string `json:"source_schema"`
TargetProtocol string `json:"target_protocol"`
TargetSchema string `json:"target_schema"`
Mappings []FieldMapping `json:"mappings"`
Transformations []TransformRule `json:"transformations,omitempty"`
Conditions []MappingCondition `json:"conditions,omitempty"`
Priority int `json:"priority,omitempty"`
Active bool `json:"active"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
MappingDefinition defines how to transform messages between schemas
type MessageBusAdapter ¶
type MessageBusAdapter struct {
// contains filtered or unexported fields
}
MessageBusAdapter adapts internal message bus to ProtocolAdapter interface
func NewMessageBusAdapter ¶
func NewMessageBusAdapter(bus interface{}) *MessageBusAdapter
NewMessageBusAdapter creates a new message bus adapter
func (*MessageBusAdapter) Close ¶
func (a *MessageBusAdapter) Close() error
func (*MessageBusAdapter) GetProtocolName ¶
func (a *MessageBusAdapter) GetProtocolName() string
func (*MessageBusAdapter) Health ¶
func (a *MessageBusAdapter) Health() error
func (*MessageBusAdapter) Initialize ¶
func (a *MessageBusAdapter) Initialize(config map[string]interface{}) error
func (*MessageBusAdapter) Receive ¶
func (a *MessageBusAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*MessageBusAdapter) Send ¶
func (a *MessageBusAdapter) Send(envelope *MessageEnvelope) error
type MessageEnvelope ¶
type MessageEnvelope struct {
ID string `json:"id"`
Protocol string `json:"protocol"` // REST, SOAP, GraphQL, TCP, MQTT, Kafka, etc.
Source string `json:"source"`
Destination string `json:"destination"`
Timestamp time.Time `json:"timestamp"`
ContentType string `json:"content_type"` // application/json, application/xml, text/plain, etc.
Headers map[string]interface{} `json:"headers,omitempty"`
Body interface{} `json:"body"`
OriginalBody []byte `json:"original_body,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
TransformPath []string `json:"transform_path,omitempty"` // Track transformation history
}
MessageEnvelope represents a universal message container for all protocols
func CreateEnvelope ¶
func CreateEnvelope(protocol, source, destination string, contentType string, body interface{}) *MessageEnvelope
CreateEnvelope creates a new message envelope
func (*MessageEnvelope) MarshalJSON ¶
func (m *MessageEnvelope) MarshalJSON() ([]byte, error)
MarshalJSON custom marshaller for MessageEnvelope to handle binary data
type MessageHistory ¶
type MessageHistory struct {
// contains filtered or unexported fields
}
MessageHistory tracks message transformations for audit
func NewMessageHistory ¶
func NewMessageHistory(maxEntries int) *MessageHistory
NewMessageHistory creates a new message history
func (*MessageHistory) Add ¶
func (mh *MessageHistory) Add(entry MessageHistoryEntry)
Add adds a history entry
func (*MessageHistory) GetRecent ¶
func (mh *MessageHistory) GetRecent(limit int) []MessageHistoryEntry
GetRecent returns recent history entries
type MessageHistoryEntry ¶
type MessageHistoryEntry struct {
MessageID string `json:"message_id"`
Timestamp time.Time `json:"timestamp"`
SourceProto string `json:"source_protocol"`
TargetProto string `json:"target_protocol"`
MappingID string `json:"mapping_id,omitempty"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Duration time.Duration `json:"duration"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
MessageHistoryEntry represents a single transformation event
type ProtocolAdapter ¶
type ProtocolAdapter interface {
// Send sends a message through this protocol
Send(envelope *MessageEnvelope) error
// Receive receives a message from this protocol (blocking or with timeout)
Receive(timeout time.Duration) (*MessageEnvelope, error)
// GetProtocolName returns the protocol name
GetProtocolName() string
// Initialize initializes the adapter with configuration
Initialize(config map[string]interface{}) error
// Close closes the adapter and releases resources
Close() error
// Health checks the health of the adapter
Health() error
}
ProtocolAdapter interface for all protocol implementations
type RESTAdapter ¶
type RESTAdapter struct {
// contains filtered or unexported fields
}
RESTAdapter adapts REST client/server to ProtocolAdapter interface
func NewRESTAdapter ¶
func NewRESTAdapter(client interface{}) *RESTAdapter
NewRESTAdapter creates a new REST adapter
func (*RESTAdapter) Close ¶
func (a *RESTAdapter) Close() error
func (*RESTAdapter) GetProtocolName ¶
func (a *RESTAdapter) GetProtocolName() string
func (*RESTAdapter) Health ¶
func (a *RESTAdapter) Health() error
func (*RESTAdapter) Initialize ¶
func (a *RESTAdapter) Initialize(config map[string]interface{}) error
func (*RESTAdapter) Receive ¶
func (a *RESTAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*RESTAdapter) Send ¶
func (a *RESTAdapter) Send(envelope *MessageEnvelope) error
type ResolvedEndpointConfig ¶
type ResolvedEndpointConfig struct {
Endpoint *HubEndpoint `json:"endpoint"`
ProtocolGroup *HubProtocolGroup `json:"protocol_group"`
Instance *HubInstance `json:"instance"`
// Computed fields after inheritance
FinalTimeout int `json:"final_timeout"`
FinalRetryAttempts int `json:"final_retry_attempts"`
FinalRetryInterval int `json:"final_retry_interval"`
FinalMessageType string `json:"final_message_type"`
FinalAuthType string `json:"final_auth_type"`
FinalAuthConfig map[string]interface{} `json:"final_auth_config"`
FinalConfig map[string]interface{} `json:"final_config"`
}
ResolvedEndpointConfig represents endpoint configuration after inheritance resolution Used to get final effective configuration combining protocol group defaults and endpoint overrides
type RouteCondition ¶
type RouteCondition struct {
Field string `json:"field"`
Operator string `json:"operator"` // eq, ne, gt, lt, contains, regex, etc.
Value interface{} `json:"value"`
}
RouteCondition defines a routing condition
type RoutingRule ¶
type RoutingRule struct {
ID string `json:"id"`
Name string `json:"name"`
Source string `json:"source"` // Source protocol/endpoint
Destination string `json:"destination"` // Destination protocol/endpoint
Conditions []MappingCondition `json:"conditions,omitempty"`
MappingID string `json:"mapping_id,omitempty"` // Optional transformation
Priority int `json:"priority"`
Active bool `json:"active"`
}
RoutingRule defines how messages should be routed
type SOAPAdapter ¶
type SOAPAdapter struct {
// contains filtered or unexported fields
}
SOAPAdapter adapts SOAP client/server to ProtocolAdapter interface
func NewSOAPAdapter ¶
func NewSOAPAdapter(client interface{}) *SOAPAdapter
NewSOAPAdapter creates a new SOAP adapter
func (*SOAPAdapter) Close ¶
func (a *SOAPAdapter) Close() error
func (*SOAPAdapter) GetProtocolName ¶
func (a *SOAPAdapter) GetProtocolName() string
func (*SOAPAdapter) Health ¶
func (a *SOAPAdapter) Health() error
func (*SOAPAdapter) Initialize ¶
func (a *SOAPAdapter) Initialize(config map[string]interface{}) error
func (*SOAPAdapter) Receive ¶
func (a *SOAPAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*SOAPAdapter) Send ¶
func (a *SOAPAdapter) Send(envelope *MessageEnvelope) error
type TCPAdapter ¶
type TCPAdapter struct {
// contains filtered or unexported fields
}
TCPAdapter adapts TCP client/server to ProtocolAdapter interface
func NewTCPAdapter ¶
func NewTCPAdapter(client interface{}) *TCPAdapter
NewTCPAdapter creates a new TCP adapter
func (*TCPAdapter) Close ¶
func (a *TCPAdapter) Close() error
func (*TCPAdapter) GetProtocolName ¶
func (a *TCPAdapter) GetProtocolName() string
func (*TCPAdapter) Health ¶
func (a *TCPAdapter) Health() error
func (*TCPAdapter) Initialize ¶
func (a *TCPAdapter) Initialize(config map[string]interface{}) error
func (*TCPAdapter) Receive ¶
func (a *TCPAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)
func (*TCPAdapter) Send ¶
func (a *TCPAdapter) Send(envelope *MessageEnvelope) error
type TransformEngine ¶
type TransformEngine struct {
// contains filtered or unexported fields
}
TransformEngine handles message transformation logic
func NewTransformEngine ¶
func NewTransformEngine() *TransformEngine
NewTransformEngine creates a new transform engine with built-in functions
func (*TransformEngine) Transform ¶
func (te *TransformEngine) Transform(source *MessageEnvelope, mapping *MappingDefinition) (*MessageEnvelope, error)
Transform transforms a message envelope using a mapping definition
type TransformFunction ¶
TransformFunction is a built-in transformation function