Documentation
¶
Overview ¶
Package datapipe provides a generic, extensible framework for building multi-source data fetching pipelines with built-in support for:
- Multi-source fetching: Chain multiple data sources with intelligent fallback
- Parallel enrichment: Concurrent data augmentation from multiple services
- Pipeline pattern: Process data through discrete, composable stages
- Observability: Built-in hooks for metrics, tracing, and logging
For more information, see https://github.com/nulllvoid/datapipe
Index ¶
- Variables
- type AggregateMode
- type AuthenticatedRequest
- type BaseEnricher
- type BaseFetcher
- type CollectionResponse
- type CompositeEnricher
- type CompositeEnricherOption
- type Config
- type DefaultResponseBuilder
- type EnrichStage
- type Enricher
- type EnricherOption
- type EnrichmentError
- type Entity
- type ExpandableRequest
- type FallbackMode
- type FetchError
- type FetchStage
- type Fetcher
- type FetcherChain
- type FetcherChainOption
- func ChainWithAggregateMode[T Entity, Req Request](mode AggregateMode) FetcherChainOption[T, Req]
- func ChainWithFallbackMode[T Entity, Req Request](mode FallbackMode) FetcherChainOption[T, Req]
- func ChainWithFetcher[T Entity, Req Request](f Fetcher[T, Req]) FetcherChainOption[T, Req]
- func ChainWithMinResults[T Entity, Req Request](min int) FetcherChainOption[T, Req]
- type FilterableRequest
- type FunctionalStage
- type Logger
- type Metrics
- type Middleware
- type NoopMetrics
- type Option
- func WithConfig[T Entity, Req Request, Resp Response[T]](cfg Config) Option[T, Req, Resp]
- func WithEnricherComposite[T Entity, Req Request, Resp Response[T]](enricher *CompositeEnricher[T]) Option[T, Req, Resp]
- func WithEnrichers[T Entity, Req Request, Resp Response[T]](enrichers ...Enricher[T]) Option[T, Req, Resp]
- func WithFailFast[T Entity, Req Request, Resp Response[T]](enabled bool) Option[T, Req, Resp]
- func WithFetcherChain[T Entity, Req Request, Resp Response[T]](chain *FetcherChain[T, Req]) Option[T, Req, Resp]
- func WithFetchers[T Entity, Req Request, Resp Response[T]](fetchers ...Fetcher[T, Req]) Option[T, Req, Resp]
- func WithFetchersAndMode[T Entity, Req Request, Resp Response[T]](mode FallbackMode, fetchers ...Fetcher[T, Req]) Option[T, Req, Resp]
- func WithMetrics[T Entity, Req Request, Resp Response[T]](m Metrics) Option[T, Req, Resp]
- func WithMiddleware[T Entity, Req Request, Resp Response[T]](m Middleware[T, Req]) Option[T, Req, Resp]
- func WithParallelEnrichment[T Entity, Req Request, Resp Response[T]](parallel bool, maxWorkers int, enrichers ...Enricher[T]) Option[T, Req, Resp]
- func WithResponseBuilder[T Entity, Req Request, Resp Response[T]](builder ResponseBuilder[T, Req, Resp]) Option[T, Req, Resp]
- func WithStage[T Entity, Req Request, Resp Response[T]](stage Stage[T, Req]) Option[T, Req, Resp]
- func WithTimeout[T Entity, Req Request, Resp Response[T]](d time.Duration) Option[T, Req, Resp]
- func WithValidation[T Entity, Req Request, Resp Response[T]](fn func(Req) error) Option[T, Req, Resp]
- type Pipeline
- func (p *Pipeline[T, Req, Resp]) AddStage(stage Stage[T, Req])
- func (p *Pipeline[T, Req, Resp]) Execute(ctx context.Context, req Req) (Resp, error)
- func (p *Pipeline[T, Req, Resp]) Name() string
- func (p *Pipeline[T, Req, Resp]) StageCount() int
- func (p *Pipeline[T, Req, Resp]) Use(m Middleware[T, Req])
- type PipelineError
- type Request
- type Response
- type ResponseBuilder
- type SearchableRequest
- type SortOrder
- type SortableRequest
- type Stage
- type StageFunc
- type State
- func (s *State[T, Req]) AddError(err error)
- func (s *State[T, Req]) AppendResults(results []T)
- func (s *State[T, Req]) Errors() []error
- func (s *State[T, Req]) GetMetadata(key string) (any, bool)
- func (s *State[T, Req]) HasErrors() bool
- func (s *State[T, Req]) HasMore() bool
- func (s *State[T, Req]) Request() Req
- func (s *State[T, Req]) ResultCount() int
- func (s *State[T, Req]) Results() []T
- func (s *State[T, Req]) SetMetadata(key string, value any)
- func (s *State[T, Req]) SetRequest(req Req)
- func (s *State[T, Req]) SetResults(results []T, hasMore bool, source string)
- func (s *State[T, Req]) Source() string
- type ValidationError
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNoFetcherAvailable = errors.New("no fetcher available for request") ErrValidationFailed = errors.New("validation failed") ErrFetchFailed = errors.New("fetch operation failed") ErrEnrichmentFailed = errors.New("enrichment failed") ErrTimeout = errors.New("operation timed out") ErrCanceled = errors.New("operation canceled") )
Functions ¶
This section is empty.
Types ¶
type AggregateMode ¶
type AggregateMode int
const ( AggregateFirst AggregateMode = iota AggregateMerge AggregateDedupe )
type AuthenticatedRequest ¶
type BaseEnricher ¶
type BaseEnricher[T Entity] struct { // contains filtered or unexported fields }
func NewBaseEnricher ¶
func NewBaseEnricher[T Entity](name string, opts ...EnricherOption[T]) BaseEnricher[T]
func (BaseEnricher[T]) AllowedAuths ¶
func (e BaseEnricher[T]) AllowedAuths() []string
func (BaseEnricher[T]) CanEnrich ¶
func (e BaseEnricher[T]) CanEnrich(authType string, expandParams []string) bool
func (BaseEnricher[T]) ExpandKey ¶
func (e BaseEnricher[T]) ExpandKey() string
func (BaseEnricher[T]) Name ¶
func (e BaseEnricher[T]) Name() string
func (BaseEnricher[T]) Required ¶
func (e BaseEnricher[T]) Required() bool
type BaseFetcher ¶
func NewBaseFetcher ¶
func NewBaseFetcher[T Entity, Req Request](name string, priority int) BaseFetcher[T, Req]
func (BaseFetcher[T, Req]) CanHandle ¶
func (f BaseFetcher[T, Req]) CanHandle(Req) bool
func (BaseFetcher[T, Req]) Name ¶
func (f BaseFetcher[T, Req]) Name() string
func (BaseFetcher[T, Req]) Priority ¶
func (f BaseFetcher[T, Req]) Priority() int
type CollectionResponse ¶
type CollectionResponse[T Entity] struct { Entity string `json:"entity"` Count int `json:"count"` More bool `json:"has_more"` Items []T `json:"items"` }
func NewCollectionResponse ¶
func NewCollectionResponse[T Entity](items []T, hasMore bool) *CollectionResponse[T]
func (*CollectionResponse[T]) GetCount ¶
func (r *CollectionResponse[T]) GetCount() int
func (*CollectionResponse[T]) GetItems ¶
func (r *CollectionResponse[T]) GetItems() []T
func (*CollectionResponse[T]) HasMore ¶
func (r *CollectionResponse[T]) HasMore() bool
type CompositeEnricher ¶
type CompositeEnricher[T Entity] struct { // contains filtered or unexported fields }
func NewCompositeEnricher ¶
func NewCompositeEnricher[T Entity](opts ...CompositeEnricherOption[T]) *CompositeEnricher[T]
func (*CompositeEnricher[T]) EnricherCount ¶
func (c *CompositeEnricher[T]) EnricherCount() int
func (*CompositeEnricher[T]) IsParallel ¶
func (c *CompositeEnricher[T]) IsParallel() bool
type CompositeEnricherOption ¶
type CompositeEnricherOption[T Entity] func(*CompositeEnricher[T])
func CompositeWithEnricher ¶
func CompositeWithEnricher[T Entity](e Enricher[T]) CompositeEnricherOption[T]
func CompositeWithMaxWorkers ¶
func CompositeWithMaxWorkers[T Entity](max int) CompositeEnricherOption[T]
func CompositeWithParallel ¶
func CompositeWithParallel[T Entity](parallel bool) CompositeEnricherOption[T]
type Config ¶
func DefaultConfig ¶
func DefaultConfig() Config
type DefaultResponseBuilder ¶
func (*DefaultResponseBuilder[T, Req]) Build ¶
func (b *DefaultResponseBuilder[T, Req]) Build(ctx context.Context, state *State[T, Req]) (*CollectionResponse[T], error)
type EnrichStage ¶
func NewEnrichStage ¶
func NewEnrichStage[T Entity, Req Request](enricher *CompositeEnricher[T]) *EnrichStage[T, Req]
func (*EnrichStage[T, Req]) Execute ¶
func (s *EnrichStage[T, Req]) Execute(ctx context.Context, state *State[T, Req]) error
func (*EnrichStage[T, Req]) Name ¶
func (s *EnrichStage[T, Req]) Name() string
func (*EnrichStage[T, Req]) Required ¶
func (s *EnrichStage[T, Req]) Required() bool
type EnricherOption ¶
type EnricherOption[T Entity] func(*BaseEnricher[T])
func EnricherWithAllowedAuths ¶
func EnricherWithAllowedAuths[T Entity](auths ...string) EnricherOption[T]
func EnricherWithExpandKey ¶
func EnricherWithExpandKey[T Entity](key string) EnricherOption[T]
func EnricherWithRequired ¶
func EnricherWithRequired[T Entity](required bool) EnricherOption[T]
type EnrichmentError ¶
func NewEnrichmentError ¶
func NewEnrichmentError(enricher, message string, err error) *EnrichmentError
func (*EnrichmentError) Error ¶
func (e *EnrichmentError) Error() string
func (*EnrichmentError) Unwrap ¶
func (e *EnrichmentError) Unwrap() error
type ExpandableRequest ¶
type FallbackMode ¶
type FallbackMode int
const ( FallbackSequential FallbackMode = iota FallbackFirstSuccess FallbackParallel )
type FetchError ¶
func NewFetchError ¶
func NewFetchError(source, message string, err error) *FetchError
func (*FetchError) Error ¶
func (e *FetchError) Error() string
func (*FetchError) Unwrap ¶
func (e *FetchError) Unwrap() error
type FetchStage ¶
func NewFetchStage ¶
func NewFetchStage[T Entity, Req Request](chain *FetcherChain[T, Req]) *FetchStage[T, Req]
func (*FetchStage[T, Req]) Execute ¶
func (s *FetchStage[T, Req]) Execute(ctx context.Context, state *State[T, Req]) error
func (*FetchStage[T, Req]) Name ¶
func (s *FetchStage[T, Req]) Name() string
func (*FetchStage[T, Req]) Required ¶
func (s *FetchStage[T, Req]) Required() bool
type FetcherChain ¶
func NewFetcherChain ¶
func NewFetcherChain[T Entity, Req Request](opts ...FetcherChainOption[T, Req]) *FetcherChain[T, Req]
func (*FetcherChain[T, Req]) FetcherCount ¶
func (c *FetcherChain[T, Req]) FetcherCount() int
type FetcherChainOption ¶
type FetcherChainOption[T Entity, Req Request] func(*FetcherChain[T, Req])
func ChainWithAggregateMode ¶
func ChainWithAggregateMode[T Entity, Req Request](mode AggregateMode) FetcherChainOption[T, Req]
func ChainWithFallbackMode ¶
func ChainWithFallbackMode[T Entity, Req Request](mode FallbackMode) FetcherChainOption[T, Req]
func ChainWithFetcher ¶
func ChainWithFetcher[T Entity, Req Request](f Fetcher[T, Req]) FetcherChainOption[T, Req]
func ChainWithMinResults ¶
func ChainWithMinResults[T Entity, Req Request](min int) FetcherChainOption[T, Req]
type FilterableRequest ¶
type FunctionalStage ¶
func (*FunctionalStage[T, Req]) Execute ¶
func (s *FunctionalStage[T, Req]) Execute(ctx context.Context, state *State[T, Req]) error
func (*FunctionalStage[T, Req]) Name ¶
func (s *FunctionalStage[T, Req]) Name() string
func (*FunctionalStage[T, Req]) Required ¶
func (s *FunctionalStage[T, Req]) Required() bool
type Middleware ¶
type Middleware[T Entity, Req Request] func(stageName string, next StageFunc[T, Req]) StageFunc[T, Req]
func LoggingMiddleware ¶
func LoggingMiddleware[T Entity, Req Request](logger Logger) Middleware[T, Req]
func RecoveryMiddleware ¶
func RecoveryMiddleware[T Entity, Req Request]() Middleware[T, Req]
type NoopMetrics ¶
type NoopMetrics struct{}
func (NoopMetrics) RecordError ¶
func (NoopMetrics) RecordError(string, string, string)
func (NoopMetrics) RecordFetchSource ¶
func (NoopMetrics) RecordFetchSource(string, string)
func (NoopMetrics) RecordResultCount ¶
func (NoopMetrics) RecordResultCount(string, int)
func (NoopMetrics) RecordStageDuration ¶
func (NoopMetrics) RecordStageDuration(string, string, time.Duration)
type Option ¶
func WithConfig ¶
func WithEnricherComposite ¶
func WithEnricherComposite[T Entity, Req Request, Resp Response[T]](enricher *CompositeEnricher[T]) Option[T, Req, Resp]
func WithEnrichers ¶
func WithFailFast ¶
func WithFetcherChain ¶
func WithFetcherChain[T Entity, Req Request, Resp Response[T]](chain *FetcherChain[T, Req]) Option[T, Req, Resp]
func WithFetchers ¶
func WithFetchersAndMode ¶
func WithMetrics ¶
func WithMiddleware ¶
func WithMiddleware[T Entity, Req Request, Resp Response[T]](m Middleware[T, Req]) Option[T, Req, Resp]
func WithParallelEnrichment ¶
func WithResponseBuilder ¶
func WithResponseBuilder[T Entity, Req Request, Resp Response[T]](builder ResponseBuilder[T, Req, Resp]) Option[T, Req, Resp]
func WithTimeout ¶
type Pipeline ¶
type Pipeline[T Entity, Req Request, Resp Response[T]] struct { // contains filtered or unexported fields }
func (*Pipeline[T, Req, Resp]) StageCount ¶
func (*Pipeline[T, Req, Resp]) Use ¶
func (p *Pipeline[T, Req, Resp]) Use(m Middleware[T, Req])
type PipelineError ¶
func NewPipelineError ¶
func NewPipelineError(pipeline, stage, op string, err error) *PipelineError
func (*PipelineError) Error ¶
func (e *PipelineError) Error() string
func (*PipelineError) Unwrap ¶
func (e *PipelineError) Unwrap() error
type ResponseBuilder ¶
type SearchableRequest ¶
type SortableRequest ¶
type State ¶
func (*State[T, Req]) AppendResults ¶
func (s *State[T, Req]) AppendResults(results []T)
func (*State[T, Req]) ResultCount ¶
func (*State[T, Req]) SetMetadata ¶
func (*State[T, Req]) SetRequest ¶
func (s *State[T, Req]) SetRequest(req Req)
func (*State[T, Req]) SetResults ¶
type ValidationError ¶
func NewValidationError ¶
func NewValidationError(field, message string) *ValidationError
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.