rkasync

package module
v0.0.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2023 License: Apache-2.0 Imports: 10 Imported by: 3

README

rk-async

Cloud native async job framework

Documentation

Index

Constants

View Source
const (
	JobStateCreated  = "created"
	JobStateRunning  = "running"
	JobStateCanceled = "canceled"
	JobStateSuccess  = "success"
	JobStateFailed   = "failed"
)
View Source
const (
	LoggerKey = "rk-async-logger"
	EventKey  = "rk-async-event"
)

Variables

This section is empty.

Functions

func GetEventFromCtx added in v0.0.3

func GetEventFromCtx(ctx context.Context) rkquery.Event

func GetLoggerFromCtx added in v0.0.3

func GetLoggerFromCtx(ctx context.Context) *zap.Logger

func JobNewStateAllowed

func JobNewStateAllowed(oldState, newState string) bool

func RegisterDatabaseRegFunc

func RegisterDatabaseRegFunc(dbType string, f func(map[string]string) Database)

func RegisterEntriesFromConfig

func RegisterEntriesFromConfig(raw []byte) map[string]rkentry.Entry

Types

type BootConfig

type BootConfig struct {
	Async struct {
		Enabled  bool   `json:"enabled" yaml:"enabled"`
		Logger   string `json:"logger" yaml:"logger"`
		Event    string `json:"event" yaml:"event"`
		Database struct {
			MySql struct {
				Enabled   bool   `json:"enabled" yaml:"enabled"`
				EntryName string `json:"entryName" yaml:"entryName"`
				Database  string `json:"database" yaml:"database"`
			} `yaml:"mySql" json:"mySql"`
			Postgres struct {
				Enabled   bool   `json:"enabled" yaml:"enabled"`
				EntryName string `json:"entryName" yaml:"entryName"`
				Database  string `json:"database" yaml:"database"`
			} `yaml:"postgres" json:"postgres"`
		} `yaml:"database" json:"database"`
		Worker struct {
			Local struct {
				Enabled bool `json:"enabled" yaml:"enabled"`
			} `yaml:"local" json:"local"`
		} `yaml:"worker" json:"worker"`
	} `yaml:"async" json:"async"`
}

type Database

type Database interface {
	Type() string

	AddJob(job *Job) error

	RegisterProcessor(jobType string, processor Processor)

	GetProcessor(jobType string) Processor

	PickJobToWork() (*Job, error)

	UpdateJobState(job *Job, state string) error

	ListJobs(filter *JobFilter) ([]*Job, error)

	GetJob(id string) (*Job, error)

	CancelJobsOverdue(days int, filter *JobFilter) error

	CleanJobs(days int, filter *JobFilter) error
}

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

func GetEntry

func GetEntry() *Entry

func (*Entry) AddJob

func (e *Entry) AddJob(job *Job) error

func (*Entry) Bootstrap

func (e *Entry) Bootstrap(ctx context.Context)

func (*Entry) CancelJobsOverdue

func (e *Entry) CancelJobsOverdue(days int, filter *JobFilter) error

func (*Entry) CleanJobs

func (e *Entry) CleanJobs(days int, filter *JobFilter) error

func (*Entry) Database

func (e *Entry) Database() Database

func (*Entry) GetDescription

func (e *Entry) GetDescription() string

func (*Entry) GetJob

func (e *Entry) GetJob(id string) (*Job, error)

func (*Entry) GetName

func (e *Entry) GetName() string

func (*Entry) GetType

func (e *Entry) GetType() string

func (*Entry) Interrupt

func (e *Entry) Interrupt(ctx context.Context)

func (*Entry) ListJobs

func (e *Entry) ListJobs(filter *JobFilter) ([]*Job, error)

func (*Entry) StartWorker added in v0.0.3

func (e *Entry) StartWorker()

func (*Entry) StopWorker added in v0.0.3

func (e *Entry) StopWorker()

func (*Entry) String

func (e *Entry) String() string

func (*Entry) UpdateJobState

func (e *Entry) UpdateJobState(job *Job, state string) error

func (*Entry) Worker

func (e *Entry) Worker() Worker

type Job

type Job struct {
	// do not edit
	Id        string    `json:"id" yaml:"id" gorm:"primaryKey"`
	State     string    `json:"state" yaml:"state" gorm:"index"`
	CreatedAt time.Time `yaml:"createdAt" json:"createdAt" attr:"-"`
	UpdatedAt time.Time `yaml:"updatedAt" json:"updatedAt" attr:"-"`

	// edit
	Type   string `json:"type" yaml:"type" gorm:"index"`
	UserId string `json:"userId" yaml:"userId" gorm:"index"`

	Filter string `json:"filter" yaml:"filter" gorm:"text"`

	Steps   datatypes.JSONType[[]*Step]     `json:"steps" yaml:"steps"`
	Payload datatypes.JSONType[interface{}] `json:"payload" yaml:"payload"`
}

func (*Job) TableName added in v0.0.10

func (j *Job) TableName() string

type JobFilter

type JobFilter struct {
	ClauseList []clause.Expression
	Limit      int
	Order      string
}

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(db Database, logger *rkentry.LoggerEntry, event *rkentry.EventEntry) *LocalWorker

func (*LocalWorker) Database

func (w *LocalWorker) Database() Database

func (*LocalWorker) Start

func (w *LocalWorker) Start()

func (*LocalWorker) Stop

func (w *LocalWorker) Stop()

type Processor added in v0.0.10

type Processor interface {
	Process(context.Context, *Job, UpdateJobFunc) error
}

type Step added in v0.0.10

type Step struct {
	Index      int           `json:"index" yaml:"index"`
	Name       string        `json:"id" yaml:"id"`
	State      string        `json:"state" yaml:"state"`
	StartedAt  time.Time     `yaml:"startedAt" json:"startedAt"`
	UpdatedAt  time.Time     `yaml:"updatedAt" json:"updatedAt"`
	ElapsedSec float64       `yaml:"elapsedSec" json:"elapsedSec"`
	Output     []*StepOutput `yaml:"output" json:"output"`
}

func (*Step) FailedOutput added in v0.0.10

func (s *Step) FailedOutput(output *StepOutput, startTime time.Time)

func (*Step) Finish added in v0.0.10

func (s *Step) Finish()

func (*Step) SuccessOutput added in v0.0.10

func (s *Step) SuccessOutput(output *StepOutput, startTime time.Time)

type StepOutput added in v0.0.11

type StepOutput struct {
	Message    string  `json:"message,omitempty"`
	ElapsedSec float64 `json:"elapsedSec"`
	Success    bool    `json:"success"`
	RetryCount int     `json:"retryCount"`
	Error      error   `json:"error,omitempty"`
}

type UpdateJobFunc added in v0.0.10

type UpdateJobFunc func(j *Job, state string) error

type Worker

type Worker interface {
	Start()

	Stop()

	Database() Database
}

Directories

Path Synopsis
database
mysql module
postgres module
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL