app

package
v0.0.0-...-2af46a5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: MIT Imports: 51 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DataLayoutSegmentCompatible  = "segment-compatible"
	DataLayoutSegmentSingleTable = "segment-single-table"
	DataLayoutJitsuLegacy        = "jitsu-legacy"
)
View Source
const MetricsMetaHeader = "metrics_meta"
View Source
const SQLLastUpdatedQuery = `select * from last_updated`

Variables

View Source
var TimestampPattern = regexp.MustCompile(`^\d{13}$`)
View Source
var WriteKeyPattern = regexp.MustCompile(`"writeKey":\s*"([^:"]+)?(:)?([^"]+)?"`)

Functions

func ExcludeConsumerForTopic

func ExcludeConsumerForTopic[T Consumer](consumers []T, topicId string, cron *Cron) []T

func IsValidTopicName

func IsValidTopicName(str string) bool

func MakeTopicId

func MakeTopicId(destinationId, mode, tableName string, checkLength bool) (string, error)

func ParseTopicId

func ParseTopicId(topic string) (destinationId, mode, tableName string, err error)

func ProducerMessageLabels

func ProducerMessageLabels(topicId string, status, errText string) (topic, destinationId, mode, tableName, st string, err string)

func RedisError

func RedisError(err error) string

func RetryBackOffTime

func RetryBackOffTime(config *Config, attempt int) time.Time

Types

type AbstractBatchConsumer

type AbstractBatchConsumer struct {
	sync.Mutex
	*AbstractConsumer
	// contains filtered or unexported fields
}

func NewAbstractBatchConsumer

func NewAbstractBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId, mode string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer) (*AbstractBatchConsumer, error)

func (*AbstractBatchConsumer) BatchPeriodSec

func (bc *AbstractBatchConsumer) BatchPeriodSec() int

func (*AbstractBatchConsumer) ConsumeAll

func (bc *AbstractBatchConsumer) ConsumeAll() (counters BatchCounters, err error)

func (*AbstractBatchConsumer) Retire

func (bc *AbstractBatchConsumer) Retire()

Retire Mark consumer as retired Consumer will close itself when com

func (*AbstractBatchConsumer) RunJob

func (bc *AbstractBatchConsumer) RunJob()

func (*AbstractBatchConsumer) TopicId

func (bc *AbstractBatchConsumer) TopicId() string

func (*AbstractBatchConsumer) UpdateBatchPeriod

func (bc *AbstractBatchConsumer) UpdateBatchPeriod(batchPeriodSec int)

type AbstractConsumer

type AbstractConsumer struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewAbstractConsumer

func NewAbstractConsumer(config *Config, repository *Repository, topicId string, bulkerProducer *Producer) *AbstractConsumer

func (*AbstractConsumer) GetInstanceId

func (ac *AbstractConsumer) GetInstanceId() string

func (*AbstractConsumer) SendMetrics

func (ac *AbstractConsumer) SendMetrics(metricsMeta string, status string, events int)

type ApiKey

type ApiKey struct {
	Id        string `json:"id"`
	Plaintext string `json:"plaintext"`
	Hash      string `json:"hash"`
	Hint      string `json:"hint"`
}

type ApiKeyBinding

type ApiKeyBinding struct {
	Hash     string `json:"hash"`
	KeyType  string `json:"keyType"`
	StreamId string `json:"streamId"`
}

type BatchConsumer

type BatchConsumer interface {
	Consumer
	RunJob()
	ConsumeAll() (consumed BatchCounters, err error)
	BatchPeriodSec() int
	UpdateBatchPeriod(batchPeriodSec int)
}

type BatchConsumerImpl

type BatchConsumerImpl struct {
	*AbstractBatchConsumer
	// contains filtered or unexported fields
}

func NewBatchConsumer

func NewBatchConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService eventslog.EventsLogService) (*BatchConsumerImpl, error)

type BatchCounters

type BatchCounters struct {
	// contains filtered or unexported fields
}

func (*BatchCounters) String

func (bs *BatchCounters) String() string

to string

type BatchFunction

type BatchFunction func(destination *Destination, batchNum, batchSize, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error)

type BatchState

type BatchState struct {
	bulker.State  `json:",inline"`
	LastMappedRow types.Object `json:"lastMappedRow"`
}

type Config

type Config struct {
	appbase.Config        `mapstructure:",squash"`
	kafkabase.KafkaConfig `mapstructure:",squash"`
	// # EVENTS LOG CONFIG - settings for events log
	eventslog.EventsLogConfig `mapstructure:",squash"`
	// For ingest endpoint only
	GlobalHashSecret string `mapstructure:"GLOBAL_HASH_SECRET" default:"dea42a58-acf4-45af-85bb-e77e94bd5025"`
	// For ingest endpoint only
	GlobalHashSecrets []string

	// ConfigSource source of destinations configs. Can be:
	//  - `file://...`  for destinations config in yaml format
	//  - `http://...` for destinations config in json array format
	//  - `redis` or `redis://redis_url` to load configs from redis `enrichedConnections` key
	//  -  postgresql://postgres_url to load configs from postgresql
	//  - `env://PREFIX` to load each destination environment variables with like `PREFIX_ID` where ID is destination id
	//
	// Default: `env://BULKER_DESTINATION`
	ConfigSource string `mapstructure:"CONFIG_SOURCE"`
	// ConfigSourceHTTPAuthToken auth token for http:// config source
	ConfigSourceHTTPAuthToken string `mapstructure:"CONFIG_SOURCE_HTTP_AUTH_TOKEN"`
	// ConfigSourceSQLQuery for `postgresql` config source, SQL query to load connections
	ConfigSourceSQLQuery string `mapstructure:"CONFIG_SOURCE_SQL_QUERY" default:"select * from enriched_connections"`
	// CacheDir dir for config source data
	CacheDir string `mapstructure:"CACHE_DIR"`
	// ConfigRefreshPeriodSec how often config source will check for new configs. Supported by `postgresql` config sources
	ConfigRefreshPeriodSec int `mapstructure:"CONFIG_REFRESH_PERIOD_SEC" default:"5"`

	// RedisURL that will be used by default by all services that need Redis
	RedisURL   string `mapstructure:"REDIS_URL"`
	RedisTLSCA string `mapstructure:"REDIS_TLS_CA"`

	// TopicManagerRefreshPeriodSec how often topic manager will check for new topics
	TopicManagerRefreshPeriodSec int `mapstructure:"TOPIC_MANAGER_REFRESH_PERIOD_SEC" default:"5"`

	BatchRunnerPeriodSec          int `mapstructure:"BATCH_RUNNER_DEFAULT_PERIOD_SEC" default:"300"`
	BatchRunnerDefaultBatchSize   int `mapstructure:"BATCH_RUNNER_DEFAULT_BATCH_SIZE" default:"10000"`
	BatchRunnerWaitForMessagesSec int `mapstructure:"BATCH_RUNNER_WAIT_FOR_MESSAGES_SEC" default:"5"`

	BatchRunnerRetryPeriodSec            int     `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_PERIOD_SEC" default:"300"`
	BatchRunnerDefaultRetryBatchFraction float64 `mapstructure:"BATCH_RUNNER_DEFAULT_RETRY_BATCH_FRACTION" default:"0.1"`
	MessagesRetryCount                   int     `mapstructure:"MESSAGES_RETRY_COUNT" default:"5"`
	// MessagesRetryBackoffBase defines base for exponential backoff in minutes.
	// For example, if retry count is 3 and base is 5, then retry delays will be 5, 25, 125 minutes.
	// Default: 5
	MessagesRetryBackoffBase float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_BASE" default:"5"`
	// MessagesRetryBackoffMaxDelay defines maximum possible retry delay in minutes. Default: 1440 minutes = 24 hours
	MessagesRetryBackoffMaxDelay float64 `mapstructure:"MESSAGES_RETRY_BACKOFF_MAX_DELAY" default:"1440"`

	EventsLogRedisURL string `mapstructure:"EVENTS_LOG_REDIS_URL"`
	EventsLogMaxSize  int    `mapstructure:"EVENTS_LOG_MAX_SIZE" default:"1000"`

	MetricsPort             int    `mapstructure:"METRICS_PORT" default:"9091"`
	MetricsRelayDestination string `mapstructure:"METRICS_RELAY_DESTINATION"`
	MetricsRelayPeriodSec   int    `mapstructure:"METRICS_RELAY_PERIOD_SEC" default:"60"`

	InstanceIndex int `mapstructure:"INSTANCE_INDEX" default:"0"`
	ShardsCount   int `mapstructure:"SHARDS" default:"1"`

	// # GRACEFUL SHUTDOWN
	//Timeout that give running batch tasks time to finish during shutdown.
	ShutdownTimeoutSec int `mapstructure:"SHUTDOWN_TIMEOUT_SEC" default:"10"`
	//Extra delay may be needed. E.g. for metric scrapper to scrape final metrics. So http server will stay active for an extra period.
	ShutdownExtraDelay int `mapstructure:"SHUTDOWN_EXTRA_DELAY_SEC" default:"5"`
}

Config is a struct for bulker app configuration It is loaded from `bulker.env` config file or environment variables.

Environment variables requires prefix `BULKER_`

func (*Config) PostInit

func (ac *Config) PostInit(settings *appbase.AppSettings) error

type ConfigurationSource

type ConfigurationSource interface {
	io.Closer
	GetDestinationConfigs() []*DestinationConfig
	GetDestinationConfig(id string) *DestinationConfig
	ChangesChannel() <-chan bool
}

func InitConfigurationSource

func InitConfigurationSource(config *Config) (ConfigurationSource, error)

type Consumer

type Consumer interface {
	Retire()
	TopicId() string
}

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Cleanup

func (a *Context) Cleanup() error

TODO: graceful shutdown and cleanups. Flush producer

func (*Context) Config

func (a *Context) Config() *Config

func (*Context) InitContext

func (a *Context) InitContext(settings *appbase.AppSettings) error

func (*Context) Server

func (a *Context) Server() *http.Server

func (*Context) ShutdownSignal

func (a *Context) ShutdownSignal() error

type Cron

type Cron struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewCron

func NewCron(config *Config) *Cron

func (*Cron) AddBatchConsumer

func (c *Cron) AddBatchConsumer(batchConsumer BatchConsumer) (gocron.Job, error)

func (*Cron) Close

func (c *Cron) Close()

Close scheduler

func (*Cron) RemoveBatchConsumer

func (c *Cron) RemoveBatchConsumer(batchConsumer BatchConsumer)

func (*Cron) ReplaceBatchConsumer

func (c *Cron) ReplaceBatchConsumer(batchConsumer BatchConsumer) (gocron.Job, error)

type DataLayout

type DataLayout string

type Destination

type Destination struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Destination) Id

func (d *Destination) Id() string

Id returns destination id

func (*Destination) InitBulkerInstance

func (d *Destination) InitBulkerInstance()

func (*Destination) Lease

func (d *Destination) Lease()

Lease destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)

func (*Destination) Mode

func (d *Destination) Mode() bulker.BulkMode

Mode returns destination mode

func (*Destination) Release

func (d *Destination) Release()

Release destination. See Lease

func (*Destination) TopicId

func (d *Destination) TopicId(tableName string, modeOverride string) (string, error)

TopicId generates topic id for Destination

type DestinationConfig

type DestinationConfig struct {
	UpdatedAt           time.Time `mapstructure:"updatedAt" json:"updatedAt"`
	UsesBulker          bool      `mapstructure:"usesBulker" json:"usesBulker"`
	bulker.Config       `mapstructure:",squash"`
	bulker.StreamConfig `mapstructure:",squash"`
	Special             string `mapstructure:"special" json:"special"`
}

func (*DestinationConfig) Id

func (dc *DestinationConfig) Id() string

type Destinations

type Destinations struct {
	DestinationsList []*DestinationConfig
	Destinations     map[string]*DestinationConfig
	LastModified     time.Time
}

func (*Destinations) GetDestinationConfig

func (d *Destinations) GetDestinationConfig(id string) *DestinationConfig

func (*Destinations) GetDestinationConfigs

func (d *Destinations) GetDestinationConfigs() []*DestinationConfig

type DestinationsRepositoryData

type DestinationsRepositoryData struct {
	// contains filtered or unexported fields
}

func (*DestinationsRepositoryData) GetData

func (drd *DestinationsRepositoryData) GetData() *Destinations

func (*DestinationsRepositoryData) Init

func (drd *DestinationsRepositoryData) Init(reader io.Reader, tag any) error

func (*DestinationsRepositoryData) Store

func (drd *DestinationsRepositoryData) Store(writer io.Writer) error

type EnvConfigurationSource

type EnvConfigurationSource struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewEnvConfigurationSource

func NewEnvConfigurationSource(envPrefix, prefix string) *EnvConfigurationSource

func (*EnvConfigurationSource) ChangesChannel

func (ecs *EnvConfigurationSource) ChangesChannel() <-chan bool

func (*EnvConfigurationSource) Close

func (ecs *EnvConfigurationSource) Close() error

func (*EnvConfigurationSource) GetDestinationConfig

func (ecs *EnvConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*EnvConfigurationSource) GetDestinationConfigs

func (ecs *EnvConfigurationSource) GetDestinationConfigs() []*DestinationConfig

func (*EnvConfigurationSource) GetValue

func (ecs *EnvConfigurationSource) GetValue(key string) any

type FastStore

type FastStore struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewFastStore

func NewFastStore(config *Config) (*FastStore, error)

func (*FastStore) Close

func (fs *FastStore) Close() error

func (*FastStore) GetStreamById

func (fs *FastStore) GetStreamById(slug string) (*StreamWithDestinations, error)

func (*FastStore) GetStreamsByDomain

func (fs *FastStore) GetStreamsByDomain(domain string) ([]StreamWithDestinations, error)

type HTTPConfigurationSource

type HTTPConfigurationSource struct {
	appbase.Repository[Destinations]
}

func NewHTTPConfigurationSource

func NewHTTPConfigurationSource(appconfig *Config) *HTTPConfigurationSource

func (*HTTPConfigurationSource) GetDestinationConfig

func (h *HTTPConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*HTTPConfigurationSource) GetDestinationConfigs

func (h *HTTPConfigurationSource) GetDestinationConfigs() []*DestinationConfig

type IngestMessage

type IngestMessage struct {
	IngestType     string              `json:"ingestType"`
	ConnectionId   string              `json:"connectionId"`
	MessageCreated time.Time           `json:"messageCreated"`
	WriteKey       string              `json:"writeKey"`
	MessageId      string              `json:"messageId"`
	Type           string              `json:"type"`
	Origin         IngestMessageOrigin `json:"origin"`
	HttpHeaders    map[string]string   `json:"httpHeaders"`
	HttpPayload    map[string]any      `json:"httpPayload"`
	Geo            map[string]any      `json:"geo"`
}

type IngestMessageOrigin

type IngestMessageOrigin struct {
	BaseURL string `json:"baseUrl"`
	Slug    string `json:"slug"`
	Domain  string `json:"domain"`
}

type MetricsServer

type MetricsServer struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewMetricsServer

func NewMetricsServer(appconfig *Config) *MetricsServer

func (*MetricsServer) Stop

func (s *MetricsServer) Stop() error

type MultiConfigurationSource

type MultiConfigurationSource struct {
	// contains filtered or unexported fields
}

func NewMultiConfigurationSource

func NewMultiConfigurationSource(configurationSources []ConfigurationSource) *MultiConfigurationSource

func (*MultiConfigurationSource) ChangesChannel

func (mcs *MultiConfigurationSource) ChangesChannel() <-chan bool

func (*MultiConfigurationSource) Close

func (mcs *MultiConfigurationSource) Close() error

func (*MultiConfigurationSource) GetDestinationConfig

func (mcs *MultiConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*MultiConfigurationSource) GetDestinationConfigs

func (mcs *MultiConfigurationSource) GetDestinationConfigs() []*DestinationConfig

type PostgresConfigurationSource

type PostgresConfigurationSource struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewPostgresConfigurationSource

func NewPostgresConfigurationSource(appconfig *Config) (*PostgresConfigurationSource, error)

func (*PostgresConfigurationSource) ChangesChannel

func (r *PostgresConfigurationSource) ChangesChannel() <-chan bool

func (*PostgresConfigurationSource) Close

func (*PostgresConfigurationSource) GetDestinationConfig

func (r *PostgresConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*PostgresConfigurationSource) GetDestinationConfigs

func (r *PostgresConfigurationSource) GetDestinationConfigs() []*DestinationConfig

type Producer

type Producer struct {
	*kafkabase.Producer
}

func NewProducer

func NewProducer(config *kafkabase.KafkaConfig, kafkaConfig *kafka.ConfigMap, reportQueueLength bool) (*Producer, error)

NewProducer creates new Producer

type RedisConfigurationSource

type RedisConfigurationSource struct {
	appbase.Service
	sync.Mutex
	// contains filtered or unexported fields
}

func NewRedisConfigurationSource

func NewRedisConfigurationSource(appconfig *Config) (*RedisConfigurationSource, error)

func (*RedisConfigurationSource) ChangesChannel

func (rcs *RedisConfigurationSource) ChangesChannel() <-chan bool

func (*RedisConfigurationSource) Close

func (rcs *RedisConfigurationSource) Close() error

func (*RedisConfigurationSource) GetDestinationConfig

func (rcs *RedisConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*RedisConfigurationSource) GetDestinationConfigs

func (rcs *RedisConfigurationSource) GetDestinationConfigs() []*DestinationConfig

func (*RedisConfigurationSource) GetValue

func (rcs *RedisConfigurationSource) GetValue(key string) any

type Repository

type Repository struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewRepository

func NewRepository(_ *Config, configurationSource ConfigurationSource) (*Repository, error)

func (*Repository) ChangesChannel

func (r *Repository) ChangesChannel() <-chan RepositoryChange

func (*Repository) Close

func (r *Repository) Close() error

Close Repository

func (*Repository) GetDestination

func (r *Repository) GetDestination(id string) *Destination

func (*Repository) GetDestinations

func (r *Repository) GetDestinations() []*Destination

func (*Repository) LeaseDestination

func (r *Repository) LeaseDestination(id string) *Destination

LeaseDestination destination. destination cannot be closed while at lease one service is using it (e.g. batch consumer)

type RepositoryCache

type RepositoryCache struct {
	Connections map[string]*DestinationConfig `json:"destinations"`
}

type RepositoryChange

type RepositoryChange struct {
	AddedDestinations     []*Destination
	ChangedDestinations   []*Destination
	RemovedDestinationIds []string
}

type RetryConsumer

type RetryConsumer struct {
	*AbstractBatchConsumer
}

func NewRetryConsumer

func NewRetryConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer) (*RetryConsumer, error)

type Router

type Router struct {
	*appbase.Router
	// contains filtered or unexported fields
}

func NewRouter

func NewRouter(appContext *Context) *Router

func (*Router) AmbiguousDomainStreamLocator

func (r *Router) AmbiguousDomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations

func (*Router) BulkHandler

func (r *Router) BulkHandler(c *gin.Context)

func (*Router) DomainStreamLocator

func (r *Router) DomainStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations

func (*Router) EventsHandler

func (r *Router) EventsHandler(c *gin.Context)

func (*Router) EventsLogHandler

func (r *Router) EventsLogHandler(c *gin.Context)

EventsLogHandler - gets events log by EventType, actor id. Filtered by date range and cursorId

func (*Router) FailedHandler

func (r *Router) FailedHandler(c *gin.Context)

func (*Router) Health

func (r *Router) Health(c *gin.Context)

func (*Router) SlugStreamLocator

func (r *Router) SlugStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations

func (*Router) TestConnectionHandler

func (r *Router) TestConnectionHandler(c *gin.Context)

func (*Router) WriteKeyStreamLocator

func (r *Router) WriteKeyStreamLocator(ingestMessage IngestMessage) *StreamWithDestinations

type ShortDestinationConfig

type ShortDestinationConfig struct {
	TagDestinationConfig
	Id              string         `json:"id"`
	ConnectionId    string         `json:"connectionId"`
	DestinationType string         `json:"destinationType"`
	Options         map[string]any `json:"options"`
}

type ShouldConsumeFunction

type ShouldConsumeFunction func(committedOffset, highOffset int64) bool

type StreamConfig

type StreamConfig struct {
	Id                          string   `json:"id"`
	Type                        string   `json:"type"`
	WorkspaceId                 string   `json:"workspaceId"`
	Slug                        string   `json:"slug"`
	Name                        string   `json:"name"`
	Domains                     []string `json:"domains"`
	AuthorizedJavaScriptDomains string   `json:"authorizedJavaScriptDomains"`
	PublicKeys                  []ApiKey `json:"publicKeys"`
	PrivateKeys                 []ApiKey `json:"privateKeys"`
	DataLayout                  string   `json:"dataLayout"`
}

type StreamConsumer

type StreamConsumer interface {
	Consumer
	UpdateDestination(destination *Destination) error
}

type StreamConsumerImpl

type StreamConsumerImpl struct {
	*AbstractConsumer
	// contains filtered or unexported fields
}

func NewStreamConsumer

func NewStreamConsumer(repository *Repository, destination *Destination, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer, eventsLogService eventslog.EventsLogService) (*StreamConsumerImpl, error)

func (*StreamConsumerImpl) Retire

func (sc *StreamConsumerImpl) Retire()

Close consumer

func (*StreamConsumerImpl) TopicId

func (sc *StreamConsumerImpl) TopicId() string

func (*StreamConsumerImpl) UpdateDestination

func (sc *StreamConsumerImpl) UpdateDestination(destination *Destination) error

UpdateDestination

type StreamCredentials

type StreamCredentials struct {
	Slug       string `json:"slug"`
	Domain     string `json:"domain"`
	WriteKey   string `json:"writeKey"`
	IngestType string `json:"ingestType"`
}

type StreamLocator

type StreamLocator func(message IngestMessage) *StreamWithDestinations

type StreamWithDestinations

type StreamWithDestinations struct {
	Stream                   StreamConfig             `json:"stream"`
	SynchronousDestinations  []ShortDestinationConfig `json:"synchronousDestinations"`
	AsynchronousDestinations []ShortDestinationConfig `json:"asynchronousDestinations"`
}

type StreamWrapper

type StreamWrapper struct {
	// contains filtered or unexported fields
}

func (*StreamWrapper) Abort

func (sw *StreamWrapper) Abort(ctx context.Context) (bulker.State, error)

func (*StreamWrapper) Complete

func (sw *StreamWrapper) Complete(ctx context.Context) (bulker.State, error)

func (*StreamWrapper) Consume

func (sw *StreamWrapper) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)

type TagDestinationConfig

type TagDestinationConfig struct {
	Mode string `json:"mode"`
	Code string `json:"code"`
}

type TopicManager

type TopicManager struct {
	appbase.Service
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTopicManager

func NewTopicManager(appContext *Context) (*TopicManager, error)

NewTopicManager returns TopicManager

func (*TopicManager) Close

func (tm *TopicManager) Close() error

func (*TopicManager) EnsureDestinationTopic

func (tm *TopicManager) EnsureDestinationTopic(destination *Destination, topicId string) error

EnsureDestinationTopic creates destination topic if it doesn't exist

func (*TopicManager) IsReady

func (tm *TopicManager) IsReady() bool

IsReady returns true if topic manager is ready to serve requests

func (*TopicManager) LoadMetadata

func (tm *TopicManager) LoadMetadata()

func (*TopicManager) Refresh

func (tm *TopicManager) Refresh()

func (*TopicManager) Start

func (tm *TopicManager) Start()

Start starts TopicManager

type YamlConfigurationSource

type YamlConfigurationSource struct {
	appbase.Service
	// contains filtered or unexported fields
}

func NewYamlConfigurationSource

func NewYamlConfigurationSource(data []byte) (*YamlConfigurationSource, error)

func (*YamlConfigurationSource) ChangesChannel

func (ycp *YamlConfigurationSource) ChangesChannel() <-chan bool

func (*YamlConfigurationSource) Close

func (ycp *YamlConfigurationSource) Close() error

func (*YamlConfigurationSource) GetDestinationConfig

func (ycp *YamlConfigurationSource) GetDestinationConfig(id string) *DestinationConfig

func (*YamlConfigurationSource) GetDestinationConfigs

func (ycp *YamlConfigurationSource) GetDestinationConfigs() []*DestinationConfig

func (*YamlConfigurationSource) GetValue

func (ycp *YamlConfigurationSource) GetValue(key string) any

Directories

Path Synopsis
testcontainers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL