Documentation
¶
Index ¶
- Constants
- Variables
- type BackoffManager
- type BatchJobEntry
- type BatchManagerConfig
- type BatchManagerStats
- type BatchWebhookConfig
- type BatchWebhookPayload
- type DefaultBatchReqManager
- func (m *DefaultBatchReqManager) AddJob(job *Job) error
- func (m *DefaultBatchReqManager) FlushBatch(ctx context.Context) error
- func (m *DefaultBatchReqManager) GetStats() *BatchManagerStats
- func (m *DefaultBatchReqManager) Start(ctx context.Context)
- func (m *DefaultBatchReqManager) Stop(ctx context.Context) error
- type DefaultBatchWebhookNotifier
- type DefaultJobSubmitter
- type DirectJobSubmitter
- type FIFOQueueManager
- type IBatchReqManager
- type IBatchWebhookNotifier
- type IJobQueue
- type IJobQueueManager
- type Job
- type JobQueue
- type JobQueueManager
- type JobSumitter
- type OrchestrationService
- type Orchestrator
- func (o *Orchestrator) GetBatchManager() IBatchReqManager
- func (o *Orchestrator) GetJobQueueManager() IJobQueueManager
- func (o *Orchestrator) IsBatchProcessingEnabled() bool
- func (o *Orchestrator) SetBatchManager(batchManager IBatchReqManager)
- func (o *Orchestrator) StartProcessing()
- func (o *Orchestrator) Stop()
- func (o *Orchestrator) SubmitJobWithRouting(job *Job) error
- type OrchestratorConfig
- type PriorityQueue
- type QueueType
- type RequestQueueManager
- type RetryHandler
- type WebhookError
- type WebhookResponse
Constants ¶
const ( DefaultSubmitter = "default" VariedSubmitter = "varied" DirectSubmitter = "direct" // NEW: Bypasses worker queue, calls HTTP client directly )
Variables ¶
var GptClient *gogpt.Client
--- API Clients ---
Functions ¶
This section is empty.
Types ¶
type BackoffManager ¶
type BackoffManager interface {
ApplyBackoff(workerID int)
ActivateBackoff(workerID int, retryAfter time.Duration)
ResetBackoff(workerID int)
}
BackoffManager defines the contract for different backoff strategies.
type BatchJobEntry ¶
type BatchJobEntry struct {
Job *Job
CustomID string
AddedAt time.Time
EstimatedMem int64 // Estimated memory usage in bytes
}
BatchJobEntry wraps a Job with metadata for batch processing
type BatchManagerConfig ¶
type BatchManagerConfig struct {
MaxRequestsPerBatch int
MaxMemoryBytes int64
FlushInterval time.Duration
BatchClient *client.BatchClient
WebhookNotifier IBatchWebhookNotifier // Optional: for batch completion notifications
}
BatchManagerConfig holds configuration for the batch manager
type BatchManagerStats ¶
type BatchManagerStats struct {
PendingJobs int
TotalJobsQueued int64
TotalBatchesSent int64
CurrentMemoryMB float64
LastFlushTime time.Time
}
BatchManagerStats provides statistics about the batch manager
type BatchWebhookConfig ¶
type BatchWebhookConfig struct {
WebhookURL string
APIKey string
HTTPClient *http.Client
RetryAttempts int
RetryDelay time.Duration
}
BatchWebhookConfig holds configuration for the webhook notifier
type BatchWebhookPayload ¶
type BatchWebhookPayload struct {
BatchID string `json:"batch_id"`
Status string `json:"status"`
JobCount int `json:"job_count"`
CompletedAt int64 `json:"completed_at"`
Responses []WebhookResponse `json:"responses"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
RequestCounts *client.RequestCounts `json:"request_counts,omitempty"`
}
BatchWebhookPayload represents the data sent to the webhook endpoint
type DefaultBatchReqManager ¶
type DefaultBatchReqManager struct {
// contains filtered or unexported fields
}
DefaultBatchReqManager implements batch request management
func (*DefaultBatchReqManager) AddJob ¶
func (m *DefaultBatchReqManager) AddJob(job *Job) error
AddJob adds a job to the batch queue
func (*DefaultBatchReqManager) FlushBatch ¶
func (m *DefaultBatchReqManager) FlushBatch(ctx context.Context) error
FlushBatch sends all pending jobs as a batch request
func (*DefaultBatchReqManager) GetStats ¶
func (m *DefaultBatchReqManager) GetStats() *BatchManagerStats
GetStats returns statistics about the batch manager
func (*DefaultBatchReqManager) Start ¶
func (m *DefaultBatchReqManager) Start(ctx context.Context)
Start begins the background flush routine
type DefaultBatchWebhookNotifier ¶
type DefaultBatchWebhookNotifier struct {
// contains filtered or unexported fields
}
DefaultBatchWebhookNotifier implements webhook notifications for batch completions
func (*DefaultBatchWebhookNotifier) IsEnabled ¶
func (n *DefaultBatchWebhookNotifier) IsEnabled() bool
IsEnabled returns true if webhook notifications are configured
func (*DefaultBatchWebhookNotifier) NotifyBatchComplete ¶
func (n *DefaultBatchWebhookNotifier) NotifyBatchComplete( ctx context.Context, batch *client.Batch, responses []client.BatchResponse, jobs []*BatchJobEntry, ) error
NotifyBatchComplete sends a notification when a batch is completed
type DefaultJobSubmitter ¶
type DefaultJobSubmitter struct{}
func NewDefaultJobSubmitter ¶
func NewDefaultJobSubmitter() *DefaultJobSubmitter
type DirectJobSubmitter ¶
type DirectJobSubmitter struct {
// contains filtered or unexported fields
}
func GetDirectSubmitter ¶
func GetDirectSubmitter() *DirectJobSubmitter
GetDirectSubmitter returns the singleton DirectJobSubmitter instance. This submitter bypasses the worker queue and calls the HTTP client directly,
func InitDirectSubmitter ¶
func InitDirectSubmitter(adapters []clientManager.ClientAdapter, maxConcurrent int, verbose bool) *DirectJobSubmitter
InitDirectSubmitter initializes the DirectSubmitter singleton eagerly at startup.
func NewDirectJobSubmitter ¶
func NewDirectJobSubmitter(adapters []clientManager.ClientAdapter, maxConcurrent int, verbose bool) *DirectJobSubmitter
NewDirectJobSubmitter creates a submitter that calls the HTTP client directly. maxConcurrent: Maximum number of concurrent HTTP requests (default: 3000)
func (*DirectJobSubmitter) GetMaxConcurrent ¶
func (d *DirectJobSubmitter) GetMaxConcurrent() int64
GetMaxConcurrent returns the maximum concurrent requests allowed
type FIFOQueueManager ¶
type FIFOQueueManager struct {
// contains filtered or unexported fields
}
FIFOQueueManager implements JobQueue interface with FIFO ordering
func NewFIFOQueueManager ¶
func NewFIFOQueueManager() *FIFOQueueManager
func (*FIFOQueueManager) Dequeue ¶
func (f *FIFOQueueManager) Dequeue() *Job
func (*FIFOQueueManager) Enqueue ¶
func (f *FIFOQueueManager) Enqueue(request *Job)
func (*FIFOQueueManager) Size ¶
func (f *FIFOQueueManager) Size() int
Size returns the current queue size
type IBatchReqManager ¶
type IBatchReqManager interface {
// AddJob adds a job to the batch queue
AddJob(job *Job) error
// FlushBatch forces the current batch to be sent immediately
FlushBatch(ctx context.Context) error
// Start begins the background flush routine
Start(ctx context.Context)
// Stop gracefully stops the manager and flushes pending requests
Stop(ctx context.Context) error
// GetStats returns statistics about the batch manager
GetStats() *BatchManagerStats
}
IBatchReqManager defines the interface for managing batch requests
func NewBatchReqManager ¶
func NewBatchReqManager(batchClient *client.BatchClient) (IBatchReqManager, error)
NewBatchReqManager creates a new batch request manager with default settings
func NewBatchReqManagerWithConfig ¶
func NewBatchReqManagerWithConfig(config *BatchManagerConfig) (IBatchReqManager, error)
NewBatchReqManagerWithConfig creates a new batch request manager with custom configuration
type IBatchWebhookNotifier ¶
type IBatchWebhookNotifier interface {
// NotifyBatchComplete sends a notification when a batch is completed
NotifyBatchComplete(ctx context.Context, batch *client.Batch, responses []client.BatchResponse, jobs []*BatchJobEntry) error
// IsEnabled returns true if webhook notifications are configured
IsEnabled() bool
}
IBatchWebhookNotifier defines the interface for sending batch completion notifications
func NewBatchWebhookNotifier ¶
func NewBatchWebhookNotifier(config *BatchWebhookConfig) IBatchWebhookNotifier
NewBatchWebhookNotifier creates a new webhook notifier with custom configuration
func NewBatchWebhookNotifierFromEnv ¶
func NewBatchWebhookNotifierFromEnv() IBatchWebhookNotifier
NewBatchWebhookNotifierFromEnv creates a webhook notifier from environment variables
type IJobQueue ¶
IJobQueue defines the contract for different queue implementations
func NewJobQueueByType ¶
NewJobQueueByType creates a new JobQueueInterface based on the specified type
type IJobQueueManager ¶
type JobQueue ¶
type JobQueue struct {
// contains filtered or unexported fields
}
JobQueue manages the flow of jobs from ingestion to workers.
func NewJobQueue ¶
func (*JobQueue) StartManager ¶
StartManager begins moving jobs from the internal FIFO queue to the worker channel. PERFORMANCE FIX: Removed 100ms timeout that was causing 18-second delays under load
func (*JobQueue) StopManager ¶
func (q *JobQueue) StopManager()
type JobQueueManager ¶
type JobQueueManager struct {
// contains filtered or unexported fields
}
JobQueue manages the flow of jobs from ingestion to workers.
func NewJobQueueManager ¶
func NewJobQueueManager(concurrency, maxQueueSize int, jobQueue IJobQueue) *JobQueueManager
func (*JobQueueManager) Dequeue ¶
func (q *JobQueueManager) Dequeue() *Job
func (*JobQueueManager) Enqueue ¶
func (q *JobQueueManager) Enqueue(request *Job)
func (*JobQueueManager) Jobs ¶
func (q *JobQueueManager) Jobs() <-chan *Job
func (*JobQueueManager) StartManager ¶
func (q *JobQueueManager) StartManager(wg *sync.WaitGroup)
StartManager begins moving jobs from the internal FIFO queue to the worker channel.
func (*JobQueueManager) StopManager ¶
func (q *JobQueueManager) StopManager()
type JobSumitter ¶
type JobSumitter interface {
SubmitJob(job *Job, workerChannel chan *Job) (any, *openai.Usage, error)
}
ClientAdapter defines the component that actually processes the job's data.
func JobSubmitterFactory ¶
func JobSubmitterFactory(submitterType string) JobSumitter
type OrchestrationService ¶
type OrchestrationService interface {
Stop()
GetJobQueueManager() IJobQueueManager
}
OrchestrationService defines the public interface for the rate limiting and job processing service. This allows us to hide the internal Orchestrator implementation from the consumer.
var ( // OpenAI Services & Channels Orchestator OrchestrationService WorkerChannel chan *Job )
func NewOrchestrationService ¶
func NewOrchestrationService( wg *sync.WaitGroup, maxTokensPerMinute int, maxRequestsPerMinute int, clientAdapter clientManager.ClientAdapter, strategy backoff.BackoffStrategy, ) OrchestrationService
NewOrchestrationService creates, configures, and starts the entire job processing pipeline. It wires together all the necessary components based on the provided configuration.
type Orchestrator ¶
type Orchestrator struct {
// contains filtered or unexported fields
}
Orchestrator coordinates workers, queues, rate limiting, and error handling.
func NewOrchestrator ¶
func NewOrchestrator( config OrchestratorConfig, handler clientManager.ClientAdapter, queue IJobQueueManager, backoffManager BackoffManager, retryHandler *RetryHandler, classifier *backoff.ErrorClassifier, ) *Orchestrator
NewOrchestrator creates and wires up a new Orchestrator instance.
func (*Orchestrator) GetBatchManager ¶
func (o *Orchestrator) GetBatchManager() IBatchReqManager
GetBatchManager returns the batch manager if configured
func (*Orchestrator) GetJobQueueManager ¶
func (o *Orchestrator) GetJobQueueManager() IJobQueueManager
func (*Orchestrator) IsBatchProcessingEnabled ¶
func (o *Orchestrator) IsBatchProcessingEnabled() bool
IsBatchProcessingEnabled returns whether batch processing is enabled
func (*Orchestrator) SetBatchManager ¶
func (o *Orchestrator) SetBatchManager(batchManager IBatchReqManager)
SetBatchManager sets the batch manager for handling low-priority jobs. This should be called after creating the orchestrator if batch processing is enabled.
func (*Orchestrator) StartProcessing ¶
func (o *Orchestrator) StartProcessing()
StartProcessing begins the worker pool and the queue manager.
func (*Orchestrator) Stop ¶
func (o *Orchestrator) Stop()
Stop gracefully shuts down the orchestrator and waits for workers to finish.
func (*Orchestrator) SubmitJobWithRouting ¶
func (o *Orchestrator) SubmitJobWithRouting(job *Job) error
SubmitJobWithRouting routes jobs to either batch processing or real-time processing based on the job's priority and configuration.
type OrchestratorConfig ¶
type OrchestratorConfig struct {
Concurrency int
MaxTokensPerMinute int
MaxRequestsPerMinute int
MaxQueueSize int
Verbose bool
EnableBatchProcessing bool // Enable batch processing for low-priority jobs
BatchPriorityThreshold int32 // Jobs with priority below this go to batch (default: 0)
}
OrchestratorConfig holds the configuration for the Orchestrator.
type PriorityQueue ¶
type PriorityQueue []*Job
func (PriorityQueue) Len ¶
func (pq PriorityQueue) Len() int
func (PriorityQueue) Less ¶
func (pq PriorityQueue) Less(i, j int) bool
func (*PriorityQueue) Pop ¶
func (pq *PriorityQueue) Pop() any
func (*PriorityQueue) Push ¶
func (pq *PriorityQueue) Push(x any)
func (PriorityQueue) Swap ¶
func (pq PriorityQueue) Swap(i, j int)
type RequestQueueManager ¶
type RequestQueueManager struct {
// contains filtered or unexported fields
}
RequestQueueManager implements JobQueue interface with priority-based ordering
func NewRequestQueueManager ¶
func NewRequestQueueManager() *RequestQueueManager
func (*RequestQueueManager) Dequeue ¶
func (r *RequestQueueManager) Dequeue() *Job
func (*RequestQueueManager) Enqueue ¶
func (r *RequestQueueManager) Enqueue(request *Job)
func (*RequestQueueManager) Size ¶
func (r *RequestQueueManager) Size() int
Size returns the current queue size
type RetryHandler ¶
RetryHandler manages retry logic for non-rate-limit errors.
func NewRetryHandler ¶
func NewRetryHandler(maxRetries int, verbose bool) *RetryHandler
func (*RetryHandler) HandlePermanentError ¶
func (rh *RetryHandler) HandlePermanentError(job *Job, workerID int, err error)
func (*RetryHandler) HandleTransientError ¶
func (rh *RetryHandler) HandleTransientError(job *Job, queue IJobQueueManager, workerID int, err error)
type WebhookError ¶
WebhookError represents an error in the webhook payload
type WebhookResponse ¶
type WebhookResponse struct {
CustomID string `json:"custom_id"`
Success bool `json:"success"`
Response map[string]interface{} `json:"response,omitempty"`
Error *WebhookError `json:"error,omitempty"`
Timestamp int64 `json:"timestamp"`
}
WebhookResponse represents a single response in the webhook payload