Documentation ¶
Index ¶
- Variables
- type Batcher
- type BatcherOption
- type MockBatcher
- func (_m *MockBatcher[P, T]) Append(ctx context.Context, payload P) async.Task[T]
- func (_m *MockBatcher[P, T]) BuyTicket(ctx context.Context) context.Context
- func (_m *MockBatcher[P, T]) DiscardTicket(ctx context.Context)
- func (_m *MockBatcher[P, T]) Process(ctx context.Context)
- func (_m *MockBatcher[P, T]) Shutdown()
- func (_m *MockBatcher[P, T]) Size() int
- type MockSilentBatcher
- func (_m *MockSilentBatcher[P]) Append(ctx context.Context, payload P) async.SilentTask
- func (_m *MockSilentBatcher[P]) BuyTicket(ctx context.Context) context.Context
- func (_m *MockSilentBatcher[P]) DiscardTicket(ctx context.Context)
- func (_m *MockSilentBatcher[P]) Process(ctx context.Context)
- func (_m *MockSilentBatcher[P]) Shutdown()
- func (_m *MockSilentBatcher[P]) Size() int
- type SilentBatcher
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var (
ErrBatchProcessorNotActive = errors.New("batch processor has already shut down")
)
Functions ¶
This section is empty.
Types ¶
type Batcher ¶
type Batcher[P any, T any] interface { // Append adds a new payload to the batch and returns a task for that particular payload. // Clients MUST execute the returned task before blocking and waiting for it to complete // to extract result. Append(ctx context.Context, payload P) async.Task[T] // contains filtered or unexported methods }
Batcher is a batch processor which is suitable for sitting in the back to receive tasks from callers to execute in one go and then return individual result to each caller.
Example ¶
ctx := context.Background() b := NewBatcher( func(input []int) ([]int, error) { fmt.Println(input) result := make([]int, len(input)) for idx, number := range input { result[idx] = number * 2 } return result, nil }, nil, ) defer b.Shutdown() t1 := b.Append(ctx, 1) t2 := b.Append(ctx, 2) b.Process(ctx) async.ContinueWithNoResult( t1, func(_ context.Context, v int, err error) error { fmt.Println(v) return nil }, ).ExecuteSync(ctx) async.ContinueWithNoResult( t2, func(_ context.Context, v int, err error) error { fmt.Println(v) return nil }, ).ExecuteSync(ctx)
Output: [1 2] 2 4
func NewBatcher ¶
func NewBatcher[P any, T any]( processFn func([]P) ([]T, error), payloadKeyExtractor func(P) any, options ...BatcherOption, ) Batcher[P, T]
NewBatcher returns a new Batcher
type BatcherOption ¶
type BatcherOption func(*batcherConfigs)
func WithAutoProcessByTicketBooth ¶
func WithAutoProcessByTicketBooth() BatcherOption
WithAutoProcessByTicketBooth creates a synchronous ticket booth that can track how many ticket owners have arrived in order to start batch processing.
func WithAutoProcessInterval ¶
func WithAutoProcessInterval(autoProcessIntervalInMilliseconds time.Duration) BatcherOption
WithAutoProcessInterval sets the interval at which Batcher will automatically process the pending tasks. If `autoProcessDurationInMilliseconds <= 0`, the default behavior applies: no periodic auto processing will be done.
Note: if periodic auto processing is ON, clients MUST call Batcher.Shutdown() to clean up the timer goroutine properly in order to avoid memory leak.
func WithAutoProcessSize ¶
func WithAutoProcessSize(autoProcessSize int) BatcherOption
WithAutoProcessSize sets the limit at which Batcher will automatically process the pending tasks. If `autoProcessSize <= 0`, the default behavior applies: no auto processing will be done based on size.
func WithShutdownGraceDuration ¶
func WithShutdownGraceDuration(shutdownDurationInMilliseconds time.Duration) BatcherOption
WithShutdownGraceDuration specifies how long Batcher will wait for the Shutdown operation to complete before returning. If `shutdownDurationInMilliseconds <= 0`, Batcher will block and wait until the shutdown operation fully completes.
type MockBatcher ¶
MockBatcher is an autogenerated mock type for the Batcher type
func NewMockBatcher ¶
func NewMockBatcher[P interface{}, T interface{}](t mockConstructorTestingTNewMockBatcher) *MockBatcher[P, T]
NewMockBatcher creates a new instance of MockBatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockBatcher[P, T]) Append ¶
func (_m *MockBatcher[P, T]) Append(ctx context.Context, payload P) async.Task[T]
Append provides a mock function with given fields: ctx, payload
func (*MockBatcher[P, T]) BuyTicket ¶
func (_m *MockBatcher[P, T]) BuyTicket(ctx context.Context) context.Context
BuyTicket provides a mock function with given fields: ctx
func (*MockBatcher[P, T]) DiscardTicket ¶
func (_m *MockBatcher[P, T]) DiscardTicket(ctx context.Context)
DiscardTicket provides a mock function with given fields: ctx
func (*MockBatcher[P, T]) Process ¶
func (_m *MockBatcher[P, T]) Process(ctx context.Context)
Process provides a mock function with given fields: ctx
func (*MockBatcher[P, T]) Shutdown ¶
func (_m *MockBatcher[P, T]) Shutdown()
Shutdown provides a mock function with given fields:
func (*MockBatcher[P, T]) Size ¶
func (_m *MockBatcher[P, T]) Size() int
Size provides a mock function with given fields:
type MockSilentBatcher ¶
MockSilentBatcher is an autogenerated mock type for the SilentBatcher type
func NewMockSilentBatcher ¶
func NewMockSilentBatcher[P interface{}](t mockConstructorTestingTNewMockSilentBatcher) *MockSilentBatcher[P]
NewMockSilentBatcher creates a new instance of MockSilentBatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func (*MockSilentBatcher[P]) Append ¶
func (_m *MockSilentBatcher[P]) Append(ctx context.Context, payload P) async.SilentTask
Append provides a mock function with given fields: ctx, payload
func (*MockSilentBatcher[P]) BuyTicket ¶
func (_m *MockSilentBatcher[P]) BuyTicket(ctx context.Context) context.Context
BuyTicket provides a mock function with given fields: ctx
func (*MockSilentBatcher[P]) DiscardTicket ¶
func (_m *MockSilentBatcher[P]) DiscardTicket(ctx context.Context)
DiscardTicket provides a mock function with given fields: ctx
func (*MockSilentBatcher[P]) Process ¶
func (_m *MockSilentBatcher[P]) Process(ctx context.Context)
Process provides a mock function with given fields: ctx
func (*MockSilentBatcher[P]) Shutdown ¶
func (_m *MockSilentBatcher[P]) Shutdown()
Shutdown provides a mock function with given fields:
func (*MockSilentBatcher[P]) Size ¶
func (_m *MockSilentBatcher[P]) Size() int
Size provides a mock function with given fields:
type SilentBatcher ¶
type SilentBatcher[P any] interface { // Append adds a new payload to the batch and returns a task for that particular payload. // Clients MUST execute the returned task before blocking and waiting for it to complete // to extract result. Append(ctx context.Context, payload P) async.SilentTask // contains filtered or unexported methods }
SilentBatcher is a batch processor which is suitable for sitting in the back to accumulate tasks and then execute all in one go silently.
Example ¶
var wg sync.WaitGroup wg.Add(2) ctx := context.Background() b := NewSilentBatcher( func(input []interface{}) error { fmt.Println(input) return nil }, ) async.ContinueInSilence( b.Append(ctx, 1), func(_ context.Context, err error) error { wg.Done() return nil }, ).Execute(ctx) async.ContinueInSilence( b.Append(ctx, 2), func(_ context.Context, err error) error { wg.Done() return nil }, ).Execute(ctx) b.Process(ctx) wg.Wait()
Output: [1 2]
func NewSilentBatcher ¶
func NewSilentBatcher[P any](processFn func([]P) error, options ...BatcherOption) SilentBatcher[P]
NewSilentBatcher returns a new SilentBatcher