batch

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 17, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	FieldTypeMismatchErr      = "RequestFieldsTypesMismatched"
	DisallowedEventNameErrStr = "Disallowed Event Name"
)
View Source
const (
	SuccessfulTestData        = "success"
	IndividualSuccessTestData = "individual_success"
	RetryTestData             = "retry"
	FailTestData              = "fail"
)
View Source
const (
	ConflictEmailsErr     = "Email conflicts"
	ConflictUserIdsErr    = "UserId conflicts"
	ForgottenEmailsErr    = "Email Forgotten"
	ForgottenUserIdsErr   = "UserId Forgotten"
	InvalidDataEmailsErr  = "Invalid Data"
	InvalidDataUserIdsErr = InvalidDataEmailsErr
	InvalidEmailsErr      = "Malformed Email"
	InvalidUserIdsErr     = "Malformed UserId"
	NotFoundEmailsErr     = "Email not found"
	NotFoundUserIdsErr    = "UserId not found"
	ValidEmailFailures    = "Internal Error with Email"
	ValidUserIdFailures   = "Internal Error with UserId"

	InvalidDataTypeErrStr = "Invalid data type in batch request"
	InvalidListId         = "List ID not valid for Iterable project"
	UnknownError          = "Subscribe request failed with unknown error"
)

Variables

View Source
var (
	DisallowedEventNameErr = &iterable_errors.ApiError{
		Stage:          iterable_errors.STAGE_REQUEST,
		Type:           iterable_errors.TYPE_HTTP_STATUS,
		HttpStatusCode: 400,
		IterableCode:   DisallowedEventNameErrStr,
	}
)
View Source
var (
	InvalidDataErr = &iterable_errors.ApiError{
		Stage:          iterable_errors.STAGE_BEFORE_REQUEST,
		Type:           iterable_errors.TYPE_INVALID_DATA,
		HttpStatusCode: 500,
		IterableCode:   InvalidDataTypeErrStr,
	}
)
View Source
var (
	NoIndividualRetryError = &iterable_errors.ApiError{
		Stage:          iterable_errors.STAGE_BEFORE_REQUEST,
		Type:           "no_list_subscribe_individual_retry",
		HttpStatusCode: 0,
		IterableCode:   "no_individual_retry",
	}
)

Functions

func NewFakeTransport

func NewFakeTransport(failCnt int, rateLimitCnt int) *fakeTransport

Types

type BoolFunc

type BoolFunc func() bool
var IsFalse BoolFunc = func() bool { return false }
var IsTrue BoolFunc = func() bool { return true }

type Handler

type Handler interface {
	// ProcessBatch processes multiple messages in a single batch operation
	// for efficiency. Returns a slice of Response objects (one per input message),
	// an error if the entire batch failed, and a boolean indicating
	// if the batch operation can be retried.
	ProcessBatch(batch []Message) ([]Response, error, bool)

	// ProcessOne processes a single message individually, typically used
	// for retry scenarios when batch processing fails or when individual messages
	// need special handling.
	// Returns a single Response object with the result of processing the message.
	ProcessOne(message Message) Response
}

Handler defines the contract for implementing batch processing logic for specific API operations. Implementations should handle both batch processing (for efficiency) and individual processing (for retry scenarios). The handler is responsible for making actual API calls and converting results into Response objects.

Usage Example:

type MyHandler struct {
    apiClient *api.Events
}

func (h *MyHandler) ProcessBatch(messages []Message) ([]Response, error, bool) {
    // Convert messages to types.EventTrackBulkRequest
    results, err := h.apiClient.TrackBulk(batch)
    if err != nil {
        return nil, err, true // error, can retry
    }
    // Convert results to Response objects
    return responses, nil, false
}

func (h *MyHandler) ProcessOne(message Message) Response {
    // Convert message to types.EventTrackRequest
    result, err := h.apiClient.Track(request)
    return Response{Data: result, OriginalReq: message, Error: err}
}

func NewEventTrackHandler

func NewEventTrackHandler(client *api.Events, logger logger.Logger) Handler

func NewListSubscribeHandler

func NewListSubscribeHandler(
	client *api.Lists,
	logger logger.Logger,
) Handler

func NewListUnSubscribeBatchHandler

func NewListUnSubscribeBatchHandler(
	client *api.Lists,
	logger logger.Logger,
) Handler

func NewSubscriptionUpdateHandler

func NewSubscriptionUpdateHandler(
	client *api.Users,
	logger logger.Logger,
) Handler

func NewUserEmailUpdateHandler

func NewUserEmailUpdateHandler(
	client *api.Users,
	logger logger.Logger,
) Handler

func NewUserUpdateHandler

func NewUserUpdateHandler(
	client *api.Users,
	logger logger.Logger,
) Handler

type Message

type Message struct {
	// Data contains the actual request payload to be sent to Iterable
	// (e.g., user data, event data, etc.)
	Data any
	// MetaData holds optional contextual information that
	// can be used for tracking, correlation, or response handling
	MetaData any
}

Message represents a generic request to be processed in a batch operation. It serves as a wrapper that carries both the actual data to be sent to Iterable and optional metadata that can be used for tracking or response correlation.

Usage Example:

message := batch.Message{
    Data:     userUpdateRequest,  // The actual API request data
    MetaData: "user-123",         // Optional tracking identifier
}
processor.Add(message)

type Processor

type Processor interface {
	// Start begins the batch processing loop. The processor
	// will start listening for messages and automatically flush batches
	// when FlushQueueSize is reached or FlushInterval elapses.
	// This method is idempotent - calling Start() multiple times
	// has no effect if already running.
	Start()

	// Stop gracefully shuts down the processor. It closes the message channel,
	// waits for all in-flight batches to complete (both sync and async),
	// and prepares for potential restart.
	// This method is idempotent - calling Stop() multiple times
	// has no effect if already stopped.
	Stop()

	// Add queues a message for batch processing. Messages are accumulated
	// until FlushQueueSize is reached or FlushInterval elapses,
	// then processed as a batch by the configured Handler.
	// This method is thread-safe and will block if the internal buffer is full.
	Add(req Message)
}

Processor provides a batching mechanism for processing messages efficiently. It accumulates individual messages and processes them in batches based on size or time thresholds, with support for retries, async processing, and response handling.

Usage Example:

// Create a processor with a handler and configuration
processor := batch.NewProcessor(
    myHandler,           // Handler that implements ProcessBatch and ProcessOne
    responseChan,        // Optional channel to receive processing results
    batch.ProcessorConfig{
        FlushQueueSize: 100,           // Process when 100 messages accumulate
        FlushInterval:  5*time.Second, // Or process every 5 seconds
        MaxRetries:     3,             // Retry failed batches up to 3 times
        Async:          batch.Async,   // Process batches asynchronously
    },
)

// Start the processor (begins listening for messages)
processor.Start()

// Add messages for batch processing
processor.Add(message1)
processor.Add(message2)
// ... messages will be automatically batched and processed

// Stop the processor (waits for in-flight batches to complete)
processor.Stop()

func NewProcessor

func NewProcessor(
	handler Handler,
	respChan chan<- Response,
	config ProcessorConfig,
) Processor

type ProcessorConfig

type ProcessorConfig struct {
	// FlushQueueSize defines the maximum number of messages
	// to accumulate before triggering a batch flush
	// default: 100
	FlushQueueSize int

	// FlushInterval specifies the maximum time to wait
	// before flushing a batch, even if FlushQueueSize hasn't been reached
	// default: 5 seconds
	FlushInterval time.Duration

	// MaxRetries sets the maximum number of retry attempts
	// for failed batch operations
	// default: 1
	MaxRetries int

	// Retry configures the retry strategy (exponential backoff, delays, etc.)
	// for failed requests
	// default: retry.NewExponentialRetry
	Retry retry.Retry

	// SendIndividual is a function that determines whether the processor
	// needs to send messages one-by-one (individually) if batch request succeeds,
	// but some messages in the batch have errors.
	// This is useful when there's 1 "bad" message in the batch which fails
	// the entire batch.
	// default: true
	SendIndividual BoolFunc

	// MaxBufferSize determines the buffer size of the internal request channel
	// to prevent blocking on Add() calls
	// default: 2000
	MaxBufferSize int

	// Async is a function that determines whether batch processing
	// should run asynchronously or synchronously
	// default: true
	Async BoolFunc

	// MaxAsyncRequests limits the number of concurrent goroutines
	// when processing batches asynchronously.
	// default: 50
	MaxAsyncRequests int

	// Logger provides logging functionality for debugging
	// and monitoring batch processing operations
	// default: logger.Noop
	Logger logger.Logger
	// contains filtered or unexported fields
}

type Response

type Response struct {
	// Data contains the successful response data from the API operation
	// or nil if error occurred
	Data any
	// OriginalReq holds a reference to the original Message that was processed
	OriginalReq Message
	// Error contains any error that occurred during processing
	// or nil if successful
	Error error
	// Retry indicates whether this failed request should be retried
	// (only relevant when Error is not nil)
	Retry bool
}

Response represents the result of processing a batch request, containing both successful results and error information. It maintains a reference to the original request for correlation and includes retry information for error handling.

Usage Example:

// Successful response
response := batch.Response{
    Data:        apiResponseData,
    OriginalReq: originalMessage,
    Error:       nil,
    Retry:       false,
}

// Failed response that should be retried
response := batch.Response{
    Data:        nil,
    OriginalReq: originalMessage,
    Error:       networkError,
    Retry:       true,
}

type TestBatchHandler

type TestBatchHandler struct{}

func NewTestBatchHandler

func NewTestBatchHandler() *TestBatchHandler

func (*TestBatchHandler) ProcessBatch

func (h *TestBatchHandler) ProcessBatch(batch []Message) ([]Response, error, bool)

func (*TestBatchHandler) ProcessOne

func (h *TestBatchHandler) ProcessOne(message Message) Response

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL