Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var GetDatasetClient = func(cfg *config.Config) clients.DatasetClient { if !cfg.EnableDatasetAPICallbacks { log.Info(context.Background(), "returning nil Dataset client as Dataset API callbacks are disabled") return nil } return dataset.NewAPIClient(cfg.DatasetAPIURL) }
GetDatasetClient gets the Dataset API client
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer { s := dphttp.NewServer(bindAddr, router) s.HandleOSSignals = false return s }
GetHTTPServer creates an HTTP Server with the provided bind address and router
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) { versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version) if err != nil { return nil, fmt.Errorf("failed to get version info: %w", err) } hc := healthcheck.New( versionInfo, cfg.HealthCheckCriticalTimeout, cfg.HealthCheckInterval, ) return &hc, nil }
GetHealthCheck creates a healthcheck with versionInfo
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka, topic string) (kafka.IConsumerGroup, error) { if cfg == nil { return nil, errors.New("cannot create a kafka consumer without kafka config") } kafkaOffset := kafka.OffsetNewest if cfg.OffsetOldest { kafkaOffset = kafka.OffsetOldest } cgConfig := &kafka.ConsumerGroupConfig{ BrokerAddrs: cfg.Addr, Topic: topic, GroupName: cfg.ContentUpdatedGroup, MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy, KafkaVersion: &cfg.Version, Offset: &kafkaOffset, } if cfg.SecProtocol == config.KafkaTLSProtocol { cgConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewConsumerGroup(ctx, cgConfig) }
GetKafkaConsumer returns a Kafka Consumer group
var GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) { if cfg == nil { return nil, errors.New("cannot create a kafka producer without kafka config") } pConfig := &kafka.ProducerConfig{ BrokerAddrs: cfg.Addr, Topic: cfg.ProducerTopic, MinBrokersHealthy: &cfg.ProducerMinBrokersHealthy, KafkaVersion: &cfg.Version, MaxMessageBytes: &cfg.MaxBytes, } if cfg.SecProtocol == config.KafkaTLSProtocol { pConfig.SecurityConfig = kafka.GetSecurityConfig( cfg.SecCACerts, cfg.SecClientCert, cfg.SecClientKey, cfg.SecSkipVerify, ) } return kafka.NewProducer(ctx, pConfig) }
GetKafkaProducer creates a Kafka producer and sets the producder flag to true
var GetTopicClient = func(cfg *config.Config) topicCli.Clienter { return topicCli.New(cfg.TopicAPIURL) }
GetTopicClient gets the Topic API client
var GetZebedee = func(cfg *config.Config) clients.ZebedeeClient { if !cfg.EnableZebedeeCallbacks { log.Info(context.Background(), "returning nil zebedee client as callbacks are disabled") return nil } return zebedee.New(cfg.ZebedeeURL) }
GetZebedee gets the Zebedee Client
Functions ¶
This section is empty.
Types ¶
type HTTPServer ¶
HTTPServer defines the required methods from the HTTP server
type HealthChecker ¶
type HealthChecker interface { Handler(w http.ResponseWriter, req *http.Request) Start(ctx context.Context) Stop() AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error) Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check) }
HealthChecker defines the required methods from Healthcheck
type Service ¶
type Service struct { Cfg *config.Config Cache cache.List Server HTTPServer HealthCheck HealthChecker SearchContentConsumer kafka.IConsumerGroup ContentPublishedConsumer kafka.IConsumerGroup Producer kafka.IProducer ZebedeeCli clients.ZebedeeClient DatasetCli clients.DatasetClient TopicCli topicCli.Clienter }
Service contains all the configs, server and clients to run the event handler service
func (*Service) Init ¶ added in v0.24.0
func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error
Init initializes the Service by setting up essential components like Kafka producers, consumers, health checks, and caching. It validates the provided configuration, initializes clients and consumers, registers health checks, and sets up an HTTP server for health endpoints.