process

package
v0.0.0-...-bf83fb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ECode030101 = e.Code0301 + "01"
	ECode030102 = e.Code0301 + "02"
	ECode030103 = e.Code0301 + "03"
	ECode030104 = e.Code0301 + "04"
	ECode030105 = e.Code0301 + "05"
	ECode030106 = e.Code0301 + "06"
	ECode030107 = e.Code0301 + "07"
	ECode030108 = e.Code0301 + "08"
	ECode030109 = e.Code0301 + "09"
	ECode03010A = e.Code0301 + "0A"
	ECode03010B = e.Code0301 + "0B"
	ECode03010C = e.Code0301 + "0C"
	ECode03010D = e.Code0301 + "0D"
	ECode03010E = e.Code0301 + "0E"
	ECode03010F = e.Code0301 + "0F"
	ECode03010G = e.Code0301 + "0G"
	ECode03010H = e.Code0301 + "0H"
	ECode03010I = e.Code0301 + "0I"
)
View Source
const (
	ECode030401 = e.Code0304 + "01"
	ECode030402 = e.Code0304 + "02"
	ECode030403 = e.Code0304 + "03"
	ECode030404 = e.Code0304 + "04"
	ECode030405 = e.Code0304 + "05"
	ECode030406 = e.Code0304 + "06"
	ECode030407 = e.Code0304 + "07"
	ECode030408 = e.Code0304 + "08"
	ECode030409 = e.Code0304 + "09"
	ECode03040A = e.Code0304 + "0A"
	ECode03040B = e.Code0304 + "0B"
	ECode03040C = e.Code0304 + "0C"
	ECode03040D = e.Code0304 + "0D"
	ECode03040E = e.Code0304 + "0E"
	ECode03040F = e.Code0304 + "0F"
	ECode03040G = e.Code0304 + "0G"
)
View Source
const (
	MIGRATION_CODE = "process"
)

Variables

This section is empty.

Functions

func GetMigrationList

func GetMigrationList() (ml *migration.List)

GetMigrationList returns this packages migration list

Types

type Logger

type Logger struct {
	Code     string    // The process code
	ShowTime bool      // Whether to show process run times when logging
	Time     time.Time // Baseline for how long the process has been running
}

Logger helper for logging process messages

func NewLogger

func NewLogger(code string) (l *Logger)

NewLogger initialize a new process log. Defaults to show time in logs

func (*Logger) Error

func (l *Logger) Error(msg string, args ...interface{})

Error helper to use zerolog error

func (*Logger) Info

func (l *Logger) Info(msg string, args ...interface{})

Info helper to use zerolog info

func (*Logger) Log

func (l *Logger) Log(ze *zerolog.Event, msg string, args ...interface{})

Log writes to the logger

func (*Logger) ResetTime

func (l *Logger) ResetTime()

ResetTime resets the time to now

func (*Logger) Warn

func (l *Logger) Warn(msg string, args ...interface{})

Warn helper to use zerolog warn

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is used to create a singleton process. It ensures only one process is running at a time.

func NewProcessor

func NewProcessor(db *sql.Connection) (p *Processor)

NewDataProcess returns a new instance of a processor

func (*Processor) Deregister

func (p *Processor) Deregister(code string) (err error)

Deregister will permanently remove the specified code from the process table

func (*Processor) Register

func (p *Processor) Register(code, name string, f func() error) (err error)

Register will register the process. If the process is already registered, it will return an error. If the process does not exist, it will create it. The application using this package should register all processes on start to ensure they exist before trying to call them.

The run function will be invoked when the process is called later. It creates a lock on the process (in the database) to ensure only one can run at a time.The run func should define all data processing that needs to occur for this run.

func (*Processor) RegisterWithInterval

func (p *Processor) RegisterWithInterval(code, name string, interval time.Duration, f func() error) (err error)

RegisterWithInterval will register the process with the specified interval. If the process is already registered, it will return an error. If the process does not exist, it will create it. The application using this package should register all processes on start to ensure they exist before trying to call them.

The run function will be invoked when the process is called later. It creates a lock on the process (in the database) to ensure only one can run at a time. The run func should define all data processing that needs to occur for this process.

func (*Processor) Run

func (p *Processor) Run(code string) (rr *RunResponse, err error)

Run executes the registered process. If it has not been registered, it will return an error. It will return a response indicating if it was skipped. If yes, it will include a reason. If no, it will include the run details.

func (*Processor) SetRunInterval

func (p *Processor) SetRunInterval(code string, interval time.Duration) (err error)

SetRunInterval sets the interval, defining how often a process should run. It will also reset the next run time to the last run time + the new interval

type Queue

type Queue struct {
	MaxGoRoutines uint // Defines the maximum number of go routines to run at a time
	// contains filtered or unexported fields
}

Queue used to process data in parallel using a queue to limit the number of go routines allowed to run at a given time. A function to get the data (getQueue) and a function to handle each record of the getQueue must be set.

func NewQueue

func NewQueue(qc *QueueConfig) (q *Queue)

NewQueue creates a new queue to run data processing in parallel and uses the max queue to define the maximum number of go routines allowed

func (*Queue) Add

func (q *Queue) Add(item interface{}) (err error)

Add adds the passed item to the queue. This func must be called in the QueueGenerator's FillQueue in order to populate the queue.

func (*Queue) Run

func (q *Queue) Run() (err error)

Run processes the queue, calling the queue generator's FillQueue method to get all of the records to process and the generator's HandleQueueItem to process each record in the queue.

type QueueConfig

type QueueConfig struct {
	Generator     QueueGenerator
	MaxGoRoutines uint
}

type QueueGenerator

type QueueGenerator interface {
	// SetQueue is called in the NewQueue func. It should assign the queue to a property
	// of the QueueGenerator, so that the FillQueue func of the generator can call the queue's
	// Add func
	SetQueue(*Queue)
	// FillQueue defines how the queue is populated. It must retrieve the records to be
	// added and call the Queue's Add func to add the records to the queue.
	FillQueue() error
	// HandleQueueItem defines what to do with each record in the queue. It will
	// be called as the queue is read
	HandleQueueItem(item interface{}) (err error)
}

QueueGenerator defines how the queue gets populated and how each record is handled

type RunResponse

type RunResponse struct {
	Skipped    bool              // Indicates if skipped
	SkipReason string            // Indicates why it was skipped
	Run        *model.ProcessRun // The run itself
}

RunResponse the response returned after running a process

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL