service

package
v1.6.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2025 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var GetDatasetClient = func(cfg *config.Config) clients.DatasetClient {
	if !cfg.EnableDatasetAPICallbacks {
		log.Info(context.Background(), "returning nil Dataset client as Dataset API callbacks are disabled")
		return nil
	}
	return dataset.NewAPIClient(cfg.DatasetAPIURL)
}

GetDatasetClient gets the Dataset API client

View Source
var GetHTTPServer = func(bindAddr string, router http.Handler) HTTPServer {
	s := dphttp.NewServer(bindAddr, router)
	s.HandleOSSignals = false
	return s
}

GetHTTPServer creates an HTTP Server with the provided bind address and router

View Source
var GetHealthCheck = func(cfg *config.Config, buildTime, gitCommit, version string) (HealthChecker, error) {
	versionInfo, err := healthcheck.NewVersionInfo(buildTime, gitCommit, version)
	if err != nil {
		return nil, fmt.Errorf("failed to get version info: %w", err)
	}
	hc := healthcheck.New(
		versionInfo,
		cfg.HealthCheckCriticalTimeout,
		cfg.HealthCheckInterval,
	)
	return &hc, nil
}

GetHealthCheck creates a healthcheck with versionInfo

View Source
var GetKafkaConsumer = func(ctx context.Context, cfg *config.Kafka, topic string) (kafka.IConsumerGroup, error) {
	if cfg == nil {
		return nil, errors.New("cannot create a kafka consumer without kafka config")
	}
	kafkaOffset := kafka.OffsetNewest
	if cfg.OffsetOldest {
		kafkaOffset = kafka.OffsetOldest
	}
	cgConfig := &kafka.ConsumerGroupConfig{
		BrokerAddrs:       cfg.Addr,
		Topic:             topic,
		GroupName:         cfg.ContentUpdatedGroup,
		MinBrokersHealthy: &cfg.ConsumerMinBrokersHealthy,
		KafkaVersion:      &cfg.Version,
		Offset:            &kafkaOffset,
	}
	if cfg.SecProtocol == config.KafkaTLSProtocol {
		cgConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.SecCACerts,
			cfg.SecClientCert,
			cfg.SecClientKey,
			cfg.SecSkipVerify,
		)
	}
	return kafka.NewConsumerGroup(ctx, cgConfig)
}

GetKafkaConsumer returns a Kafka Consumer group

View Source
var GetKafkaProducer = func(ctx context.Context, cfg *config.Kafka) (kafka.IProducer, error) {
	if cfg == nil {
		return nil, errors.New("cannot create a kafka producer without kafka config")
	}
	pConfig := &kafka.ProducerConfig{
		BrokerAddrs:       cfg.Addr,
		Topic:             cfg.ProducerTopic,
		MinBrokersHealthy: &cfg.ProducerMinBrokersHealthy,
		KafkaVersion:      &cfg.Version,
		MaxMessageBytes:   &cfg.MaxBytes,
	}
	if cfg.SecProtocol == config.KafkaTLSProtocol {
		pConfig.SecurityConfig = kafka.GetSecurityConfig(
			cfg.SecCACerts,
			cfg.SecClientCert,
			cfg.SecClientKey,
			cfg.SecSkipVerify,
		)
	}
	return kafka.NewProducer(ctx, pConfig)
}

GetKafkaProducer creates a Kafka producer and sets the producder flag to true

View Source
var GetTopicClient = func(cfg *config.Config) topicCli.Clienter {
	return topicCli.New(cfg.TopicAPIURL)
}

GetTopicClient gets the Topic API client

View Source
var GetZebedee = func(cfg *config.Config) clients.ZebedeeClient {
	if !cfg.EnableZebedeeCallbacks {
		log.Info(context.Background(), "returning nil zebedee client as callbacks are disabled")
		return nil
	}
	return zebedee.New(cfg.ZebedeeURL)
}

GetZebedee gets the Zebedee Client

Functions

This section is empty.

Types

type HTTPServer

type HTTPServer interface {
	ListenAndServe() error
	Shutdown(ctx context.Context) error
}

HTTPServer defines the required methods from the HTTP server

type HealthChecker

type HealthChecker interface {
	Handler(w http.ResponseWriter, req *http.Request)
	Start(ctx context.Context)
	Stop()
	AddAndGetCheck(name string, checker healthcheck.Checker) (check *healthcheck.Check, err error)
	Subscribe(s healthcheck.Subscriber, checks ...*healthcheck.Check)
}

HealthChecker defines the required methods from Healthcheck

type Service

type Service struct {
	Cfg                      *config.Config
	Cache                    cache.List
	Server                   HTTPServer
	HealthCheck              HealthChecker
	SearchContentConsumer    kafka.IConsumerGroup
	ContentPublishedConsumer kafka.IConsumerGroup
	Producer                 kafka.IProducer
	ZebedeeCli               clients.ZebedeeClient
	DatasetCli               clients.DatasetClient
	TopicCli                 topicCli.Clienter
}

Service contains all the configs, server and clients to run the event handler service

func New added in v0.24.0

func New() *Service

func (*Service) Close

func (svc *Service) Close(ctx context.Context) error

Close gracefully shuts the service down in the required order, with timeout

func (*Service) Init added in v0.24.0

func (svc *Service) Init(ctx context.Context, cfg *config.Config, buildTime, gitCommit, version string) error

Init initializes the Service by setting up essential components like Kafka producers, consumers, health checks, and caching. It validates the provided configuration, initializes clients and consumers, registers health checks, and sets up an HTTP server for health endpoints.

func (*Service) Start added in v0.24.0

func (svc *Service) Start(ctx context.Context, svcErrors chan error) error

Start the service

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL