worker

package
v0.0.0-...-f6774a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 27, 2024 License: Apache-2.0 Imports: 49 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultBatchFlushSize = 10000

DefaultBatchFlushSize set per https://clickhouse.com/docs/en/cloud/bestpractices/bulk-inserts

View Source
const DefaultBatchedFlushTimeout = 5 * time.Second
View Source
const EVENTS_READ_TIMEOUT = 300000

cancel events_objects reads after 5 minutes

View Source
const ErrorGroupsMaxRowsPostgres = 500
View Source
const ErrorObjectsMaxRowsPostgres = 500
View Source
const INACTIVE_THRESHOLD = 0.02

For active and inactive segment calculation

View Source
const MAX_RETRIES = 5

Stop trying to reprocess a session if its retry count exceeds this

View Source
const MIN_INACTIVE_DURATION = 10

Worker is a job runner that parses sessions

View Source
const MinRetryDelay = 250 * time.Millisecond
View Source
const REFRESH_MATERIALIZED_VIEW_TIMEOUT = 30 * 60 * 1000

cancel refreshing materialized views after 30 minutes

View Source
const SessionsMaxRowsPostgres = 500

Variables

This section is empty.

Functions

func CalculateSessionLength

func CalculateSessionLength(first time.Time, last time.Time) (d time.Duration)

CalculateSessionLength gets the session length given two sets of ReplayEvents.

Types

type AutoResolver

type AutoResolver struct {
	// contains filtered or unexported fields
}

func NewAutoResolver

func NewAutoResolver(store *store.Store, db *gorm.DB) *AutoResolver

func (*AutoResolver) AutoResolveStaleErrors

func (autoResolver *AutoResolver) AutoResolveStaleErrors(ctx context.Context)

type EventProcessingAccumulator

type EventProcessingAccumulator struct {
	SessionSecureID string
	// ClickEventQueue is a queue containing the last 2 seconds worth of clustered click events
	ClickEventQueue *list.List
	// CurrentlyInRageClickSet denotes whether the currently parsed event is within a rage click set
	CurrentlyInRageClickSet bool
	// RageClickSets contains all rage click sets that will be inserted into the db
	RageClickSets []*model.RageClickEvent
	// FirstFullSnapshotTimestamp represents the timestamp for the first full snapshot
	FirstFullSnapshotTimestamp time.Time
	// LastEventTimestamp represents the timestamp for the first event
	LastEventTimestamp time.Time
	// ActiveDuration represents the duration that the user was active
	ActiveDuration time.Duration
	// TimestampCounts represents a count of all user interaction events per second
	TimestampCounts map[time.Time]int
	// UserInteractionEvents represents the user interaction events in the session from rrweb
	UserInteractionEvents []*parse.ReplayEvent
	// EventsForTimelineIndicator represents the custom events that will be shown on the timeline indicator
	EventsForTimelineIndicator []*parse.ReplayEvent
	// LatestSID represents the last sequential ID seen
	LatestSID int
	// AreEventsOutOfOrder is true if the list of event SID's is not monotonically increasing from 1
	AreEventsOutOfOrder bool
	// Error
	Error error
	// Parameters for triggering rage click detection
	RageClickSettings RageClickSettings
	// Event chunk metadata for syncing player time with event chunks
	EventChunks []*model.EventChunk
}

func MakeEventProcessingAccumulator

func MakeEventProcessingAccumulator(sessionSecureID string, rageClickSettings RageClickSettings) EventProcessingAccumulator

type KafkaBatchWorker

type KafkaBatchWorker struct {
	KafkaQueue          *kafkaqueue.Queue
	Worker              *Worker
	WorkerThread        int
	BatchFlushSize      int
	BatchedFlushTimeout time.Duration
	Name                string
	TracingDisabled     bool
	// contains filtered or unexported fields
}

func (*KafkaBatchWorker) ProcessMessages

func (k *KafkaBatchWorker) ProcessMessages(ctx context.Context)

type KafkaWorker

type KafkaWorker struct {
	KafkaQueue   *kafkaqueue.Queue
	Worker       *Worker
	WorkerThread int
}

func (*KafkaWorker) ProcessMessages

func (k *KafkaWorker) ProcessMessages(ctx context.Context)

type RageClickSettings

type RageClickSettings struct {
	Window time.Duration
	Radius int
	Count  int
}

type Worker

type Worker struct {
	Resolver       *mgraph.Resolver
	PublicResolver *pubgraph.Resolver
	StorageClient  storage.Client
}

func (*Worker) AutoResolveStaleErrors

func (w *Worker) AutoResolveStaleErrors(ctx context.Context)

Autoresolves error groups that have not had any recent instances

func (*Worker) BackfillStackFrames

func (w *Worker) BackfillStackFrames(ctx context.Context)

func (*Worker) GetHandler

func (w *Worker) GetHandler(ctx context.Context, handlerFlag string) func(ctx context.Context)

func (*Worker) GetPublicWorker

func (w *Worker) GetPublicWorker(topic kafkaqueue.TopicType) func(context.Context)

func (*Worker) GetSessionsToProcess

func (w *Worker) GetSessionsToProcess(ctx context.Context, payloadLookbackPeriod int, lockPeriod int, limit int) ([]*model.Session, error)

func (*Worker) MigrateDB

func (w *Worker) MigrateDB(ctx context.Context)

func (*Worker) PublicWorker

func (w *Worker) PublicWorker(ctx context.Context, topic kafkaqueue.TopicType)

func (*Worker) RefreshMaterializedViews

func (w *Worker) RefreshMaterializedViews(ctx context.Context)

func (*Worker) ReportStripeUsage

func (w *Worker) ReportStripeUsage(ctx context.Context)

func (*Worker) Start

func (w *Worker) Start(ctx context.Context)

Start begins the worker's tasks.

func (*Worker) StartLogAlertWatcher

func (w *Worker) StartLogAlertWatcher(ctx context.Context)

func (*Worker) StartMetricMonitorWatcher

func (w *Worker) StartMetricMonitorWatcher(ctx context.Context)

type WorkerConfig

type WorkerConfig struct {
	Workers          int
	FlushSize        int
	QueueSize        int
	MessageSizeBytes *int64
	FlushTimeout     time.Duration
	Topic            kafkaqueue.TopicType
	TracingDisabled  bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL