Documentation ¶
Index ¶
- Constants
- Variables
- func ExcludeConsumerForTopic[T Consumer](consumers []T, topicId string, cron *Cron) []T
- func IsValidTopicName(str string) bool
- func MakeTopicId(destinationId, mode, tableName string, checkLength bool) (string, error)
- func ParseTopicId(topic string) (destinationId, mode, tableName string, err error)
- func ProducerMessageLabels(topicId string, status, errText string) (topic, destinationId, mode, tableName, st string, err string)
- func RedisError(err error) string
- func RetryBackOffTime(config *Config, attempt int) time.Time
- type AbstractBatchConsumer
- func (bc *AbstractBatchConsumer) BatchPeriodSec() int
- func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error)
- func (bc *AbstractBatchConsumer) Retire()
- func (bc *AbstractBatchConsumer) RunJob()
- func (bc *AbstractBatchConsumer) TopicId() string
- func (bc *AbstractBatchConsumer) UpdateBatchPeriod(batchPeriodSec int)
- type AbstractConsumer
- type ApiKey
- type ApiKeyBinding
- type BatchConsumer
- type BatchConsumerImpl
- type BatchCounters
- type BatchFunction
- type BatchState
- type Config
- type ConfigurationSource
- type Consumer
- type Context
- type Cron
- type DataLayout
- type Destination
- type DestinationConfig
- type Destinations
- type DestinationsRepositoryData
- type EnvConfigurationSource
- func (ecs *EnvConfigurationSource) ChangesChannel() <-chan bool
- func (ecs *EnvConfigurationSource) Close() error
- func (ecs *EnvConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
- func (ecs *EnvConfigurationSource) GetDestinationConfigs() []*DestinationConfig
- func (ecs *EnvConfigurationSource) GetValue(key string) any
- type FastStore
- type HTTPConfigurationSource
- type IngestMessage
- type IngestMessageOrigin
- type MetricsServer
- type MultiConfigurationSource
- type PostgresConfigurationSource
- type Producer
- type RedisConfigurationSource
- func (rcs *RedisConfigurationSource) ChangesChannel() <-chan bool
- func (rcs *RedisConfigurationSource) Close() error
- func (rcs *RedisConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
- func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfig
- func (rcs *RedisConfigurationSource) GetValue(key string) any
- type Repository
- type RepositoryCache
- type RepositoryChange
- type RetryConsumer
- type Router
- func (r *Router) AmbiguousDomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
- func (r *Router) BulkHandler(c *gin.Context)
- func (r *Router) DomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
- func (r *Router) EventsHandler(c *gin.Context)
- func (r *Router) EventsLogHandler(c *gin.Context)
- func (r *Router) FailedHandler(c *gin.Context)
- func (r *Router) Health(c *gin.Context)
- func (r *Router) SlugStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
- func (r *Router) TestConnectionHandler(c *gin.Context)
- func (r *Router) WriteKeyStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
- type ShortDestinationConfig
- type ShouldConsumeFunction
- type StreamConfig
- type StreamConsumer
- type StreamConsumerImpl
- type StreamCredentials
- type StreamLocator
- type StreamWithDestinations
- type StreamWrapper
- type TagDestinationConfig
- type TopicManager
- type YamlConfigurationSource
- func (ycp *YamlConfigurationSource) ChangesChannel() <-chan bool
- func (ycp *YamlConfigurationSource) Close() error
- func (ycp *YamlConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
- func (ycp *YamlConfigurationSource) GetDestinationConfigs() []*DestinationConfig
- func (ycp *YamlConfigurationSource) GetValue(key string) any
Constants ¶
const ( DataLayoutSegmentCompatible = "segment-compatible" DataLayoutSegmentSingleTable = "segment-single-table" DataLayoutJitsuLegacy = "jitsu-legacy" )
const MetricsMetaHeader = "metrics_meta"
const SQLLastUpdatedQuery = `select * from last_updated`
Variables ¶
var TimestampPattern = regexp.MustCompile(`^\d{13}$`)
var WriteKeyPattern = regexp.MustCompile(`"writeKey":\s*"([^:"]+)?(:)?([^"]+)?"`)
Functions ¶
func ExcludeConsumerForTopic ¶
func IsValidTopicName ¶
func MakeTopicId ¶
func ParseTopicId ¶
func ProducerMessageLabels ¶
func RedisError ¶
Types ¶
type AbstractBatchConsumer ¶
type AbstractBatchConsumer struct { sync.Mutex *AbstractConsumer // contains filtered or unexported fields }
func NewAbstractBatchConsumer ¶
func NewAbstractBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId, mode string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer) (*AbstractBatchConsumer, error)
func (*AbstractBatchConsumer) BatchPeriodSec ¶
func (bc *AbstractBatchConsumer) BatchPeriodSec() int
func (*AbstractBatchConsumer) ConsumeAll ¶
func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error)
func (*AbstractBatchConsumer) Retire ¶
func (bc *AbstractBatchConsumer) Retire()
Retire Mark consumer as retired Consumer will close itself when com
func (*AbstractBatchConsumer) RunJob ¶
func (bc *AbstractBatchConsumer) RunJob()
func (*AbstractBatchConsumer) TopicId ¶
func (bc *AbstractBatchConsumer) TopicId() string
func (*AbstractBatchConsumer) UpdateBatchPeriod ¶
func (bc *AbstractBatchConsumer) UpdateBatchPeriod(batchPeriodSec int)
type AbstractConsumer ¶
func NewAbstractConsumer ¶
func NewAbstractConsumer(config *Config, repository *Repository, topicId string, bulkerProducer *Producer) *AbstractConsumer
func (*AbstractConsumer) GetInstanceId ¶
func (ac *AbstractConsumer) GetInstanceId() string
func (*AbstractConsumer) SendMetrics ¶
func (ac *AbstractConsumer) SendMetrics(metricsMeta string, status string, events int)
type ApiKeyBinding ¶
type BatchConsumer ¶
type BatchConsumer interface { Consumer RunJob() ConsumeAll() (consumed BatchCounters, err error) BatchPeriodSec() int UpdateBatchPeriod(batchPeriodSec int) }
type BatchConsumerImpl ¶
type BatchConsumerImpl struct { *AbstractBatchConsumer // contains filtered or unexported fields }
func NewBatchConsumer ¶
func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService eventslog.EventsLogService) (*BatchConsumerImpl, error)
type BatchCounters ¶
type BatchCounters struct {
// contains filtered or unexported fields
}
type BatchFunction ¶
type BatchFunction func(destination *Destination, batchNum, batchSize, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error)
type BatchState ¶
type Config ¶
type Config struct { appbase.Config `mapstructure:",squash"` kafkabase.KafkaConfig `mapstructure:",squash"` // # EVENTS LOG CONFIG - settings for events log eventslog.EventsLogConfig `mapstructure:",squash"` // For ingest endpoint only GlobalHashSecret string `mapstructure:"GLOBAL_HASH_SECRET" default:"dea42a58-acf4-45af-85bb-e77e94bd5025"` // For ingest endpoint only GlobalHashSecrets []string // ConfigSource source of destinations configs. Can be: // - `file://...` for destinations config in yaml format // - `http://...` for destinations config in json array format // - `redis` or `redis://redis_url` to load configs from redis `enrichedConnections` key // - postgresql://postgres_url to load configs from postgresql // - `env://PREFIX` to load each destination environment variables with like `PREFIX_ID` where ID is destination id // // Default: `env://BULKER_DESTINATION` ConfigSource string `mapstructure:"CONFIG_SOURCE"` // ConfigSourceHTTPAuthToken auth token for http:// config source ConfigSourceHTTPAuthToken string `mapstructure:"CONFIG_SOURCE_HTTP_AUTH_TOKEN"` // ConfigSourceSQLQuery for `postgresql` config source, SQL query to load connections ConfigSourceSQLQuery string `mapstructure:"CONFIG_SOURCE_SQL_QUERY" default:"select * from enriched_connections"` // CacheDir dir for config source data CacheDir string `mapstructure:"CACHE_DIR"` // ConfigRefreshPeriodSec how often config source will check for new configs. Supported by `postgresql` config sources ConfigRefreshPeriodSec int `mapstructure:"CONFIG_REFRESH_PERIOD_SEC" default:"5"` // RedisURL that will be used by default by all services that need Redis RedisURL string `mapstructure:"REDIS_URL"` RedisTLSCA string `mapstructure:"REDIS_TLS_CA"` // TopicManagerRefreshPeriodSec how often topic manager will check for new topics TopicManagerRefreshPeriodSec int `mapstructure:"TOPIC_MANAGER_REFRESH_PERIOD_SEC" default:"5"` BatchRunnerPeriodSec int `mapstructure:"BATCH_RUNNER_DEFAULT_PERIOD_SEC" default:"300"` BatchRunnerDefaultBatchSize int `mapstructure:"BATCH_RUNNER_DEFAULT_BATCH_SIZE" default:"10000"` BatchRunnerWaitForMessagesSec int `mapstructure:"BATCH_RUNNER_WAIT_FOR_MESSAGES_SEC" default:"5"` BatchRunnerRetryPeriodSec int `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_PERIOD_SEC" default:"300"` BatchRunnerDefaultRetryBatchFraction float64 `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_BATCH_FRACTION" default:"0.1"` MessagesRetryCount int `mapstructure:"MESSAGES_RETRY_COUNT" default:"5"` // MessagesRetryBackoffBase defines base for exponential backoff in minutes. // For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes. // Default: 5 MessagesRetryBackoffBase float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_BASE" default:"5"` // MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"1440"` EventsLogRedisURL string `mapstructure:"EVENTS_LOG_REDIS_URL"` EventsLogMaxSize int `mapstructure:"EVENTS_LOG_MAX_SIZE" default:"1000"` MetricsPort int `mapstructure:"METRICS_PORT" default:"9091"` MetricsRelayDestination string `mapstructure:"METRICS_RELAY_DESTINATION"` MetricsRelayPeriodSec int `mapstructure:"METRICS_RELAY_PERIOD_SEC" default:"60"` InstanceIndex int `mapstructure:"INSTANCE_INDEX" default:"0"` ShardsCount int `mapstructure:"SHARDS" default:"1"` // # GRACEFUL SHUTDOWN //Timeout that give running batch tasks time to finish during shutdown. ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"` //Extra delay may be needed. E.g. for metric scrapper to scrape final metrics. So http server will stay active for an extra period. ShutdownExtraDelay int `mapstructure:"SHUTDOWN_EXTRA_DELAY_SEC" default:"5"` }
Config is a struct for bulker app configuration It is loaded from `bulker.env` config file or environment variables.
Environment variables requires prefix `BULKER_`
type ConfigurationSource ¶
type ConfigurationSource interface { io.Closer GetDestinationConfigs() []*DestinationConfig GetDestinationConfig(id string) *DestinationConfig ChangesChannel() <-chan bool }
func InitConfigurationSource ¶
func InitConfigurationSource(config *Config) (ConfigurationSource, error)
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func (*Context) InitContext ¶
func (a *Context) InitContext(settings *appbase.AppSettings) error
func (*Context) ShutdownSignal ¶
type Cron ¶
func (*Cron) AddBatchConsumer ¶
func (c *Cron) AddBatchConsumer(batchConsumer BatchConsumer) (gocron.Job, error)
func (*Cron) RemoveBatchConsumer ¶
func (c *Cron) RemoveBatchConsumer(batchConsumer BatchConsumer)
func (*Cron) ReplaceBatchConsumer ¶
func (c *Cron) ReplaceBatchConsumer(batchConsumer BatchConsumer) (gocron.Job, error)
type DataLayout ¶
type DataLayout string
type Destination ¶
func (*Destination) InitBulkerInstance ¶
func (d *Destination) InitBulkerInstance()
func (*Destination) Lease ¶
func (d *Destination) Lease()
Lease destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)
func (*Destination) Mode ¶
func (d *Destination) Mode() bulker.BulkMode
Mode returns destination mode
type DestinationConfig ¶
type DestinationConfig struct { UpdatedAt time.Time `mapstructure:"updatedAt" json:"updatedAt"` UsesBulker bool `mapstructure:"usesBulker" json:"usesBulker"` bulker.Config `mapstructure:",squash"` bulker.StreamConfig `mapstructure:",squash"` Special string `mapstructure:"special" json:"special"` }
func (*DestinationConfig) Id ¶
func (dc *DestinationConfig) Id() string
type Destinations ¶
type Destinations struct { DestinationsList []*DestinationConfig Destinations map[string]*DestinationConfig LastModified time.Time }
func (*Destinations) GetDestinationConfig ¶
func (d *Destinations) GetDestinationConfig(id string) *DestinationConfig
func (*Destinations) GetDestinationConfigs ¶
func (d *Destinations) GetDestinationConfigs() []*DestinationConfig
type DestinationsRepositoryData ¶
type DestinationsRepositoryData struct {
// contains filtered or unexported fields
}
func (*DestinationsRepositoryData) GetData ¶
func (drd *DestinationsRepositoryData) GetData() *Destinations
type EnvConfigurationSource ¶
func NewEnvConfigurationSource ¶
func NewEnvConfigurationSource(envPrefix, prefix string) *EnvConfigurationSource
func (*EnvConfigurationSource) ChangesChannel ¶
func (ecs *EnvConfigurationSource) ChangesChannel() <-chan bool
func (*EnvConfigurationSource) Close ¶
func (ecs *EnvConfigurationSource) Close() error
func (*EnvConfigurationSource) GetDestinationConfig ¶
func (ecs *EnvConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*EnvConfigurationSource) GetDestinationConfigs ¶
func (ecs *EnvConfigurationSource) GetDestinationConfigs() []*DestinationConfig
func (*EnvConfigurationSource) GetValue ¶
func (ecs *EnvConfigurationSource) GetValue(key string) any
type FastStore ¶
func NewFastStore ¶
func (*FastStore) GetStreamById ¶
func (fs *FastStore) GetStreamById(slug string) (*StreamWithDestinations, error)
func (*FastStore) GetStreamsByDomain ¶
func (fs *FastStore) GetStreamsByDomain(domain string) ([]StreamWithDestinations, error)
type HTTPConfigurationSource ¶
type HTTPConfigurationSource struct { appbase.Repository[Destinations] }
func NewHTTPConfigurationSource ¶
func NewHTTPConfigurationSource(appconfig *Config) *HTTPConfigurationSource
func (*HTTPConfigurationSource) GetDestinationConfig ¶
func (h *HTTPConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*HTTPConfigurationSource) GetDestinationConfigs ¶
func (h *HTTPConfigurationSource) GetDestinationConfigs() []*DestinationConfig
type IngestMessage ¶
type IngestMessage struct { IngestType string `json:"ingestType"` ConnectionId string `json:"connectionId"` MessageCreated time.Time `json:"messageCreated"` WriteKey string `json:"writeKey"` MessageId string `json:"messageId"` Type string `json:"type"` Origin IngestMessageOrigin `json:"origin"` HttpHeaders map[string]string `json:"httpHeaders"` HttpPayload map[string]any `json:"httpPayload"` Geo map[string]any `json:"geo"` }
type IngestMessageOrigin ¶
type MetricsServer ¶
func NewMetricsServer ¶
func NewMetricsServer(appconfig *Config) *MetricsServer
func (*MetricsServer) Stop ¶
func (s *MetricsServer) Stop() error
type MultiConfigurationSource ¶
type MultiConfigurationSource struct {
// contains filtered or unexported fields
}
func NewMultiConfigurationSource ¶
func NewMultiConfigurationSource(configurationSources []ConfigurationSource) *MultiConfigurationSource
func (*MultiConfigurationSource) ChangesChannel ¶
func (mcs *MultiConfigurationSource) ChangesChannel() <-chan bool
func (*MultiConfigurationSource) Close ¶
func (mcs *MultiConfigurationSource) Close() error
func (*MultiConfigurationSource) GetDestinationConfig ¶
func (mcs *MultiConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*MultiConfigurationSource) GetDestinationConfigs ¶
func (mcs *MultiConfigurationSource) GetDestinationConfigs() []*DestinationConfig
type PostgresConfigurationSource ¶
type PostgresConfigurationSource struct { appbase.Service // contains filtered or unexported fields }
func NewPostgresConfigurationSource ¶
func NewPostgresConfigurationSource(appconfig *Config) (*PostgresConfigurationSource, error)
func (*PostgresConfigurationSource) ChangesChannel ¶
func (r *PostgresConfigurationSource) ChangesChannel() <-chan bool
func (*PostgresConfigurationSource) Close ¶
func (r *PostgresConfigurationSource) Close() error
func (*PostgresConfigurationSource) GetDestinationConfig ¶
func (r *PostgresConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*PostgresConfigurationSource) GetDestinationConfigs ¶
func (r *PostgresConfigurationSource) GetDestinationConfigs() []*DestinationConfig
type Producer ¶
func NewProducer ¶
func NewProducer(config *kafkabase.KafkaConfig, kafkaConfig *kafka.ConfigMap, reportQueueLength bool) (*Producer, error)
NewProducer creates new Producer
type RedisConfigurationSource ¶
type RedisConfigurationSource struct { appbase.Service sync.Mutex // contains filtered or unexported fields }
func NewRedisConfigurationSource ¶
func NewRedisConfigurationSource(appconfig *Config) (*RedisConfigurationSource, error)
func (*RedisConfigurationSource) ChangesChannel ¶
func (rcs *RedisConfigurationSource) ChangesChannel() <-chan bool
func (*RedisConfigurationSource) Close ¶
func (rcs *RedisConfigurationSource) Close() error
func (*RedisConfigurationSource) GetDestinationConfig ¶
func (rcs *RedisConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*RedisConfigurationSource) GetDestinationConfigs ¶
func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfig
func (*RedisConfigurationSource) GetValue ¶
func (rcs *RedisConfigurationSource) GetValue(key string) any
type Repository ¶
func NewRepository ¶
func NewRepository(_ *Config, configurationSource ConfigurationSource) (*Repository, error)
func (*Repository) ChangesChannel ¶
func (r *Repository) ChangesChannel() <-chan RepositoryChange
func (*Repository) GetDestination ¶
func (r *Repository) GetDestination(id string) *Destination
func (*Repository) GetDestinations ¶
func (r *Repository) GetDestinations() []*Destination
func (*Repository) LeaseDestination ¶
func (r *Repository) LeaseDestination(id string) *Destination
LeaseDestination destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)
type RepositoryCache ¶
type RepositoryCache struct {
Connections map[string]*DestinationConfig `json:"destinations"`
}
type RepositoryChange ¶
type RepositoryChange struct { AddedDestinations []*Destination ChangedDestinations []*Destination RemovedDestinationIds []string }
type RetryConsumer ¶
type RetryConsumer struct {
*AbstractBatchConsumer
}
func NewRetryConsumer ¶
func NewRetryConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer) (*RetryConsumer, error)
type Router ¶
func (*Router) AmbiguousDomainStreamLocator ¶
func (r *Router) AmbiguousDomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
func (*Router) BulkHandler ¶
func (*Router) DomainStreamLocator ¶
func (r *Router) DomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
func (*Router) EventsHandler ¶
func (*Router) EventsLogHandler ¶
EventsLogHandler - gets events log by EventType, actor id. Filtered by date range and cursorId
func (*Router) FailedHandler ¶
func (*Router) SlugStreamLocator ¶
func (r *Router) SlugStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
func (*Router) TestConnectionHandler ¶
func (*Router) WriteKeyStreamLocator ¶
func (r *Router) WriteKeyStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations
type ShortDestinationConfig ¶
type ShouldConsumeFunction ¶
type StreamConfig ¶
type StreamConfig struct { Id string `json:"id"` Type string `json:"type"` WorkspaceId string `json:"workspaceId"` Slug string `json:"slug"` Name string `json:"name"` Domains []string `json:"domains"` AuthorizedJavaScriptDomains string `json:"authorizedJavaScriptDomains"` PublicKeys []ApiKey `json:"publicKeys"` PrivateKeys []ApiKey `json:"privateKeys"` DataLayout string `json:"dataLayout"` }
type StreamConsumer ¶
type StreamConsumer interface { Consumer UpdateDestination(destination *Destination) error }
type StreamConsumerImpl ¶
type StreamConsumerImpl struct { *AbstractConsumer // contains filtered or unexported fields }
func NewStreamConsumer ¶
func NewStreamConsumer(repository *Repository, destination *Destination, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService eventslog.EventsLogService) (*StreamConsumerImpl, error)
func (*StreamConsumerImpl) TopicId ¶
func (sc *StreamConsumerImpl) TopicId() string
func (*StreamConsumerImpl) UpdateDestination ¶
func (sc *StreamConsumerImpl) UpdateDestination(destination *Destination) error
UpdateDestination
type StreamCredentials ¶
type StreamLocator ¶
type StreamLocator func(message IngestMessage) *StreamWithDestinations
type StreamWithDestinations ¶
type StreamWithDestinations struct { Stream StreamConfig `json:"stream"` SynchronousDestinations []ShortDestinationConfig `json:"synchronousDestinations"` AsynchronousDestinations []ShortDestinationConfig `json:"asynchronousDestinations"` }
type StreamWrapper ¶
type StreamWrapper struct {
// contains filtered or unexported fields
}
type TagDestinationConfig ¶
type TopicManager ¶
func NewTopicManager ¶
func NewTopicManager(appContext *Context) (*TopicManager, error)
NewTopicManager returns TopicManager
func (*TopicManager) Close ¶
func (tm *TopicManager) Close() error
func (*TopicManager) EnsureDestinationTopic ¶
func (tm *TopicManager) EnsureDestinationTopic(destination *Destination, topicId string) error
EnsureDestinationTopic creates destination topic if it doesn't exist
func (*TopicManager) IsReady ¶
func (tm *TopicManager) IsReady() bool
IsReady returns true if topic manager is ready to serve requests
func (*TopicManager) LoadMetadata ¶
func (tm *TopicManager) LoadMetadata()
func (*TopicManager) Refresh ¶
func (tm *TopicManager) Refresh()
type YamlConfigurationSource ¶
func NewYamlConfigurationSource ¶
func NewYamlConfigurationSource(data []byte) (*YamlConfigurationSource, error)
func (*YamlConfigurationSource) ChangesChannel ¶
func (ycp *YamlConfigurationSource) ChangesChannel() <-chan bool
func (*YamlConfigurationSource) Close ¶
func (ycp *YamlConfigurationSource) Close() error
func (*YamlConfigurationSource) GetDestinationConfig ¶
func (ycp *YamlConfigurationSource) GetDestinationConfig(id string) *DestinationConfig
func (*YamlConfigurationSource) GetDestinationConfigs ¶
func (ycp *YamlConfigurationSource) GetDestinationConfigs() []*DestinationConfig
func (*YamlConfigurationSource) GetValue ¶
func (ycp *YamlConfigurationSource) GetValue(key string) any
Source Files ¶
- abstract_batch_consumer.go
- abstract_consumer.go
- app.go
- app_config.go
- batch_consumer.go
- configuration_source.go
- cron.go
- fast_store.go
- http_configuration_source.go
- metrics_server.go
- multi_configuration_source.go
- postgres_configuration_source.go
- producer.go
- redis_configuration_source.go
- repository.go
- retry_consumer.go
- router.go
- stream_consumer.go
- topic_manager.go