scheduler

package
v0.0.0-...-ce3407c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2025 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Redis key for scheduled events
	ScheduleKey = "schedules"
)

Variables

View Source
var TimeNow = func() time.Time { return time.Now() }

Functions

func StartArchivalScheduler

func StartArchivalScheduler(db *sqlx.DB, rdb *redis.Client, checkPeriod time.Duration, durations config.RetentionDurations, stopCh <-chan struct{}, logger *zap.Logger)

Types

type ClientNotifier

type ClientNotifier interface {
	DispatchToClient(clientID string, payload []byte) error
}

ClientNotifier abstracts WebSocket client dispatch (or use your preferred mock generator)

type DayOfWeek

type DayOfWeek string

DayOfWeek represents a day of the week

const (
	DayMonday    DayOfWeek = "MO"
	DayTuesday   DayOfWeek = "TU"
	DayWednesday DayOfWeek = "WE"
	DayThursday  DayOfWeek = "TH"
	DayFriday    DayOfWeek = "FR"
	DaySaturday  DayOfWeek = "SA"
	DaySunday    DayOfWeek = "SU"
)

type DefaultHTTPClient

type DefaultHTTPClient struct {
	// contains filtered or unexported fields
}

DefaultHTTPClient implements HTTPClient using http.Client

func (*DefaultHTTPClient) Do

func (d *DefaultHTTPClient) Do(req *http.Request) (*http.Response, error)

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *repository.OccurrenceRepository, hmacService *services.HMACService, logger *zap.Logger, maxRetries int, retryDelay time.Duration, clientNotifier ClientNotifier, scheduler *Scheduler, httpClient HTTPClient) *Dispatcher

func (*Dispatcher) DispatchAction

func (d *Dispatcher) DispatchAction(ctx context.Context, sched *models.Schedule) error

DispatchAction sends a webhook request or dispatches a websocket message based on the event's action type.

func (*Dispatcher) Run

func (d *Dispatcher) Run(ctx context.Context, scheduler *Scheduler, workerCount int) error

Run starts a pool of dispatcher workers that process the dispatch queue

func (*Dispatcher) SetHTTPClient

func (d *Dispatcher) SetHTTPClient(client HTTPClient)

SetHTTPClient allows setting a custom HTTP client (used for testing)

type Expander

type Expander struct {
	// contains filtered or unexported fields
}

func NewExpander

func NewExpander(
	scheduler *Scheduler,
	eventRepo *repository.EventRepository,
	occurrenceRepo *repository.OccurrenceRepository,
	lookAheadDuration time.Duration,
	expansionInterval time.Duration,
	gracePeriod time.Duration,
	logger *zap.Logger,
) *Expander

NewExpander creates a new Expander with configurable look-ahead duration and expansion interval

func (*Expander) ExpandEvents

func (e *Expander) ExpandEvents(ctx context.Context) error

ExpandEvents expands both recurring and non-recurring events into occurrences

func (*Expander) ExpandNonRecurringEvent

func (e *Expander) ExpandNonRecurringEvent(ctx context.Context, event *models.Event) error

func (*Expander) ExpandRecurringEvent

func (e *Expander) ExpandRecurringEvent(ctx context.Context, event *models.Event) error

func (*Expander) GracePeriod

func (e *Expander) GracePeriod() time.Duration

func (*Expander) LookAheadDuration

func (e *Expander) LookAheadDuration() time.Duration

func (*Expander) Run

func (e *Expander) Run(ctx context.Context) error

Run starts the expander in a loop

type Frequency

type Frequency string

Frequency represents the recurrence frequency

const (
	FrequencyMinutely Frequency = "minutely"
	FrequencyHourly   Frequency = "hourly"
	FrequencyDaily    Frequency = "daily"
	FrequencyWeekly   Frequency = "weekly"
	FrequencyMonthly  Frequency = "monthly"
	FrequencyYearly   Frequency = "yearly"
)

type HTTPClient

type HTTPClient interface {
	Do(req *http.Request) (*http.Response, error)
}

HTTPClient interface for mocking HTTP requests

type Schedule

type Schedule struct {
	Frequency   Frequency   `json:"frequency"`
	Interval    int         `json:"interval"`
	DaysOfWeek  []DayOfWeek `json:"days_of_week,omitempty"`
	DaysOfMonth []int       `json:"days_of_month,omitempty"`
	Months      []int       `json:"months,omitempty"`
	Count       *int        `json:"count,omitempty"`
	Until       *time.Time  `json:"until,omitempty"`
}

Schedule represents a JSON-based recurrence schedule

func ParseSchedule

func ParseSchedule(scheduleStr string, logger *zap.Logger) (*Schedule, error)

ParseSchedule parses a JSON schedule string and returns a Schedule instance

func (*Schedule) GetNextOccurrences

func (s *Schedule) GetNextOccurrences(startTime time.Time, endTime time.Time, logger *zap.Logger) ([]time.Time, error)

GetNextOccurrences calculates the next occurrences based on the schedule

func (*Schedule) Validate

func (s *Schedule) Validate() error

Validate checks if the schedule is valid

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(redis *redis.Client, logger *zap.Logger, prefix string) *Scheduler

func (*Scheduler) AddEvent

func (s *Scheduler) AddEvent(ctx context.Context, event *models.Event) error

func (*Scheduler) GetDueSchedules

func (s *Scheduler) GetDueSchedules(ctx context.Context) (int, error)

GetDueSchedules moves due schedules to the dispatch queue (idempotent version)

func (*Scheduler) PopDispatchQueue

func (s *Scheduler) PopDispatchQueue(ctx context.Context) (*models.Schedule, error)

PopDispatchQueue pops a schedule from the dispatch queue (for worker use)

func (*Scheduler) RemoveScheduledEvent

func (s *Scheduler) RemoveScheduledEvent(ctx context.Context, occurrence *models.Occurrence) error

RemoveScheduledEvent removes a scheduled event from Redis (idempotent version)

func (*Scheduler) ScheduleEvent

func (s *Scheduler) ScheduleEvent(ctx context.Context, occurrence *models.Occurrence, event *models.Event) error

ScheduleEvent schedules a single event occurrence using idempotent deterministic keys

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL