messagepipeline

package
v0.0.0-...-c69eee4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2025 License: Apache-2.0 Imports: 13 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GooglePubsubConsumer

type GooglePubsubConsumer struct {
	// contains filtered or unexported fields
}

func (*GooglePubsubConsumer) Done

func (c *GooglePubsubConsumer) Done() <-chan struct{}

func (*GooglePubsubConsumer) Messages

func (c *GooglePubsubConsumer) Messages() <-chan types.ConsumedMessage

func (*GooglePubsubConsumer) Start

func (c *GooglePubsubConsumer) Start(ctx context.Context) error

func (*GooglePubsubConsumer) Stop

func (c *GooglePubsubConsumer) Stop() error

type GooglePubsubConsumerConfig

type GooglePubsubConsumerConfig struct {
	ProjectID              string
	SubscriptionID         string
	CredentialsFile        string // Optional
	MaxOutstandingMessages int
	NumGoroutines          int
}

func LoadGooglePubsubConsumerConfigFromEnv

func LoadGooglePubsubConsumerConfigFromEnv() (*GooglePubsubConsumerConfig, error)

LoadGooglePubsubConsumerConfigFromEnv loads consumer configuration from environment variables. Renamed from LoadConsumerConfigFromEnv

type GooglePubsubProducer

type GooglePubsubProducer[T any] struct {
	// contains filtered or unexported fields
}

GooglePubsubProducer implements MessageProcessor for publishing to Google Cloud Pub/Sub.

func NewGooglePubsubProducer

func NewGooglePubsubProducer[T any](
	ctx context.Context,
	client *pubsub.Client,
	cfg *GooglePubsubProducerConfig,
	logger zerolog.Logger,
) (*GooglePubsubProducer[T], error)

NewGooglePubsubProducer creates a new GooglePubsubProducer. It takes an existing *pubsub.Client instance, allowing for dependency injection.

func (*GooglePubsubProducer[T]) Done

func (p *GooglePubsubProducer[T]) Done() <-chan struct{}

Done returns a channel that is closed when the producer has fully stopped.

func (*GooglePubsubProducer[T]) Input

func (p *GooglePubsubProducer[T]) Input() chan<- *types.BatchedMessage[T]

Input returns the channel to send messages to the producer.

func (*GooglePubsubProducer[T]) Start

func (p *GooglePubsubProducer[T]) Start()

Start initiates the producer's internal processing loop.

func (*GooglePubsubProducer[T]) Stop

func (p *GooglePubsubProducer[T]) Stop()

Stop gracefully shuts down the producer.

type GooglePubsubProducerConfig

type GooglePubsubProducerConfig struct {
	ProjectID  string
	TopicID    string
	BatchSize  int           // New: for internal buffering
	BatchDelay time.Duration // New: for internal buffering
}

GooglePubsubProducerConfig holds configuration for the Google Pub/Sub producer.

func LoadGooglePubsubProducerConfigFromEnv

func LoadGooglePubsubProducerConfigFromEnv() (*GooglePubsubProducerConfig, error)

LoadGooglePubsubProducerConfigFromEnv loads producer configuration from environment variables.

type MessageConsumer

type MessageConsumer interface {
	// Messages returns a read-only channel from which raw messages can be consumed.
	Messages() <-chan types.ConsumedMessage // Use the locally defined type.
	// Start initiates the consumption of messages.
	Start(ctx context.Context) error
	// Stop gracefully ceases message consumption.
	Stop() error
	// Done returns a channel that is closed when the consumer has fully stopped.
	Done() <-chan struct{}
}

MessageConsumer defines the interface for a message source (e.g., Pub/Sub, Kafka). It is responsible for fetching raw messages from the broker.

type MessageProcessor

type MessageProcessor[T any] interface {
	// Input returns a write-only channel for sending transformed messages to the processor.
	Input() chan<- *types.BatchedMessage[T]
	// Start begins the processor's operations (e.g., its batching worker).
	Start()
	// Stop gracefully shuts down the processor, ensuring any buffered items are handled.
	Stop()
}

MessageProcessor defines the contract for any component that receives and handles transformed messages. Both `bqstore.BatchInserter` and `icestore.Batcher` are perfect implementations of this interface.

type MessageTransformer

type MessageTransformer[T any] func(msg types.ConsumedMessage) (payload *T, skip bool, err error)

MessageTransformer defines a function that transforms a whole ConsumedMessage into a structured payload of type T. This new, more powerful interface REPLACES the old `PayloadDecoder`. It provides access to all message metadata, like PublishTime, enabling more robust processing logic.

It returns the transformed payload, a boolean to indicate if the message should be skipped, and an error if the transformation fails.

type MockMessageConsumer

type MockMessageConsumer struct {
	// contains filtered or unexported fields
}

MockMessageConsumer is a mock implementation of the MessageConsumer interface. It is designed to be used in unit tests to simulate a message source.

func NewMockMessageConsumer

func NewMockMessageConsumer(bufferSize int) *MockMessageConsumer

NewMockMessageConsumer creates a new mock consumer with a buffered channel.

func (*MockMessageConsumer) Done

func (m *MockMessageConsumer) Done() <-chan struct{}

Done returns the channel that signals when the consumer has fully stopped.

func (*MockMessageConsumer) GetStartCount

func (m *MockMessageConsumer) GetStartCount() int

GetStartCount returns the number of times Start() was called.

func (*MockMessageConsumer) GetStopCount

func (m *MockMessageConsumer) GetStopCount() int

GetStopCount returns the number of times Stop() was called.

func (*MockMessageConsumer) Messages

func (m *MockMessageConsumer) Messages() <-chan types.ConsumedMessage

Messages returns the read-only channel for consuming messages.

func (*MockMessageConsumer) Push

Push is a test helper to inject a message into the mock consumer's channel.

func (*MockMessageConsumer) SetStartError

func (m *MockMessageConsumer) SetStartError(err error)

SetStartError configures the mock to return an error on Start().

func (*MockMessageConsumer) Start

func (m *MockMessageConsumer) Start(ctx context.Context) error

Start simulates the startup of a real consumer.

func (*MockMessageConsumer) Stop

func (m *MockMessageConsumer) Stop() error

Stop gracefully closes the message and done channels. FIX: This now correctly simulates a real consumer by draining its internal buffer and Nacking any outstanding messages upon shutdown.

type MockMessageProcessor

type MockMessageProcessor[T any] struct {
	InputChan chan *types.BatchedMessage[T]
	Received  []*types.BatchedMessage[T]
	// contains filtered or unexported fields
}

MockMessageProcessor is a mock implementation of the MessageProcessor interface.

func NewMockMessageProcessor

func NewMockMessageProcessor[T any](bufferSize int) *MockMessageProcessor[T]

NewMockMessageProcessor creates a new mock processor.

func (*MockMessageProcessor[T]) GetReceived

func (m *MockMessageProcessor[T]) GetReceived() []*types.BatchedMessage[T]

GetReceived returns a copy of the messages received by the processor.

func (*MockMessageProcessor[T]) Input

func (m *MockMessageProcessor[T]) Input() chan<- *types.BatchedMessage[T]

Input returns the write-only channel for sending messages to the processor.

func (*MockMessageProcessor[T]) SetAckOnProcess

func (m *MockMessageProcessor[T]) SetAckOnProcess(b bool)

SetAckOnProcess configures the mock processor to call Ack() on the original message.

func (*MockMessageProcessor[T]) SetMessageProcessedSignal

func (m *MockMessageProcessor[T]) SetMessageProcessedSignal(ch chan struct{})

SetMessageProcessedSignal sets a channel that will be signaled each time a message is processed.

func (*MockMessageProcessor[T]) SetProcessDelay

func (m *MockMessageProcessor[T]) SetProcessDelay(d time.Duration)

SetProcessDelay introduces a delay for every message processed.

func (*MockMessageProcessor[T]) SetProcessingHook

func (m *MockMessageProcessor[T]) SetProcessingHook(hook func(msg *types.BatchedMessage[T]))

SetProcessingHook sets a function to be called for each message processed.

func (*MockMessageProcessor[T]) Start

func (m *MockMessageProcessor[T]) Start()

Start begins the processor's operations.

func (*MockMessageProcessor[T]) Stop

func (m *MockMessageProcessor[T]) Stop()

Stop gracefully shuts down the processor.

type ProcessingService

type ProcessingService[T any] struct {
	// contains filtered or unexported fields
}

ProcessingService orchestrates the pipeline of consuming, transforming, and processing messages. It is generic and can be used with any combination of consumers and processors.

func NewProcessingService

func NewProcessingService[T any](
	numWorkers int,
	consumer MessageConsumer,
	processor MessageProcessor[T],
	transformer MessageTransformer[T],
	logger zerolog.Logger,
) (*ProcessingService[T], error)

NewProcessingService creates a new, generic ProcessingService. It requires a consumer to get messages, a transformer to give them structure, and a processor to handle the structured data.

func (*ProcessingService[T]) Start

func (s *ProcessingService[T]) Start() error

Start begins the service operation. It starts the processor and the consumer, then spins up a pool of workers to process messages.

func (*ProcessingService[T]) Stop

func (s *ProcessingService[T]) Stop()

Stop gracefully shuts down the entire service in the correct order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL