process

package
v0.5.2-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RunFeatureExtraction

func RunFeatureExtraction(config *config.Config, pool *pgxpool.Pool) error

func RunJobQueue

func RunJobQueue(queue *JobQueue, conn *pgxpool.Pool, cfg *config.Config) error

RunJobQueue drains a JobQueue into a channel and processes it with workers.

Types

type Job

type Job struct {
	JobID       uuid.UUID `json:"job_id"`
	BuildingIDs []int64   `json:"building_ids"`
	Tasks       []*Task   `json:"tasks"`
	EnqueuedAt  time.Time `json:"enqueued_at"` // Timestamp when the job was enqueued
	CreatedAt   time.Time `json:"created_at"`  // Creation timestamp
}

Job represents a batch of building IDs and the sequence of tasks to execute for them

func NewJob

func NewJob(buildingIDs []int64, tasks []*Task) *Job

NewJob creates a new Job instance

func (*Job) AddTask

func (j *Job) AddTask(task *Task)

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

JobQueue represents a queue of jobs to be processed by the worker pool.

func BuildFeatureExtractionQueue

func BuildFeatureExtractionQueue(
	config *config.Config,
	lod2Batches [][]int64,
	lod3Batches [][]int64,
) (*JobQueue, error)

BuildFeatureExtractionQueue creates a queue of jobs (one per batch) for both LOD2 and LOD3 building IDs.

func MainDBSetupJobQueue

func MainDBSetupJobQueue(config *config.Config) (*JobQueue, error)

MainDBSetupJobQueue creates a queue with function scripts and main table scripts

func NewJobQueue

func NewJobQueue() *JobQueue

NewJobQueue initializes an empty queue

func SupplementaryDBSetupJobQueue

func SupplementaryDBSetupJobQueue(config *config.Config) (*JobQueue, error)

SupplementaryDBSetupJobQueue creates a queue with supplementary table scripts

func SupplementaryJobQueue

func SupplementaryJobQueue(config *config.Config) (*JobQueue, error)

SupplementaryJobQueue creates a queue with supplementary processing scripts

func (*JobQueue) Clear

func (q *JobQueue) Clear()

Clear removes all jobs from the queue

func (*JobQueue) Dequeue

func (q *JobQueue) Dequeue() *Job

Dequeue removes and returns the first job

func (*JobQueue) Enqueue

func (q *JobQueue) Enqueue(job *Job)

Enqueue adds a job to the queue

func (*JobQueue) IsEmpty

func (q *JobQueue) IsEmpty() bool

IsEmpty checks if the job queue is empty

func (*JobQueue) Len

func (q *JobQueue) Len() int

Len returns the number of jobs in the queue

func (*JobQueue) Peek

func (q *JobQueue) Peek() *Job

Peek returns the first job without removing it

type JobType

type JobType string

JobType represents the category of a job

const (
	LOD2               JobType = "lod2"
	LOD3               JobType = "lod3"
	Function           JobType = "function"
	MainTable          JobType = "main_table"
	Supplementary      JobType = "supplementary"
	SupplementaryTable JobType = "supplementary_table"
)

type Params

type Params struct {
	BuildingIDs []int64 `json:"building_ids"`
}

All required parameters for any SQL task

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner handles execution of tasks and jobs

func NewRunner

func NewRunner(config *config.Config) *Runner

NewRunner creates a new runner instance

func (*Runner) RunJob

func (r *Runner) RunJob(job *Job, conn *pgxpool.Pool, workerID int) error

RunJob executes all tasks in a job in priority order

func (*Runner) RunTaskWithRetry

func (r *Runner) RunTaskWithRetry(task *Task, conn *pgxpool.Pool, config *config.Config, workerID int) error

RunTaskWithRetry executes a single task with retry logic

type Task

type Task struct {
	TaskID    uuid.UUID `json:"task_id"`    // Unique identifier for the task
	TaskType  string    `json:"task_type"`  // e.g. "LOD2: 01_extract.sql"
	Params    Params    `json:"params"`     // Parameters for the task
	SQLFile   string    `json:"sql_file"`   // SQL file path
	Priority  int       `json:"priority"`   // Task priority (lower number = higher priority)
	CreatedAt time.Time `json:"created_at"` // Creation timestamp
}

Task represents a single parameterised SQL script execution

func NewTask

func NewTask(taskType string, params Params, SQLFile string, priority int) *Task

NewTask creates a new Task instance

type Worker

type Worker struct {
	ID int
}

func NewWorker

func NewWorker(id int) *Worker

func (*Worker) Start

func (w *Worker) Start(jobChan <-chan *Job, conn *pgxpool.Pool, wg *sync.WaitGroup, config *config.Config)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL