Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrJobNotFound = errors.New("microbatching: Job not found")
ErrJobNotFound is returned by the Service.JobResult method when the job is not found.
var ErrServiceClosed = errors.New("microbatching: Service closed")
ErrServiceClosed is returned by the Service.AddJob methods after a call to Service.Shutdown.
Functions ¶
Types ¶
type BatchProcessor ¶
type BatchProcessor interface {
Process(jobs []Job) []ProcessingResult
}
BatchProcessor describes a batch processor interface.
type JobExtendedResult ¶
JobExtendedResult describes job result with state.
type JobResult ¶
type JobResult struct { Err error Result interface{} }
JobResult describes job result.
type ProcessingResult ¶
ProcessingResult describes job processing result by the batch processor.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner is a micro-batching runner. It reads batches from the channel and stores them into a queue. It processes the queue in a batch when the ticker ticks. It notifies the results to the notification channel.
func NewRunner ¶
func NewRunner( bp BatchProcessor, bc <-chan []Job, nc chan<- JobExtendedResult, freq time.Duration, ) *Runner
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is a micro-batching service that processes jobs in batches.
Example ¶
srv := mb.NewService(mb.WithFrequency(10 * time.Millisecond)) srv.Run(&mockBatchProcessor{}) jobsSize := 7 testJobs := makeMockJobs(jobsSize) for _, j := range testJobs { srv.AddJob(j) } time.Sleep(50 * time.Millisecond) for _, j := range testJobs { r, err := srv.JobResult(j.ID()) if err != nil { fmt.Println(err) } else { fmt.Printf("Job ID: %s, State: %s\n", r.JobID, r.State) } } srv.Shutdown()
Output: Job ID: 0, State: Completed Job ID: 1, State: Completed Job ID: 2, State: Completed Job ID: 3, State: Completed Job ID: 4, State: Completed Job ID: 5, State: Completed Job ID: 6, State: Completed
func NewService ¶
func NewService(opt ...ServiceOption) *Service
func (*Service) AddJob ¶
AddJob adds a job to the queue. It returns an error if the service is closed.
func (*Service) JobResult ¶
func (s *Service) JobResult(jobID string) (JobExtendedResult, error)
JobResult returns the result of a job. It returns an error if the job is not found.
func (*Service) Run ¶
func (s *Service) Run(bp BatchProcessor)
type ServiceOption ¶
type ServiceOption interface {
// contains filtered or unexported methods
}
ServiceOption sets service options such as batch size and frequency.
func WithBatchSize ¶
func WithBatchSize(v int) ServiceOption
WithBatchSize returns a ServiceOption that sets batch size.
func WithFrequency ¶
func WithFrequency(v time.Duration) ServiceOption
WithFrequency returns a ServiceOption that sets frequency.
func WithLogger ¶
func WithLogger(v Logger) ServiceOption
WithLogger returns a ServiceOption that sets service logger.
func WithQueueSize ¶
func WithQueueSize(v int) ServiceOption
WithQueueSize returns a ServiceOption that sets jobs queue size.
func WithShutdownTimeout ¶
func WithShutdownTimeout(v time.Duration) ServiceOption
WithShutdownTimeout returns a ServiceOption that sets service shutdown timeout.