asyncjob

package module
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2024 License: MIT Imports: 8 Imported by: 0

README

AsyncJob

AsyncJob aiming to help you organize code in dependencyGraph(DAG), instead of a sequential chain.

Concepts

JobDefinition is a graph describe code blocks and their connections.

  • you can use AddStep, StepAfter, StepAfterBoth to organize steps in a JobDefinition.
  • jobDefinition can be and should be build and seal in package init time.
  • jobDefinition have a generic typed input
  • calling Start with the input, will instantiate an jobInstance, and steps will began to execute.
  • jobDefinition can be visualized using graphviz, easier for human to understand.

JobInstance is an instance of JobDefinition, after calling .Start() method from JobDefinition

  • all Steps on the definition will be copied to JobInstance.
  • each step will be executed once it's precedent step is done.
  • jobInstance can be visualized as well, instance visualize contains detailed info(startTime, duration) on each step.

StepDefinition is a individual code block which can be executed and have inputs, output.

  • StepDefinition describe it's preceding steps.
  • StepDefinition contains generic Params
  • ideally all stepMethod should come from JobInput (generic type on JobDefinition), or static method. To avoid shared state between jobs.
  • output of a step can be feed into next step as input, type is checked by go generics.

StepInstance is instance of StepDefinition

  • step is wrapped in AsyncTask
  • a step would be started once all it's dependency is finished.
  • executionPolicy can be applied {Retry, ContextEnrichment}

Usage

Build and run a asyncjob

// SqlSummaryAsyncJobDefinition is the job definition for the SqlSummaryJobLib
//   JobDefinition fit perfectly in init() function
var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]

func init() {
	var err error
	SqlSummaryAsyncJobDefinition, err = BuildJobWithResult(map[string]asyncjob.RetryPolicy{})
	if err != nil {
		panic(err)
	}

	SqlSummaryAsyncJobDefinition.Seal()
}

func BuildJob(retryPolicies map[string]asyncjob.RetryPolicy) (*asyncjob.JobDefinition[SqlSummaryJobLib], error) {
	job := asyncjob.NewJobDefinition[SqlSummaryJobLib]("sqlSummaryJob")

	connTsk, err := asyncjob.AddStep(job, "GetConnection", connectionStepFunc, asyncjob.WithRetry(retryPolicies["GetConnection"]), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetConnection: %w", err)
	}

	checkAuthTask, err := asyncjob.AddStep(job, "CheckAuth", checkAuthStepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step CheckAuth: %w", err)
	}

	table1ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient1", connTsk, tableClient1StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetTableClient1: %w", err)
	}

	qery1ResultTsk, err := asyncjob.StepAfter(job, "QueryTable1", table1ClientTsk, queryTable1StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step QueryTable1: %w", err)
	}

	table2ClientTsk, err := asyncjob.StepAfter(job, "GetTableClient2", connTsk, tableClient2StepFunc, asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step GetTableClient2: %w", err)
	}

	qery2ResultTsk, err := asyncjob.StepAfter(job, "QueryTable2", table2ClientTsk, queryTable2StepFunc, asyncjob.WithRetry(retryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step QueryTable2: %w", err)
	}

	summaryTsk, err := asyncjob.StepAfterBoth(job, "Summarize", qery1ResultTsk, qery2ResultTsk, summarizeQueryResultStepFunc, asyncjob.WithRetry(retryPolicies["Summarize"]), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step Summarize: %w", err)
	}

	_, err = asyncjob.AddStep(job, "EmailNotification", emailNotificationStepFunc, asyncjob.ExecuteAfter(summaryTsk), asyncjob.WithContextEnrichment(EnrichContext))
	if err != nil {
		return nil, fmt.Errorf("error adding step EmailNotification: %w", err)
	}
	return job, nil
}
	// execute job
	jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
	jobInstance2 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})

    // ...

	jobInstance1.Wait(context.WithTimeout(context.Background(), 10*time.Second))
	jobInstance2.Wait(context.WithTimeout(context.Background(), 10*time.Second))
visualize of a job
	# visualize the job
	dotGraph := job.Visualize()
	fmt.Println(dotGraph)

visualize job graph

digraph {
	newrank = "true"
		"QueryTable2" [label="QueryTable2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254054-08:00\nDuration: 13.207µs" fillcolor=green] 
		"QueryTable1" [label="QueryTable1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254098-08:00\nDuration: 11.394µs" fillcolor=green] 
		"EmailNotification" [label="EmailNotification" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254143-08:00\nDuration: 11.757µs" fillcolor=green] 
		"sqlSummaryJob" [label="sqlSummaryJob" shape=triangle style=filled tooltip="State: completed\nStartAt: 0001-01-01T00:00:00Z\nDuration: 0s" fillcolor=green] 
		"GetConnection" [label="GetConnection" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253844-08:00\nDuration: 154.825µs" fillcolor=green] 
		"GetTableClient2" [label="GetTableClient2" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254017-08:00\nDuration: 25.793µs" fillcolor=green] 
		"GetTableClient1" [label="GetTableClient1" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254076-08:00\nDuration: 12.459µs" fillcolor=green] 
		"Summarize" [label="Summarize" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.254121-08:00\nDuration: 7.88µs" fillcolor=green] 
		"CheckAuth" [label="CheckAuth" shape=hexagon style=filled tooltip="State: completed\nStartAt: 2022-12-12T12:00:32.253818-08:00\nDuration: 18.52µs" fillcolor=green] 

		"CheckAuth" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green] 
		"CheckAuth" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green] 
		"GetTableClient2" -> "QueryTable2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254054-08:00" color=green] 
		"GetTableClient1" -> "QueryTable1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254098-08:00" color=green] 
		"QueryTable1" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green] 
		"QueryTable2" -> "Summarize" [style=bold tooltip="Time: 2022-12-12T12:00:32.254121-08:00" color=green] 
		"Summarize" -> "EmailNotification" [style=bold tooltip="Time: 2022-12-12T12:00:32.254143-08:00" color=green] 
		"sqlSummaryJob" -> "CheckAuth" [style=bold tooltip="Time: 2022-12-12T12:00:32.253818-08:00" color=green] 
		"sqlSummaryJob" -> "GetConnection" [style=bold tooltip="Time: 2022-12-12T12:00:32.253844-08:00" color=green] 
		"GetConnection" -> "GetTableClient2" [style=bold tooltip="Time: 2022-12-12T12:00:32.254017-08:00" color=green] 
		"GetConnection" -> "GetTableClient1" [style=bold tooltip="Time: 2022-12-12T12:00:32.254076-08:00" color=green] 
}
collect result from job

you can enrich job to aware result from given step, then you can collect result (strongly typed) from that step

var SqlSummaryAsyncJobDefinition *asyncjob.JobDefinitionWithResult[SqlSummaryJobLib, SummarizedResult]
SqlSummaryAsyncJobDefinition = asyncjob.JobWithResult(job /*from previous section*/, summaryTsk)

jobInstance1 := SqlSummaryAsyncJobDefinition.Start(ctx, &SqlSummaryJobLib{...})
result, err := jobInstance1.Result(ctx)
Overhead?
  • go routine will be created for each step in your jobDefinition, when you call .Start()
  • each step also hold tiny memory as well for state tracking.
  • userFunction is instrumented with state tracking, panic handling.

Here is some simple visualize on how it actual looks like:

gantt
    title       asyncjob.Start()
    dateFormat  HH:mm

    section GetConnection
    WaitPrecedingTasks            :des11, 00:00,0ms
    userFunction                  :des12, after des11, 20ms

    section GetTableClient1
    WaitPrecedingTasks            :des21, 00:00,20ms
    userFunction                  :des22, after des21, 15ms

    section GetTableClient2
    WaitPrecedingTasks            :des31, 00:00,20ms
    userFunction                  :des32, after des31, 21ms

    section QueryTable1
    WaitPrecedingTasks            :des41, 00:00,35ms
    userFunction                  :des42, after des41, 24ms

    section QueryTable2
    WaitPrecedingTasks            :des51, 00:00,41ms
    userFunction                  :des52, after des51, 30ms

    section QueryResultSummarize
    WaitPrecedingTasks            :des61, 00:00, 71ms
    userFunction                  :des62, after des61, 10ms

Documentation

Index

Constants

View Source
const (
	ErrPrecedentStepFailed JobErrorCode = "PrecedentStepFailed"
	ErrStepFailed          JobErrorCode = "StepFailed"

	ErrRefStepNotInJob JobErrorCode = "RefStepNotInJob"
	MsgRefStepNotInJob string       = "trying to reference to step %q, but it is not registered in job"

	ErrAddStepInSealedJob JobErrorCode = "AddStepInSealedJob"
	MsgAddStepInSealedJob string       = "trying to add step %q to a sealed job definition"

	ErrAddExistingStep JobErrorCode = "AddExistingStep"
	MsgAddExistingStep string       = "trying to add step %q to job definition, but it already exists"

	ErrDuplicateInputParentStep JobErrorCode = "DuplicateInputParentStep"
	MsgDuplicateInputParentStep string       = "at least 2 input parentSteps are same"

	ErrRuntimeStepNotFound JobErrorCode = "RuntimeStepNotFound"
	MsgRuntimeStepNotFound string       = "runtime step %q not found, must be a bug in asyncjob"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ExecutionOptionPreparer

type ExecutionOptionPreparer func(*StepExecutionOptions) *StepExecutionOptions

func ExecuteAfter added in v0.1.1

Add precedence to a step.

without taking input from it(use StepAfter/StepAfterBoth otherwise)

func WithContextEnrichment added in v0.5.0

func WithContextEnrichment(contextPolicy StepContextPolicy) ExecutionOptionPreparer

func WithRetry added in v0.2.2

func WithRetry(retryPolicy RetryPolicy) ExecutionOptionPreparer

Allow retry of a step on error.

type JobDefinition added in v0.5.0

type JobDefinition[T any] struct {
	// contains filtered or unexported fields
}

JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).

func NewJobDefinition added in v0.5.0

func NewJobDefinition[T any](name string) *JobDefinition[T]

Create new JobDefinition

it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.

func (*JobDefinition[T]) GetName added in v0.5.0

func (jd *JobDefinition[T]) GetName() string

func (*JobDefinition[T]) GetStep added in v0.5.0

func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool)

GetStep returns the stepDefinition by name

func (*JobDefinition[T]) Seal added in v0.5.0

func (jd *JobDefinition[T]) Seal()

func (*JobDefinition[T]) Sealed added in v0.5.0

func (jd *JobDefinition[T]) Sealed() bool

func (*JobDefinition[T]) Start added in v0.5.0

func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T]

Start execution of the job definition.

this will create and return new instance of the job
caller will then be able to wait for the job instance

func (*JobDefinition[T]) Visualize added in v0.5.0

func (jd *JobDefinition[T]) Visualize() (string, error)

Visualize the job definition in graphviz dot format

type JobDefinitionMeta added in v0.5.0

type JobDefinitionMeta interface {
	GetName() string
	GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
	Seal()
	Sealed() bool
	Visualize() (string, error)
	// contains filtered or unexported methods
}

Interface for a job definition

type JobDefinitionWithResult added in v0.5.0

type JobDefinitionWithResult[Tin, Tout any] struct {
	*JobDefinition[Tin]
	// contains filtered or unexported fields
}

func JobWithResult added in v0.5.0

func JobWithResult[Tin, Tout any](jd *JobDefinition[Tin], resultStep *StepDefinition[Tout]) (*JobDefinitionWithResult[Tin, Tout], error)

func (*JobDefinitionWithResult[Tin, Tout]) Start added in v0.5.0

func (jd *JobDefinitionWithResult[Tin, Tout]) Start(ctx context.Context, input Tin, jobOptions ...JobOptionPreparer) *JobInstanceWithResult[Tin, Tout]

type JobError added in v0.2.2

type JobError struct {
	Code         JobErrorCode
	StepError    error
	StepInstance StepInstanceMeta
	Message      string
}

func (*JobError) Error added in v0.2.2

func (je *JobError) Error() string

func (*JobError) RootCause added in v0.5.0

func (je *JobError) RootCause() error

RootCause track precendent chain and return the first step raised this error.

func (*JobError) Unwrap added in v0.2.2

func (je *JobError) Unwrap() error

type JobErrorCode added in v0.2.2

type JobErrorCode string

func (JobErrorCode) Error added in v0.2.2

func (code JobErrorCode) Error() string

func (JobErrorCode) WithMessage added in v0.6.0

func (code JobErrorCode) WithMessage(msg string) *MessageError

type JobExecutionOptions added in v0.5.0

type JobExecutionOptions struct {
	Id              string
	RunSequentially bool
}

type JobInstance added in v0.5.0

type JobInstance[T any] struct {
	Definition *JobDefinition[T]
	// contains filtered or unexported fields
}

JobInstance is the instance of a jobDefinition

func (*JobInstance[T]) GetJobDefinition added in v0.5.0

func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta

func (*JobInstance[T]) GetJobInstanceId added in v0.5.0

func (ji *JobInstance[T]) GetJobInstanceId() string

func (*JobInstance[T]) GetStepInstance added in v0.5.0

func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool)

GetStepInstance returns the stepInstance by name

func (*JobInstance[T]) Visualize added in v0.5.0

func (jd *JobInstance[T]) Visualize() (string, error)

Visualize the job instance in graphviz dot format

func (*JobInstance[T]) Wait added in v0.5.0

func (ji *JobInstance[T]) Wait(ctx context.Context) error

Wait for all steps in the job to finish.

type JobInstanceMeta added in v0.5.0

type JobInstanceMeta interface {
	GetJobInstanceId() string
	GetJobDefinition() JobDefinitionMeta
	GetStepInstance(stepName string) (StepInstanceMeta, bool)
	Wait(context.Context) error
	Visualize() (string, error)
	// contains filtered or unexported methods
}

type JobInstanceWithResult added in v0.5.0

type JobInstanceWithResult[Tin, Tout any] struct {
	*JobInstance[Tin]
	// contains filtered or unexported fields
}

func (*JobInstanceWithResult[Tin, Tout]) Result added in v0.5.0

func (ji *JobInstanceWithResult[Tin, Tout]) Result(ctx context.Context) (Tout, error)

Result returns the result of the job from result step.

it doesn't wait for all steps to finish, you can use Result() after Wait() if desired.

type JobOptionPreparer added in v0.5.0

type JobOptionPreparer func(*JobExecutionOptions) *JobExecutionOptions

func WithJobId added in v0.5.0

func WithJobId(jobId string) JobOptionPreparer

func WithSequentialExecution added in v0.5.0

func WithSequentialExecution() JobOptionPreparer

type MessageError added in v0.6.0

type MessageError struct {
	Code    JobErrorCode
	Message string
}

func (*MessageError) Error added in v0.6.0

func (me *MessageError) Error() string

func (*MessageError) Unwrap added in v0.6.0

func (me *MessageError) Unwrap() error

type RetryPolicy added in v0.2.2

type RetryPolicy interface {
	ShouldRetry(error) (bool, time.Duration)
}

type RetryReport added in v0.2.2

type RetryReport struct {
	Count int
}

RetryReport would record the retry count (could extend to include each retry duration, ...)

type StepContextPolicy added in v0.2.2

type StepContextPolicy func(context.Context, StepInstanceMeta) context.Context

StepContextPolicy allows context enrichment before passing to step.

With StepInstanceMeta you can access StepInstance, StepDefinition, JobInstance, JobDefinition.

type StepDefinition added in v0.5.0

type StepDefinition[T any] struct {
	// contains filtered or unexported fields
}

StepDefinition defines a step and it's dependencies in a job definition.

func AddStep

func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

AddStep adds a step to the job definition.

func AddStepWithStaticFunc added in v0.5.0

func AddStepWithStaticFunc[JT, ST any](j *JobDefinition[JT], stepName string, stepFunc asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

AddStepWithStaticFunc is same as AddStep, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)

func StepAfter

func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfter add a step after a preceding step, also take input from that preceding step

func StepAfterBoth

func StepAfterBoth[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepAfterBothFuncCreator func(input JT) asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfterBoth add a step after both preceding steps, also take input from both preceding steps

func StepAfterBothWithStaticFunc added in v0.5.0

func StepAfterBothWithStaticFunc[JT, PT1, PT2, ST any](j *JobDefinition[JT], stepName string, parentStep1 *StepDefinition[PT1], parentStep2 *StepDefinition[PT2], stepFunc asynctask.AfterBothFunc[PT1, PT2, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfterBothWithStaticFunc is same as StepAfterBoth, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)

func StepAfterWithStaticFunc added in v0.5.0

func StepAfterWithStaticFunc[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepFunc asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error)

StepAfterWithStaticFunc is same as StepAfter, but the stepFunc passed in shouldn't have receiver. (or you get shared state between job instances)

func (*StepDefinition[T]) DependsOn added in v0.5.0

func (sd *StepDefinition[T]) DependsOn() []string

func (*StepDefinition[T]) DotSpec added in v0.5.0

func (sd *StepDefinition[T]) DotSpec() *graph.DotNodeSpec

func (*StepDefinition[T]) GetName added in v0.5.0

func (sd *StepDefinition[T]) GetName() string

type StepDefinitionMeta added in v0.5.0

type StepDefinitionMeta interface {

	// GetName return name of the step
	GetName() string

	// DependsOn return the list of step names that this step depends on
	DependsOn() []string

	// DotSpec used for generating graphviz graph
	DotSpec() *graph.DotNodeSpec
	// contains filtered or unexported methods
}

StepDefinitionMeta is the interface for a step definition

type StepErrorPolicy

type StepErrorPolicy struct{}

type StepExecutionData added in v0.2.1

type StepExecutionData struct {
	StartTime time.Time
	Duration  time.Duration
	Retried   *RetryReport
}

StepExecutionData would measure the step execution time and retry report.

type StepExecutionOptions

type StepExecutionOptions struct {
	ErrorPolicy   StepErrorPolicy
	RetryPolicy   RetryPolicy
	ContextPolicy StepContextPolicy

	// dependencies that are not input.
	DependOn []string
}

type StepInstance added in v0.5.0

type StepInstance[T any] struct {
	Definition  *StepDefinition[T]
	JobInstance JobInstanceMeta
	// contains filtered or unexported fields
}

StepInstance is the instance of a step, within a job instance.

func (*StepInstance[T]) DotSpec added in v0.5.0

func (si *StepInstance[T]) DotSpec() *graph.DotNodeSpec

func (*StepInstance[T]) EnrichContext added in v0.5.0

func (si *StepInstance[T]) EnrichContext(ctx context.Context) (result context.Context)

func (*StepInstance[T]) ExecutionData added in v0.5.0

func (si *StepInstance[T]) ExecutionData() *StepExecutionData

func (*StepInstance[T]) GetJobInstance added in v0.5.0

func (si *StepInstance[T]) GetJobInstance() JobInstanceMeta

func (*StepInstance[T]) GetName added in v0.5.0

func (si *StepInstance[T]) GetName() string

func (*StepInstance[T]) GetState added in v0.5.0

func (si *StepInstance[T]) GetState() StepState

func (*StepInstance[T]) GetStepDefinition added in v0.5.0

func (si *StepInstance[T]) GetStepDefinition() StepDefinitionMeta

func (*StepInstance[T]) Waitable added in v0.5.0

func (si *StepInstance[T]) Waitable() asynctask.Waitable

type StepInstanceMeta added in v0.5.0

type StepInstanceMeta interface {
	GetName() string
	ExecutionData() *StepExecutionData
	GetState() StepState
	GetJobInstance() JobInstanceMeta
	GetStepDefinition() StepDefinitionMeta
	Waitable() asynctask.Waitable

	DotSpec() *graph.DotNodeSpec
}

StepInstanceMeta is the interface for a step instance

type StepState

type StepState string
const StepStateCompleted StepState = "completed"
const StepStateFailed StepState = "failed"
const StepStatePending StepState = "pending"
const StepStateRunning StepState = "running"

Directories

Path Synopsis
graph module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL