synchronizer

package
v4.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: Apache-2.0 Imports: 23 Imported by: 1

Documentation

Index

Constants

View Source
const (
	// Ready represents ready
	Ready = iota
	// StreamingReady ready
	StreamingReady
	// Error represents some error in SSE streaming
	Error
)
View Source
const (
	Streaming = iota
	Polling
)

Operation mode constants

Variables

This section is empty.

Functions

This section is empty.

Types

type Local

type Local struct {
	// contains filtered or unexported fields
}

Local implements Local Synchronizer

func (*Local) LocalKill

func (s *Local) LocalKill(splitName string, defaultTreatment string, changeNumber int64)

LocalKill does nothing

func (*Local) RefreshRates added in v4.0.2

func (s *Local) RefreshRates() (time.Duration, time.Duration)

RefreshRates returns anything

func (*Local) StartPeriodicDataRecording

func (s *Local) StartPeriodicDataRecording()

StartPeriodicDataRecording starts periodic recorders tasks

func (*Local) StartPeriodicFetching

func (s *Local) StartPeriodicFetching()

StartPeriodicFetching starts periodic fetchers tasks

func (*Local) StopPeriodicDataRecording

func (s *Local) StopPeriodicDataRecording()

StopPeriodicDataRecording stops periodic recorders tasks

func (*Local) StopPeriodicFetching

func (s *Local) StopPeriodicFetching()

StopPeriodicFetching stops periodic fetchers tasks

func (*Local) SyncAll

func (s *Local) SyncAll() error

SyncAll syncs splits and segments

func (*Local) SynchronizeSegment

func (s *Local) SynchronizeSegment(name string, till *int64) error

SynchronizeSegment syncs segment

func (*Local) SynchronizeSplits

func (s *Local) SynchronizeSplits(till *int64) error

SynchronizeSplits syncs splits

type LocalConfig added in v4.3.0

type LocalConfig struct {
	SplitPeriod      int
	SegmentPeriod    int
	SegmentWorkers   int
	QueueSize        int
	SegmentDirectory string
	RefreshEnabled   bool
}

type Manager

type Manager interface {
	Start()
	Stop()
	IsRunning() bool
}

Manager interface

func NewSynchronizerManagerRedis added in v4.2.0

func NewSynchronizerManagerRedis(synchronizer Synchronizer, logger logging.LoggerInterface) Manager

NewSynchronizerManagerRedis creates new sync manager for redis

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl struct

func NewSynchronizerManager

func NewSynchronizerManager(
	synchronizer Synchronizer,
	logger logging.LoggerInterface,
	config conf.AdvancedConfig,
	authClient service.AuthClient,
	splitStorage storage.SplitStorage,
	managerStatus chan int,
	runtimeTelemetry storage.TelemetryRuntimeProducer,
	metadata dtos.Metadata,
	clientKey *string,
	hcMonitor application.MonitorProducerInterface,
) (*ManagerImpl, error)

NewSynchronizerManager creates new sync manager

func (*ManagerImpl) IsRunning

func (s *ManagerImpl) IsRunning() bool

IsRunning returns true if is in Streaming or Polling

func (*ManagerImpl) Start

func (s *ManagerImpl) Start()

Start starts synchronization through Split

func (*ManagerImpl) Stop

func (s *ManagerImpl) Stop()

Stop stop synchronizaation through Split

type ManagerRedisImpl added in v4.2.0

type ManagerRedisImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl struct

func (*ManagerRedisImpl) IsRunning added in v4.2.0

func (m *ManagerRedisImpl) IsRunning() bool

func (*ManagerRedisImpl) Start added in v4.2.0

func (m *ManagerRedisImpl) Start()

func (*ManagerRedisImpl) Stop added in v4.2.0

func (m *ManagerRedisImpl) Stop()

type SplitTasks

type SplitTasks struct {
	SplitSyncTask            *asynctask.AsyncTask
	SegmentSyncTask          *asynctask.AsyncTask
	TelemetrySyncTask        tasks.Task
	ImpressionSyncTask       tasks.Task
	EventSyncTask            tasks.Task
	ImpressionsCountSyncTask tasks.Task
	UniqueKeysTask           tasks.Task
	CleanFilterTask          tasks.Task
	ImpsCountConsumerTask    tasks.Task
}

SplitTasks struct for tasks

type Synchronizer

type Synchronizer interface {
	SyncAll() error
	SynchronizeSplits(till *int64) error
	LocalKill(splitName string, defaultTreatment string, changeNumber int64)
	SynchronizeSegment(segmentName string, till *int64) error
	StartPeriodicFetching()
	StopPeriodicFetching()
	StartPeriodicDataRecording()
	StopPeriodicDataRecording()
	RefreshRates() (splits time.Duration, segments time.Duration)
}

Synchronizer interface for syncing data to and from splits servers

func NewLocal

func NewLocal(cfg *LocalConfig, splitAPI *api.SplitAPI, splitStorage storage.SplitStorage, segmentStorage storage.SegmentStorage, logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer, hcMonitor application.MonitorProducerInterface) Synchronizer

NewLocal creates new Local

func NewSynchronizer

func NewSynchronizer(
	confAdvanced conf.AdvancedConfig,
	splitTasks SplitTasks,
	workers Workers,
	logger logging.LoggerInterface,
	inMememoryFullQueue chan string,
	hcMonitor application.MonitorProducerInterface,
) Synchronizer

NewSynchronizer creates new SynchronizerImpl

type SynchronizerImpl

type SynchronizerImpl struct {
	// contains filtered or unexported fields
}

SynchronizerImpl implements Synchronizer

func (*SynchronizerImpl) LocalKill

func (s *SynchronizerImpl) LocalKill(splitName string, defaultTreatment string, changeNumber int64)

LocalKill locally kills a split

func (*SynchronizerImpl) RefreshRates added in v4.0.2

func (s *SynchronizerImpl) RefreshRates() (splits time.Duration, segments time.Duration)

RefreshRates returns the refresh rates of the splits & segment tasks

func (*SynchronizerImpl) StartPeriodicDataRecording

func (s *SynchronizerImpl) StartPeriodicDataRecording()

StartPeriodicDataRecording starts periodic recorders tasks

func (*SynchronizerImpl) StartPeriodicFetching

func (s *SynchronizerImpl) StartPeriodicFetching()

StartPeriodicFetching starts periodic fetchers tasks

func (*SynchronizerImpl) StopPeriodicDataRecording

func (s *SynchronizerImpl) StopPeriodicDataRecording()

StopPeriodicDataRecording stops periodic recorders tasks

func (*SynchronizerImpl) StopPeriodicFetching

func (s *SynchronizerImpl) StopPeriodicFetching()

StopPeriodicFetching stops periodic fetchers tasks

func (*SynchronizerImpl) SyncAll

func (s *SynchronizerImpl) SyncAll() error

SyncAll syncs splits and segments

func (*SynchronizerImpl) SynchronizeSegment

func (s *SynchronizerImpl) SynchronizeSegment(name string, till *int64) error

SynchronizeSegment syncs segment

func (*SynchronizerImpl) SynchronizeSplits

func (s *SynchronizerImpl) SynchronizeSplits(till *int64) error

SynchronizeSplits syncs splits

type Workers

type Workers struct {
	SplitFetcher             split.Updater
	SegmentFetcher           segment.Updater
	TelemetryRecorder        telemetry.TelemetrySynchronizer
	ImpressionRecorder       impression.ImpressionRecorder
	EventRecorder            event.EventRecorder
	ImpressionsCountRecorder impressionscount.ImpressionsCountRecorder
}

Workers struct for workers

Directories

Path Synopsis
worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL