processor

package module
v0.0.0-...-3f7829a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2023 License: Apache-2.0 Imports: 5 Imported by: 0

README

Batch Processor

This Go package provides a flexible and configurable batch processing framework for concurrent execution of tasks in a controlled manner. It allows for efficient handling of batches of items using a supplier, processor, and finalizer.

Usage

To use the batch processor, follow these steps:

  1. Create a Supplier:

    • Implement the Supplier interface to fetch the next batch of items.
  2. Create a Processor:

    • Implement the Processor interface to define how to process a batch of items.
  3. Create a Finalizer (Optional):

    • Implement the Finalizer interface if you need to perform actions after each batch is processed.
  4. Instantiate the Batch Processor:

    processor := processor.NewBatchProcessorImpl(maxWorkersCount, yourSupplier, yourProcessor).
        WithFinalizer(yourFinalizer).
        WithNoBatchSleepIntervalInMilliseconds(sleepIntervalInMillis).
        WithProcessorTimeout(processorTimeoutInMillis)
    
  5. Start the Batch Processor:

    processor.Start()
    
  6. Stop the Batch Processor:

    processor.Stop()
    

Interfaces

BatchProcessor[T any]
  • Start(): Initiates the batch processing loop.
  • Stop(): Stops the batch processor, waiting for all processing to finish.
Supplier[T any]
  • FetchNextBatch() ([]*T, error): Fetches the next batch of items for processing.
Processor[T any]
  • ProcessBatch(ctx context.Context, batch []*T) ([]*T, error): Processes a batch of items with optional context and returns the processed items or an error.
Finalizer[T any] (Optional)
  • OnBatchProcessed(processedBatch []*T, err error): Performs actions after process

Mocking

Mocks were generated using https://vektra.github.io/mockery/latest/

Documentation

Index

Constants

View Source
const DefaultTimeout = time.Duration(2147483647) * time.Millisecond

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchProcessor

type BatchProcessor[T any] interface {
	Start()
	Stop()
}

type BatchProcessorImpl

type BatchProcessorImpl[T any] struct {
	// contains filtered or unexported fields
}

func NewBatchProcessorImpl

func NewBatchProcessorImpl[T any](maxWorkersCount int, supplier Supplier[T], processor Processor[T]) *BatchProcessorImpl[T]

func (*BatchProcessorImpl[T]) Start

func (s *BatchProcessorImpl[T]) Start()

func (*BatchProcessorImpl[T]) Stop

func (s *BatchProcessorImpl[T]) Stop()

Stop /* Stops the processor. It will wait until all the processing is finished.

func (*BatchProcessorImpl[T]) WithFinalizer

func (s *BatchProcessorImpl[T]) WithFinalizer(finalizer Finalizer[T]) *BatchProcessorImpl[T]

WithFinalizer /* Finalizers are useful if there is a need to run code after the batch is processed

func (*BatchProcessorImpl[T]) WithNoBatchSleepIntervalInMilliseconds

func (s *BatchProcessorImpl[T]) WithNoBatchSleepIntervalInMilliseconds(millis int64) *BatchProcessorImpl[T]

WithNoBatchSleepIntervalInMilliseconds /* For how long should the processor wait before trying to get the next batch of messages in case the previous fetch was empty

func (*BatchProcessorImpl[T]) WithProcessorTimeout

func (s *BatchProcessorImpl[T]) WithProcessorTimeout(processorTimeoutMillis int64) *BatchProcessorImpl[T]

type Finalizer

type Finalizer[T any] interface {
	OnBatchProcessed(processedBatch []*T, err error)
}

type MockFinalizer

type MockFinalizer[T interface{}] struct {
	mock.Mock
}

MockFinalizer is an autogenerated mock type for the Finalizer type

func NewMockFinalizer

func NewMockFinalizer[T interface{}](t interface {
	mock.TestingT
	Cleanup(func())
}) *MockFinalizer[T]

NewMockFinalizer creates a new instance of MockFinalizer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockFinalizer[T]) EXPECT

func (_m *MockFinalizer[T]) EXPECT() *MockFinalizer_Expecter[T]

func (*MockFinalizer[T]) OnBatchProcessed

func (_m *MockFinalizer[T]) OnBatchProcessed(processedBatch []*T, err error)

OnBatchProcessed provides a mock function with given fields: processedBatch, err

type MockFinalizer_Expecter

type MockFinalizer_Expecter[T interface{}] struct {
	// contains filtered or unexported fields
}

func (*MockFinalizer_Expecter[T]) OnBatchProcessed

func (_e *MockFinalizer_Expecter[T]) OnBatchProcessed(processedBatch interface{}, err interface{}) *MockFinalizer_OnBatchProcessed_Call[T]

OnBatchProcessed is a helper method to define mock.On call

  • processedBatch []*T
  • err error

type MockFinalizer_OnBatchProcessed_Call

type MockFinalizer_OnBatchProcessed_Call[T interface{}] struct {
	*mock.Call
}

MockFinalizer_OnBatchProcessed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnBatchProcessed'

func (*MockFinalizer_OnBatchProcessed_Call[T]) Return

func (*MockFinalizer_OnBatchProcessed_Call[T]) Run

func (_c *MockFinalizer_OnBatchProcessed_Call[T]) Run(run func(processedBatch []*T, err error)) *MockFinalizer_OnBatchProcessed_Call[T]

func (*MockFinalizer_OnBatchProcessed_Call[T]) RunAndReturn

func (_c *MockFinalizer_OnBatchProcessed_Call[T]) RunAndReturn(run func([]*T, error)) *MockFinalizer_OnBatchProcessed_Call[T]

type MockProcessor

type MockProcessor[T interface{}] struct {
	mock.Mock
}

MockProcessor is an autogenerated mock type for the Processor type

func NewMockProcessor

func NewMockProcessor[T interface{}](t interface {
	mock.TestingT
	Cleanup(func())
}) *MockProcessor[T]

NewMockProcessor creates a new instance of MockProcessor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockProcessor[T]) EXPECT

func (_m *MockProcessor[T]) EXPECT() *MockProcessor_Expecter[T]

func (*MockProcessor[T]) ProcessBatch

func (_m *MockProcessor[T]) ProcessBatch(ctx context.Context, batch []*T) ([]*T, error)

ProcessBatch provides a mock function with given fields: ctx, batch

type MockProcessor_Expecter

type MockProcessor_Expecter[T interface{}] struct {
	// contains filtered or unexported fields
}

func (*MockProcessor_Expecter[T]) ProcessBatch

func (_e *MockProcessor_Expecter[T]) ProcessBatch(ctx interface{}, batch interface{}) *MockProcessor_ProcessBatch_Call[T]

ProcessBatch is a helper method to define mock.On call

  • ctx context.Context
  • batch []*T

type MockProcessor_ProcessBatch_Call

type MockProcessor_ProcessBatch_Call[T interface{}] struct {
	*mock.Call
}

MockProcessor_ProcessBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessBatch'

func (*MockProcessor_ProcessBatch_Call[T]) Return

func (*MockProcessor_ProcessBatch_Call[T]) Run

func (_c *MockProcessor_ProcessBatch_Call[T]) Run(run func(ctx context.Context, batch []*T)) *MockProcessor_ProcessBatch_Call[T]

func (*MockProcessor_ProcessBatch_Call[T]) RunAndReturn

func (_c *MockProcessor_ProcessBatch_Call[T]) RunAndReturn(run func(context.Context, []*T) ([]*T, error)) *MockProcessor_ProcessBatch_Call[T]

type MockSupplier

type MockSupplier[T interface{}] struct {
	mock.Mock
}

MockSupplier is an autogenerated mock type for the Supplier type

func NewMockSupplier

func NewMockSupplier[T interface{}](t interface {
	mock.TestingT
	Cleanup(func())
}) *MockSupplier[T]

NewMockSupplier creates a new instance of MockSupplier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.

func (*MockSupplier[T]) EXPECT

func (_m *MockSupplier[T]) EXPECT() *MockSupplier_Expecter[T]

func (*MockSupplier[T]) FetchNextBatch

func (_m *MockSupplier[T]) FetchNextBatch() ([]*T, error)

FetchNextBatch provides a mock function with given fields:

type MockSupplier_Expecter

type MockSupplier_Expecter[T interface{}] struct {
	// contains filtered or unexported fields
}

func (*MockSupplier_Expecter[T]) FetchNextBatch

func (_e *MockSupplier_Expecter[T]) FetchNextBatch() *MockSupplier_FetchNextBatch_Call[T]

FetchNextBatch is a helper method to define mock.On call

type MockSupplier_FetchNextBatch_Call

type MockSupplier_FetchNextBatch_Call[T interface{}] struct {
	*mock.Call
}

MockSupplier_FetchNextBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FetchNextBatch'

func (*MockSupplier_FetchNextBatch_Call[T]) Return

func (*MockSupplier_FetchNextBatch_Call[T]) Run

func (*MockSupplier_FetchNextBatch_Call[T]) RunAndReturn

func (_c *MockSupplier_FetchNextBatch_Call[T]) RunAndReturn(run func() ([]*T, error)) *MockSupplier_FetchNextBatch_Call[T]

type Processor

type Processor[T any] interface {
	ProcessBatch(ctx context.Context, batch []*T) ([]*T, error)
}

type Supplier

type Supplier[T any] interface {
	FetchNextBatch() ([]*T, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL