Documentation
¶
Index ¶
- type GooglePubsubConsumer
- type GooglePubsubConsumerConfig
- type GooglePubsubProducer
- type GooglePubsubProducerConfig
- type MessageConsumer
- type MessageProcessor
- type MessageTransformer
- type MockMessageConsumer
- func (m *MockMessageConsumer) Done() <-chan struct{}
- func (m *MockMessageConsumer) GetStartCount() int
- func (m *MockMessageConsumer) GetStopCount() int
- func (m *MockMessageConsumer) Messages() <-chan types.ConsumedMessage
- func (m *MockMessageConsumer) Push(msg types.ConsumedMessage)
- func (m *MockMessageConsumer) SetStartError(err error)
- func (m *MockMessageConsumer) Start(ctx context.Context) error
- func (m *MockMessageConsumer) Stop() error
- type MockMessageProcessor
- func (m *MockMessageProcessor[T]) GetReceived() []*types.BatchedMessage[T]
- func (m *MockMessageProcessor[T]) Input() chan<- *types.BatchedMessage[T]
- func (m *MockMessageProcessor[T]) SetAckOnProcess(b bool)
- func (m *MockMessageProcessor[T]) SetMessageProcessedSignal(ch chan struct{})
- func (m *MockMessageProcessor[T]) SetProcessDelay(d time.Duration)
- func (m *MockMessageProcessor[T]) SetProcessingHook(hook func(msg *types.BatchedMessage[T]))
- func (m *MockMessageProcessor[T]) Start()
- func (m *MockMessageProcessor[T]) Stop()
- type ProcessingService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type GooglePubsubConsumer ¶
type GooglePubsubConsumer struct {
// contains filtered or unexported fields
}
func NewGooglePubsubConsumer ¶
func NewGooglePubsubConsumer(ctx context.Context, cfg *GooglePubsubConsumerConfig, opts []option.ClientOption, logger zerolog.Logger) (*GooglePubsubConsumer, error)
func (*GooglePubsubConsumer) Done ¶
func (c *GooglePubsubConsumer) Done() <-chan struct{}
func (*GooglePubsubConsumer) Messages ¶
func (c *GooglePubsubConsumer) Messages() <-chan types.ConsumedMessage
func (*GooglePubsubConsumer) Start ¶
func (c *GooglePubsubConsumer) Start(ctx context.Context) error
func (*GooglePubsubConsumer) Stop ¶
func (c *GooglePubsubConsumer) Stop() error
type GooglePubsubConsumerConfig ¶
type GooglePubsubConsumerConfig struct { ProjectID string SubscriptionID string CredentialsFile string // Optional MaxOutstandingMessages int NumGoroutines int }
func LoadGooglePubsubConsumerConfigFromEnv ¶
func LoadGooglePubsubConsumerConfigFromEnv() (*GooglePubsubConsumerConfig, error)
LoadGooglePubsubConsumerConfigFromEnv loads consumer configuration from environment variables. Renamed from LoadConsumerConfigFromEnv
type GooglePubsubProducer ¶
type GooglePubsubProducer[T any] struct { // contains filtered or unexported fields }
GooglePubsubProducer implements MessageProcessor for publishing to Google Cloud Pub/Sub.
func NewGooglePubsubProducer ¶
func NewGooglePubsubProducer[T any]( ctx context.Context, client *pubsub.Client, cfg *GooglePubsubProducerConfig, logger zerolog.Logger, ) (*GooglePubsubProducer[T], error)
NewGooglePubsubProducer creates a new GooglePubsubProducer. It takes an existing *pubsub.Client instance, allowing for dependency injection.
func (*GooglePubsubProducer[T]) Done ¶
func (p *GooglePubsubProducer[T]) Done() <-chan struct{}
Done returns a channel that is closed when the producer has fully stopped.
func (*GooglePubsubProducer[T]) Input ¶
func (p *GooglePubsubProducer[T]) Input() chan<- *types.BatchedMessage[T]
Input returns the channel to send messages to the producer.
func (*GooglePubsubProducer[T]) Start ¶
func (p *GooglePubsubProducer[T]) Start()
Start initiates the producer's internal processing loop.
func (*GooglePubsubProducer[T]) Stop ¶
func (p *GooglePubsubProducer[T]) Stop()
Stop gracefully shuts down the producer.
type GooglePubsubProducerConfig ¶
type GooglePubsubProducerConfig struct { ProjectID string TopicID string BatchSize int // New: for internal buffering BatchDelay time.Duration // New: for internal buffering }
GooglePubsubProducerConfig holds configuration for the Google Pub/Sub producer.
func LoadGooglePubsubProducerConfigFromEnv ¶
func LoadGooglePubsubProducerConfigFromEnv() (*GooglePubsubProducerConfig, error)
LoadGooglePubsubProducerConfigFromEnv loads producer configuration from environment variables.
type MessageConsumer ¶
type MessageConsumer interface { // Messages returns a read-only channel from which raw messages can be consumed. Messages() <-chan types.ConsumedMessage // Use the locally defined type. // Start initiates the consumption of messages. Start(ctx context.Context) error // Stop gracefully ceases message consumption. Stop() error // Done returns a channel that is closed when the consumer has fully stopped. Done() <-chan struct{} }
MessageConsumer defines the interface for a message source (e.g., Pub/Sub, Kafka). It is responsible for fetching raw messages from the broker.
type MessageProcessor ¶
type MessageProcessor[T any] interface { // Input returns a write-only channel for sending transformed messages to the processor. Input() chan<- *types.BatchedMessage[T] // Start begins the processor's operations (e.g., its batching worker). Start() // Stop gracefully shuts down the processor, ensuring any buffered items are handled. Stop() }
MessageProcessor defines the contract for any component that receives and handles transformed messages. Both `bqstore.BatchInserter` and `icestore.Batcher` are perfect implementations of this interface.
type MessageTransformer ¶
type MessageTransformer[T any] func(msg types.ConsumedMessage) (payload *T, skip bool, err error)
MessageTransformer defines a function that transforms a whole ConsumedMessage into a structured payload of type T. This new, more powerful interface REPLACES the old `PayloadDecoder`. It provides access to all message metadata, like PublishTime, enabling more robust processing logic.
It returns the transformed payload, a boolean to indicate if the message should be skipped, and an error if the transformation fails.
type MockMessageConsumer ¶
type MockMessageConsumer struct {
// contains filtered or unexported fields
}
MockMessageConsumer is a mock implementation of the MessageConsumer interface. It is designed to be used in unit tests to simulate a message source.
func NewMockMessageConsumer ¶
func NewMockMessageConsumer(bufferSize int) *MockMessageConsumer
NewMockMessageConsumer creates a new mock consumer with a buffered channel.
func (*MockMessageConsumer) Done ¶
func (m *MockMessageConsumer) Done() <-chan struct{}
Done returns the channel that signals when the consumer has fully stopped.
func (*MockMessageConsumer) GetStartCount ¶
func (m *MockMessageConsumer) GetStartCount() int
GetStartCount returns the number of times Start() was called.
func (*MockMessageConsumer) GetStopCount ¶
func (m *MockMessageConsumer) GetStopCount() int
GetStopCount returns the number of times Stop() was called.
func (*MockMessageConsumer) Messages ¶
func (m *MockMessageConsumer) Messages() <-chan types.ConsumedMessage
Messages returns the read-only channel for consuming messages.
func (*MockMessageConsumer) Push ¶
func (m *MockMessageConsumer) Push(msg types.ConsumedMessage)
Push is a test helper to inject a message into the mock consumer's channel.
func (*MockMessageConsumer) SetStartError ¶
func (m *MockMessageConsumer) SetStartError(err error)
SetStartError configures the mock to return an error on Start().
func (*MockMessageConsumer) Start ¶
func (m *MockMessageConsumer) Start(ctx context.Context) error
Start simulates the startup of a real consumer.
func (*MockMessageConsumer) Stop ¶
func (m *MockMessageConsumer) Stop() error
Stop gracefully closes the message and done channels. FIX: This now correctly simulates a real consumer by draining its internal buffer and Nacking any outstanding messages upon shutdown.
type MockMessageProcessor ¶
type MockMessageProcessor[T any] struct { InputChan chan *types.BatchedMessage[T] Received []*types.BatchedMessage[T] // contains filtered or unexported fields }
MockMessageProcessor is a mock implementation of the MessageProcessor interface.
func NewMockMessageProcessor ¶
func NewMockMessageProcessor[T any](bufferSize int) *MockMessageProcessor[T]
NewMockMessageProcessor creates a new mock processor.
func (*MockMessageProcessor[T]) GetReceived ¶
func (m *MockMessageProcessor[T]) GetReceived() []*types.BatchedMessage[T]
GetReceived returns a copy of the messages received by the processor.
func (*MockMessageProcessor[T]) Input ¶
func (m *MockMessageProcessor[T]) Input() chan<- *types.BatchedMessage[T]
Input returns the write-only channel for sending messages to the processor.
func (*MockMessageProcessor[T]) SetAckOnProcess ¶
func (m *MockMessageProcessor[T]) SetAckOnProcess(b bool)
SetAckOnProcess configures the mock processor to call Ack() on the original message.
func (*MockMessageProcessor[T]) SetMessageProcessedSignal ¶
func (m *MockMessageProcessor[T]) SetMessageProcessedSignal(ch chan struct{})
SetMessageProcessedSignal sets a channel that will be signaled each time a message is processed.
func (*MockMessageProcessor[T]) SetProcessDelay ¶
func (m *MockMessageProcessor[T]) SetProcessDelay(d time.Duration)
SetProcessDelay introduces a delay for every message processed.
func (*MockMessageProcessor[T]) SetProcessingHook ¶
func (m *MockMessageProcessor[T]) SetProcessingHook(hook func(msg *types.BatchedMessage[T]))
SetProcessingHook sets a function to be called for each message processed.
func (*MockMessageProcessor[T]) Start ¶
func (m *MockMessageProcessor[T]) Start()
Start begins the processor's operations.
func (*MockMessageProcessor[T]) Stop ¶
func (m *MockMessageProcessor[T]) Stop()
Stop gracefully shuts down the processor.
type ProcessingService ¶
type ProcessingService[T any] struct { // contains filtered or unexported fields }
ProcessingService orchestrates the pipeline of consuming, transforming, and processing messages. It is generic and can be used with any combination of consumers and processors.
func NewProcessingService ¶
func NewProcessingService[T any]( numWorkers int, consumer MessageConsumer, processor MessageProcessor[T], transformer MessageTransformer[T], logger zerolog.Logger, ) (*ProcessingService[T], error)
NewProcessingService creates a new, generic ProcessingService. It requires a consumer to get messages, a transformer to give them structure, and a processor to handle the structured data.
func (*ProcessingService[T]) Start ¶
func (s *ProcessingService[T]) Start() error
Start begins the service operation. It starts the processor and the consumer, then spins up a pool of workers to process messages.
func (*ProcessingService[T]) Stop ¶
func (s *ProcessingService[T]) Stop()
Stop gracefully shuts down the entire service in the correct order.