godot

package module
v0.0.0-...-8a99df0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2023 License: Apache-2.0 Imports: 21 Imported by: 0

README

Godot

Simple, efficient background processing for Go.

Godot uses go routine to handle many jobs at the same time in the
same process

Web UI

Requirements

redis
go 1.19+

Installation

go get github.com/easedot/godot

Getting Started

Step1

Write doters , doters/doters.go

Step2

Write job servers, examples/examples_srv.go

Step3

Write run job task, examples/examples_cli.go

Step4

go build example/example_srv.go

go build example/example_cli.go

Step5

open new termial run
docker-compose up -d

open new termial run     
./example_srv -m 50

open new termial run
./example_cli -m 5000

Config

//set queue weight
var queues = []godot.Queue{
	{Name: "work1", Weight: 3},
	{Name: "work2", Weight: 2},
	{Name: "work3", Weight: 1},
	{Name: "default", Weight: 1},
}

//set dots to 1000 goroutine 
godotSRV := godot.NewGoDot(ctx, client, queues, 50, 6698)

//set job option and register
options := Doter{
	Queue:      "default",
	Retry:      false,
	RetryCount: 2,
}
doter := defaultDoter{options}
Register(doter, options)

client.Run(ctx, "defaultDoter", "test_at")
//or run after 1000ms
client.RunAt(ctx, 1000, "defaultDoter", "test_at")

Todo

dashboard localhost:6698

Web UI

Performance

Macbook M1 Pro  
10 process 1m27s put 500000 jobs 
./example_cli -m 50000

one server, 2000 redis conn, 30s done.
./example_srv -m 2000

License

Please see LICENSE.txt for licensing details.

Author

Haihui Jiang @easedot

Documentation

Index

Constants

View Source
const (
	DefaultMaxRetry = 25
	LAYOUT          = "2006-01-02 15:04:05 UTC"
	FetchTimeout    = 2 * time.Second
	PollInterval    = 2
)
View Source
const CALCKEY = "godot:calc:%s"
View Source
const Dead = "dead"
View Source
const DefaultDoter = "default_dot"
View Source
const Jobs = "jobs"
View Source
const MSTASQKEY = "godot:mstats:%s"
View Source
const MaxMinuteSpan = 60 * 24 * 7 //seconds per 7 days
View Source
const MaxSecondSpan = 60 * 60 * 24 //seconds per 7 days
View Source
const RetryQueue = "retry"
View Source
const STASQKEY = "godot:stats:%s"
View Source
const ScheduleQueue = "schedule"
View Source
const Success = "success"

Variables

This section is empty.

Functions

func NowTimeStamp

func NowTimeStamp() string

func RandQueue

func RandQueue(queueNames []string) []string

func Register

func Register(task Task, option Doter)

func RegisterByName

func RegisterByName(taskName string, task Task, option Doter)

func Unique

func Unique(in []string) []string

Types

type ChartData

type ChartData struct {
	Name   string
	Title  string            `json:"title,omitempty"`
	Legend []string          `json:"legend,omitempty"`
	Labels []string          `json:"labels,omitempty"`
	Series []ChartSeriesData `json:"series,omitempty"`
	Height string
}

type ChartSeriesData

type ChartSeriesData struct {
	Name      string `json:"name,omitempty"`
	Type      string `json:"type,omitempty"`
	Data      []int  `json:"data,omitempty"`
	Smooth    bool   `json:"smooth,omitempty"`
	Animation bool   `json:"animation,omitempty"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewGoDotCli

func NewGoDotCli(client *redis.Client) *Client

func (*Client) Run

func (d *Client) Run(ctx context.Context, className string, args ...interface{})

func (*Client) RunAt

func (d *Client) RunAt(ctx context.Context, at int64, className string, args ...interface{})

type Dot

type Dot struct {
	// contains filtered or unexported fields
}

func NewDot

func NewDot(maxDots int, stats *Stats) *Dot

func (*Dot) Shutdown

func (d *Dot) Shutdown()

func (*Dot) WaitJob

func (d *Dot) WaitJob(ctx context.Context, jobChan chan string, retry *ScheduleJob)

type DotCache

type DotCache interface {
	RPop(ctx context.Context, key string) (string, error)
	LPop(ctx context.Context, key string) (string, error)
	LTrim(ctx context.Context, key string, start, stop int64) (string, error)
	LIndex(ctx context.Context, key string, index int64) (string, error)
	LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
	LPush(ctx context.Context, key string, values interface{})
	RPush(ctx context.Context, key string, values interface{})
	BulkLPush(ctx context.Context, key string, values []interface{})
	BlockRPop(ctx context.Context, queue ...string) (string, error)

	TimeAdd(ctx context.Context, time int64, key string, values interface{})
	TimeQuery(ctx context.Context, queue string) ([]string, error)
	TimeRem(ctx context.Context, queue, job string) (int64, error)
	HGet(ctx context.Context, hash, key string) (string, error)
	HGetAll(ctx context.Context, hash string) (map[string]string, error)
	HSet(ctx context.Context, hash string, value ...interface{}) error
	HIncrBy(ctx context.Context, hash, key string, incValue int64) error
	LLen(ctx context.Context, key string) (int64, error)
}

func NewRedisCache

func NewRedisCache(client *redis.Client) DotCache

type DotData

type DotData struct {
	Queue      string        `json:"queue,omitempty"`
	Class      string        `json:"class"`
	Args       []interface{} `json:"args"`
	Jid        string        `json:"jid"`
	EnqueuedAt string        `json:"enqueued_at"`
	DotDataOption
}

func NewDotData

func NewDotData(className string) (*DotData, error)

func (*DotData) SetRetryInfo

func (d *DotData) SetRetryInfo()

type DotDataOption

type DotDataOption struct {
	RetryCount int   `json:"retry_count,omitempty"`
	Retry      bool  `json:"retryJob,omitempty"`
	At         int64 `json:"at,omitempty"`
}

type Doter

type Doter struct {
	Queue      string
	Retry      bool
	RetryCount int
}

type GoDot

type GoDot struct {
	// contains filtered or unexported fields
}

func NewGoDot

func NewGoDot(ctx context.Context, client *redis.Client, queues []Queue, maxRedisCnn int, port int) *GoDot

func (*GoDot) Shutdown

func (d *GoDot) Shutdown()

func (*GoDot) WaitIdl

func (d *GoDot) WaitIdl()

func (*GoDot) WaitJob

func (d *GoDot) WaitJob()

type Queue

type Queue struct {
	Name   string
	Weight int
}

type QueueJob

type QueueJob struct {
	// contains filtered or unexported fields
}

func NewQueueJob

func NewQueueJob(ctx context.Context, queues []Queue, cache DotCache, jobChan chan string, maxFetch int) *QueueJob

func (*QueueJob) FetchJob

func (q *QueueJob) FetchJob(ctx context.Context)

func (*QueueJob) IsIDL

func (q *QueueJob) IsIDL() bool

func (*QueueJob) IsStop

func (q *QueueJob) IsStop() bool

func (*QueueJob) QueueNames

func (q *QueueJob) QueueNames() []string

func (*QueueJob) Stop

func (q *QueueJob) Stop()

type RedisCache

type RedisCache struct {
	// contains filtered or unexported fields
}

func (*RedisCache) BlockRPop

func (r *RedisCache) BlockRPop(ctx context.Context, queue ...string) (string, error)

func (*RedisCache) BulkLPush

func (r *RedisCache) BulkLPush(ctx context.Context, key string, values []interface{})

func (*RedisCache) HGet

func (r *RedisCache) HGet(ctx context.Context, hash, key string) (string, error)

func (*RedisCache) HGetAll

func (r *RedisCache) HGetAll(ctx context.Context, hash string) (map[string]string, error)

func (*RedisCache) HIncrBy

func (r *RedisCache) HIncrBy(ctx context.Context, hash, key string, incValue int64) error

func (*RedisCache) HSet

func (r *RedisCache) HSet(ctx context.Context, hash string, values ...interface{}) error

func (*RedisCache) LIndex

func (r *RedisCache) LIndex(ctx context.Context, key string, index int64) (string, error)

func (*RedisCache) LLen

func (r *RedisCache) LLen(ctx context.Context, key string) (int64, error)

func (*RedisCache) LPop

func (r *RedisCache) LPop(ctx context.Context, key string) (string, error)

func (*RedisCache) LPush

func (r *RedisCache) LPush(ctx context.Context, key string, values interface{})

func (*RedisCache) LRange

func (r *RedisCache) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)

func (*RedisCache) LTrim

func (r *RedisCache) LTrim(ctx context.Context, key string, start, stop int64) (string, error)

func (*RedisCache) RPop

func (r *RedisCache) RPop(ctx context.Context, key string) (string, error)

func (*RedisCache) RPush

func (r *RedisCache) RPush(ctx context.Context, key string, values interface{})

func (*RedisCache) TimeAdd

func (r *RedisCache) TimeAdd(ctx context.Context, time int64, key string, values interface{})

func (*RedisCache) TimeQuery

func (r *RedisCache) TimeQuery(ctx context.Context, queue string) ([]string, error)

func (*RedisCache) TimeRem

func (r *RedisCache) TimeRem(ctx context.Context, queue, job string) (int64, error)

type ScheduleJob

type ScheduleJob struct {
	// contains filtered or unexported fields
}

func NewScheduleJob

func NewScheduleJob(ctx context.Context, queueName string, cache DotCache, jobChan chan string) *ScheduleJob

func (*ScheduleJob) FetchJob

func (s *ScheduleJob) FetchJob(ctx context.Context)

func (*ScheduleJob) Stop

func (s *ScheduleJob) Stop()

type Stats

type Stats struct {
	// contains filtered or unexported fields
}

func NewStats

func NewStats(cache DotCache, queues []Queue, port int) *Stats

func (*Stats) GetHash

func (d *Stats) GetHash(ctx context.Context, hash, key string) string

func (*Stats) PushHash

func (d *Stats) PushHash(ctx context.Context, hash, key string)

type Task

type Task interface {
	Run(args ...interface{}) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL