Documentation
¶
Index ¶
- Constants
- Variables
- func NewFakeTransport(failCnt int, rateLimitCnt int) *fakeTransport
- type BoolFunc
- type Handler
- func NewEventTrackHandler(client *api.Events, logger logger.Logger) Handler
- func NewListSubscribeHandler(client *api.Lists, logger logger.Logger) Handler
- func NewListUnSubscribeBatchHandler(client *api.Lists, logger logger.Logger) Handler
- func NewSubscriptionUpdateHandler(client *api.Users, logger logger.Logger) Handler
- func NewUserEmailUpdateHandler(client *api.Users, logger logger.Logger) Handler
- func NewUserUpdateHandler(client *api.Users, logger logger.Logger) Handler
- type Message
- type Processor
- type ProcessorConfig
- type Response
- type TestBatchHandler
Constants ¶
const ( FieldTypeMismatchErr = "RequestFieldsTypesMismatched" DisallowedEventNameErrStr = "Disallowed Event Name" )
const ( SuccessfulTestData = "success" IndividualSuccessTestData = "individual_success" RetryTestData = "retry" FailTestData = "fail" )
const ( ConflictEmailsErr = "Email conflicts" ConflictUserIdsErr = "UserId conflicts" ForgottenEmailsErr = "Email Forgotten" ForgottenUserIdsErr = "UserId Forgotten" InvalidDataEmailsErr = "Invalid Data" InvalidDataUserIdsErr = InvalidDataEmailsErr InvalidEmailsErr = "Malformed Email" InvalidUserIdsErr = "Malformed UserId" NotFoundEmailsErr = "Email not found" NotFoundUserIdsErr = "UserId not found" ValidEmailFailures = "Internal Error with Email" ValidUserIdFailures = "Internal Error with UserId" InvalidDataTypeErrStr = "Invalid data type in batch request" InvalidListId = "List ID not valid for Iterable project" UnknownError = "Subscribe request failed with unknown error" )
Variables ¶
var ( DisallowedEventNameErr = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_REQUEST, Type: iterable_errors.TYPE_HTTP_STATUS, HttpStatusCode: 400, IterableCode: DisallowedEventNameErrStr, } )
var ( InvalidDataErr = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_BEFORE_REQUEST, Type: iterable_errors.TYPE_INVALID_DATA, HttpStatusCode: 500, IterableCode: InvalidDataTypeErrStr, } )
var ( NoIndividualRetryError = &iterable_errors.ApiError{ Stage: iterable_errors.STAGE_BEFORE_REQUEST, Type: "no_list_subscribe_individual_retry", HttpStatusCode: 0, IterableCode: "no_individual_retry", } )
Functions ¶
func NewFakeTransport ¶
Types ¶
type Handler ¶
type Handler interface {
// ProcessBatch processes multiple messages in a single batch operation
// for efficiency. Returns a slice of Response objects (one per input message),
// an error if the entire batch failed, and a boolean indicating
// if the batch operation can be retried.
ProcessBatch(batch []Message) ([]Response, error, bool)
// ProcessOne processes a single message individually, typically used
// for retry scenarios when batch processing fails or when individual messages
// need special handling.
// Returns a single Response object with the result of processing the message.
ProcessOne(message Message) Response
}
Handler defines the contract for implementing batch processing logic for specific API operations. Implementations should handle both batch processing (for efficiency) and individual processing (for retry scenarios). The handler is responsible for making actual API calls and converting results into Response objects.
Usage Example:
type MyHandler struct {
apiClient *api.Events
}
func (h *MyHandler) ProcessBatch(messages []Message) ([]Response, error, bool) {
// Convert messages to types.EventTrackBulkRequest
results, err := h.apiClient.TrackBulk(batch)
if err != nil {
return nil, err, true // error, can retry
}
// Convert results to Response objects
return responses, nil, false
}
func (h *MyHandler) ProcessOne(message Message) Response {
// Convert message to types.EventTrackRequest
result, err := h.apiClient.Track(request)
return Response{Data: result, OriginalReq: message, Error: err}
}
func NewEventTrackHandler ¶
func NewListSubscribeHandler ¶
type Message ¶
type Message struct {
// Data contains the actual request payload to be sent to Iterable
// (e.g., user data, event data, etc.)
Data any
// MetaData holds optional contextual information that
// can be used for tracking, correlation, or response handling
MetaData any
}
Message represents a generic request to be processed in a batch operation. It serves as a wrapper that carries both the actual data to be sent to Iterable and optional metadata that can be used for tracking or response correlation.
Usage Example:
message := batch.Message{
Data: userUpdateRequest, // The actual API request data
MetaData: "user-123", // Optional tracking identifier
}
processor.Add(message)
type Processor ¶
type Processor interface {
// Start begins the batch processing loop. The processor
// will start listening for messages and automatically flush batches
// when FlushQueueSize is reached or FlushInterval elapses.
// This method is idempotent - calling Start() multiple times
// has no effect if already running.
Start()
// Stop gracefully shuts down the processor. It closes the message channel,
// waits for all in-flight batches to complete (both sync and async),
// and prepares for potential restart.
// This method is idempotent - calling Stop() multiple times
// has no effect if already stopped.
Stop()
// Add queues a message for batch processing. Messages are accumulated
// until FlushQueueSize is reached or FlushInterval elapses,
// then processed as a batch by the configured Handler.
// This method is thread-safe and will block if the internal buffer is full.
Add(req Message)
}
Processor provides a batching mechanism for processing messages efficiently. It accumulates individual messages and processes them in batches based on size or time thresholds, with support for retries, async processing, and response handling.
Usage Example:
// Create a processor with a handler and configuration
processor := batch.NewProcessor(
myHandler, // Handler that implements ProcessBatch and ProcessOne
responseChan, // Optional channel to receive processing results
batch.ProcessorConfig{
FlushQueueSize: 100, // Process when 100 messages accumulate
FlushInterval: 5*time.Second, // Or process every 5 seconds
MaxRetries: 3, // Retry failed batches up to 3 times
Async: batch.Async, // Process batches asynchronously
},
)
// Start the processor (begins listening for messages)
processor.Start()
// Add messages for batch processing
processor.Add(message1)
processor.Add(message2)
// ... messages will be automatically batched and processed
// Stop the processor (waits for in-flight batches to complete)
processor.Stop()
func NewProcessor ¶
func NewProcessor( handler Handler, respChan chan<- Response, config ProcessorConfig, ) Processor
type ProcessorConfig ¶
type ProcessorConfig struct {
// FlushQueueSize defines the maximum number of messages
// to accumulate before triggering a batch flush
// default: 100
FlushQueueSize int
// FlushInterval specifies the maximum time to wait
// before flushing a batch, even if FlushQueueSize hasn't been reached
// default: 5 seconds
FlushInterval time.Duration
// MaxRetries sets the maximum number of retry attempts
// for failed batch operations
// default: 1
MaxRetries int
// Retry configures the retry strategy (exponential backoff, delays, etc.)
// for failed requests
// default: retry.NewExponentialRetry
Retry retry.Retry
// SendIndividual is a function that determines whether the processor
// needs to send messages one-by-one (individually) if batch request succeeds,
// but some messages in the batch have errors.
// This is useful when there's 1 "bad" message in the batch which fails
// the entire batch.
// default: true
SendIndividual BoolFunc
// MaxBufferSize determines the buffer size of the internal request channel
// to prevent blocking on Add() calls
// default: 2000
MaxBufferSize int
// Async is a function that determines whether batch processing
// should run asynchronously or synchronously
// default: true
Async BoolFunc
// MaxAsyncRequests limits the number of concurrent goroutines
// when processing batches asynchronously.
// default: 50
MaxAsyncRequests int
// Logger provides logging functionality for debugging
// and monitoring batch processing operations
// default: logger.Noop
Logger logger.Logger
// contains filtered or unexported fields
}
type Response ¶
type Response struct {
// Data contains the successful response data from the API operation
// or nil if error occurred
Data any
// OriginalReq holds a reference to the original Message that was processed
OriginalReq Message
// Error contains any error that occurred during processing
// or nil if successful
Error error
// Retry indicates whether this failed request should be retried
// (only relevant when Error is not nil)
Retry bool
}
Response represents the result of processing a batch request, containing both successful results and error information. It maintains a reference to the original request for correlation and includes retry information for error handling.
Usage Example:
// Successful response
response := batch.Response{
Data: apiResponseData,
OriginalReq: originalMessage,
Error: nil,
Retry: false,
}
// Failed response that should be retried
response := batch.Response{
Data: nil,
OriginalReq: originalMessage,
Error: networkError,
Retry: true,
}
type TestBatchHandler ¶
type TestBatchHandler struct{}
func NewTestBatchHandler ¶
func NewTestBatchHandler() *TestBatchHandler
func (*TestBatchHandler) ProcessBatch ¶
func (h *TestBatchHandler) ProcessBatch(batch []Message) ([]Response, error, bool)
func (*TestBatchHandler) ProcessOne ¶
func (h *TestBatchHandler) ProcessOne(message Message) Response