app

package
v0.0.0-...-c6cbf5f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: MIT Imports: 50 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DataLayoutSegmentCompatible  = "segment-compatible"
	DataLayoutSegmentSingleTable = "segment-single-table"
	DataLayoutJitsuLegacy        = "jitsu-legacy"
)
View Source
const MessageIdHeader = "message_id"

Variables

View Source
var TimestampPattern = regexp.MustCompile(`^\d{13}$`)

Functions

func Exit

func Exit(signal os.Signal)

func GetKafkaHeader

func GetKafkaHeader(message *kafka.Message, key string) string

func GetKafkaIntHeader

func GetKafkaIntHeader(message *kafka.Message, name string) (int, error)

func GetKafkaTimeHeader

func GetKafkaTimeHeader(message *kafka.Message, name string) (time.Time, error)

func MakeTopicId

func MakeTopicId(destinationId, mode, tableName string, checkLength bool) (string, error)

func ParseTopicId

func ParseTopicId(topic string) (destinationId, mode, tableName string, err error)

func ProducerErrorLabels

func ProducerErrorLabels(topicId string, errText string) (topic, destinationId, mode, tableName, err string)

func ProducerMessageLabels

func ProducerMessageLabels(topicId string, status, errText string) (topic, destinationId, mode, tableName, st string, err string)

func RedisError

func RedisError(err error) string

func RetryBackOffTime

func RetryBackOffTime(config *AppConfig, attempt int) time.Time

func Run

func Run()

TODO: graceful shutdown and cleanups. Flush producer

Types

type AbstractBatchConsumer

type AbstractBatchConsumer struct {
	sync.Mutex
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewAbstractBatchConsumer

func NewAbstractBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId, mode string, config *AppConfig, kafkaConfig *kafka.ConfigMap) (*AbstractBatchConsumer, error)

func (*AbstractBatchConsumer) BatchPeriodSec

func (bc *AbstractBatchConsumer) BatchPeriodSec() int

func (*AbstractBatchConsumer) ConsumeAll

func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error)

func (*AbstractBatchConsumer) Retire

func (bc *AbstractBatchConsumer) Retire()

Retire Mark consumer as retired Consumer will close itself when com

func (*AbstractBatchConsumer) RunJob

func (bc *AbstractBatchConsumer) RunJob()

func (*AbstractBatchConsumer) TopicId

func (bc *AbstractBatchConsumer) TopicId() string

func (*AbstractBatchConsumer) UpdateBatchPeriod

func (bc *AbstractBatchConsumer) UpdateBatchPeriod(batchPeriodSec int)

type ApiKey

type ApiKey struct {
	Id        string `json:"id"`
	Plaintext string `json:"plaintext"`
	Hash      string `json:"hash"`
	Hint      string `json:"hint"`
}

type ApiKeyBinding

type ApiKeyBinding struct {
	Hash     string `json:"hash"`
	KeyType  string `json:"keyType"`
	StreamId string `json:"streamId"`
}

type AppConfig

type AppConfig struct {
	// InstanceId ID of bulker instance. It is used for identifying Kafka consumers.
	// If is not set, instance id will be generated and persisted to disk (~/.bulkerapp/instance_id) and reused on next restart.
	// Default: random uuid
	InstanceId string `mapstructure:"INSTANCE_ID"`

	// HTTPPort port for bulker http server. Default: 3042
	HTTPPort int `mapstructure:"HTTP_PORT"`

	// AuthTokens A list of auth tokens that authorizes user in HTTP interface separated by comma. Each token can be either:
	// - `${token}` un-encrypted token value
	// - `${salt}.${hash}` hashed token.` ${salt}` should be random string. Hash is `base64(sha512($token + $salt + TokenSecrets)`.
	// - Token is `[0-9a-zA-Z_\-]` (only letters, digits, underscore and dash)
	AuthTokens string `mapstructure:"AUTH_TOKENS"`
	// See AuthTokens
	TokenSecrets string `mapstructure:"TOKEN_SECRET"`
	// For ingest endpoint only
	GlobalHashSecret string `mapstructure:"GLOBAL_HASH_SECRET" default:"dea42a58-acf4-45af-85bb-e77e94bd5025"`
	// For ingest endpoint only
	GlobalHashSecrets []string

	// LogFormat log format. Can be `text` or `json`. Default: `text`
	LogFormat string `mapstructure:"LOG_FORMAT"`

	// ConfigSource source of destinations configs. Can be:
	//  - `file://...`  for destinations config in yaml format
	//  - `redis` or `redis://redis_url` to load configs from redis `enrichedConnections` key
	//  - `env://PREFIX` to load each destination environment variables with like `PREFIX_ID` where ID is destination id
	//
	// Default: `env://BULKER_DESTINATION`
	ConfigSource string `mapstructure:"CONFIG_SOURCE"`
	// RedisURL that will be used by default by all services that need Redis
	RedisURL   string `mapstructure:"REDIS_URL"`
	RedisTLSCA string `mapstructure:"REDIS_TLS_CA"`

	// KubernetesNamespace namespace of bulker app. Default: `default`
	KubernetesNamespace    string `mapstructure:"KUBERNETES_NAMESPACE" default:"default"`
	KubernetesClientConfig string `mapstructure:"KUBERNETES_CLIENT_CONFIG"`

	// KafkaBootstrapServers List of Kafka brokers separated by comma. Each broker should be in format host:port.
	KafkaBootstrapServers string `mapstructure:"KAFKA_BOOTSTRAP_SERVERS"`
	KafkaSSL              bool   `mapstructure:"KAFKA_SSL" default:"false"`
	KafkaSSLSkipVerify    bool   `mapstructure:"KAFKA_SSL_SKIP_VERIFY" default:"false"`
	//Kafka authorization as JSON object {"mechanism": "SCRAM-SHA-256|PLAIN", "username": "user", "password": "password"}
	KafkaSASL string `mapstructure:"KAFKA_SASL"`

	KafkaTopicCompression                    string `mapstructure:"KAFKA_TOPIC_COMPRESSION" default:"snappy"`
	KafkaTopicRetentionHours                 int    `mapstructure:"KAFKA_TOPIC_RETENTION_HOURS" default:"168"`
	KafkaRetryTopicRetentionHours            int    `mapstructure:"KAFKA_RETRY_TOPIC_RETENTION_HOURS" default:"168"`
	KafkaRetryTopicSegmentBytes              int    `mapstructure:"KAFKA_RETRY_TOPIC_SEGMENT_BYTES" default:"104857600"`
	KafkaDeadTopicRetentionHours             int    `mapstructure:"KAFKA_DEAD_TOPIC_RETENTION_HOURS" default:"168"`
	KafkaTopicReplicationFactor              int    `mapstructure:"KAFKA_TOPIC_REPLICATION_FACTOR"`
	KafkaAdminMetadataTimeoutMs              int    `mapstructure:"KAFKA_ADMIN_METADATA_TIMEOUT_MS" default:"1000"`
	KafkaConsumerPartitionsAssigmentStrategy string `mapstructure:"KAFKA_CONSUMER_PARTITIONS_ASSIGMENT_STRATEGY" default:"cooperative-sticky"`

	// KafkaDestinationsTopicName destination topic for /ingest endpoint
	KafkaDestinationsTopicName       string `mapstructure:"KAFKA_DESTINATIONS_TOPIC_NAME" default:"destination-messages"`
	KafkaDestinationsTopicPartitions int    `mapstructure:"KAFKA_DESTINATIONS_TOPIC_PARTITIONS" default:"4"`

	KafkaDestinationsDeadLetterTopicName string `mapstructure:"KAFKA_DESTINATIONS_DEAD_LETTER_TOPIC_NAME" default:"destination-messages-dead-letter"`

	// TopicManagerRefreshPeriodSec how often topic manager will check for new topics
	TopicManagerRefreshPeriodSec int `mapstructure:"TOPIC_MANAGER_REFRESH_PERIOD_SEC" default:"5"`

	// ProducerWaitForDeliveryMs For ProduceSync only is a timeout for producer to wait for delivery report.
	ProducerWaitForDeliveryMs int `mapstructure:"PRODUCER_WAIT_FOR_DELIVERY_MS" default:"1000"`

	BatchRunnerPeriodSec        int `mapstructure:"BATCH_RUNNER_DEFAULT_PERIOD_SEC" default:"300"`
	BatchRunnerDefaultBatchSize int `mapstructure:"BATCH_RUNNER_DEFAULT_BATCH_SIZE" default:"10000"`
	// BatchRunnerWaitForMessagesSec when there are no more messages in the topic BatchRunner will wait for BatchRunnerWaitForMessagesSec seconds before sending a batch
	BatchRunnerWaitForMessagesSec int `mapstructure:"BATCH_RUNNER_WAIT_FOR_MESSAGES_SEC" default:"1"`

	BatchRunnerRetryPeriodSec            int     `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_PERIOD_SEC" default:"300"`
	BatchRunnerDefaultRetryBatchFraction float64 `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_BATCH_FRACTION" default:"0.1"`
	MessagesRetryCount                   int     `mapstructure:"MESSAGES_RETRY_COUNT" default:"5"`
	// MessagesRetryBackoffBase defines base for exponential backoff in minutes.
	// For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes.
	// Default: 5
	MessagesRetryBackoffBase float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_BASE" default:"5"`
	// MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours
	MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"1440"`

	EventsLogRedisURL string `mapstructure:"EVENTS_LOG_REDIS_URL"`
	EventsLogMaxSize  int    `mapstructure:"EVENTS_LOG_MAX_SIZE" default:"1000"`

	MetricsPort             int    `mapstructure:"METRICS_PORT" default:"9091"`
	MetricsRelayDestination string `mapstructure:"METRICS_RELAY_DESTINATION"`
	MetricsRelayPeriodSec   int    `mapstructure:"METRICS_RELAY_PERIOD_SEC" default:"60"`

	//Timeout that give running batch tasks time to finish during shutdown.
	ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"`
	//Extra delay may be needed. E.g. for metric scrapper to scrape final metrics. So http server will stay active for an extra period.
	ShutdownExtraDelay int `mapstructure:"SHUTDOWN_EXTRA_DELAY_SEC"`
}

AppConfig is a struct for bulker app configuration It is loaded from `bulker.env` config file or environment variables.

Environment variables requires prefix `BULKER_`

func InitAppConfig

func InitAppConfig() (*AppConfig, error)

func (*AppConfig) GetKafkaConfig

func (ac *AppConfig) GetKafkaConfig() *kafka.ConfigMap

GetKafkaConfig returns kafka config

type AppContext

type AppContext struct {
	// contains filtered or unexported fields
}

func InitAppContext

func InitAppContext() *AppContext

func (*AppContext) Shutdown

func (a *AppContext) Shutdown()

type BatchConsumer

type BatchConsumer interface {
	RunJob()
	ConsumeAll() (consumed BatchCounters, err error)
	Retire()
	BatchPeriodSec() int
	UpdateBatchPeriod(batchPeriodSec int)
	TopicId() string
}

type BatchConsumerImpl

type BatchConsumerImpl struct {
	*AbstractBatchConsumer
	// contains filtered or unexported fields
}

func NewBatchConsumer

func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *AppConfig, kafkaConfig *kafka.ConfigMap, eventsLogService EventsLogService) (*BatchConsumerImpl, error)

type BatchCounters

type BatchCounters struct {
	// contains filtered or unexported fields
}

func (BatchCounters) String

func (bs BatchCounters) String() string

to string

type BatchFunction

type BatchFunction func(destination *Destination, batchNum, batchSize, retryBatchSize int) (counters BatchCounters, nextBatch bool, err error)

type BatchState

type BatchState struct {
	bulker.State  `json:",inline"`
	LastMappedRow []types.Object `json:"lastMappedRow"`
}

type ConfigurationSource

type ConfigurationSource interface {
	io.Closer
	GetDestinationConfigs() []*DestinationConfig
	GetDestinationConfig(id string) *DestinationConfig
	ChangesChannel() <-chan bool
}

func InitConfigurationSource

func InitConfigurationSource(config *AppConfig) (ConfigurationSource, error)

type Cron

type Cron struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewCron

func NewCron(config *AppConfig) *Cron

func (*Cron) AddBatchConsumer

func (c *Cron) AddBatchConsumer(batchConsumer BatchConsumer) (*gocron.Job, error)

func (*Cron) Close

func (c *Cron) Close()

Close scheduler

func (*Cron) RemoveBatchConsumer

func (c *Cron) RemoveBatchConsumer(batchConsumer BatchConsumer) error

func (*Cron) ReplaceBatchConsumer

func (c *Cron) ReplaceBatchConsumer(batchConsumer BatchConsumer) (*gocron.Job, error)

type DataLayout

type DataLayout string

type Destination

type Destination struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Destination) Id

func (d *Destination) Id() string

Id returns destination id

func (*Destination) Lease

func (d *Destination) Lease()

Lease destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)

func (*Destination) Mode

func (d *Destination) Mode() bulker.BulkMode

Mode returns destination mode

func (*Destination) Release

func (d *Destination) Release()

Release destination. See Lease

func (*Destination) TopicId

func (d *Destination) TopicId(tableName string) (string, error)

TopicId generates topic id for Destination

type DestinationConfig

type DestinationConfig struct {
	UpdatedAt           time.Time `mapstructure:"updatedAt" json:"updatedAt"`
	UsesBulker          bool      `mapstructure:"usesBulker" json:"usesBulker"`
	bulker.Config       `mapstructure:",squash"`
	bulker.StreamConfig `mapstructure:",squash"`
}

func (*DestinationConfig) Id

func (dc *DestinationConfig) Id() string

type DummyEventsLogService

type DummyEventsLogService struct{}

func (*DummyEventsLogService) Close

func (d *DummyEventsLogService) Close() error

func (*DummyEventsLogService) GetEvents

func (d *DummyEventsLogService) GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)

func (*DummyEventsLogService) PostEvent

func (d *DummyEventsLogService) PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error)

type EnvConfigurationSource

type EnvConfigurationSource struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewEnvConfigurationSource

func NewEnvConfigurationSource(prefix string) *EnvConfigurationSource

func (*EnvConfigurationSource) ChangesChannel

func (ecs *EnvConfigurationSource) ChangesChannel() <-chan bool

func (*EnvConfigurationSource) Close

func (ecs *EnvConfigurationSource) Close() error

func (*EnvConfigurationSource) GetDestinationConfig

func (ecs *EnvConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*EnvConfigurationSource) GetDestinationConfigs

func (ecs *EnvConfigurationSource) GetDestinationConfigs() []*DestinationConfig

func (*EnvConfigurationSource) GetValue

func (ecs *EnvConfigurationSource) GetValue(key string) any

type EventStatus

type EventStatus string

type EventType

type EventType string
const (
	EventTypeIncomingAll   EventType = "incoming.all"
	EventTypeIncomingError EventType = "incoming.error"

	EventTypeProcessedAll   EventType = "bulker_stream.all"
	EventTypeProcessedError EventType = "bulker_stream.error"

	EventTypeBatchAll   EventType = "bulker_batch.all"
	EventTypeBatchError EventType = "bulker_batch.error"
)

type EventsLogFilter

type EventsLogFilter struct {
	Start    time.Time
	End      time.Time
	BeforeId EventsLogRecordId
	Filter   func(event any) bool
}

func (*EventsLogFilter) GetStartAndEndIds

func (f *EventsLogFilter) GetStartAndEndIds() (start, end string, err error)

GetStartAndEndIds returns end and start ids for the stream

type EventsLogRecord

type EventsLogRecord struct {
	Id      EventsLogRecordId `json:"id"`
	Date    time.Time         `json:"date"`
	Content any               `json:"content"`
}

type EventsLogRecordId

type EventsLogRecordId string

type EventsLogService

type EventsLogService interface {
	io.Closer
	// PostEvent posts event to the events log
	// actorId – id of entity of event origin. E.g. for 'incoming' event - id of site, for 'processed' event - id of destination
	PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error)

	GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)
}

type FastStore

type FastStore struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewFastStore

func NewFastStore(config *AppConfig) (*FastStore, error)

func (*FastStore) Close

func (fs *FastStore) Close() error

func (*FastStore) GetStreamById

func (fs *FastStore) GetStreamById(slug string) (*StreamWithDestinations, error)

func (*FastStore) GetStreamsByDomain

func (fs *FastStore) GetStreamsByDomain(domain string) ([]StreamWithDestinations, error)

type IngestMessage

type IngestMessage struct {
	IngestType     string              `json:"ingestType"`
	ConnectionId   string              `json:"connectionId"`
	MessageCreated time.Time           `json:"messageCreated"`
	WriteKey       string              `json:"writeKey"`
	MessageId      string              `json:"messageId"`
	Type           string              `json:"type"`
	Origin         IngestMessageOrigin `json:"origin"`
	HttpHeaders    map[string]string   `json:"httpHeaders"`
	HttpPayload    map[string]any      `json:"httpPayload"`
}

type IngestMessageOrigin

type IngestMessageOrigin struct {
	BaseURL string `json:"baseUrl"`
	Slug    string `json:"slug"`
	Domain  string `json:"domain"`
}

type JobRunner

type JobRunner struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewJobRunner

func NewJobRunner(appContext *AppContext) (*JobRunner, error)

func (*JobRunner) Close

func (j *JobRunner) Close() error

func (*JobRunner) SpecHandler

func (j *JobRunner) SpecHandler(c *gin.Context)

type MetricsRelay

type MetricsRelay struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewMetricsRelay

func NewMetricsRelay(appConfig *AppConfig) (*MetricsRelay, error)

func (*MetricsRelay) Push

func (m *MetricsRelay) Push() (err error)

Push metrics to destination

func (*MetricsRelay) Stop

func (m *MetricsRelay) Stop() error

Stop metrics relay

type MetricsServer

type MetricsServer struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewMetricsServer

func NewMetricsServer(appconfig *AppConfig) *MetricsServer

func (*MetricsServer) Stop

func (s *MetricsServer) Stop() error

type MultiConfigurationSource

type MultiConfigurationSource struct {
	// contains filtered or unexported fields
}

func NewMultiConfigurationSource

func NewMultiConfigurationSource(configurationSources []ConfigurationSource) *MultiConfigurationSource

func (*MultiConfigurationSource) ChangesChannel

func (mcs *MultiConfigurationSource) ChangesChannel() <-chan bool

func (*MultiConfigurationSource) Close

func (mcs *MultiConfigurationSource) Close() error

func (*MultiConfigurationSource) GetDestinationConfig

func (mcs *MultiConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*MultiConfigurationSource) GetDestinationConfigs

func (mcs *MultiConfigurationSource) GetDestinationConfigs() []*DestinationConfig

type Producer

type Producer struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(config *AppConfig, kafkaConfig *kafka.ConfigMap) (*Producer, error)

NewProducer creates new Producer

func (*Producer) Close

func (p *Producer) Close() error

Close closes producer

func (*Producer) ProduceAsync

func (p *Producer) ProduceAsync(topic string, messageKey string, event []byte) error

ProduceAsync TODO: transactional delivery? produces messages to kafka

func (*Producer) ProduceSync

func (p *Producer) ProduceSync(topic string, events ...kafka.Message) error

ProduceSync TODO: transactional delivery? produces messages to kafka

func (*Producer) Start

func (p *Producer) Start()

type RedisConfigurationSource

type RedisConfigurationSource struct {
	objects.ServiceBase
	sync.Mutex
	// contains filtered or unexported fields
}

func NewRedisConfigurationSource

func NewRedisConfigurationSource(appconfig *AppConfig) (*RedisConfigurationSource, error)

func (*RedisConfigurationSource) ChangesChannel

func (rcs *RedisConfigurationSource) ChangesChannel() <-chan bool

func (*RedisConfigurationSource) Close

func (rcs *RedisConfigurationSource) Close() error

func (*RedisConfigurationSource) GetDestinationConfig

func (rcs *RedisConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*RedisConfigurationSource) GetDestinationConfigs

func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfig

func (*RedisConfigurationSource) GetValue

func (rcs *RedisConfigurationSource) GetValue(key string) any

type RedisEventsLog

type RedisEventsLog struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewRedisEventsLog

func NewRedisEventsLog(config *AppConfig, redisUrl string) (*RedisEventsLog, error)

func (*RedisEventsLog) Close

func (r *RedisEventsLog) Close() error

func (*RedisEventsLog) GetEvents

func (r *RedisEventsLog) GetEvents(eventType EventType, actorId string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)

func (*RedisEventsLog) PostEvent

func (r *RedisEventsLog) PostEvent(eventType EventType, actorId string, event any) (id EventsLogRecordId, err error)

type Repository

type Repository struct {
	objects.ServiceBase
	sync.Mutex
	// contains filtered or unexported fields
}

func NewRepository

func NewRepository(config *AppConfig, configurationSource ConfigurationSource) (*Repository, error)

func (*Repository) ChangesChannel

func (r *Repository) ChangesChannel() <-chan RepositoryChange

func (*Repository) Close

func (r *Repository) Close() error

Close Repository

func (*Repository) GetDestination

func (r *Repository) GetDestination(id string) *Destination

func (*Repository) GetDestinations

func (r *Repository) GetDestinations() []*Destination

func (*Repository) LeaseDestination

func (r *Repository) LeaseDestination(id string) *Destination

LeaseDestination destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)

type RepositoryChange

type RepositoryChange struct {
	AddedDestinations     []*Destination
	ChangedDestinations   []*Destination
	RemovedDestinationIds []string
}

type RetryConsumer

type RetryConsumer struct {
	*AbstractBatchConsumer
}

func NewRetryConsumer

func NewRetryConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *AppConfig, kafkaConfig *kafka.ConfigMap) (*RetryConsumer, error)

type Router

type Router struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(appContext *AppContext, jobRunner *JobRunner) *Router

func (*Router) AuthMiddleware

func (r *Router) AuthMiddleware(c *gin.Context)

func (*Router) BulkHandler

func (r *Router) BulkHandler(c *gin.Context)

func (*Router) EventsHandler

func (r *Router) EventsHandler(c *gin.Context)

func (*Router) EventsLogHandler

func (r *Router) EventsLogHandler(c *gin.Context)

EventsLogHandler - gets events log by EventType, actor id. Filtered by date range and cursorId

func (*Router) FailedHandler

func (r *Router) FailedHandler(c *gin.Context)

func (*Router) GetEngine

func (r *Router) GetEngine() *gin.Engine

GetEngine returns gin router

func (*Router) IngestHandler

func (r *Router) IngestHandler(c *gin.Context)

func (*Router) ResponseError

func (r *Router) ResponseError(c *gin.Context, code int, errorType string, maskError bool, err error, logFormat string, logArgs ...any) RouterError

func (*Router) TestConnectionHandler

func (r *Router) TestConnectionHandler(c *gin.Context)

type RouterError

type RouterError struct {
	Error       error
	PublicError error
	ErrorType   string
}

type ShortDestinationConfig

type ShortDestinationConfig struct {
	TagDestinationConfig
	Id              string `json:"id"`
	ConnectionId    string `json:"connectionId"`
	DestinationType string `json:"destinationType"`
}

type StreamConfig

type StreamConfig struct {
	Id                          string   `json:"id"`
	Type                        string   `json:"type"`
	WorkspaceId                 string   `json:"workspaceId"`
	Slug                        string   `json:"slug"`
	Name                        string   `json:"name"`
	Domains                     []string `json:"domains"`
	AuthorizedJavaScriptDomains string   `json:"authorizedJavaScriptDomains"`
	PublicKeys                  []ApiKey `json:"publicKeys"`
	PrivateKeys                 []ApiKey `json:"privateKeys""`
	DataLayout                  string   `json:"dataLayout"`
}

type StreamConsumer

type StreamConsumer struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewStreamConsumer

func NewStreamConsumer(repository *Repository, destination *Destination, topicId string, config *AppConfig, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService EventsLogService) (*StreamConsumer, error)

func (*StreamConsumer) Close

func (sc *StreamConsumer) Close() error

Close consumer

func (*StreamConsumer) UpdateDestination

func (sc *StreamConsumer) UpdateDestination(destination *Destination) error

UpdateDestination

type StreamWithDestinations

type StreamWithDestinations struct {
	Stream                   StreamConfig             `json:"stream"`
	SynchronousDestinations  []ShortDestinationConfig `json:"synchronousDestinations"`
	AsynchronousDestinations []ShortDestinationConfig `json:"asynchronousDestinations"`
}

type TagDestinationConfig

type TagDestinationConfig struct {
	Mode string `json:"mode"`
	Code string `json:"code"`
}

type TopicManager

type TopicManager struct {
	objects.ServiceBase
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTopicManager

func NewTopicManager(appContext *AppContext) (*TopicManager, error)

NewTopicManager returns TopicManager

func (*TopicManager) Close

func (tm *TopicManager) Close() error

func (*TopicManager) EnsureDestinationTopic

func (tm *TopicManager) EnsureDestinationTopic(destination *Destination, topicId string) error

EnsureDestinationTopic creates destination topic if it doesn't exist

func (*TopicManager) IsReady

func (tm *TopicManager) IsReady() bool

IsReady returns true if topic manager is ready to serve requests

func (*TopicManager) LoadMetadata

func (tm *TopicManager) LoadMetadata()

func (*TopicManager) Refresh

func (tm *TopicManager) Refresh()

func (*TopicManager) Start

func (tm *TopicManager) Start()

Start starts TopicManager

type YamlConfigurationSource

type YamlConfigurationSource struct {
	objects.ServiceBase
	// contains filtered or unexported fields
}

func NewYamlConfigurationSource

func NewYamlConfigurationSource(data []byte) (*YamlConfigurationSource, error)

func (*YamlConfigurationSource) ChangesChannel

func (ycp *YamlConfigurationSource) ChangesChannel() <-chan bool

func (*YamlConfigurationSource) Close

func (ycp *YamlConfigurationSource) Close() error

func (*YamlConfigurationSource) GetDestinationConfig

func (ycp *YamlConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*YamlConfigurationSource) GetDestinationConfigs

func (ycp *YamlConfigurationSource) GetDestinationConfigs() []*DestinationConfig

func (*YamlConfigurationSource) GetValue

func (ycp *YamlConfigurationSource) GetValue(key string) any

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL