gojm

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

xybor founder Go Reference GitHub Repo stars GitHub top language GitHub go.mod Go version GitHub release (release name instead of tag name) Codacy Badge Codacy Badge Go Report

Introduction

A thread-safe and reliable priority-based job manager.

Job

In gojm, job is only a wrapper of function. You can define a job as below:

job := gojm.NewJob(function(ctx context.Context) *gojm.JobResult {
    fmt.Println("The job started")

    time.Sleep(time.Second)

    fmt.Println("The job completed")

    return nil
})

You can execute the job like you are calling a function. Note that the method Exec() should be called only once. The program will panics if you call it again.

ctx := context.Background()
result := job.Exec(ctx)

You also can get the result with non-blocking mode or blocking mode after the job had already completed. If the job has not completed yet, the result is nil.

// Non-blocking mode
result := job.GetResult()
// Blocking mode
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Second) // set timeout as 1 second.
defer cancel()

result := job.WaitResult(ctx)

Job result

If you want the job returns a value, you can use JobResult.

job := gojm.NewJob(function(ctx context.Context) *gojm.JobResult {
    fmt.Println("The job started")

    time.Sleep(time.Second)

    fmt.Println("The job completed")

    return gojm.Result(100)
})

result := job.Exec(ctx)
fmt.Println(result.Get(nil))
// Output:
// 100

You also can put many values into JobResult.

job := gojm.NewJob(function(ctx context.Context) *gojm.JobResult {
    fmt.Println("The job started")

    time.Sleep(time.Second)

    fmt.Println("The job completed")

    result := gojm.EmptyResult()
    result.Set("x", 100)
    result.Set("y", "abc")
    return result
})

result := job.Exec(ctx)
fmt.Println(result.Get("x"), result.Get("y"))
// Output:
// 100 abc

Or return an error.

job := gojm.NewJob(function(ctx context.Context) *gojm.JobResult {
    fmt.Println("The job started")

    time.Sleep(time.Second)

    fmt.Println("The job completed")

    return gojm.Err(errors.New("something's wrong"))
})

result := job.Exec(ctx)
fmt.Println(result.Err)
// Output:
// something's wrong

Job manager

You can put jobs into a job manager with a priority to execute it in when possible.

Firstly, you need to create some Priority levels. Every Priority has its own value, the lower value, the higher priority.

// Urgent is for jobs which need to be executed as soon as possible.
var Urgent = gojm.NewPriority("Urgent", 0)

// Necessary is for jobs which can be executed later but also need to be
// completed soon. We set the aging by one minute (after one minute, this job
// will be moved to the higher priority).
var Necessary = gojm.NewPriority("Necessary", 10).WithAging(time.Minute)

// Background is for jobs which can be completed no matter of time. We must
// specify that we don't need an aging (including default aging) for this
// priority.
var Background = gojm.NewPriority("Background", 1000).WithNoAging()

Start a job manager

jm := gojm.New()

ctx := context.Background()
if err := jm.Run(ctx); err != nil {
    panic(err)
}

You can put job into the job manager in another goroutine.

jm.Schedule(Background, gojm.NewJob(func(ctx context.Context) *gojm.JobResult {
    fmt.Println("Do background job")
    return nil
}))

Hold the job object to wait for its result.

urgentJob := gojm.NewJob(func(ctx context.Context) *gojm.JobResult {
    fmt.Println("Do urgent job")
    return gojm.Result(0)
})

jm.Schedule(Background, urgentJob)

result := urgentJob.WaitResult(ctx)
fmt.Println(result.Get(nil))
// Output:
// 0

Hook

Instead of waiting for the result of each job, you can set a hook function to handle the result of all completed jobs.

jm.Hook(func (ctx context.Context, job gojm.JobWrapper) {
    if job.Unwrap().Err != nil {
        log.Printf("level=error priority=%s err=%v", job.OriginalPriority, job.Unwrap().Err)
    } else if result := job.Unwrap().GetResult().Get(nil); result != nil {
        log.Printf("level=info priority=%s result=%v", job.OriginalPriority, result)
    }
})

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrExistedJob = errors.New("existed job")

Functions

This section is empty.

Types

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job is the wrapper of a function which allows to wait the result asynchronously.

func NewJob

func NewJob(f func(ctx context.Context) *JobResult) *Job

NewJob wraps a function to Job.

func (*Job) Exec

func (j *Job) Exec(ctx context.Context) *JobResult

Exec executes the job and returns the result. Do not call this method multiple times.

func (*Job) GetResult

func (j *Job) GetResult() *JobResult

GetResult returns the JobResult if the job already completed. Otherwise, it returns nil.

func (*Job) IsCompleted

func (j *Job) IsCompleted() bool

IsCompleted returns true if the job already completed.

func (*Job) WaitResult

func (j *Job) WaitResult(ctx context.Context) *JobResult

WaitResult returns the result if the job already completed. Otherwise, it blocks the current process until the job completes.

type JobManager

type JobManager struct {
	// contains filtered or unexported fields
}

JobManager allows to schedule job based on its priority.

Example
// Setup priority
jm := gojm.New()
jm.AddPriority(Urgent)
jm.AddPriority(Necessary)
jm.AddPriority(Background)

backgroundJob := gojm.NewJob(func(ctx context.Context) *gojm.JobResult {
	fmt.Println("background done")
	return gojm.Result("background")
})

// In other goroutines, schedule your jobs.
go func() {
	time.Sleep(20 * time.Millisecond)

	jm.Schedule(Background, backgroundJob)

	time.Sleep(20 * time.Millisecond)

	jm.Schedule(Urgent, gojm.NewJob(func(ctx context.Context) *gojm.JobResult {
		fmt.Println("urgent done")
		return gojm.EmptyResult()
	}))

	time.Sleep(20 * time.Millisecond)

	jm.Schedule(Necessary, gojm.NewJob(func(ctx context.Context) *gojm.JobResult {
		fmt.Println("necessary done")
		return gojm.EmptyResult()
	}))
}()

// Because this is an example, we need to stop this function after one
// second.
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1000*time.Millisecond)
defer cancel()

// Run the manager with the ability of handling 2 jobs in a time. In
// reality, you could run this function forever.
jm.Run(ctx, 2)

// You also wait for the result of a job.
result := backgroundJob.WaitResult(ctx)
fmt.Println("Result of background job:", result.Get(nil))
Output:
background done
urgent done
necessary done
Result of background job: background

func New

func New() *JobManager

New initialized a JobManager.

func (*JobManager) AddPriority

func (m *JobManager) AddPriority(p Priority)

AddPriority sets a new Priority to JobManager.

func (*JobManager) Hook

func (m *JobManager) Hook(trigger func(ctx context.Context, job JobWrapper))

Hook sets a trigger function to be executed when the job has just completed. This method should be called before calling Run() or RunOne().

func (*JobManager) RefreshEvery

func (m *JobManager) RefreshEvery(interval time.Duration)

RefreshEvery sets an interval which refreshes the priority of jobs every time interval passed. If you do not call this method, this interval will be chosen automatically (equal to the least aging timeslice of all priorities).

func (*JobManager) Run

func (m *JobManager) Run(ctx context.Context, numThreads int) error

Run starts the JobManager. The parameter numThreads specifies the number of Jobs which could be executed concurrently.

func (*JobManager) RunOne

func (m *JobManager) RunOne(ctx context.Context) error

RunOne starts the JobManager which only one Job could be executed at a time.

func (*JobManager) Schedule

func (m *JobManager) Schedule(priority Priority, job *Job) error

Schedule adds a Job to JobManager, the job will be scheduled to execute later.

func (*JobManager) SetDefaultJobAging

func (m *JobManager) SetDefaultJobAging(timeslice time.Duration)

SetDefaultJobAging sets a timeslice. When the job has existed for more than this timeslice, it will be moved to the higher priority. This timeslice is only applied when the priority hasn't its own aging.

type JobResult

type JobResult struct {
	Err    error
	DoneAt time.Time
	// contains filtered or unexported fields
}

JobResult is the returned value of Job.

func EmptyResult

func EmptyResult() *JobResult

EmptyResult initializes an empty JobResult.

func Err

func Err(err error) *JobResult

Err return a JobResult with an error.

func Result

func Result(value any) *JobResult

Result returns a JobResult with a default value.

func (JobResult) Get

func (r JobResult) Get(key any) any

Get returns the value with a given key.

func (JobResult) GetBool

func (r JobResult) GetBool(key any) bool

GetBool returns the boolean value with a given key.

func (JobResult) GetFloat32

func (r JobResult) GetFloat32(key any) float32

GetFloat32 returns the float32 value with a given key.

func (JobResult) GetFloat64

func (r JobResult) GetFloat64(key any) float64

GetFloat64 returns the float64 value with a given key.

func (JobResult) GetInt

func (r JobResult) GetInt(key any) int

GetInt returns the int value with a given key.

func (JobResult) GetInt32

func (r JobResult) GetInt32(key any) int32

GetInt32 returns the int32 value with a given key.

func (JobResult) GetInt64

func (r JobResult) GetInt64(key any) int64

GetInt64 returns the int64 value with a given key.

func (JobResult) GetString

func (r JobResult) GetString(key any) string

GetString returns the string value with a given key.

func (JobResult) GetUint

func (r JobResult) GetUint(key any) uint

GetUint returns the uint value with a given key.

func (JobResult) GetUint32

func (r JobResult) GetUint32(key any) uint32

GetUint32 returns the uint32 value with a given key.

func (JobResult) GetUint64

func (r JobResult) GetUint64(key any) uint64

GetUint64 returns the uint64 value with a given key.

func (JobResult) Has

func (r JobResult) Has(key any) bool

Has returns true if the result contains a key.

func (*JobResult) Set

func (r *JobResult) Set(key, value any) *JobResult

Set adds a key-value pair to JobResult.

type JobWrapper

type JobWrapper struct {
	Priority         Priority
	OriginalPriority Priority
	// contains filtered or unexported fields
}

func (JobWrapper) Unwrap

func (w JobWrapper) Unwrap() *Job

type Priority

type Priority struct {
	// contains filtered or unexported fields
}

func NewPriority

func NewPriority(name string, p int) Priority

func (Priority) Name

func (p Priority) Name() string

func (Priority) String

func (p Priority) String() string

func (Priority) Value

func (p Priority) Value() int

func (Priority) WithAging

func (p Priority) WithAging(timeslice time.Duration) Priority

func (Priority) WithNoAging

func (p Priority) WithNoAging() Priority

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL