Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var GetDatasetClient = func(cfg *config.Config) clients.DatasetClient { return dataset.NewAPIClient(cfg.DatasetAPIURL) }
GetDatasetClient gets the Dataset API client
View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates an HTTP Server with the provided bind address and router
View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New( versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval, ) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo
View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka) (kafka.IConsumerGroup, error) { if cfg == nil { return nil, errors.New("cannot create a kafka consumer without kafka config") } kafkaOffset := kafka.OffsetNewest if cfg.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.Addr, Topic: cfg.ContentUpdatedTopic, GroupName: cfg.ContentUpdatedGroup, MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy, KafkaVersion: &cfg.Version, Offset: &kafkaOffset, } if cfg.SecProtocol == config.KafkaTLSProtocol { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer returns a Kafka Consumer group
View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) { if cfg == nil { return nil, errors.New("cannot create a kafka producer without kafka config") } pConfig := &kafka.ProducerConfig{ BrokerAddrs: cfg.Addr, Topic: cfg.ProducerTopic, MinBrokersHealthy: &cfg.ProducerMinBrokersHealthy, KafkaVersion: &cfg.Version, MaxMessageBytes: &cfg.MaxBytes, } if cfg.SecProtocol == config.KafkaTLSProtocol { pConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewProducer(ctx, pConfig) }
GetKafkaProducer creates a Kafka producer and sets the producder flag to true
View Source
var GetTopicClient = func(cfg *config.Config) topicCli.Clienter { return topicCli.New(cfg.TopicAPIURL) }
GetTopicClient gets the Topic API client
View Source
var GetZebedee = func(cfg *config.Config) clients.ZebedeeClient { return zebedee.New(cfg.ZebedeeURL) }
GetZebedee gets the Zebedee Client
Functions ¶
This section is empty.
Types ¶
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface { Handler(w http.ResponseWriter, req *http.Request) Start(ctx context.Context) Stop() AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error) Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check) }
HealthChecker defines the required methods from Healthcheck
type Service ¶
type Service struct { Cfg *config.Config Cache cache.List Server HTTPServer HealthCheck HealthChecker Consumer kafka.IConsumerGroup Producer kafka.IProducer ZebedeeCli clients.ZebedeeClient DatasetCli clients.DatasetClient TopicCli topicCli.Clienter }
Service contains all the configs, server and clients to run the event handler service
Click to show internal directories.
Click to hide internal directories.