ellie

package module
v0.0.0-...-1325eac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2017 License: MIT Imports: 10 Imported by: 0

README

Ellie

A distributed task queue written in Go.

Installation

Grab the project for your own project using go get:

$ go get github.com/dansackett/ellie

Examples

package main

import (
	"time"

	"github.com/dansackett/ellie"
)

func Sum(x, y int) int {
    return x + y
}

func main() {
	// Configure the application to run 10 workers with 5 seconds of sleep between each run.
	ellie.Configure(10, 5)

	// Enqueue a task to run now
	ellie.Enqueue(Sum, 3, 4)

	// Enqueue a task to run in 30 seconds
	ellie.EnqueueIn(30*time.Second, Sum, 3, 4)

	// Enqueue a task to run in 2 minutes
	ellie.EnqueueAt(time.Now().Add(2*time.Minute), Sum, 3, 4)

	// Enqueue a task to run every minute and a half
	ellie.EnqueueEvery((1*time.Minute)+(30*time.Second), Sum, 3, 4)

	// Enqueue a task to run that we intend to cancel
	cancelHash := ellie.EnqueueIn(5*time.Minute, Sum, 3, 4)

	// Dequeue a task from running
	ellie.Dequeue(cancelHash)

	// Start the workers and watch for new tasks
	ellie.RunServer()
}

Documentation

Index

Constants

View Source
const (
	DEFAULT_WORKER_COUNT  = 5
	DEFAULT_WORK_INTERVAL = 5 // in seconds
)

Variables

View Source
var (
	// Worker Channels
	WorkerStarted  = make(chan *Worker)
	WorkerSleeping = make(chan *Worker)

	// Task Channels
	TaskScheduled = make(chan *Task)
	TaskDequeued  = make(chan *Task)
	TaskStarted   = make(chan map[*Worker]*Task)
	TaskFinished  = make(chan map[*Worker]*Task)
)

Functions

func Dequeue

func Dequeue(hash uuid.UUID)

Enqueue schedules a task to run as soon as the next worker is available.

func Enqueue

func Enqueue(fn interface{}, args ...interface{}) uuid.UUID

Enqueue schedules a task to run as soon as the next worker is available.

func EnqueueAt

func EnqueueAt(period time.Time, fn interface{}, args ...interface{}) uuid.UUID

EnqueueAt schedules a task to run at a certain time in the future.

func EnqueueEvery

func EnqueueEvery(period time.Duration, fn interface{}, args ...interface{}) uuid.UUID

EnqueueEvery schedules a task to run and reschedule itself on a regular interval. It works like EnqueueIn but repeats

func EnqueueIn

func EnqueueIn(period time.Duration, fn interface{}, args ...interface{}) uuid.UUID

EnqueueIn schedules a task to run a certain amount of time from the current time. This allows us to schedule tasks to run in intervals.

func LogTaskDequeued

func LogTaskDequeued(t *Task)

LogTaskDequeued sends a signal to the TaskDequeued channel triggering the output text.

func LogTaskFinished

func LogTaskFinished(w *Worker, t *Task)

LogTaskFinished sends a signal to the TaskFinished channel triggering the output text.

func LogTaskScheduled

func LogTaskScheduled(t *Task)

LogTaskScheduled sends a signal to the TaskScheduled channel triggering the output text.

func LogTaskStarted

func LogTaskStarted(w *Worker, t *Task)

LogTaskStarted sends a signal to the TaskStarted channel triggering the output text.

func LogWorkerSleeping

func LogWorkerSleeping(w *Worker)

LogWorkerSleeping sends a signal to the WorkerSleeping channel triggering the output text.

func LogWorkerStarted

func LogWorkerStarted(w *Worker)

LogWorkerStarted sends a signal to the WorkerStarted channel triggering the output text.

func RunServer

func RunServer()

RunServer starts a blocking loop allowing the goroutines to communicate without the program closing. We spawn the workers here and also fire off the StateMonitor to listen for state changes while processing.

func SpawnWorkers

func SpawnWorkers()

SpawnWorkers creates the number of workers in the config and starts them as goroutines listening for jobs to pick up.

func StateMonitor

func StateMonitor()

StateMonitor provides a sane way to listen for state changes in the application. New state is passed via channels outputting logs from anywhere in the application.

Types

type Config

type Config struct {
	// NumWorkers specifies the maximum number of active workers to run at any
	// given time.
	NumWorkers int
	// WorkInterval is the time it takes for a worker to sleep before it
	// checks the task queue for more work to do.
	WorkInterval int
	// ScheduledTasks is the default queue used to decide what is available
	// for the workers to consume.
	ScheduledTasks TaskQueue
	// CancelledTasks is a queue which is checked before a task is executed to
	// see if the task has been cancelled.
	CancelledTasks TaskDequeue
	// NewTasks is a signal channel to express that a new task has been pushed
	// to the ScheduledTasks queue.
	NewTasks chan bool
	// WorkerPool in a channel to wait for a worker when a job comes in and
	// we send workers back into it when they are done.
	WorkerPool chan *Worker
	// FinishedTasks is a channel which cleans up after a task has finished.
	FinishedTasks chan *Task
}

Config contains the base configuration for the work queue.

var AppConfig *Config

appConfig is the configuration object to use within the actual module.

func Configure

func Configure(numWorkers, workInterval int) *Config

Configure sets up the base application confiuration options.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig uses the defaults to configure the application.

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents a task to run. It can be scheduled to run later or right away.

type TaskDequeue

type TaskDequeue struct {
	Tasks map[uuid.UUID]bool
	Lock  *sync.Mutex
}

TaskDequeue is a threadsafe container for tasks to be dequeued

func NewDequeue

func NewDequeue() TaskDequeue

NewDequeue returns a new instance of a TaskQueue

func (*TaskDequeue) Get

func (q *TaskDequeue) Get(hash uuid.UUID) bool

Get checks if a key exists in our dequeued task list

func (*TaskDequeue) Push

func (q *TaskDequeue) Push(hash uuid.UUID) (uuid.UUID, error)

Push adds a new task into the front of the TaskQueue

func (*TaskDequeue) Remove

func (q *TaskDequeue) Remove(hash uuid.UUID)

Remove deletes the dequeued entry once we are done with it

type TaskQueue

type TaskQueue struct {
	Tasks *list.List
	Lock  *sync.Mutex
}

TaskQueue is a threadsafe container for tasks to be processed

func NewQueue

func NewQueue() TaskQueue

NewQueue returns a new instance of a TaskQueue

func (*TaskQueue) Len

func (q *TaskQueue) Len() int

Push adds a new task into the front of the TaskQueue

func (*TaskQueue) Pop

func (q *TaskQueue) Pop() *Task

Pop grabs the last task from the TaskQueue

func (*TaskQueue) Push

func (q *TaskQueue) Push(t *Task)

Push adds a new task into the front of the TaskQueue

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a background worker which picks up tasks and communicates its progress on its set channels

func (*Worker) Process

func (w *Worker) Process(t *Task)

Process takes a task and does the work on it.

func (*Worker) Sleep

func (w *Worker) Sleep()

Sleep pauses the worker before its next run

func (*Worker) Start

func (w *Worker) Start()

Start begins a selected worker's scanning loop waiting for tasks to come in. When a task comes in, we first check if it is scheduled to be dequeued. If so, we don't run it and remove it. If it is ready to be run, it processes it. If it isn't ready to be run, it reschedules the task to check again. If the worker doesn't find anything within 100 milliseconds, it sends the worker into sleep mode for the set interval.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL