smartpoll

package module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2024 License: MIT Imports: 6 Imported by: 0

README

go-smartpoll

Package smartpoll offers dynamic, reactive scheduling for synchronized polling of multiple data points. It provides a highly configurable "control loop" (roughly for { ... select ...}), a scheduler API, and concurrency control, for task execution.

See the API docs.

Features

  • Dynamic Scheduling: The package provides a dynamic scheduling mechanism. This means that you can adjust the schedule of polling operations on the fly, based on the needs of your application. See Internal.

  • Sane Control Loop: Smartpoll makes it easy to synchronise control-flow decisions, including handling of results. See TaskHook.

  • Reactive: Smartpoll is reactive. It allows implementations to respond to changes in the state of your application, and adjust scheduling accordingly. See Hook.

  • Easy to Implement: The package is designed to make it easy to implement a control loop. This reduces the complexity of your code and makes it easier to maintain.

  • Error Handling: Smartpoll provides robust error handling mechanisms for handling fatal errors from tasks or any logic running within the control loop.

  • Restart Capability: Smartpoll allows for the control loop to be torn down and later re-run, in response to arbitrary events, or context cancel.

Examples

1. Aggregating Data from Multiple APIs

Imagine you are building a weather application, that aggregates data from multiple weather APIs, to provide a more accurate forecast. Your backend to retrieve the data, and perform the aggregation, is a simple, single-instance worker. Each API has different rate limits, and you want to ensure you're not hitting these limits. You also want to be able to implement backoff/retry, per API. Each time any of the input data changes, the aggregate will be regenerated, and the result published.

// TODO: Actually implement this, as a runnable example.
// Assume state in the local scope, without explicit synchronisation, except where noted.
weatherAPI1Task := func(ctx context.Context) (smartpoll.TaskHook, error) {
	// running in a separate goroutine...
	// assume backoff / retries baked into retrieving the result
	result, err := // ...
	if err != nil {
		// fatal error, will terminate the control loop
		return nil, err
	}

	return func(ctx context.Context, internal *Internal) error {
		// synchronised with the control loop...

		// reschedule as desired
		internal.ScheduleSooner("weatherAPI1", time.Second*10)

		if !result.Equal(lastResult) {
			// schedule a publish task, if not already scheduled
			internal.Schedule("publish", 0)
		}

		lastResult = result
		return nil
	}, nil
}

publishTask := func(ctx context.Context) (smartpoll.TaskHook, error) {
	// we are running in a separate goroutine - this might use a mutex, atomic, or some other mechanism to synchronise
	allDataSnapshot := getAllDataSynchronised()

	// this could also be made available to the other tasks, e.g. updated to a variable in the parent scope, in a TaskHook
	aggregateResult := transformAllData(allDataSnapshot)

	// ... perform IO etc, to publish the aggregate result

	return nil, nil // TaskHook is omitted - nothing to synchronise with the control loop
}

scheduler, _ := smartpoll.New(
	smartpoll.WithRunHook(func(ctx context.Context, internal *smartpoll.Internal) error {
		// schedule the first invocation of each polling task, after which they manage their own lifecycle
		internal.Schedule("weatherAPI1", 0)
		internal.Schedule("weatherAPI2", 0)
		internal.Schedule("weatherAPI3", 0)
		return nil
	}),
	smartpoll.WithTask("weatherAPI1", weatherAPI1Task),
	smartpoll.WithTask("weatherAPI2", weatherAPI2Task),
	smartpoll.WithTask("weatherAPI3", weatherAPI3Task),
	// scheduled by each weather api task, as necessary (singleflight)
	smartpoll.WithTask("publish", publishTask),
)

scheduler.Run(context.Background())

Litmus test

Answering yes to one or more of the following might indicate that smartpoll is a good fit for your use case.

  1. Do you need to control (schedule or run) distinct tasks, which operate in the background?
  2. Do you need a mechanism to synchronise handling the results of tasks?
  3. Do you need tasks to be able to coordinate with or schedule other tasks, in an arbitrary manner?
  4. Do you need tasks which run on a dynamic interval?
  5. Do you need to be able to reschedule tasks, or inspect when they are scheduled to run?
  6. Do you also need to be able to perform arbitrary blocking logic (including accessing task results and scheduling tasks), in response to arbitrary events, scheduled in a "fair" manner, alongside the built-in behavior?
  7. Do you need to be able to handle fatal errors, from tasks, or any logic running within the control loop?
  8. Do you need to be able to tear down then later restart the control loop, e.g. in response to arbitrary events?

Smartpoll DOES NOT provide, but could be used with an implementation which provides the following. The effort involved varies.

  • Cancellation of running tasks (trivial)
  • Waiting for all tasks to exit on return of Scheduler.Run (trivial, and there IS support for waiting on re-run)
  • Built-in prioritisation or other higher-level strategies to order independent tasks (non-trivial)
  • The ability to schedule a given task more than once, in addition to any running invocation of that task (somewhat non-trivial)
  • Cron-based scheduling or similar (trivial, given a suitable cron implementation)

Smartpoll does not support:

  • Dynamically adding or removing supported tasks (potentially able to be worked around by using a new Scheduler)

Documentation

Overview

Package smartpoll offers dynamic, reactive scheduling for synchronized polling of multiple data points. See the README for the rationale, use cases, and examples.

The purpose of this implementation is to make it as easy as possible to implement a "control loop", which directly manages the scheduling of polling operations, and propagates and/or transforms the results. It achieves this by running all such operations, termed "hooks", within a single goroutine. The implementation consists of a Scheduler, which is configured with a set of Task values, with a corresponding identifying key, and optionally a set of Hook values, with which each receive from a corresponding channel. Each Task consists of a two-stage, higher-order function. The first stage is performed in its own goroutine, while the second stage is synchronized with the main loop. Channels for each custom Hook are selected alongside the logic for scheduling and handling the results of each Task.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPanicInTask will be returned by Scheduler.Run if a task calls
	// runtime.Goexit. Note that it will technically also be returned if a task
	// panics, but that will bubble, and cause the process to exit.
	ErrPanicInTask = errors.New(`smartpoll: panic in task`)
)

Functions

This section is empty.

Types

type Hook

type Hook[T any] func(ctx context.Context, internal *Internal, value T, ok bool) error

Hook will be called with the values `value, ok := <-ch`, where ch represents the channel this hook was configured with. The context will be a descendent of the Scheduler.Run context, and will be cancelled after the hook returns.

type Internal

type Internal struct {
	// contains filtered or unexported fields
}

Internal models the internal API, accessible from within hooks. It is unsafe to retain a reference to Internal, or to use concurrently.

func (*Internal) Next

func (x *Internal) Next(key any) time.Time

Next returns an advisory timestamp for when the given task key will next be ready, or the zero time if the task is not scheduled.

func (*Internal) Running

func (x *Internal) Running(key any) bool

Running returns true if the given task key is currently running.

func (*Internal) Schedule

func (x *Internal) Schedule(key any, d time.Duration)

Schedule schedules the given task key to be ready after the given duration, immediately if the duration is zero, or clears any scheduling if the duration is negative. The scheduled time will be returned by Next, until it is consumed, or the task is rescheduled.

func (*Internal) ScheduleAt

func (x *Internal) ScheduleAt(key any, t time.Time)

ScheduleAt schedules the given task key to be ready after the given time, or clears any scheduling if t is the zero value. The return value of Next will be the provided time, until it is consumed, or the task is rescheduled.

func (*Internal) ScheduleAtSooner

func (x *Internal) ScheduleAtSooner(key any, t time.Time)

ScheduleAtSooner is like ScheduleAt, except it will use the sooner of the existing schedule and the given time. It will panic if the given time is the zero value.

func (*Internal) ScheduleSooner

func (x *Internal) ScheduleSooner(key any, d time.Duration)

ScheduleSooner is like Schedule, except it will use the sooner of the existing schedule and the given duration. It will panic if the given duration is not positive (use Schedule if you wish to immediately schedule a task).

func (*Internal) StopTimer

func (x *Internal) StopTimer(key any) (ready bool)

StopTimer stops the timer for the given task key, but does not clear any scheduling of that task. It returns true if the task is still scheduled. Note that a task may be running, regardless of whether it is scheduled.

type Option

type Option interface {
	// contains filtered or unexported methods
}

func WithHook

func WithHook[T any](ch <-chan T, hook Hook[T]) Option

WithHook adds a Hook, wired up to the given channel.

func WithRunHook

func WithRunHook(hook RunHook) Option

WithRunHook adds a RunHook to be called on each Scheduler.Run, just prior to starting the main loop. If more than one RunHook is configured, they will be called in the order they were configured.

func WithTask

func WithTask(key any, task Task) Option

WithTask adds a task, identified by the given key. The provided task may be scheduled.

type RunHook

type RunHook func(ctx context.Context, internal *Internal) error

RunHook is a hook which is called on each Scheduler.Run, just prior to starting the main loop. The context will be a descendent of the Scheduler.Run context, and will be cancelled after the hook returns.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler schedules "tasks", implements a "main loop", and provides various means to manager the scheduler (and other, arbitrary) state, via "hooks", which run within, or are synchronised with, the main loop.

Scheduler must be constructed with New. The Run method is used to run the scheduler.

See also the package docs for smartpoll.

func New

func New(options ...Option) (*Scheduler, error)

New initialises a Scheduler, with the given options. See also `With*` prefixed functions.

func (*Scheduler) Run

func (x *Scheduler) Run(ctx context.Context) error

Run runs the scheduler, blocking until the context is cancelled, or an error is returned from a Hook or a Task. A panic will occur if called concurrently (called again before the previous call returns), or if called on a scheduler which was not initialized with New.

The context, passed to tasks and hooks, will be derived from the context passed to Run, and will be cancelled when Run returns. There is not any built-in mechanism to cancel individual running tasks.

It is important to note that Run does not wait for all running tasks to complete, prior to exit. Calling Run again will block on any tasks which are still running, dropping any errors (likely context cancellation) which occur.

type Task

type Task func(ctx context.Context) (TaskHook, error)

Task performs an arbitrary operation in the background, returning either a fatal error, or a TaskHook, or neither (`return nil, nil`). The context provided to Task will be cancelled after this function (not the TaskHook) returns, or when the Scheduler.Run context is canceled.

Each configured task runs independently, and only one will run per key, at any given time.

type TaskHook

type TaskHook func(ctx context.Context, internal *Internal) error

TaskHook performs an arbitrary operation, synchronised with the main loop, within Scheduler.Run. The typical usage of this is to handle results (e.g. store them in an in-memory cache), and/or to reschedule the task. The context provided to TaskHook will be cancelled after this function returns, or the Scheduler.Run context is canceled.

Like all "hooks" in this package, TaskHook runs synchronously with the main loop. It will always be run, prior to the next Task (of the same key), though it may not be called, in the event a fatal error occurs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL