LLM

package
v1.0.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSubmitter = "default"
	VariedSubmitter  = "varied"
	DirectSubmitter  = "direct" // NEW: Bypasses worker queue, calls HTTP client directly
)

Variables

View Source
var GptClient *gogpt.Client

--- API Clients ---

Functions

This section is empty.

Types

type BackoffManager

type BackoffManager interface {
	ApplyBackoff(workerID int)
	ActivateBackoff(workerID int, retryAfter time.Duration)
	ResetBackoff(workerID int)
}

BackoffManager defines the contract for different backoff strategies.

type BatchJobEntry

type BatchJobEntry struct {
	Job          *Job
	CustomID     string
	AddedAt      time.Time
	EstimatedMem int64 // Estimated memory usage in bytes
}

BatchJobEntry wraps a Job with metadata for batch processing

type BatchManagerConfig

type BatchManagerConfig struct {
	MaxRequestsPerBatch int
	MaxMemoryBytes      int64
	FlushInterval       time.Duration
	BatchClient         *client.BatchClient
	WebhookNotifier     IBatchWebhookNotifier // Optional: for batch completion notifications
}

BatchManagerConfig holds configuration for the batch manager

type BatchManagerStats

type BatchManagerStats struct {
	PendingJobs      int
	TotalJobsQueued  int64
	TotalBatchesSent int64
	CurrentMemoryMB  float64
	LastFlushTime    time.Time
}

BatchManagerStats provides statistics about the batch manager

type BatchWebhookConfig

type BatchWebhookConfig struct {
	WebhookURL    string
	APIKey        string
	HTTPClient    *http.Client
	RetryAttempts int
	RetryDelay    time.Duration
}

BatchWebhookConfig holds configuration for the webhook notifier

type BatchWebhookPayload

type BatchWebhookPayload struct {
	BatchID       string                 `json:"batch_id"`
	Status        string                 `json:"status"`
	JobCount      int                    `json:"job_count"`
	CompletedAt   int64                  `json:"completed_at"`
	Responses     []WebhookResponse      `json:"responses"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
	RequestCounts *client.RequestCounts  `json:"request_counts,omitempty"`
}

BatchWebhookPayload represents the data sent to the webhook endpoint

type DefaultBatchReqManager

type DefaultBatchReqManager struct {
	// contains filtered or unexported fields
}

DefaultBatchReqManager implements batch request management

func (*DefaultBatchReqManager) AddJob

func (m *DefaultBatchReqManager) AddJob(job *Job) error

AddJob adds a job to the batch queue

func (*DefaultBatchReqManager) FlushBatch

func (m *DefaultBatchReqManager) FlushBatch(ctx context.Context) error

FlushBatch sends all pending jobs as a batch request

func (*DefaultBatchReqManager) GetStats

GetStats returns statistics about the batch manager

func (*DefaultBatchReqManager) Start

func (m *DefaultBatchReqManager) Start(ctx context.Context)

Start begins the background flush routine

func (*DefaultBatchReqManager) Stop

Stop gracefully stops the manager and flushes pending requests

type DefaultBatchWebhookNotifier

type DefaultBatchWebhookNotifier struct {
	// contains filtered or unexported fields
}

DefaultBatchWebhookNotifier implements webhook notifications for batch completions

func (*DefaultBatchWebhookNotifier) IsEnabled

func (n *DefaultBatchWebhookNotifier) IsEnabled() bool

IsEnabled returns true if webhook notifications are configured

func (*DefaultBatchWebhookNotifier) NotifyBatchComplete

func (n *DefaultBatchWebhookNotifier) NotifyBatchComplete(
	ctx context.Context,
	batch *client.Batch,
	responses []client.BatchResponse,
	jobs []*BatchJobEntry,
) error

NotifyBatchComplete sends a notification when a batch is completed

type DefaultJobSubmitter

type DefaultJobSubmitter struct{}

func NewDefaultJobSubmitter

func NewDefaultJobSubmitter() *DefaultJobSubmitter

func (*DefaultJobSubmitter) SubmitJob

func (d *DefaultJobSubmitter) SubmitJob(job *Job, workerChannel chan *Job) (any, *gogpt.Usage, error)

type DirectJobSubmitter

type DirectJobSubmitter struct {
	// contains filtered or unexported fields
}

func GetDirectSubmitter

func GetDirectSubmitter() *DirectJobSubmitter

GetDirectSubmitter returns the singleton DirectJobSubmitter instance. This submitter bypasses the worker queue and calls the HTTP client directly,

func InitDirectSubmitter

func InitDirectSubmitter(adapters []clientManager.ClientAdapter, maxConcurrent int, verbose bool) *DirectJobSubmitter

InitDirectSubmitter initializes the DirectSubmitter singleton eagerly at startup.

func NewDirectJobSubmitter

func NewDirectJobSubmitter(adapters []clientManager.ClientAdapter, maxConcurrent int, verbose bool) *DirectJobSubmitter

NewDirectJobSubmitter creates a submitter that calls the HTTP client directly. maxConcurrent: Maximum number of concurrent HTTP requests (default: 3000)

func (*DirectJobSubmitter) GetMaxConcurrent

func (d *DirectJobSubmitter) GetMaxConcurrent() int64

GetMaxConcurrent returns the maximum concurrent requests allowed

func (*DirectJobSubmitter) SubmitJob

func (d *DirectJobSubmitter) SubmitJob(job *Job, _ chan *Job) (any, *gogpt.Usage, error)

SubmitJob processes the job directly without going through the worker queue.

type FIFOQueueManager

type FIFOQueueManager struct {
	// contains filtered or unexported fields
}

FIFOQueueManager implements JobQueue interface with FIFO ordering

func NewFIFOQueueManager

func NewFIFOQueueManager() *FIFOQueueManager

func (*FIFOQueueManager) Dequeue

func (f *FIFOQueueManager) Dequeue() *Job

func (*FIFOQueueManager) Enqueue

func (f *FIFOQueueManager) Enqueue(request *Job)

func (*FIFOQueueManager) Size

func (f *FIFOQueueManager) Size() int

Size returns the current queue size

type IBatchReqManager

type IBatchReqManager interface {
	// AddJob adds a job to the batch queue
	AddJob(job *Job) error
	// FlushBatch forces the current batch to be sent immediately
	FlushBatch(ctx context.Context) error
	// Start begins the background flush routine
	Start(ctx context.Context)
	// Stop gracefully stops the manager and flushes pending requests
	Stop(ctx context.Context) error
	// GetStats returns statistics about the batch manager
	GetStats() *BatchManagerStats
}

IBatchReqManager defines the interface for managing batch requests

func NewBatchReqManager

func NewBatchReqManager(batchClient *client.BatchClient) (IBatchReqManager, error)

NewBatchReqManager creates a new batch request manager with default settings

func NewBatchReqManagerWithConfig

func NewBatchReqManagerWithConfig(config *BatchManagerConfig) (IBatchReqManager, error)

NewBatchReqManagerWithConfig creates a new batch request manager with custom configuration

type IBatchWebhookNotifier

type IBatchWebhookNotifier interface {
	// NotifyBatchComplete sends a notification when a batch is completed
	NotifyBatchComplete(ctx context.Context, batch *client.Batch, responses []client.BatchResponse, jobs []*BatchJobEntry) error
	// IsEnabled returns true if webhook notifications are configured
	IsEnabled() bool
}

IBatchWebhookNotifier defines the interface for sending batch completion notifications

func NewBatchWebhookNotifier

func NewBatchWebhookNotifier(config *BatchWebhookConfig) IBatchWebhookNotifier

NewBatchWebhookNotifier creates a new webhook notifier with custom configuration

func NewBatchWebhookNotifierFromEnv

func NewBatchWebhookNotifierFromEnv() IBatchWebhookNotifier

NewBatchWebhookNotifierFromEnv creates a webhook notifier from environment variables

type IJobQueue

type IJobQueue interface {
	Enqueue(request *Job)
	Dequeue() *Job
	Size() int
}

IJobQueue defines the contract for different queue implementations

func NewJobQueueByType

func NewJobQueueByType(queueType QueueType) IJobQueue

NewJobQueueByType creates a new JobQueueInterface based on the specified type

type IJobQueueManager

type IJobQueueManager interface {
	StartManager(wg *sync.WaitGroup)
	Jobs() <-chan *Job
	StopManager()
	Enqueue(request *Job)
	Dequeue() *Job
}

type Job

type Job struct {
	Result   chan *domain.JobResult
	Tokens   int
	Inputs   *llmManagement.Inputs
	Error    chan error
	Retries  int   // Tracks the number of retry attempts for transient errors.
	Priority int32 // For batch processing: <0 = send to batch queue, >=0 = direct processing
}

type JobQueue

type JobQueue struct {
	// contains filtered or unexported fields
}

JobQueue manages the flow of jobs from ingestion to workers.

func NewJobQueue

func NewJobQueue(concurrency, maxQueueSize int) *JobQueue

func (*JobQueue) Enqueue

func (q *JobQueue) Enqueue(job *Job)

Enqueue adds a job to the end of the queue.

func (*JobQueue) Jobs

func (q *JobQueue) Jobs() <-chan *Job

func (*JobQueue) Prepend

func (q *JobQueue) Prepend(job *Job)

Prepend adds a job to the front of the queue for immediate retry.

func (*JobQueue) StartManager

func (q *JobQueue) StartManager(wg *sync.WaitGroup)

StartManager begins moving jobs from the internal FIFO queue to the worker channel. PERFORMANCE FIX: Removed 100ms timeout that was causing 18-second delays under load

func (*JobQueue) StopManager

func (q *JobQueue) StopManager()

type JobQueueManager

type JobQueueManager struct {
	// contains filtered or unexported fields
}

JobQueue manages the flow of jobs from ingestion to workers.

func NewJobQueueManager

func NewJobQueueManager(concurrency, maxQueueSize int, jobQueue IJobQueue) *JobQueueManager

func (*JobQueueManager) Dequeue

func (q *JobQueueManager) Dequeue() *Job

func (*JobQueueManager) Enqueue

func (q *JobQueueManager) Enqueue(request *Job)

func (*JobQueueManager) Jobs

func (q *JobQueueManager) Jobs() <-chan *Job

func (*JobQueueManager) StartManager

func (q *JobQueueManager) StartManager(wg *sync.WaitGroup)

StartManager begins moving jobs from the internal FIFO queue to the worker channel.

func (*JobQueueManager) StopManager

func (q *JobQueueManager) StopManager()

type JobSumitter

type JobSumitter interface {
	SubmitJob(job *Job, workerChannel chan *Job) (any, *openai.Usage, error)
}

ClientAdapter defines the component that actually processes the job's data.

func JobSubmitterFactory

func JobSubmitterFactory(submitterType string) JobSumitter

type OrchestrationService

type OrchestrationService interface {
	Stop()
	GetJobQueueManager() IJobQueueManager
}

OrchestrationService defines the public interface for the rate limiting and job processing service. This allows us to hide the internal Orchestrator implementation from the consumer.

var (
	// OpenAI Services & Channels
	Orchestator   OrchestrationService
	WorkerChannel chan *Job
)

func NewOrchestrationService

func NewOrchestrationService(
	wg *sync.WaitGroup,
	maxTokensPerMinute int,
	maxRequestsPerMinute int,
	clientAdapter clientManager.ClientAdapter,
	strategy backoff.BackoffStrategy,
) OrchestrationService

NewOrchestrationService creates, configures, and starts the entire job processing pipeline. It wires together all the necessary components based on the provided configuration.

type Orchestrator

type Orchestrator struct {
	// contains filtered or unexported fields
}

Orchestrator coordinates workers, queues, rate limiting, and error handling.

func NewOrchestrator

func NewOrchestrator(
	config OrchestratorConfig,
	handler clientManager.ClientAdapter,
	queue IJobQueueManager,
	backoffManager BackoffManager,
	retryHandler *RetryHandler,
	classifier *backoff.ErrorClassifier,
) *Orchestrator

NewOrchestrator creates and wires up a new Orchestrator instance.

func (*Orchestrator) GetBatchManager

func (o *Orchestrator) GetBatchManager() IBatchReqManager

GetBatchManager returns the batch manager if configured

func (*Orchestrator) GetJobQueueManager

func (o *Orchestrator) GetJobQueueManager() IJobQueueManager

func (*Orchestrator) IsBatchProcessingEnabled

func (o *Orchestrator) IsBatchProcessingEnabled() bool

IsBatchProcessingEnabled returns whether batch processing is enabled

func (*Orchestrator) SetBatchManager

func (o *Orchestrator) SetBatchManager(batchManager IBatchReqManager)

SetBatchManager sets the batch manager for handling low-priority jobs. This should be called after creating the orchestrator if batch processing is enabled.

func (*Orchestrator) StartProcessing

func (o *Orchestrator) StartProcessing()

StartProcessing begins the worker pool and the queue manager.

func (*Orchestrator) Stop

func (o *Orchestrator) Stop()

Stop gracefully shuts down the orchestrator and waits for workers to finish.

func (*Orchestrator) SubmitJobWithRouting

func (o *Orchestrator) SubmitJobWithRouting(job *Job) error

SubmitJobWithRouting routes jobs to either batch processing or real-time processing based on the job's priority and configuration.

type OrchestratorConfig

type OrchestratorConfig struct {
	Concurrency            int
	MaxTokensPerMinute     int
	MaxRequestsPerMinute   int
	MaxQueueSize           int
	Verbose                bool
	EnableBatchProcessing  bool  // Enable batch processing for low-priority jobs
	BatchPriorityThreshold int32 // Jobs with priority below this go to batch (default: 0)
}

OrchestratorConfig holds the configuration for the Orchestrator.

type PriorityQueue

type PriorityQueue []*Job

func (PriorityQueue) Len

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less

func (pq PriorityQueue) Less(i, j int) bool

func (*PriorityQueue) Pop

func (pq *PriorityQueue) Pop() any

func (*PriorityQueue) Push

func (pq *PriorityQueue) Push(x any)

func (PriorityQueue) Swap

func (pq PriorityQueue) Swap(i, j int)

type QueueType

type QueueType string

QueueType represents the type of queue to create

const (
	QueueTypeFIFO     QueueType = "fifo"
	QueueTypePriority QueueType = "priority"
)

type RequestQueueManager

type RequestQueueManager struct {
	// contains filtered or unexported fields
}

RequestQueueManager implements JobQueue interface with priority-based ordering

func NewRequestQueueManager

func NewRequestQueueManager() *RequestQueueManager

func (*RequestQueueManager) Dequeue

func (r *RequestQueueManager) Dequeue() *Job

func (*RequestQueueManager) Enqueue

func (r *RequestQueueManager) Enqueue(request *Job)

func (*RequestQueueManager) Size

func (r *RequestQueueManager) Size() int

Size returns the current queue size

type RetryHandler

type RetryHandler struct {
	MaxTransientRetries int
	Verbose             bool
}

RetryHandler manages retry logic for non-rate-limit errors.

func NewRetryHandler

func NewRetryHandler(maxRetries int, verbose bool) *RetryHandler

func (*RetryHandler) HandlePermanentError

func (rh *RetryHandler) HandlePermanentError(job *Job, workerID int, err error)

func (*RetryHandler) HandleTransientError

func (rh *RetryHandler) HandleTransientError(job *Job, queue IJobQueueManager, workerID int, err error)

type WebhookError

type WebhookError struct {
	Code    string `json:"code"`
	Message string `json:"message"`
}

WebhookError represents an error in the webhook payload

type WebhookResponse

type WebhookResponse struct {
	CustomID  string                 `json:"custom_id"`
	Success   bool                   `json:"success"`
	Response  map[string]interface{} `json:"response,omitempty"`
	Error     *WebhookError          `json:"error,omitempty"`
	Timestamp int64                  `json:"timestamp"`
}

WebhookResponse represents a single response in the webhook payload

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL