connector

package
v0.0.0-...-dac86b4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	ID              string          `json:"id"`
	Source          string          `json:"source"`
	Type            string          `json:"type"`
	Subject         string          `json:"subject,omitempty"`
	Time            time.Time       `json:"time"`
	Data            json.RawMessage `json:"data"`
	DataSchema      string          `json:"dataschema,omitempty"`
	DataContentType string          `json:"datacontenttype,omitempty"`
	// Internal metadata (not serialized to CloudEvents)
	TenantID       string `json:"-"`
	PipelineID     string `json:"-"`
	IdempotencyKey string `json:"-"`
}

Event is the universal event envelope (CloudEvents compatible).

type EventSink

type EventSink interface {
	Name() string
	Type() string
	Deliver(ctx context.Context, event Event) error
	DeliverBatch(ctx context.Context, events []Event) []error
	Stop(ctx context.Context) error
	Healthy() bool
}

EventSink defines the interface for event egress connectors.

func WebhookSinkFactory

func WebhookSinkFactory(name string, config map[string]any) (EventSink, error)

WebhookSinkFactory is a SinkFactory for creating WebhookSink instances.

type EventSource

type EventSource interface {
	Name() string
	Type() string
	Start(ctx context.Context, output chan<- Event) error
	Stop(ctx context.Context) error
	Healthy() bool
	Checkpoint(ctx context.Context) error
}

EventSource defines the interface for event ingress connectors.

func WebhookSourceFactory

func WebhookSourceFactory(name string, config map[string]any) (EventSource, error)

WebhookSourceFactory is a SourceFactory for creating WebhookSource instances.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry manages connector factories and instances.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates an empty connector registry.

func (*Registry) CreateSink

func (r *Registry) CreateSink(connectorType, name string, config map[string]any) (EventSink, error)

CreateSink creates and tracks a new EventSink instance.

func (*Registry) CreateSource

func (r *Registry) CreateSource(connectorType, name string, config map[string]any) (EventSource, error)

CreateSource creates and tracks a new EventSource instance.

func (*Registry) GetInstance

func (r *Registry) GetInstance(name string) (any, bool)

GetInstance returns a running connector instance by name.

func (*Registry) ListSinks

func (r *Registry) ListSinks() []string

ListSinks returns the registered sink connector type names.

func (*Registry) ListSources

func (r *Registry) ListSources() []string

ListSources returns the registered source connector type names.

func (*Registry) RegisterSink

func (r *Registry) RegisterSink(connectorType string, factory SinkFactory) error

RegisterSink registers a SinkFactory for the given connector type.

func (*Registry) RegisterSource

func (r *Registry) RegisterSource(connectorType string, factory SourceFactory) error

RegisterSource registers a SourceFactory for the given connector type.

func (*Registry) StopAll

func (r *Registry) StopAll(ctx context.Context) error

StopAll stops all running connector instances and clears the instance map.

type SinkFactory

type SinkFactory func(name string, config map[string]any) (EventSink, error)

SinkFactory creates EventSink instances from config.

type SourceFactory

type SourceFactory func(name string, config map[string]any) (EventSource, error)

SourceFactory creates EventSource instances from config.

type WebhookSink

type WebhookSink struct {
	// contains filtered or unexported fields
}

WebhookSink is an EventSink that delivers events to HTTP endpoints. It wraps the webhook delivery pattern used by the existing webhook.sender module, adding CloudEvents envelope support.

func NewWebhookSink

func NewWebhookSink(name string, config map[string]any) (*WebhookSink, error)

NewWebhookSink creates a WebhookSink from a config map. Supported config keys: url, method, headers, retry (max_attempts, backoff).

func (*WebhookSink) Deliver

func (ws *WebhookSink) Deliver(ctx context.Context, event Event) error

Deliver sends a single event to the configured HTTP endpoint with retry.

func (*WebhookSink) DeliverBatch

func (ws *WebhookSink) DeliverBatch(ctx context.Context, events []Event) []error

DeliverBatch delivers multiple events individually, returning per-event errors. A nil entry in the returned slice means the corresponding event was delivered successfully.

func (*WebhookSink) Healthy

func (ws *WebhookSink) Healthy() bool

Healthy returns true when the sink is operational.

func (*WebhookSink) Name

func (ws *WebhookSink) Name() string

Name returns the connector instance name.

func (*WebhookSink) SetClient

func (ws *WebhookSink) SetClient(client *http.Client)

SetClient sets a custom HTTP client (useful for testing).

func (*WebhookSink) Stop

func (ws *WebhookSink) Stop(_ context.Context) error

Stop marks the sink as unhealthy. The underlying HTTP client does not require explicit shutdown.

func (*WebhookSink) Type

func (ws *WebhookSink) Type() string

Type returns the connector type identifier.

type WebhookSinkRetryConfig

type WebhookSinkRetryConfig struct {
	MaxAttempts int           `json:"max_attempts" yaml:"max_attempts"`
	Backoff     time.Duration `json:"backoff" yaml:"backoff"`
}

WebhookSinkRetryConfig controls retry behavior for webhook delivery.

type WebhookSource

type WebhookSource struct {
	// contains filtered or unexported fields
}

WebhookSource is an EventSource that receives HTTP webhooks and emits them as CloudEvents. It wraps the common HTTP trigger pattern used elsewhere in the workflow engine.

func NewWebhookSource

func NewWebhookSource(name string, config map[string]any) (*WebhookSource, error)

NewWebhookSource creates a WebhookSource from a config map. Supported config keys: address, path, secret.

func (*WebhookSource) Addr

func (ws *WebhookSource) Addr() string

Addr returns the resolved listen address. Useful when the source was started on port 0 to let the OS pick a port.

func (*WebhookSource) Checkpoint

func (ws *WebhookSource) Checkpoint(_ context.Context) error

Checkpoint is a no-op for webhooks (stateless).

func (*WebhookSource) Healthy

func (ws *WebhookSource) Healthy() bool

Healthy returns true when the server is running.

func (*WebhookSource) Name

func (ws *WebhookSource) Name() string

Name returns the connector instance name.

func (*WebhookSource) Start

func (ws *WebhookSource) Start(ctx context.Context, output chan<- Event) error

Start begins listening for HTTP webhooks, writing received events to output.

func (*WebhookSource) Stop

func (ws *WebhookSource) Stop(ctx context.Context) error

Stop gracefully shuts down the HTTP server.

func (*WebhookSource) Type

func (ws *WebhookSource) Type() string

Type returns the connector type identifier.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL