repeater

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2026 License: MIT Imports: 16 Imported by: 0

README

Compogo Repeater 🔁

Repeater — это планировщик периодических задач для Compogo, построенный поверх Runner. Поддерживает два режима работы (эксклюзивный и параллельный), имеет собственные middleware и полностью интегрируется с жизненным циклом Compogo.

🚀 Установка

go get github.com/Compogo/repeater
📦 Быстрый старт
package main

import (
    "context"
    "time"

    "github.com/Compogo/compogo"
    "github.com/Compogo/runner"
    "github.com/Compogo/repeater"
)

func main() {
    app := compogo.NewApp("myapp",
        compogo.WithOsSignalCloser(),
        runner.WithRunner(),
        repeater.WithRepeater(),
        compogo.WithComponents(
            myWorkerComponent,
        ),
    )

    if err := app.Serve(); err != nil {
        panic(err)
    }
}

var myWorkerComponent = &component.Component{
    Dependencies: component.Components{runner.Component, repeater.Component},
    Run: component.StepFunc(func(c container.Container) error {
        return c.Invoke(func(r repeater.Repeater) {
            // Задача, которая выполняется не чаще раза в 5 секунд
            task := repeater.NewTaskWithLock(
                "cleanup",
                runner.ProcessFunc(func(ctx context.Context) error {
                    return doCleanup()
                }),
                5 * time.Second,
            )
            
            r.AddTask(task)
        })
    }),
}
🎯 Две стратегии выполнения
Lock — эксклюзивный режим
// Только один экземпляр задачи может выполняться одновременно
task := repeater.NewTaskWithLock(
    "db-cleanup",
    cleanupProcess,
    1 * time.Hour,
)

Если задача ещё работает, следующий запуск пропускается.

Unlock — параллельный режим
// Можно запускать сколько угодно экземпляров
task := repeater.NewTaskWithUnlock(
    "queue-worker",
    workerProcess,
    10 * time.Second,
)

Каждый запуск получает уникальное имя: queue-worker_1, queue-worker_2 и т.д.

⚙️ Конфигурация

Repeater добавляет флаг --repeater.delay — как часто проверять, какие задачи пора запускать.

./myapp --repeater.delay=100ms

По умолчанию — 1/60 секунды (~16.6мс).

🧩 Опции задач
SkipFirstRun — пропустить первый запуск
task := repeater.NewTaskWithLock(
    "daily-report",
    reportProcess,
    24 * time.Hour,
    repeater.SkipFirstRun,  // первый запуск через 24 часа, а не сразу
)
🔌 Middleware

Repeater поддерживает свои middleware, которые работают только с периодическими задачами:

type LoggingMiddleware struct{}

func (m *LoggingMiddleware) Middleware(task *repeater.Task, next runner.Process) runner.Process {
    return runner.ProcessFunc(func(ctx context.Context) error {
        log.Printf("starting periodic task: %s", task.Name())
        err := next.Process(ctx)
        log.Printf("finished periodic task: %s, err=%v", task.Name(), err)
        return err
    })
}

// Использование
r.Use(&LoggingMiddleware{})
📊 Мониторинг

У каждой задачи есть счётчик выполненных запусков:

count := task.RunNumbers()  // сколько раз уже запускалась
🧹 Graceful shutdown

При остановке приложения Repeater:

  • Получает сигнал через Close()
  • Завершает основной цикл
  • Автоматически останавливает все запущенные экземпляры задач через Runner

Documentation

Index

Constants

View Source
const (
	DelayFieldName = "repeater.delay"
)

Variables

View Source
var Component = &component.Component{
	Dependencies: component.Components{
		runner.Component,
	},
	Init: component.StepFunc(func(container container.Container) error {
		return container.Provides(
			NewConfig,
			NewRepeater,
		)
	}),
	BindFlags: component.BindFlags(func(flagSet flag.FlagSet, container container.Container) error {
		return container.Invoke(func(config *Config) {
			flagSet.DurationVar(&config.Delay, DelayFieldName, DelayDefault, "")
		})
	}),
	Configuration: component.StepFunc(func(container container.Container) error {
		return container.Invoke(Configuration)
	}),
	Execute: component.StepFunc(func(container container.Container) error {
		return container.Invoke(func(r runner.Runner, repeater Repeater) error {
			return r.RunTask(runner.NewTask("repeater", repeater))
		})
	}),
	Stop: component.StepFunc(func(container container.Container) error {
		return container.Invoke(func(repeater Repeater) error {
			return repeater.Close()
		})
	}),
}

Component is a ready-to-use Compogo component that provides the Repeater. It automatically:

  • Registers Config and Repeater in the DI container
  • Adds command-line flags for ticker interval
  • Starts the Repeater as a runner task during Run phase
  • Stops the Repeater during Stop phase

Usage:

compogo.WithComponents(
    runner.Component,
    repeater.Component,
)
View Source
var (
	DelayDefault = time.Second / 60
)

Functions

func StrategyRegister

func StrategyRegister(st StrategyType, strategy Strategy)

StrategyRegister registers a new strategy implementation. This allows extending the repeater with custom strategies.

Types

type Config

type Config struct {
	Delay time.Duration
}

func Configuration

func Configuration(config *Config, configurator configurator.Configurator) *Config

func NewConfig

func NewConfig() *Config

type Middleware

type Middleware interface {
	// Middleware wraps a periodic task's process function.
	// It receives the periodic task and the next Process in the chain,
	// and returns a new Process that will be executed instead.
	Middleware(task *Task, next runner.Process) runner.Process
}

Middleware defines the interface for task middleware. Middlewares can wrap a task's Process function to add cross-cutting concerns such as logging, metrics, or error handling specific to periodic tasks.

type MiddlewareFunc

type MiddlewareFunc func(task *Task, next runner.Process) runner.Process

MiddlewareFunc is a function adapter that allows ordinary functions to be used as Middleware implementations for periodic tasks.

func (MiddlewareFunc) Middleware

func (m MiddlewareFunc) Middleware(task *Task, next runner.Process) runner.Process

Middleware implements the Middleware interface by calling the underlying function.

type Option

type Option func(task *Task) *Task

Option is a function that configures a Task during creation. Options are applied in the order they are provided.

type Repeater

type Repeater interface {
	// Closer stops all tasks and cleans up resources.
	io.Closer

	// Process implements runner.Process, allowing Repeater to be run as a task.
	// It periodically checks for tasks to execute and launches them via Runner.
	runner.Process

	// AddTasks adds multiple periodic tasks to the scheduler.
	AddTasks(tasks ...*Task) error

	// AddTask adds a single periodic task to the scheduler.
	AddTask(task *Task) error

	// RemoveTask removes a periodic task and stops all its running instances.
	RemoveTask(task *Task) error

	// RemoveTaskByName removes a periodic task by name and stops all its instances.
	RemoveTaskByName(name string) error

	// HasTaskByName checks if a periodic task with the given name exists.
	HasTaskByName(name string) bool

	// HasTask checks if the specific task instance exists in the scheduler.
	HasTask(task *Task) bool

	// Use registers middlewares that wrap all periodic task executions.
	Use(middlewares ...Middleware)
}

Repeater defines the interface for a periodic task scheduler. It manages a collection of tasks, executes them on schedule, and ensures proper cleanup on shutdown.

func NewRepeater

func NewRepeater(config *Config, logger logger.Logger, runner runner.Runner) Repeater

NewRepeater creates a new Repeater instance. It requires:

  • config: ticker interval configuration
  • logger: for logging repeater events
  • runner: for executing task instances

type Strategy

type Strategy interface {
	// IsTaskRun determines whether a new instance of the task should be started.
	IsTaskRun(task *Task, runner runner.Runner) bool

	// GenerateName creates a unique name for a task instance.
	// For Lock strategy, this returns the base name; for Unlock, it appends a counter.
	GenerateName(task *Task) string
}

Strategy defines the interface for task execution strategies. Each strategy decides when a task should run and how to name its instances.

type StrategyType

type StrategyType uint8

StrategyType defines how a periodic task handles concurrent executions.

const (
	// Lock strategy ensures only one instance of the task runs at any time.
	// If a task is still running when its next execution is due, it is skipped.
	Lock StrategyType = iota

	// Unlock strategy allows multiple instances of the task to run concurrently.
	// Each execution creates a new task instance with a unique name.
	Unlock
)

func (StrategyType) String

func (s StrategyType) String() string

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a periodic task managed by the Repeater. It contains the task logic, execution schedule, and strategy for concurrent execution.

func NewTask

func NewTask(name string, process runner.Process, delay time.Duration, strategy StrategyType, options ...Option) *Task

NewTask creates a new periodic task with the specified strategy. This is the base constructor that all other constructors use.

func NewTaskWithLock

func NewTaskWithLock(name string, process runner.Process, delay time.Duration, options ...Option) *Task

NewTaskWithLock creates a new periodic task with Lock strategy. Lock strategy ensures only one instance of this task runs at any given time. The task will execute every 'delay' duration.

func NewTaskWithUnlock

func NewTaskWithUnlock(name string, process runner.Process, delay time.Duration, options ...Option) *Task

NewTaskWithUnlock creates a new periodic task with Unlock strategy. Unlock strategy allows multiple instances of this task to run concurrently. The task will execute every 'delay' duration.

func SkipFirstRun

func SkipFirstRun(task *Task) *Task

SkipFirstRun configures the task to skip its first scheduled execution. Useful when you want tasks to start after a delay rather than immediately.

func (*Task) Delay

func (t *Task) Delay() time.Duration

Delay returns the interval between task executions.

func (*Task) Name

func (t *Task) Name() string

Name returns the task's identifier.

func (*Task) RunNumbers

func (t *Task) RunNumbers() uint64

RunNumbers returns the number of times this task has been executed.

func (*Task) Strategy

func (t *Task) Strategy() StrategyType

Strategy returns the task's execution strategy (Lock/Unlock).

func (*Task) String

func (t *Task) String() string

String returns the task's name, implementing fmt.Stringer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL