rkasync

package module
v0.0.32 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 11 Imported by: 3

README

rk-async

Cloud native async job framework

Documentation

Index

Constants

View Source
const (
	JobStateCreated  = "created"
	JobStateRunning  = "running"
	JobStateCanceled = "canceled"
	JobStateSuccess  = "success"
	JobStateFailed   = "failed"
)
View Source
const (
	LoggerKey = "rk-async-logger"
	EventKey  = "rk-async-event"
)

Variables

This section is empty.

Functions

func GetEventFromCtx added in v0.0.3

func GetEventFromCtx(ctx context.Context) rkquery.Event

func GetLoggerFromCtx added in v0.0.3

func GetLoggerFromCtx(ctx context.Context) *zap.Logger

func RegisterDatabaseRegFunc

func RegisterDatabaseRegFunc(dbType string, f func(map[string]string, *zap.Logger) Database)

func RegisterEntriesFromConfig

func RegisterEntriesFromConfig(raw []byte) map[string]rkentry.Entry

Types

type BootConfig

type BootConfig struct {
	Async struct {
		Enabled  bool   `json:"enabled" yaml:"enabled"`
		Logger   string `json:"logger" yaml:"logger"`
		Event    string `json:"event" yaml:"event"`
		Database struct {
			MySql struct {
				Enabled   bool   `json:"enabled" yaml:"enabled"`
				EntryName string `json:"entryName" yaml:"entryName"`
				Database  string `json:"database" yaml:"database"`
			} `yaml:"mySql" json:"mySql"`
			Postgres struct {
				Enabled   bool   `json:"enabled" yaml:"enabled"`
				EntryName string `json:"entryName" yaml:"entryName"`
				Database  string `json:"database" yaml:"database"`
			} `yaml:"postgres" json:"postgres"`
		} `yaml:"database" json:"database"`
		Worker struct {
			Local struct {
				Enabled bool `json:"enabled" yaml:"enabled"`
			} `yaml:"local" json:"local"`
		} `yaml:"worker" json:"worker"`
	} `yaml:"async" json:"async"`
}

type Database

type Database interface {
	Type() string

	AddJob(job *Job) error

	DeleteJob(jobId string) error

	RegisterProcessor(jobType string, processor Processor)

	GetProcessor(jobType string) Processor

	PickJobToWorkWithId(jobId string) (*Job, error)

	PickJobToWork() (*Job, error)

	UpdateJobState(job *Job) error

	UpdateJobPayloadAndStep(job *Job) error

	ListJobs(filter *JobFilter) ([]*Job, int, error)

	GetJob(id string) (*Job, error)

	CancelJobsOverdue(days int, filter *JobFilter) error

	CleanJobs(days int, filter *JobFilter) error
}

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

func GetEntry

func GetEntry() *Entry

func (*Entry) AddJob

func (e *Entry) AddJob(job *Job) error

func (*Entry) Bootstrap

func (e *Entry) Bootstrap(ctx context.Context)

func (*Entry) CancelJob added in v0.0.24

func (e *Entry) CancelJob(job *Job) error

func (*Entry) CancelJobsOverdue

func (e *Entry) CancelJobsOverdue(days int, filter *JobFilter) error

func (*Entry) CleanJobs

func (e *Entry) CleanJobs(days int, filter *JobFilter) error

func (*Entry) Database

func (e *Entry) Database() Database

func (*Entry) DeleteJob added in v0.0.29

func (e *Entry) DeleteJob(jobId string) error

func (*Entry) FinishJob added in v0.0.24

func (e *Entry) FinishJob(job *Job, success bool) error

func (*Entry) GetDescription

func (e *Entry) GetDescription() string

func (*Entry) GetJob

func (e *Entry) GetJob(id string) (*Job, error)

func (*Entry) GetName

func (e *Entry) GetName() string

func (*Entry) GetType

func (e *Entry) GetType() string

func (*Entry) Interrupt

func (e *Entry) Interrupt(ctx context.Context)

func (*Entry) ListJobs

func (e *Entry) ListJobs(filter *JobFilter) ([]*Job, int, error)

func (*Entry) StartJob added in v0.0.24

func (e *Entry) StartJob(job *Job) error

func (*Entry) StartWorker added in v0.0.3

func (e *Entry) StartWorker()

func (*Entry) StopWorker added in v0.0.3

func (e *Entry) StopWorker(force bool, waitSec int)

func (*Entry) String

func (e *Entry) String() string

func (*Entry) UpdateJobPayloadAndStep added in v0.0.24

func (e *Entry) UpdateJobPayloadAndStep(job *Job) error

func (*Entry) Worker

func (e *Entry) Worker() Worker

type Job

type Job struct {
	// do not edit
	Id              string    `json:"id" yaml:"id" gorm:"primaryKey"`
	InvokedRole     string    `json:"invokedRole" yaml:"invokedRole" gorm:"index"`
	InvokedInstance string    `json:"invokedInstance" yaml:"invokedInstance" gorm:"index"`
	State           string    `json:"state" yaml:"state" gorm:"index"`
	CreatedAt       time.Time `yaml:"createdAt" json:"createdAt" attr:"-"`
	UpdatedAt       time.Time `yaml:"updatedAt" json:"updatedAt"`

	// edit
	Type   string `json:"type" yaml:"type" gorm:"index"`
	UserId string `json:"userId" yaml:"userId" gorm:"index"`

	Filter string `json:"filter" yaml:"filter" gorm:"text"`

	Steps   datatypes.JSONType[[]*Step]     `json:"steps" yaml:"steps"`
	Payload datatypes.JSONType[interface{}] `json:"payload" yaml:"payload"`
}

func (*Job) TableName added in v0.0.10

func (j *Job) TableName() string

type JobFilter

type JobFilter struct {
	ClauseList []clause.Expression
	Limit      int
	Offset     int
	Order      string
}

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(db Database, logger *rkentry.LoggerEntry, event *rkentry.EventEntry) *LocalWorker

func (*LocalWorker) Database

func (w *LocalWorker) Database() Database

func (*LocalWorker) Start

func (w *LocalWorker) Start()

func (*LocalWorker) Stop

func (w *LocalWorker) Stop(force bool, waitSec int)

type NewRecorderF added in v0.0.20

type NewRecorderF func() *Recorder

type Processor added in v0.0.10

type Processor interface {
	Process(context.Context, *Job, UpdateJobFunc) error
}

type Recorder added in v0.0.20

type Recorder struct {
	// contains filtered or unexported fields
}

func (*Recorder) Info added in v0.0.20

func (r *Recorder) Info(s string)

func (*Recorder) Title added in v0.0.20

func (r *Recorder) Title(s string)

func (*Recorder) Warn added in v0.0.20

func (r *Recorder) Warn(s string)

type Step added in v0.0.10

type Step struct {
	Index      int       `json:"index" yaml:"index"`
	Name       string    `json:"id" yaml:"id"`
	State      string    `json:"state" yaml:"state"`
	StartedAt  time.Time `yaml:"startedAt" json:"startedAt"`
	ElapsedSec float64   `yaml:"elapsedSec" json:"elapsedSec"`
	Output     []string  `yaml:"output" json:"output"`

	PersistFunc func() `json:"-" yaml:"-"`

	Lock sync.Mutex `json:"-" yaml:"-"`
}

func (*Step) Finish added in v0.0.10

func (s *Step) Finish(state string)

func (*Step) NewRecorder added in v0.0.20

func (s *Step) NewRecorder() *Recorder

type UpdateJobFunc added in v0.0.10

type UpdateJobFunc func(j *Job) error

type Worker

type Worker interface {
	Start()

	Stop(force bool, waitSec int)

	Database() Database
}

Directories

Path Synopsis
database
mysql Module
postgres Module
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL