Documentation ¶
Index ¶
Constants ¶
const (
// DefaultHealthCheckPort is the default port for checking sync pool health.
DefaultHealthCheckPort = 8080
)
Variables ¶
var ( DefaultCEClientOpts = []ceclient.Option{ ceclient.WithUUIDs(), ceclient.WithTimeNow(), ceclient.WithTracePropagation(), } DefaultHTTPClient = &http.Client{ Transport: &ochttp.Transport{ Base: &http.Transport{ MaxIdleConns: 1000, MaxIdleConnsPerHost: 500, MaxConnsPerHost: 500, IdleConnTimeout: 30 * time.Second, }, Propagation: tracecontextb3.TraceContextEgress, }, } // ProviderSet provides the fanout and retry sync pools using the default client options. In // order to inject either pool, ProjectID, []Option, and config.ReadOnlyTargets must be // externally provided. ProviderSet = wire.NewSet( NewFanoutPool, NewRetryPool, clients.NewPubsubClient, NewRetryClient, wire.Value(DefaultHTTPClient), wire.Value(DefaultCEClientOpts), ) )
Functions ¶
This section is empty.
Types ¶
type FanoutPool ¶
type FanoutPool struct {
// contains filtered or unexported fields
}
FanoutPool is the sync pool for fanout handlers. For each broker in the config, it will attempt to create a handler. It will also stop/delete the handler if the corresponding broker is deleted in the config.
func InitializeTestFanoutPool ¶
func InitializeTestFanoutPool(ctx context.Context, podName metrics.PodName, containerName metrics.ContainerName, targets config.ReadonlyTargets, pubsubClient *pubsub.Client, opts ...Option) (*FanoutPool, error)
func NewFanoutPool ¶
func NewFanoutPool( targets config.ReadonlyTargets, pubsubClient *pubsub.Client, deliverClient *http.Client, retryClient RetryClient, statsReporter *metrics.DeliveryReporter, opts ...Option, ) (*FanoutPool, error)
NewFanoutPool creates a new fanout handler pool.
type Handler ¶
type Handler struct { // Subscription is the pubsub subscription that messages will be // received from. Subscription *pubsub.Subscription // Processor is the processor to process events. Processor processors.Interface // Timeout is the timeout for processing each individual event. Timeout time.Duration // contains filtered or unexported fields }
Handler pulls Pubsub messages as events and processes them with chain of processors.
func NewHandler ¶
func NewHandler( sub *pubsub.Subscription, processor processors.Interface, timeout time.Duration, ) *Handler
NewHandler creates a new Handler.
type Option ¶
type Option func(*Options)
Option is for providing individual option.
func WithDeliveryTimeout ¶
WithDeliveryTimeout sets the DeliveryTimeout.
func WithHandlerConcurrency ¶
WithHandlerConcurrency sets HandlerConcurrency.
func WithMaxConcurrentPerEvent ¶
WithMaxConcurrentPerEvent sets MaxConcurrencyPerEvent.
func WithPubsubReceiveSettings ¶
func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option
WithPubsubReceiveSettings sets PubsubReceiveSettings.
func WithTimeoutPerEvent ¶
WithTimeoutPerEvent sets TimeoutPerEvent.
type Options ¶
type Options struct { // HandlerConcurrency is the number of goroutines // will be spawned in each handler. HandlerConcurrency int // MaxConcurrencyPerEvent is the max number of goroutines // will be spawned to handle an event. MaxConcurrencyPerEvent int // TimeoutPerEvent is the timeout for handling an event. TimeoutPerEvent time.Duration // DeliveryTimeout is the timeout for delivering an event to a consumer. DeliveryTimeout time.Duration // PubsubReceiveSettings is the pubsub receive settings. PubsubReceiveSettings pubsub.ReceiveSettings }
Options holds all the options for create handler pool.
type RetryClient ¶
func NewRetryClient ¶
func NewRetryClient(ctx context.Context, client *pubsub.Client, opts ...ceclient.Option) (RetryClient, error)
NewRetryClient provides a retry CE client from a PubSub client and list of CE client options.
type RetryPool ¶
type RetryPool struct {
// contains filtered or unexported fields
}
RetryPool is the sync pool for retry handlers. For each trigger in the config, it will attempt to create a handler. It will also stop/delete the handler if the corresponding trigger is deleted in the config.
func InitializeTestRetryPool ¶
func NewRetryPool ¶
func NewRetryPool( targets config.ReadonlyTargets, pubsubClient *pubsub.Client, deliverClient *http.Client, statsReporter *metrics.DeliveryReporter, opts ...Option) (*RetryPool, error)
NewRetryPool creates a new retry handler pool.