Documentation ¶
Index ¶
- Constants
- func NewJob(name string, steps ...Step) *jobBuilder
- func NewStep(name string, handler ...interface{}) *stepBuilder
- func Register(job Job) error
- func Restart(ctx context.Context, jobId interface{}, clientId int64) (int64, error)
- func RestartAsync(ctx context.Context, jobId interface{}, clientId int64) (int64, error)
- func SetDB(sqlDb *sql.DB)
- func SetLogger(l logs.Logger)
- func SetMaxRunningJobs(size int)
- func SetMaxRunningSteps(size int)
- func SetTransactionManager(txMgr TransactionManager)
- func Start(ctx context.Context, jobName string, params string, clientId int64) (int64, error)
- func StartAsync(ctx context.Context, jobName string, params string, clientId int64) (int64, error)
- func Stop(ctx context.Context, jobId interface{}) error
- func Unregister(job Job)
- type Aggregator
- type BatchContext
- func (ctx *BatchContext) DeepCopy() *BatchContext
- func (ctx *BatchContext) Exists(key string) bool
- func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}
- func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)
- func (ctx *BatchContext) GetInt(key string, def ...int) (int, error)
- func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)
- func (ctx *BatchContext) GetString(key string, def ...string) (string, error)
- func (ctx *BatchContext) MarshalJSON() ([]byte, error)
- func (ctx *BatchContext) Merge(other *BatchContext)
- func (ctx *BatchContext) Put(key string, value interface{})
- func (ctx *BatchContext) Remove(key string)
- func (ctx *BatchContext) UnmarshalJSON(b []byte) error
- type BatchError
- type ChunkContext
- type ChunkListener
- type DefaultTxManager
- type FilePath
- type Future
- type Handler
- type ItemReader
- type Job
- type JobExecution
- type JobListener
- type OpenCloser
- type PartitionListener
- type Partitioner
- type PartitionerFactory
- type Processor
- type Reader
- type Step
- type StepExecution
- type StepListener
- type Task
- type TransactionManager
- type Writer
Constants ¶
const ( //ErrCodeRetry an error indicating the caller should retry ErrCodeRetry = "retry" //ErrCodeStop an error indicating the job is to be stopped ErrCodeStop = "stop" //ErrCodeConcurrency an error indicating conflict modification ErrCodeConcurrency = "concurrency" //ErrCodeDbFail an error indicating database access failed ErrCodeDbFail = "db_fail" //ErrCodeGeneral general error ErrCodeGeneral = "general" )
const ( DefaultJobPoolSize = 10 DefaultStepTaskPoolSize = 1000 )
task pool
const ( //ItemReaderKeyList the key of keyList in StepContext ItemReaderKeyList = "gobatch.ItemReader.key.list" //ItemReaderCurrentIndex the key of current offset of step's keyList in StepContext ItemReaderCurrentIndex = "gobatch.ItemReader.current.index" //ItemReaderMaxIndex the key of max index of step's keyList in StepContext ItemReaderMaxIndex = "gobatch.ItemReader.max.index" )
const ( //DefaultChunkSize default number of record per chunk to read DefaultChunkSize = 10 //DefaultPartitions default number of partitions to construct a step DefaultPartitions = 1 //DefaultMinPartitionSize default min number of record to process in a sub step of a partitionStep DefaultMinPartitionSize = 1 //DefaultMaxPartitionSize default max number of record to process in a sub step of a partitionStep DefaultMaxPartitionSize = 2147483647 )
Variables ¶
This section is empty.
Functions ¶
func NewJob ¶
NewJob new instance of job builder
func NewStep ¶
func NewStep(name string, handler ...interface{}) *stepBuilder
NewStep initialize a step builder
func Restart ¶
Restart restart job by job name or job execution id
func RestartAsync ¶
RestartAsync restart job by job name or job execution id asynchronously
func SetMaxRunningJobs ¶
func SetMaxRunningJobs(size int)
SetMaxRunningJobs set max number of parallel jobs for GoBatch
func SetMaxRunningSteps ¶
func SetMaxRunningSteps(size int)
SetMaxRunningSteps set max number of parallel steps for GoBatch
func SetTransactionManager ¶
func SetTransactionManager(txMgr TransactionManager)
SetTransactionManager register a TransactionManager instance for GoBatch
func Start ¶
Start start job by job name and params
func StartAsync ¶
StartAsync start job by job name and params asynchronously
func Stop ¶
Stop stop job by job name or job execution id
Types ¶
type Aggregator ¶
type Aggregator interface { //Aggregate aggregate result from all sub step executions Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError }
Aggregator merge results of sub step executions of a chunk step
type BatchContext ¶
type BatchContext struct {
// contains filtered or unexported fields
}
BatchContext contains properties during a job or step execution
func (*BatchContext) DeepCopy ¶
func (ctx *BatchContext) DeepCopy() *BatchContext
func (*BatchContext) Exists ¶
func (ctx *BatchContext) Exists(key string) bool
func (*BatchContext) Get ¶
func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}
func (*BatchContext) GetBool ¶
func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)
func (*BatchContext) GetInt64 ¶
func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)
func (*BatchContext) GetString ¶
func (ctx *BatchContext) GetString(key string, def ...string) (string, error)
func (*BatchContext) MarshalJSON ¶
func (ctx *BatchContext) MarshalJSON() ([]byte, error)
func (*BatchContext) Merge ¶
func (ctx *BatchContext) Merge(other *BatchContext)
func (*BatchContext) Put ¶
func (ctx *BatchContext) Put(key string, value interface{})
func (*BatchContext) Remove ¶
func (ctx *BatchContext) Remove(key string)
func (*BatchContext) UnmarshalJSON ¶
func (ctx *BatchContext) UnmarshalJSON(b []byte) error
type BatchError ¶
type BatchError interface { //Code code of the error Code() string //Message readable message of the error Message() string //Error error interface Error() string //StackTrace goroutine stack trace StackTrace() string }
BatchError represent an error during GoBatch executing
func NewBatchError ¶
func NewBatchError(code string, msg string, args ...interface{}) BatchError
NewBatchError new instance
type ChunkContext ¶
type ChunkContext struct { StepExecution *StepExecution Tx interface{} End bool }
type ChunkListener ¶
type ChunkListener interface { //BeforeChunk execute before start of a chunk in a chunkStep BeforeChunk(context *ChunkContext) BatchError //AfterChunk execute after end of a chunk in a chunkStep AfterChunk(context *ChunkContext) BatchError //OnError execute when an error occurred during a chunk in a chunkStep OnError(context *ChunkContext, err BatchError) }
ChunkListener job listener
type DefaultTxManager ¶
type DefaultTxManager struct {
// contains filtered or unexported fields
}
DefaultTxManager default TransactionManager implementation
func (*DefaultTxManager) BeginTx ¶
func (tm *DefaultTxManager) BeginTx() (interface{}, BatchError)
BeginTx begin a transaction
func (*DefaultTxManager) Commit ¶
func (tm *DefaultTxManager) Commit(tx interface{}) BatchError
Commit commit a transaction
func (*DefaultTxManager) Rollback ¶
func (tm *DefaultTxManager) Rollback(tx interface{}) BatchError
Rollback rollback a transaction
type FilePath ¶
type FilePath struct {
NamePattern string
}
FilePath an abstract file path
func (*FilePath) Format ¶
func (f *FilePath) Format(execution *StepExecution) (string, error)
Format generate a real file path by formatting FilePath according to *StepExecution instance
type Handler ¶
type Handler interface { //Handle implement handler logic here Handle(execution *StepExecution) BatchError }
Handler is a interface for doing work in a simple step
type ItemReader ¶
type ItemReader interface { //ReadKeys read all keys of some kind of data ReadKeys() ([]interface{}, error) //ReadItem read value by one key from ReadKeys result ReadItem(key interface{}) (interface{}, error) }
ItemReader is for loading large amount of data from a datasource like database, used in a chunk step. When the step executing, it first loads all data keys by calling ReadKeys() once, then load full data by key one by one in every chunk.
type Job ¶
type Job interface { Name() string Start(ctx context.Context, execution *JobExecution) BatchError Stop(ctx context.Context, execution *JobExecution) BatchError GetSteps() []Step }
Job job interface used by GoBatch
type JobExecution ¶
type JobExecution struct { JobExecutionId int64 JobInstanceId int64 JobName string JobParams map[string]interface{} JobStatus status.BatchStatus StepExecutions []*StepExecution JobContext *BatchContext CreateTime time.Time StartTime time.Time EndTime time.Time FailError BatchError Version int64 }
JobExecution represents context of a job execution
func (*JobExecution) AddStepExecution ¶
func (e *JobExecution) AddStepExecution(execution *StepExecution)
AddStepExecution add a step execution in this job
type JobListener ¶
type JobListener interface { //BeforeJob execute before job start BeforeJob(execution *JobExecution) BatchError //AfterJob execute after job end either normally or abnormally AfterJob(execution *JobExecution) BatchError }
JobListener job listener
type OpenCloser ¶
type OpenCloser interface { //Open do initialization for Reader or Writer Open(execution *StepExecution) BatchError //Close do cleanups for Reader or Writer Close(execution *StepExecution) BatchError }
OpenCloser is used doing initialization and cleanups for Reader or Writer
type PartitionListener ¶
type PartitionListener interface { //BeforePartition execute before enter into Partitioner.Partition() in a partitionStep BeforePartition(execution *StepExecution) BatchError //AfterPartition execute after return from Partitioner.Partition() in a partitionStep AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError //OnError execute when an error return from Partitioner.Partition() in a partitionStep OnError(execution *StepExecution, err BatchError) }
PartitionListener job listener
type Partitioner ¶
type Partitioner interface { //Partition generate sub step executions from specified step execution and partitions count Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError) //GetPartitionNames generate sub step names from specified step execution and partitions count GetPartitionNames(execution *StepExecution, partitions uint) []string }
Partitioner split an execution of step into multiple sub executions.
type PartitionerFactory ¶
type PartitionerFactory interface {
GetPartitioner(minPartitionSize, maxPartitionSize uint) Partitioner
}
PartitionerFactory can create Partitioners, it is used by Reader usually.
type Processor ¶
type Processor interface { //Process process an item from reader and return a result item Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError) }
Processor is for processing data in a chunk step
type Reader ¶
type Reader interface { //Read each call of Read() will return a data item, if there is no more data, a nil item will be returned. //If there is an error, nil item and a BatchError will return Read(chunkCtx *ChunkContext) (interface{}, BatchError) }
Reader is for loading data in a chunk step
type Step ¶
type Step interface { Name() string Exec(ctx context.Context, execution *StepExecution) BatchError // contains filtered or unexported methods }
Step step interface
type StepExecution ¶
type StepExecution struct { StepExecutionId int64 StepName string StepStatus status.BatchStatus StepContext *BatchContext StepContextId int64 StepExecutionContext *BatchContext JobExecution *JobExecution CreateTime time.Time StartTime time.Time EndTime time.Time ReadCount int64 WriteCount int64 CommitCount int64 FilterCount int64 ReadSkipCount int64 WriteSkipCount int64 ProcessSkipCount int64 RollbackCount int64 FailError BatchError LastUpdated time.Time Version int64 }
StepExecution represents context of a step execution
type StepListener ¶
type StepListener interface { //BeforeStep execute before step start BeforeStep(execution *StepExecution) BatchError //AfterStep execute after step end either normally or abnormally AfterStep(execution *StepExecution) BatchError }
StepListener job listener
type Task ¶
type Task func(execution *StepExecution) BatchError
Task is a function type for doing work in a simple step
type TransactionManager ¶
type TransactionManager interface { BeginTx() (tx interface{}, err BatchError) Commit(tx interface{}) BatchError Rollback(tx interface{}) BatchError }
TransactionManager used by chunk step to execute chunk process in a transaction.
func NewTransactionManager ¶
func NewTransactionManager(db *sql.DB) TransactionManager
NewTransactionManager create a TransactionManager instance
type Writer ¶
type Writer interface { //Write write items generated by processor in a chunk Write(items []interface{}, chunkCtx *ChunkContext) BatchError }
Writer is for writing data in a chunk step