rkasync

package module
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 17, 2023 License: Apache-2.0 Imports: 8 Imported by: 3

README

rk-async

Cloud native async job framework

Documentation

Index

Constants

View Source
const (
	JobStateCreated  = "created"
	JobStateRunning  = "running"
	JobStateCanceled = "canceled"
	JobStateSuccess  = "success"
	JobStateFailed   = "failed"
)
View Source
const (
	LoggerKey = "rk-async-logger"
	EventKey  = "rk-async-event"
)

Variables

This section is empty.

Functions

func GetEventFromCtx added in v0.0.3

func GetEventFromCtx(ctx context.Context) rkquery.Event

func GetLoggerFromCtx added in v0.0.3

func GetLoggerFromCtx(ctx context.Context) *zap.Logger

func JobNewStateAllowed

func JobNewStateAllowed(oldState, newState string) bool

func RegisterDatabaseRegFunc

func RegisterDatabaseRegFunc(dbType string, f func(map[string]string) Database)

func RegisterEntriesFromConfig

func RegisterEntriesFromConfig(raw []byte) map[string]rkentry.Entry

Types

type BootConfig

type BootConfig struct {
	Async struct {
		Enabled  bool   `json:"enabled" yaml:"enabled"`
		Logger   string `json:"logger" yaml:"logger"`
		Event    string `json:"event" yaml:"event"`
		Database struct {
			MySql struct {
				Enabled   bool   `json:"enabled" yaml:"enabled"`
				EntryName string `json:"entryName" yaml:"entryName"`
				Database  string `json:"database" yaml:"database"`
			} `yaml:"mySql" json:"mySql"`
			Postgres struct {
				Enabled   bool   `json:"enabled" yaml:"enabled"`
				EntryName string `json:"entryName" yaml:"entryName"`
				Database  string `json:"database" yaml:"database"`
			} `yaml:"postgres" json:"postgres"`
		} `yaml:"database" json:"database"`
		Worker struct {
			Local struct {
				Enabled bool `json:"enabled" yaml:"enabled"`
			} `yaml:"local" json:"local"`
		} `yaml:"worker" json:"worker"`
	} `yaml:"async" json:"async"`
}

type Database

type Database interface {
	Type() string

	RegisterJob(job Job)

	AddJob(job Job) error

	PickJobToWork() (Job, error)

	UpdateJobState(job Job, state string) error

	ListJobs(filter *JobFilter) ([]Job, error)

	GetJob(id string) (Job, error)

	CancelJobsOverdue(days int, filter *JobFilter) error

	CleanJobs(days int, filter *JobFilter) error
}

type Entry

type Entry struct {
	// contains filtered or unexported fields
}

func GetEntry

func GetEntry() *Entry

func (*Entry) AddJob

func (e *Entry) AddJob(job Job) error

func (*Entry) Bootstrap

func (e *Entry) Bootstrap(ctx context.Context)

func (*Entry) CancelJobsOverdue

func (e *Entry) CancelJobsOverdue(days int, filter *JobFilter) error

func (*Entry) CleanJobs

func (e *Entry) CleanJobs(days int, filter *JobFilter) error

func (*Entry) Database

func (e *Entry) Database() Database

func (*Entry) GetDescription

func (e *Entry) GetDescription() string

func (*Entry) GetJob

func (e *Entry) GetJob(id string) (Job, error)

func (*Entry) GetName

func (e *Entry) GetName() string

func (*Entry) GetType

func (e *Entry) GetType() string

func (*Entry) Interrupt

func (e *Entry) Interrupt(ctx context.Context)

func (*Entry) ListJobs

func (e *Entry) ListJobs(filter *JobFilter) ([]Job, error)

func (*Entry) RegisterJob

func (e *Entry) RegisterJob(job Job)

func (*Entry) StartWorker added in v0.0.3

func (e *Entry) StartWorker()

func (*Entry) StopWorker added in v0.0.3

func (e *Entry) StopWorker()

func (*Entry) String

func (e *Entry) String() string

func (*Entry) UpdateJobState

func (e *Entry) UpdateJobState(job Job, state string) error

func (*Entry) Worker

func (e *Entry) Worker() Worker

type Job

type Job interface {
	Process(context.Context) error

	Meta() *JobMeta

	Marshal() ([]byte, error)

	Unmarshal([]byte, *JobMeta) (Job, error)
}

type JobFilter

type JobFilter struct {
	TypeList     []string
	UserList     []string
	ClassList    []string
	CategoryList []string
	Limit        int
}

func NewJobFilter

func NewJobFilter() *JobFilter

func (*JobFilter) AddCategory

func (f *JobFilter) AddCategory(in string)

func (*JobFilter) AddClass

func (f *JobFilter) AddClass(in string)

func (*JobFilter) AddType

func (f *JobFilter) AddType(in string)

func (*JobFilter) AddUser

func (f *JobFilter) AddUser(in string)

type JobMeta

type JobMeta struct {
	// do not edit
	Id        string    `json:"id" yaml:"id" gorm:"primaryKey"`
	State     string    `json:"state" yaml:"state" gorm:"index"`
	CreatedAt time.Time `yaml:"createdAt" json:"createdAt" attr:"-"`
	UpdatedAt time.Time `yaml:"updatedAt" json:"updatedAt" attr:"-"`

	// edit
	Type     string `json:"type" yaml:"type" gorm:"index"`
	User     string `json:"user" yaml:"user" gorm:"index"`
	Class    string `json:"class" yaml:"class" gorm:"index"`
	Category string `json:"category" yaml:"category" gorm:"index"`

	// error
	Error string `json:"error" yaml:"error" gorm:"text"`
}

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(db Database, logger *rkentry.LoggerEntry, event *rkentry.EventEntry) *LocalWorker

func (*LocalWorker) Database

func (w *LocalWorker) Database() Database

func (*LocalWorker) Start

func (w *LocalWorker) Start()

func (*LocalWorker) Stop

func (w *LocalWorker) Stop()

type UnmarshalerFunc

type UnmarshalerFunc func([]byte, *JobMeta) (Job, error)

type Worker

type Worker interface {
	Start()

	Stop()

	Database() Database
}

Directories

Path Synopsis
database
mysql module
postgres module
example module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL