gocelery

package module
v0.0.0-...-0d53e84 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2015 License: MIT Imports: 20 Imported by: 0

README

GoCelery

a golang port of Celery distributed task engine. It supports executing and submitting tasks, and can interop with celery engine or celery python client.

Build Status

Features

  • Task Queues
  • Task Event Reporting
  • Supported Brokers
    • RabbitMQ
    • Redis
    • Nats

Installation

go get http://github.com/taoh/gocelery

Example

demo/main.go

package main

import (
	"math/rand"
	"time"

	log "github.com/Sirupsen/logrus"
	"github.com/taoh/gocelery"
)

// Adder worker
type Adder struct{}

// Execute an addition
func (a *Adder) Execute(task *gocelery.Task) (result interface{}, err error) {
	sum := float64(0)
	for _, arg := range task.Args {
		switch arg.(type) {
		case int64:
			sum += (float64)(arg.(int64))
		case float64:
			sum += arg.(float64)
		}
	}
	result = sum

	// simulate the wait
	time.Sleep(time.Duration(rand.Int31n(3000)) * time.Millisecond)
	log.Debug("task.Args: ", task.Args, " Result: ", result)
	return
}

func main() {
	worker := gocelery.New(&gocelery.Config{
		LogLevel: "debug",
	})
	defer worker.Close()

	gocelery.RegisterWorker("tasks.add", &Adder{})
	// print all registered workers
	workers := gocelery.RegisteredWorkers()
	for _, worker := range workers {
		log.Debugf("Registered Worker: %s", worker)
	}

	// start executing
	worker.StartWorkers()
}

start the worker process

go run demo/main.go

demo/client.go

package main

import (
	log "github.com/Sirupsen/logrus"
	"github.com/taoh/gocelery"
)

func main() {
	log.SetLevel(log.InfoLevel)
	log.Info("We can run the task and ignore result")
	i := 13
	j := 12
	args := []interface{}{i, j}

	worker := gocelery.New(&gocelery.Config{
		LogLevel: "debug",
	})
	defer worker.Close()

	worker.Enqueue(
		"tasks.add", // task name
		args,        // arguments
		true,        // ignoreResults
	)

	log.Info("Task Executed.")

	taskResult := worker.Enqueue(
		"tasks.add", // task name
		args,        // arguments
		false,       // ignoreResults
	)

	tr := <-taskResult
	log.Infof("We can also run the task and return result: %d + %d = %d", i, j, int64(tr.Result.(float64)))
	log.Info("Task Executed.")
}

start the client process

go run demo/client.go

Documentation

Godoc

The MIT License

Documentation

Overview

Package gocelery is a Golang implemenation of celery task queue. It allows you to enqueue a task and execute it using go runtime which brings efficiency and concurrency features

gocelery is compatible with celery so that you can execute tasks enqueued by celery client or submit tasks to be executed by celery workers

gocelery requires a broker and results backend. currently only rabbitmq is supported (although any AMQP broker should work but not tested)

gocelery is shipped as a library so you can implement your own workers.

Index

Constants

View Source
const (
	// DefaultQueue is the default task queue name
	DefaultQueue = "celery"
)
View Source
const (
	JSON string = "application/json"
)

Constants

Variables

This section is empty.

Functions

func IsWorkerRegistered

func IsWorkerRegistered(name string) bool

IsWorkerRegistered checks if worker exists for the task name

func NewTaskFailedEvent

func NewTaskFailedEvent(task *Task, taskResult *TaskResult, err error) map[string]interface{}

NewTaskFailedEvent creates new event for task failed

func NewTaskReceivedEvent

func NewTaskReceivedEvent(task *Task) map[string]interface{}

NewTaskReceivedEvent creates new event for task received

func NewTaskStartedEvent

func NewTaskStartedEvent(task *Task) map[string]interface{}

NewTaskStartedEvent creates new event for task started

func NewTaskSucceedEvent

func NewTaskSucceedEvent(task *Task, taskResult *TaskResult, runtime time.Duration) map[string]interface{}

NewTaskSucceedEvent creates new event for task succeeded

func RegisterWorker

func RegisterWorker(name string, worker Worker)

RegisterWorker registers the worker with given task name

func RegisteredWorkers

func RegisteredWorkers() []string

RegisteredWorkers List all registered workers

Types

type Config

type Config struct {
	// BrokerURL in the format amqp:user@password//<host>/<virtualhost>
	BrokerURL string
	// LogLevel: debug, info, warn, error, fatal
	LogLevel string
}

Config stores the configuration information for gocelery

type EventType

type EventType string

EventType is enum of valid event types in celery

const (
	None            EventType = "None"
	WorkerOffline   EventType = "worker-offline"
	WorkerHeartbeat EventType = "worker-heartbeat"
	WorkerOnline    EventType = "worker-online"
	TaskRetried     EventType = "task-retried"
	TaskSucceeded   EventType = "task-succeeded"
	TaskStarted     EventType = "task-started"
	TaskReceived    EventType = "task-received"
	TaskFailed      EventType = "task-failed"
	TaskRevoked     EventType = "task-revoked"
)

Valid EventTypes

func (EventType) RoutingKey

func (eventType EventType) RoutingKey() string

RoutingKey returns celery routing keys for events

type GoCelery

type GoCelery struct {
	// contains filtered or unexported fields
}

GoCelery creates an instance of entry

func New

func New(config *Config) *GoCelery

New creates a GoCelery instance with given config

func (*GoCelery) Close

func (gocelery *GoCelery) Close()

Close disconnects with broker and cleans up all resources used. Use a defer statement to make sure resources are closed

func (*GoCelery) Enqueue

func (gocelery *GoCelery) Enqueue(taskName string, args []interface{}, ignoreResult bool) (chan *TaskResult, error)

Enqueue adds a task to queue to be executed immediately. If ignoreResult is true the function returns immediately with a nil channel returned. Otherwise, a result channel is returned so client can wait for the result.

func (*GoCelery) EnqueueInQueue

func (gocelery *GoCelery) EnqueueInQueue(queueName string, taskName string, args []interface{}, ignoreResult bool) (chan *TaskResult, error)

EnqueueInQueue adds a task to queue to be executed immediately. If ignoreResult is true the function returns immediately with a nil channel returned. Otherwise, a result channel is returned so client can wait for the result.

func (*GoCelery) EnqueueInQueueWithSchedule

func (gocelery *GoCelery) EnqueueInQueueWithSchedule(spec string, queueName string, taskName string, args []interface{}) error

EnqueueInQueueWithSchedule adds a task that is scheduled repeatedly. Schedule is specified in a string with cron format

func (*GoCelery) EnqueueWithSchedule

func (gocelery *GoCelery) EnqueueWithSchedule(spec string, queueName string, taskName string, args []interface{}) error

EnqueueWithSchedule adds a task that is scheduled repeatedly. Schedule is specified in a string with cron format

func (*GoCelery) StartWorkers

func (gocelery *GoCelery) StartWorkers()

StartWorkers start running the workers with default queue

func (*GoCelery) StartWorkersWithQueues

func (gocelery *GoCelery) StartWorkersWithQueues(queues []string)

StartWorkersWithQueues start running the workers

type ResultStatus

type ResultStatus string

ResultStatus is the valid statuses for task executions

const (
	Pending ResultStatus = "PENDING"
	Started ResultStatus = "STARTED"
	Success ResultStatus = "SUCCESS"
	Retry   ResultStatus = "RETRY"
	Failure ResultStatus = "FAILURE"
	Revoked ResultStatus = "REVOKED"
)

ResultStatus values

type Task

type Task struct {
	Task        string                 `json:"task"`
	ID          string                 `json:"id"`
	Args        []interface{}          `json:"args,omitempty"`
	Kwargs      map[string]interface{} `json:"kwargs,omitempty"`
	Retries     int                    `json:"retries,omitempty"`
	Eta         celeryTime             `json:"eta,omitempty"`
	Expires     celeryTime             `json:"expires,omitempty"`
	ContentType string                 `json:"-"`
}

Task represents the a single piece of work

func (Task) String

func (t Task) String() string

type TaskResult

type TaskResult struct {
	ID        string       `json:"task_id"`
	Result    interface{}  `json:"result"`
	Status    ResultStatus `json:"status"`
	TraceBack string       `json:"traceback"`
}

TaskResult is the result wrapper for task

type Worker

type Worker interface {
	Execute(*Task) (interface{}, error)
}

Worker is the definition of task execution

type WorkerEvent

type WorkerEvent struct {
	Type      EventType `json:"type"`
	Ident     string    `json:"sw_ident"`
	Ver       string    `json:"sw_ver"`
	Sys       string    `json:"sw_sys"`
	HostName  string    `json:"hostname"`
	Timestamp int64     `json:"timestamp"`
}

WorkerEvent implements the structure for worker related events

func NewWorkerEvent

func NewWorkerEvent(eventType EventType) *WorkerEvent

NewWorkerEvent creates new worker events

Directories

Path Synopsis
Godeps
_workspace/src/github.com/apcera/nats
A Go client for the NATS messaging system (https://nats.io).
A Go client for the NATS messaging system (https://nats.io).
_workspace/src/github.com/garyburd/redigo/internal/redistest
Package redistest contains utilities for writing Redigo tests.
Package redistest contains utilities for writing Redigo tests.
_workspace/src/github.com/garyburd/redigo/redis
Package redis is a client for the Redis database.
Package redis is a client for the Redis database.
_workspace/src/github.com/go-errors/errors
Package errors provides errors that have stack-traces.
Package errors provides errors that have stack-traces.
_workspace/src/github.com/hydrogen18/stalecucumber
This package reads and writes pickled data.
This package reads and writes pickled data.
_workspace/src/github.com/robfig/cron
This library implements a cron spec parser and runner.
This library implements a cron spec parser and runner.
_workspace/src/github.com/twinj/uuid
This package provides RFC4122 UUID capabilities.
This package provides RFC4122 UUID capabilities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL