datahub

package
v0.0.0-...-99e730b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package datahub provides hierarchical integration hub schema definitions

Index

Constants

View Source
const (
	ProtocolREST      = "REST"
	ProtocolSOAP      = "SOAP"
	ProtocolMQTT      = "MQTT"
	ProtocolKafka     = "Kafka"
	ProtocolRabbitMQ  = "RabbitMQ"
	ProtocolActiveMQ  = "ActiveMQ"
	ProtocolAMQP      = "AMQP"
	ProtocolTCP       = "TCP"
	ProtocolGraphQL   = "GraphQL"
	ProtocolWebSocket = "WebSocket"
	ProtocolgRPC      = "gRPC"
	ProtocolFTP       = "FTP"
	ProtocolSFTP      = "SFTP"
	ProtocolFile      = "File"
	ProtocolDatabase  = "Database"
	ProtocolEmail     = "Email"
	ProtocolSMS       = "SMS"
)
View Source
const (
	DirectionInbound  = "inbound"
	DirectionOutbound = "outbound"
)
View Source
const (
	JobTypeManual    = "Manual"
	JobTypeScheduled = "Scheduled"
	JobTypeTriggered = "Triggered"
)
View Source
const (
	OnErrorRetry            = "Retry"
	OnErrorSkip             = "Skip"
	OnErrorSendToDeadLetter = "SendToDeadLetter"
)
View Source
const (
	TransformationNone       = "None"
	TransformationXSLT       = "XSLT"
	TransformationJavaScript = "JavaScript"
	TransformationJSONata    = "JSONata"
	TransformationCustom     = "Custom"
)
View Source
const (
	MigrationStatusPending   = "pending"
	MigrationStatusCompleted = "completed"
	MigrationStatusFailed    = "failed"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ArrayIterationContext

type ArrayIterationContext struct {
	CurrentIndex int
	ParentPath   string
	ItemData     interface{}
	Metadata     map[string]interface{}
}

ArrayIterationContext tracks the current position during array iteration

type ArrayMapping

type ArrayMapping struct {
	Mode            string            `json:"mode"`                       // iterate, flatten, expand, filter, merge
	ItemMappings    []FieldMapping    `json:"item_mappings"`              // Mappings to apply to each array item
	FilterCondition *MappingCondition `json:"filter_condition,omitempty"` // Condition to filter array items
	SortBy          string            `json:"sort_by,omitempty"`          // Field to sort by
	SortOrder       string            `json:"sort_order,omitempty"`       // asc, desc
	Limit           int               `json:"limit,omitempty"`            // Limit number of items
	GroupBy         string            `json:"group_by,omitempty"`         // Field to group by
	AggregateFunc   string            `json:"aggregate_func,omitempty"`   // sum, avg, count, min, max
}

ArrayMapping defines how to handle array transformations

type DataHub

type DataHub struct {
	// contains filtered or unexported fields
}

DataHub is the central message transformation and routing hub

func GetGlobalDataHub

func GetGlobalDataHub() *DataHub

GetGlobalDataHub returns the global DataHub instance (singleton)

func NewDataHub

func NewDataHub(logger *logrus.Logger) *DataHub

NewDataHub creates a new DataHub instance

func (*DataHub) AddMapping

func (dh *DataHub) AddMapping(mapping *MappingDefinition)

AddMapping adds a mapping definition

func (*DataHub) AddRoutingRule

func (dh *DataHub) AddRoutingRule(rule *RoutingRule)

AddRoutingRule adds a routing rule

func (*DataHub) Close

func (dh *DataHub) Close() error

Close closes all adapters and cleans up resources

func (*DataHub) Disable

func (dh *DataHub) Disable()

Disable disables the DataHub

func (*DataHub) Enable

func (dh *DataHub) Enable()

Enable enables the DataHub

func (*DataHub) GetAdapter

func (dh *DataHub) GetAdapter(name string) (ProtocolAdapter, error)

GetAdapter retrieves a protocol adapter by name

func (*DataHub) GetMapping

func (dh *DataHub) GetMapping(id string) (*MappingDefinition, error)

GetMapping retrieves a mapping definition by ID

func (*DataHub) GetMessageHistory

func (dh *DataHub) GetMessageHistory(limit int) []MessageHistoryEntry

GetMessageHistory returns message history entries

func (*DataHub) IsEnabled

func (dh *DataHub) IsEnabled() bool

IsEnabled returns whether the DataHub is enabled

func (*DataHub) LoadMappingsFromFile

func (dh *DataHub) LoadMappingsFromFile(filePath string) error

LoadMappingsFromFile loads mapping definitions from a JSON file

func (*DataHub) RegisterAdapter

func (dh *DataHub) RegisterAdapter(name string, adapter ProtocolAdapter) error

RegisterAdapter registers a protocol adapter

func (*DataHub) RouteMessage

func (dh *DataHub) RouteMessage(envelope *MessageEnvelope) error

RouteMessage routes a message based on routing rules

type DirectionNode

type DirectionNode struct {
	Direction  string `json:"direction"` // "inbound" or "outbound"
	InstanceID string `json:"instance_id"`
}

DirectionNode represents a direction node in the tree (inbound/outbound)

type FieldMapping

type FieldMapping struct {
	SourcePath    string      `json:"source_path"` // JSONPath or XPath expression
	TargetPath    string      `json:"target_path"` // JSONPath or XPath expression
	DataType      string      `json:"data_type"`   // string, int, float, bool, date, array, object
	DefaultValue  interface{} `json:"default_value,omitempty"`
	Required      bool        `json:"required"`
	TransformFunc string      `json:"transform_func,omitempty"` // Built-in function name
	CustomScript  string      `json:"custom_script,omitempty"`  // JavaScript/Lua script for complex transforms

	// Array mapping configuration
	ArrayMapping   *ArrayMapping  `json:"array_mapping,omitempty"`   // Configuration for array iteration
	NestedMappings []FieldMapping `json:"nested_mappings,omitempty"` // Mappings for nested objects/arrays
	Optional       bool           `json:"optional,omitempty"`        // Skip if source doesn't exist
}

FieldMapping defines a single field transformation

type GraphQLAdapter

type GraphQLAdapter struct {
	// contains filtered or unexported fields
}

GraphQLAdapter adapts GraphQL client/server to ProtocolAdapter interface

func NewGraphQLAdapter

func NewGraphQLAdapter(client interface{}) *GraphQLAdapter

NewGraphQLAdapter creates a new GraphQL adapter

func (*GraphQLAdapter) Close

func (a *GraphQLAdapter) Close() error

func (*GraphQLAdapter) GetProtocolName

func (a *GraphQLAdapter) GetProtocolName() string

func (*GraphQLAdapter) Health

func (a *GraphQLAdapter) Health() error

func (*GraphQLAdapter) Initialize

func (a *GraphQLAdapter) Initialize(config map[string]interface{}) error

func (*GraphQLAdapter) Receive

func (a *GraphQLAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*GraphQLAdapter) Send

func (a *GraphQLAdapter) Send(envelope *MessageEnvelope) error

type HierarchicalTreeNode

type HierarchicalTreeNode struct {
	ID       string                  `json:"id"`
	Type     string                  `json:"type"` // instance, direction, protocol_group, endpoint
	Label    string                  `json:"label"`
	Icon     string                  `json:"icon"`
	Data     interface{}             `json:"data"` // Actual data (HubInstance, HubProtocolGroup, HubEndpoint, or DirectionNode)
	Children []*HierarchicalTreeNode `json:"children,omitempty"`
}

HierarchicalTreeNode represents a node in the hierarchical tree Used for UI tree representation

type HubEndpoint

type HubEndpoint struct {
	ID              string `json:"id" db:"id"`
	ProtocolGroupID string `json:"protocol_group_id" db:"protocol_group_id"`
	Name            string `json:"name" db:"name"`
	Description     string `json:"description" db:"description"`

	// Endpoint-specific configuration
	EndpointURL string `json:"endpoint_url" db:"endpoint_url"`
	Port        int    `json:"port" db:"port"`
	Path        string `json:"path" db:"path"`
	Method      string `json:"method" db:"method"` // GET, POST, PUT, DELETE, etc.

	// Override settings (NULL/zero means inherit from protocol_group)
	OverrideConfig map[string]interface{} `json:"override_config" db:"override_config"`
	MessageType    string                 `json:"message_type" db:"message_type"`
	Timeout        int                    `json:"timeout" db:"timeout"`
	RetryAttempts  int                    `json:"retry_attempts" db:"retry_attempts"`
	RetryInterval  int                    `json:"retry_interval" db:"retry_interval"`

	// Authentication overrides
	AuthType   string                 `json:"auth_type" db:"auth_type"`
	AuthConfig map[string]interface{} `json:"auth_config" db:"auth_config"`

	// Protocol-specific configs
	QueueConfig map[string]interface{} `json:"queue_config" db:"queue_config"` // For MQTT, AMQP, Kafka, RabbitMQ
	FileConfig  map[string]interface{} `json:"file_config" db:"file_config"`   // For FTP, SFTP, File

	// Schema validation
	ValidateSchema   bool   `json:"validate_schema" db:"validate_schema"`
	SchemaDefinition string `json:"schema_definition" db:"schema_definition"`

	Enabled         bool                   `json:"enabled" db:"enabled"`
	Metadata        map[string]interface{} `json:"metadata" db:"metadata"`
	Active          bool                   `json:"active" db:"active"`
	ReferenceID     string                 `json:"referenceid" db:"referenceid"`
	CreatedBy       string                 `json:"createdby" db:"createdby"`
	CreatedOn       time.Time              `json:"createdon" db:"createdon"`
	ModifiedBy      string                 `json:"modifiedby" db:"modifiedby"`
	ModifiedOn      time.Time              `json:"modifiedon" db:"modifiedon"`
	RowVersionStamp int                    `json:"rowversionstamp" db:"rowversionstamp"`
}

HubEndpoint represents individual endpoint configuration Can override protocol group settings

type HubEndpointJob

type HubEndpointJob struct {
	ID         string `json:"id" db:"id"`
	EndpointID string `json:"endpoint_id" db:"endpoint_id"`
	JobID      string `json:"job_id" db:"job_id"`
	JobType    string `json:"job_type" db:"job_type"` // Manual, Scheduled, Triggered

	// Schedule configuration
	CronExpression  string `json:"cron_expression" db:"cron_expression"`
	IntervalSeconds int    `json:"interval_seconds" db:"interval_seconds"`

	// Trigger configuration
	TriggerEventType   string `json:"trigger_event_type" db:"trigger_event_type"`
	TriggerEventSource string `json:"trigger_event_source" db:"trigger_event_source"`
	TriggerEventFilter string `json:"trigger_event_filter" db:"trigger_event_filter"`

	// Execution
	Timeout        int  `json:"timeout" db:"timeout"`
	MaxConcurrent  int  `json:"max_concurrent" db:"max_concurrent"`
	RetryOnFailure bool `json:"retry_on_failure" db:"retry_on_failure"`
	MaxRetries     int  `json:"max_retries" db:"max_retries"`

	Parameters map[string]interface{} `json:"parameters" db:"parameters"`

	Enabled         bool                   `json:"enabled" db:"enabled"`
	Metadata        map[string]interface{} `json:"metadata" db:"metadata"`
	Active          bool                   `json:"active" db:"active"`
	ReferenceID     string                 `json:"referenceid" db:"referenceid"`
	CreatedBy       string                 `json:"createdby" db:"createdby"`
	CreatedOn       time.Time              `json:"createdon" db:"createdon"`
	ModifiedBy      string                 `json:"modifiedby" db:"modifiedby"`
	ModifiedOn      time.Time              `json:"modifiedon" db:"modifiedon"`
	RowVersionStamp int                    `json:"rowversionstamp" db:"rowversionstamp"`
}

HubEndpointJob links jobs to specific endpoints

type HubInstance

type HubInstance struct {
	ID              string                 `json:"id" db:"id"`
	InstanceID      string                 `json:"instance_id" db:"instance_id"`
	Name            string                 `json:"name" db:"name"`
	Description     string                 `json:"description" db:"description"`
	Enabled         bool                   `json:"enabled" db:"enabled"`
	Metadata        map[string]interface{} `json:"metadata" db:"metadata"`
	Active          bool                   `json:"active" db:"active"`
	ReferenceID     string                 `json:"referenceid" db:"referenceid"`
	CreatedBy       string                 `json:"createdby" db:"createdby"`
	CreatedOn       time.Time              `json:"createdon" db:"createdon"`
	ModifiedBy      string                 `json:"modifiedby" db:"modifiedby"`
	ModifiedOn      time.Time              `json:"modifiedon" db:"modifiedon"`
	RowVersionStamp int                    `json:"rowversionstamp" db:"rowversionstamp"`
}

HubInstance represents a hub instance assignment Links to system InstanceID (com.InstanceID) for managing integration configurations

type HubMigrationLog

type HubMigrationLog struct {
	ID               string                 `json:"id" db:"id"`
	OldHubID         string                 `json:"old_hub_id" db:"old_hub_id"`
	NewInstanceID    string                 `json:"new_instance_id" db:"new_instance_id"`
	MigrationStatus  string                 `json:"migration_status" db:"migration_status"` // pending, completed, failed
	MigrationDetails map[string]interface{} `json:"migration_details" db:"migration_details"`
	ErrorMessage     string                 `json:"error_message" db:"error_message"`
	MigratedAt       time.Time              `json:"migrated_at" db:"migrated_at"`
	CreatedBy        string                 `json:"createdby" db:"createdby"`
	CreatedOn        time.Time              `json:"createdon" db:"createdon"`
}

HubMigrationLog tracks migration from old flat structure to hierarchical

type HubProtocolGroup

type HubProtocolGroup struct {
	ID          string `json:"id" db:"id"`
	InstanceID  string `json:"instance_id" db:"instance_id"`
	Direction   string `json:"direction" db:"direction"` // "inbound" or "outbound"
	Name        string `json:"name" db:"name"`
	Description string `json:"description" db:"description"`
	Protocol    string `json:"protocol" db:"protocol"` // REST, SOAP, MQTT, Kafka, etc.

	// Base configuration inherited by endpoints
	BaseConfig map[string]interface{} `json:"base_config" db:"base_config"`

	// Protocol-specific defaults
	MessageType   string `json:"message_type" db:"message_type"`
	Timeout       int    `json:"timeout" db:"timeout"`
	RetryAttempts int    `json:"retry_attempts" db:"retry_attempts"`
	RetryInterval int    `json:"retry_interval" db:"retry_interval"`

	// Authentication defaults
	AuthType   string                 `json:"auth_type" db:"auth_type"`
	AuthConfig map[string]interface{} `json:"auth_config" db:"auth_config"`

	Enabled         bool                   `json:"enabled" db:"enabled"`
	Metadata        map[string]interface{} `json:"metadata" db:"metadata"`
	Active          bool                   `json:"active" db:"active"`
	ReferenceID     string                 `json:"referenceid" db:"referenceid"`
	CreatedBy       string                 `json:"createdby" db:"createdby"`
	CreatedOn       time.Time              `json:"createdon" db:"createdon"`
	ModifiedBy      string                 `json:"modifiedby" db:"modifiedby"`
	ModifiedOn      time.Time              `json:"modifiedon" db:"modifiedon"`
	RowVersionStamp int                    `json:"rowversionstamp" db:"rowversionstamp"`
}

HubProtocolGroup represents protocol-level configuration group Provides base configuration that child endpoints inherit

type HubRoute

type HubRoute struct {
	ID                    string `json:"id" db:"id"`
	SourceEndpointID      string `json:"source_endpoint_id" db:"source_endpoint_id"`
	DestinationEndpointID string `json:"destination_endpoint_id" db:"destination_endpoint_id"`
	Name                  string `json:"name" db:"name"`
	Description           string `json:"description" db:"description"`

	// Routing logic
	SourceFilter string           `json:"source_filter" db:"source_filter"` // JSONPath or XPath
	Conditions   []RouteCondition `json:"conditions" db:"conditions"`

	// Transformation
	TransformationType string         `json:"transformation_type" db:"transformation_type"` // None, XSLT, JavaScript, JSONata, Custom
	Transformation     string         `json:"transformation" db:"transformation"`
	FieldMappings      []FieldMapping `json:"field_mappings" db:"field_mappings"`

	// Execution
	Priority  int  `json:"priority" db:"priority"`
	AsyncMode bool `json:"async_mode" db:"async_mode"`

	// Error handling
	OnError         string `json:"on_error" db:"on_error"` // Retry, Skip, SendToDeadLetter
	DeadLetterQueue string `json:"dead_letter_queue" db:"dead_letter_queue"`
	MaxRetries      int    `json:"max_retries" db:"max_retries"`

	Enabled         bool                   `json:"enabled" db:"enabled"`
	Metadata        map[string]interface{} `json:"metadata" db:"metadata"`
	Active          bool                   `json:"active" db:"active"`
	ReferenceID     string                 `json:"referenceid" db:"referenceid"`
	CreatedBy       string                 `json:"createdby" db:"createdby"`
	CreatedOn       time.Time              `json:"createdon" db:"createdon"`
	ModifiedBy      string                 `json:"modifiedby" db:"modifiedby"`
	ModifiedOn      time.Time              `json:"modifiedon" db:"modifiedon"`
	RowVersionStamp int                    `json:"rowversionstamp" db:"rowversionstamp"`
}

HubRoute represents message routing configuration between endpoints

type IntegrationJobCreator

type IntegrationJobCreator struct {
	// contains filtered or unexported fields
}

IntegrationJobCreator creates jobs automatically when messages are received This bridges the gap between protocol adapters and the job system

func NewIntegrationJobCreator

func NewIntegrationJobCreator(jobConfigManager *JobConfigManager, db *sql.DB, logger *logrus.Logger) *IntegrationJobCreator

NewIntegrationJobCreator creates a new integration job creator

func (*IntegrationJobCreator) CreateManualJob

func (ijc *IntegrationJobCreator) CreateManualJob(
	ctx context.Context,
	jobDefinitionID string,
	customData map[string]interface{},
) (string, error)

CreateManualJob creates a job manually (not triggered by message)

func (*IntegrationJobCreator) Disable

func (ijc *IntegrationJobCreator) Disable()

Disable disables the integration job creator

func (*IntegrationJobCreator) Enable

func (ijc *IntegrationJobCreator) Enable()

Enable enables the integration job creator

func (*IntegrationJobCreator) GetStats

func (ijc *IntegrationJobCreator) GetStats() map[string]interface{}

GetStats returns statistics

func (*IntegrationJobCreator) IsEnabled

func (ijc *IntegrationJobCreator) IsEnabled() bool

IsEnabled returns whether the integration job creator is enabled

func (*IntegrationJobCreator) OnMessageReceived

func (ijc *IntegrationJobCreator) OnMessageReceived(
	ctx context.Context,
	protocol string,
	topic string,
	payload interface{},
	metadata map[string]interface{},
) ([]string, error)

OnMessageReceived is called when a message is received from any protocol It automatically creates jobs based on configuration

func (*IntegrationJobCreator) OnMessageSent

func (ijc *IntegrationJobCreator) OnMessageSent(
	ctx context.Context,
	protocol string,
	destination string,
	payload interface{},
	metadata map[string]interface{},
) (string, error)

OnMessageSent is called before a message is sent It can create jobs for outbound processing

type JobConfig

type JobConfig struct {
	Jobs []JobDefinition `json:"jobs"`
}

JobConfig defines configuration for automated job creation

type JobConfigManager

type JobConfigManager struct {
	// contains filtered or unexported fields
}

JobConfigManager manages job configurations

func NewJobConfigManager

func NewJobConfigManager(hub *DataHub, logger *logrus.Logger) *JobConfigManager

NewJobConfigManager creates a new job config manager

func (*JobConfigManager) CreateJobPayload

func (jcm *JobConfigManager) CreateJobPayload(jobDef *JobDefinition, messageData map[string]interface{}) map[string]interface{}

CreateJobPayload creates a job payload from a job definition and message data

func (*JobConfigManager) Disable

func (jcm *JobConfigManager) Disable()

Disable disables the job config manager

func (*JobConfigManager) Enable

func (jcm *JobConfigManager) Enable()

Enable enables the job config manager

func (*JobConfigManager) EvaluateTriggerCondition

func (jcm *JobConfigManager) EvaluateTriggerCondition(condition *MappingCondition, data map[string]interface{}) bool

EvaluateTriggerCondition evaluates whether a trigger condition is met

func (*JobConfigManager) FindMatchingJobs

func (jcm *JobConfigManager) FindMatchingJobs(protocol string, topic string, messageData map[string]interface{}) []JobDefinition

FindMatchingJobs finds all jobs that match the given message

func (*JobConfigManager) GetJobDefinition

func (jcm *JobConfigManager) GetJobDefinition(jobID string) (*JobDefinition, error)

GetJobDefinition gets a job definition by ID

func (*JobConfigManager) GetJobsForProtocol

func (jcm *JobConfigManager) GetJobsForProtocol(protocol string) []JobDefinition

GetJobsForProtocol gets all jobs for a specific protocol

func (*JobConfigManager) GetJobsForTrigger

func (jcm *JobConfigManager) GetJobsForTrigger(triggerType string) []JobDefinition

GetJobsForTrigger gets all jobs that match a trigger type

func (*JobConfigManager) GetStats

func (jcm *JobConfigManager) GetStats() map[string]interface{}

GetStats returns statistics about configured jobs

func (*JobConfigManager) IsEnabled

func (jcm *JobConfigManager) IsEnabled() bool

IsEnabled returns whether the job config manager is enabled

func (*JobConfigManager) LoadFromFile

func (jcm *JobConfigManager) LoadFromFile(filePath string) error

LoadFromFile loads job configurations from a JSON file

func (*JobConfigManager) Reload

func (jcm *JobConfigManager) Reload() error

Reload reloads the configuration from file

type JobDefinition

type JobDefinition struct {
	ID          string `json:"id"`
	Name        string `json:"name"`
	Description string `json:"description"`
	Enabled     bool   `json:"enabled"`
	Type        string `json:"type"` // transform, receive, send, route

	// Trigger configuration
	Trigger JobTrigger `json:"trigger"`

	// Job parameters
	Protocol    string `json:"protocol"`
	Source      string `json:"source,omitempty"`
	Destination string `json:"destination,omitempty"`
	MappingID   string `json:"mapping_id,omitempty"`
	RoutingRule string `json:"routing_rule,omitempty"`

	// Job execution settings
	Priority   int  `json:"priority"`
	MaxRetries int  `json:"max_retries"`
	Timeout    int  `json:"timeout"` // seconds
	AutoRoute  bool `json:"auto_route"`

	// Metadata and options
	Metadata map[string]interface{} `json:"metadata,omitempty"`
	Options  map[string]interface{} `json:"options,omitempty"`
}

JobDefinition defines a single job configuration

type JobHandler

type JobHandler struct {
	// contains filtered or unexported fields
}

JobHandler handles DataHub transformation and routing jobs

func NewJobHandler

func NewJobHandler(hub *DataHub, db *sql.DB, docDB *documents.DocDB, logger *logrus.Logger) *JobHandler

NewJobHandler creates a new DataHub job handler

func (*JobHandler) Disable

func (jh *JobHandler) Disable()

Disable disables the job handler

func (*JobHandler) Enable

func (jh *JobHandler) Enable()

Enable enables the job handler

func (*JobHandler) ExecuteReceiveJob

func (jh *JobHandler) ExecuteReceiveJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)

ExecuteReceiveJob executes a message receive job This polls a protocol adapter for new messages

func (*JobHandler) ExecuteSendJob

func (jh *JobHandler) ExecuteSendJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)

ExecuteSendJob executes a message send job This sends a message through a protocol adapter

func (*JobHandler) ExecuteTransformJob

func (jh *JobHandler) ExecuteTransformJob(ctx context.Context, payload map[string]interface{}) (map[string]interface{}, error)

ExecuteTransformJob executes a transformation job This is called by the job worker to process messages

func (*JobHandler) GetStats

func (jh *JobHandler) GetStats() map[string]interface{}

GetStats returns job handler statistics

func (*JobHandler) IsEnabled

func (jh *JobHandler) IsEnabled() bool

IsEnabled returns whether the job handler is enabled

type JobTrigger

type JobTrigger struct {
	Type      string                 `json:"type"`               // on_receive, on_schedule, on_event, manual
	Protocol  string                 `json:"protocol,omitempty"` // For on_receive triggers
	Topic     string                 `json:"topic,omitempty"`
	Schedule  string                 `json:"schedule,omitempty"` // Cron expression
	Event     string                 `json:"event,omitempty"`
	Condition *MappingCondition      `json:"condition,omitempty"`
	Config    map[string]interface{} `json:"config,omitempty"`
}

JobTrigger defines when a job should be created

type KafkaAdapter

type KafkaAdapter struct {
	// contains filtered or unexported fields
}

KafkaAdapter adapts Kafka consumer/producer to ProtocolAdapter interface

func NewKafkaAdapter

func NewKafkaAdapter(producer, consumer interface{}) *KafkaAdapter

NewKafkaAdapter creates a new Kafka adapter

func (*KafkaAdapter) Close

func (a *KafkaAdapter) Close() error

func (*KafkaAdapter) GetProtocolName

func (a *KafkaAdapter) GetProtocolName() string

func (*KafkaAdapter) Health

func (a *KafkaAdapter) Health() error

func (*KafkaAdapter) Initialize

func (a *KafkaAdapter) Initialize(config map[string]interface{}) error

func (*KafkaAdapter) Receive

func (a *KafkaAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*KafkaAdapter) Send

func (a *KafkaAdapter) Send(envelope *MessageEnvelope) error

type MQTTAdapter

type MQTTAdapter struct {
	// contains filtered or unexported fields
}

MQTTAdapter adapts MQTT client to ProtocolAdapter interface

func NewMQTTAdapter

func NewMQTTAdapter(client interface{}) *MQTTAdapter

NewMQTTAdapter creates a new MQTT adapter

func (*MQTTAdapter) Close

func (a *MQTTAdapter) Close() error

func (*MQTTAdapter) GetProtocolName

func (a *MQTTAdapter) GetProtocolName() string

func (*MQTTAdapter) Health

func (a *MQTTAdapter) Health() error

func (*MQTTAdapter) Initialize

func (a *MQTTAdapter) Initialize(config map[string]interface{}) error

func (*MQTTAdapter) Receive

func (a *MQTTAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*MQTTAdapter) Send

func (a *MQTTAdapter) Send(envelope *MessageEnvelope) error

type MappingCondition

type MappingCondition struct {
	Field    string      `json:"field"`    // Field to check
	Operator string      `json:"operator"` // eq, ne, gt, lt, contains, regex, exists
	Value    interface{} `json:"value"`
	LogicOp  string      `json:"logic_op,omitempty"` // and, or (for chaining conditions)
}

MappingCondition defines when a mapping should be applied

type MappingDefinition

type MappingDefinition struct {
	ID              string             `json:"id"`
	Name            string             `json:"name"`
	Description     string             `json:"description"`
	SourceProtocol  string             `json:"source_protocol"`
	SourceSchema    string             `json:"source_schema"`
	TargetProtocol  string             `json:"target_protocol"`
	TargetSchema    string             `json:"target_schema"`
	Mappings        []FieldMapping     `json:"mappings"`
	Transformations []TransformRule    `json:"transformations,omitempty"`
	Conditions      []MappingCondition `json:"conditions,omitempty"`
	Priority        int                `json:"priority,omitempty"`
	Active          bool               `json:"active"`
	CreatedAt       time.Time          `json:"created_at"`
	UpdatedAt       time.Time          `json:"updated_at"`
}

MappingDefinition defines how to transform messages between schemas

type MessageBusAdapter

type MessageBusAdapter struct {
	// contains filtered or unexported fields
}

MessageBusAdapter adapts internal message bus to ProtocolAdapter interface

func NewMessageBusAdapter

func NewMessageBusAdapter(bus interface{}) *MessageBusAdapter

NewMessageBusAdapter creates a new message bus adapter

func (*MessageBusAdapter) Close

func (a *MessageBusAdapter) Close() error

func (*MessageBusAdapter) GetProtocolName

func (a *MessageBusAdapter) GetProtocolName() string

func (*MessageBusAdapter) Health

func (a *MessageBusAdapter) Health() error

func (*MessageBusAdapter) Initialize

func (a *MessageBusAdapter) Initialize(config map[string]interface{}) error

func (*MessageBusAdapter) Receive

func (a *MessageBusAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*MessageBusAdapter) Send

func (a *MessageBusAdapter) Send(envelope *MessageEnvelope) error

type MessageEnvelope

type MessageEnvelope struct {
	ID            string                 `json:"id"`
	Protocol      string                 `json:"protocol"` // REST, SOAP, GraphQL, TCP, MQTT, Kafka, etc.
	Source        string                 `json:"source"`
	Destination   string                 `json:"destination"`
	Timestamp     time.Time              `json:"timestamp"`
	ContentType   string                 `json:"content_type"` // application/json, application/xml, text/plain, etc.
	Headers       map[string]interface{} `json:"headers,omitempty"`
	Body          interface{}            `json:"body"`
	OriginalBody  []byte                 `json:"original_body,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
	TransformPath []string               `json:"transform_path,omitempty"` // Track transformation history
}

MessageEnvelope represents a universal message container for all protocols

func CreateEnvelope

func CreateEnvelope(protocol, source, destination string, contentType string, body interface{}) *MessageEnvelope

CreateEnvelope creates a new message envelope

func (*MessageEnvelope) MarshalJSON

func (m *MessageEnvelope) MarshalJSON() ([]byte, error)

MarshalJSON custom marshaller for MessageEnvelope to handle binary data

type MessageHistory

type MessageHistory struct {
	// contains filtered or unexported fields
}

MessageHistory tracks message transformations for audit

func NewMessageHistory

func NewMessageHistory(maxEntries int) *MessageHistory

NewMessageHistory creates a new message history

func (*MessageHistory) Add

func (mh *MessageHistory) Add(entry MessageHistoryEntry)

Add adds a history entry

func (*MessageHistory) GetRecent

func (mh *MessageHistory) GetRecent(limit int) []MessageHistoryEntry

GetRecent returns recent history entries

type MessageHistoryEntry

type MessageHistoryEntry struct {
	MessageID   string                 `json:"message_id"`
	Timestamp   time.Time              `json:"timestamp"`
	SourceProto string                 `json:"source_protocol"`
	TargetProto string                 `json:"target_protocol"`
	MappingID   string                 `json:"mapping_id,omitempty"`
	Success     bool                   `json:"success"`
	Error       string                 `json:"error,omitempty"`
	Duration    time.Duration          `json:"duration"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

MessageHistoryEntry represents a single transformation event

type ProtocolAdapter

type ProtocolAdapter interface {
	// Send sends a message through this protocol
	Send(envelope *MessageEnvelope) error

	// Receive receives a message from this protocol (blocking or with timeout)
	Receive(timeout time.Duration) (*MessageEnvelope, error)

	// GetProtocolName returns the protocol name
	GetProtocolName() string

	// Initialize initializes the adapter with configuration
	Initialize(config map[string]interface{}) error

	// Close closes the adapter and releases resources
	Close() error

	// Health checks the health of the adapter
	Health() error
}

ProtocolAdapter interface for all protocol implementations

type RESTAdapter

type RESTAdapter struct {
	// contains filtered or unexported fields
}

RESTAdapter adapts REST client/server to ProtocolAdapter interface

func NewRESTAdapter

func NewRESTAdapter(client interface{}) *RESTAdapter

NewRESTAdapter creates a new REST adapter

func (*RESTAdapter) Close

func (a *RESTAdapter) Close() error

func (*RESTAdapter) GetProtocolName

func (a *RESTAdapter) GetProtocolName() string

func (*RESTAdapter) Health

func (a *RESTAdapter) Health() error

func (*RESTAdapter) Initialize

func (a *RESTAdapter) Initialize(config map[string]interface{}) error

func (*RESTAdapter) Receive

func (a *RESTAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*RESTAdapter) Send

func (a *RESTAdapter) Send(envelope *MessageEnvelope) error

type ResolvedEndpointConfig

type ResolvedEndpointConfig struct {
	Endpoint      *HubEndpoint      `json:"endpoint"`
	ProtocolGroup *HubProtocolGroup `json:"protocol_group"`
	Instance      *HubInstance      `json:"instance"`

	// Computed fields after inheritance
	FinalTimeout       int                    `json:"final_timeout"`
	FinalRetryAttempts int                    `json:"final_retry_attempts"`
	FinalRetryInterval int                    `json:"final_retry_interval"`
	FinalMessageType   string                 `json:"final_message_type"`
	FinalAuthType      string                 `json:"final_auth_type"`
	FinalAuthConfig    map[string]interface{} `json:"final_auth_config"`
	FinalConfig        map[string]interface{} `json:"final_config"`
}

ResolvedEndpointConfig represents endpoint configuration after inheritance resolution Used to get final effective configuration combining protocol group defaults and endpoint overrides

type RouteCondition

type RouteCondition struct {
	Field    string      `json:"field"`
	Operator string      `json:"operator"` // eq, ne, gt, lt, contains, regex, etc.
	Value    interface{} `json:"value"`
}

RouteCondition defines a routing condition

type RoutingRule

type RoutingRule struct {
	ID          string             `json:"id"`
	Name        string             `json:"name"`
	Source      string             `json:"source"`      // Source protocol/endpoint
	Destination string             `json:"destination"` // Destination protocol/endpoint
	Conditions  []MappingCondition `json:"conditions,omitempty"`
	MappingID   string             `json:"mapping_id,omitempty"` // Optional transformation
	Priority    int                `json:"priority"`
	Active      bool               `json:"active"`
}

RoutingRule defines how messages should be routed

type SOAPAdapter

type SOAPAdapter struct {
	// contains filtered or unexported fields
}

SOAPAdapter adapts SOAP client/server to ProtocolAdapter interface

func NewSOAPAdapter

func NewSOAPAdapter(client interface{}) *SOAPAdapter

NewSOAPAdapter creates a new SOAP adapter

func (*SOAPAdapter) Close

func (a *SOAPAdapter) Close() error

func (*SOAPAdapter) GetProtocolName

func (a *SOAPAdapter) GetProtocolName() string

func (*SOAPAdapter) Health

func (a *SOAPAdapter) Health() error

func (*SOAPAdapter) Initialize

func (a *SOAPAdapter) Initialize(config map[string]interface{}) error

func (*SOAPAdapter) Receive

func (a *SOAPAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*SOAPAdapter) Send

func (a *SOAPAdapter) Send(envelope *MessageEnvelope) error

type TCPAdapter

type TCPAdapter struct {
	// contains filtered or unexported fields
}

TCPAdapter adapts TCP client/server to ProtocolAdapter interface

func NewTCPAdapter

func NewTCPAdapter(client interface{}) *TCPAdapter

NewTCPAdapter creates a new TCP adapter

func (*TCPAdapter) Close

func (a *TCPAdapter) Close() error

func (*TCPAdapter) GetProtocolName

func (a *TCPAdapter) GetProtocolName() string

func (*TCPAdapter) Health

func (a *TCPAdapter) Health() error

func (*TCPAdapter) Initialize

func (a *TCPAdapter) Initialize(config map[string]interface{}) error

func (*TCPAdapter) Receive

func (a *TCPAdapter) Receive(timeout time.Duration) (*MessageEnvelope, error)

func (*TCPAdapter) Send

func (a *TCPAdapter) Send(envelope *MessageEnvelope) error

type TransformEngine

type TransformEngine struct {
	// contains filtered or unexported fields
}

TransformEngine handles message transformation logic

func NewTransformEngine

func NewTransformEngine() *TransformEngine

NewTransformEngine creates a new transform engine with built-in functions

func (*TransformEngine) Transform

func (te *TransformEngine) Transform(source *MessageEnvelope, mapping *MappingDefinition) (*MessageEnvelope, error)

Transform transforms a message envelope using a mapping definition

type TransformFunction

type TransformFunction func(input interface{}, params map[string]interface{}) (interface{}, error)

TransformFunction is a built-in transformation function

type TransformRule

type TransformRule struct {
	Type        string                 `json:"type"` // filter, enrich, aggregate, split, merge
	Description string                 `json:"description"`
	Config      map[string]interface{} `json:"config"`
	Order       int                    `json:"order"`
}

TransformRule defines a transformation to apply to the message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL