datapipe

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: MIT Imports: 6 Imported by: 0

README

DataPipe

Go Reference Go Report Card CI

A generic, extensible Go package for building multi-source data fetching pipelines with built-in support for fallback, enrichment, and observability.

Features

  • Multi-Source Fetching: Chain multiple data sources with intelligent fallback
  • Fallback Modes: Sequential, FirstSuccess, or Parallel fetching strategies
  • Pipeline Pattern: Process data through discrete, composable stages
  • Generic Types: Full Go 1.21+ generics support for type-safe pipelines
  • Middleware Support: Add cross-cutting concerns like logging and metrics
  • Thread-Safe State: Safe for parallel enrichment operations
  • Observability Ready: Built-in hooks for metrics and tracing

Installation

go get github.com/nulllvoid/datapipe

Requires Go 1.21 or later.

Quick Start

1. Define Your Entity and Request
type User struct {
    ID    string
    Name  string
}

func (u User) GetID() string { return u.ID }

type UserRequest struct {
    Limit  int
    Offset int
}

func (r *UserRequest) GetLimit() int           { return r.Limit }
func (r *UserRequest) GetOffset() int          { return r.Offset }
func (r *UserRequest) Clone() datapipe.Request { copy := *r; return &copy }
2. Create a Fetcher
type DBFetcher struct {
    datapipe.BaseFetcher[User, *UserRequest]
    db *sql.DB
}

func NewDBFetcher(db *sql.DB, priority int) *DBFetcher {
    return &DBFetcher{
        BaseFetcher: datapipe.NewBaseFetcher[User, *UserRequest]("database", priority),
        db:          db,
    }
}

func (f *DBFetcher) Fetch(ctx context.Context, req *UserRequest) ([]User, bool, error) {
    // Fetch from database
    users := []User{{ID: "1", Name: "Alice"}}
    return users, false, nil
}
3. Create a Pipeline
// Create fetchers
primaryDB := NewDBFetcher(primaryConn, 1)   // Priority 1 (tried first)
replicaDB := NewDBFetcher(replicaConn, 2)   // Priority 2 (fallback)

// Create fetcher chain with sequential fallback
chain := datapipe.NewFetcherChain[User, *UserRequest](
    datapipe.ChainWithFetcher(primaryDB),
    datapipe.ChainWithFetcher(replicaDB),
    datapipe.ChainWithFallbackMode(datapipe.FallbackSequential),
)

// Create pipeline
pipeline := datapipe.New[User, *UserRequest, *datapipe.CollectionResponse[User]](
    "user_fetch",
    datapipe.WithFetcherChain(chain),
    datapipe.WithTimeout(5*time.Second),
    datapipe.WithResponseBuilder(&datapipe.DefaultResponseBuilder[User, *UserRequest]{}),
)

// Execute
resp, err := pipeline.Execute(ctx, &UserRequest{Limit: 10})

Fallback Modes

Mode Behavior
FallbackSequential Try fetchers in priority order, continue if insufficient results
FallbackFirstSuccess Use first fetcher that returns any results
FallbackParallel Query all fetchers concurrently, use best by priority
// Sequential: AppDB -> TiDB -> Elasticsearch
chain := datapipe.NewFetcherChain[T, Req](
    datapipe.ChainWithFetcher(appDBFetcher),
    datapipe.ChainWithFetcher(tiDBFetcher),
    datapipe.ChainWithFetcher(esFetcher),
    datapipe.ChainWithFallbackMode(datapipe.FallbackSequential),
)

// Parallel: Query all, use highest priority with results
chain := datapipe.NewFetcherChain[T, Req](
    datapipe.ChainWithFetcher(primaryDB),
    datapipe.ChainWithFetcher(replicaDB),
    datapipe.ChainWithFallbackMode(datapipe.FallbackParallel),
)

Conditional Fetching with CanHandle

Route requests to appropriate fetchers:

type ESFetcher struct {
    datapipe.BaseFetcher[Payout, *PayoutRequest]
}

func (f *ESFetcher) CanHandle(req *PayoutRequest) bool {
    // Only handle search queries
    return req.HasSearchQuery()
}

func (f *ESFetcher) Fetch(ctx context.Context, req *PayoutRequest) ([]Payout, bool, error) {
    // Full-text search in Elasticsearch
}

Enrichment

Add parallel enrichment to augment fetched data from multiple services:

// Define enrichers
fundAccountEnricher := &FundAccountEnricher{
    BaseEnricher: datapipe.NewBaseEnricher[*Payout]("fund_account",
        datapipe.EnricherWithExpandKey("fund_account"),
        datapipe.EnricherWithAllowedAuths("private", "admin"),
    ),
}

userEnricher := &UserEnricher{
    BaseEnricher: datapipe.NewBaseEnricher[*Payout]("user",
        datapipe.EnricherWithExpandKey("user"),
    ),
}

// Create pipeline with enrichment
pipeline := datapipe.New[*Payout, *PayoutRequest, *datapipe.CollectionResponse[*Payout]](
    "payout_fetch",
    datapipe.WithFetchers(dbFetcher),
    datapipe.WithParallelEnrichment(true, 5, fundAccountEnricher, userEnricher),
    datapipe.WithResponseBuilder(...),
)
Enricher Options
Option Description
EnricherWithExpandKey(key) Only enrich when key is in expand params
EnricherWithAllowedAuths(auths...) Only enrich for specific auth types
EnricherWithRequired(bool) If true, enrichment failure stops pipeline

Pipeline Options

pipeline := datapipe.New[User, *UserRequest, *datapipe.CollectionResponse[User]](
    "my_pipeline",
    
    // Fetcher options
    datapipe.WithFetcherChain(chain),
    datapipe.WithFetchers(fetcher1, fetcher2),
    datapipe.WithFetchersAndMode(datapipe.FallbackParallel, fetcher1, fetcher2),
    
    // Enricher options
    datapipe.WithEnrichers(enricher1, enricher2),
    datapipe.WithParallelEnrichment(true, 5, enricher1, enricher2),
    datapipe.WithEnricherComposite(compositeEnricher),
    
    // Validation
    datapipe.WithValidation(validatorFunc),
    
    // Custom stages
    datapipe.WithStage(transformStage),
    
    // Configuration
    datapipe.WithTimeout(5*time.Second),
    datapipe.WithFailFast(true),
    
    // Observability
    datapipe.WithMetrics(metricsImpl),
    datapipe.WithMiddleware(loggingMiddleware),
    
    // Response
    datapipe.WithResponseBuilder(responseBuilder),
)

Examples

See the examples directory:

Current Implementation

Phase 1: Core Foundation
  • Entity, Request, Response interfaces
  • Pipeline with stage execution
  • State management
  • Middleware support
  • Error types
Phase 2: Fetcher System
  • Fetcher interface
  • FetcherChain with fallback modes (Sequential, FirstSuccess, Parallel)
  • FetchStage
  • Pipeline integration
Phase 3: Enrichment System
  • Enricher interface with auth/expand filtering
  • CompositeEnricher with parallel/sequential execution
  • EnrichStage
  • Worker-limited concurrent enrichment
Upcoming Phases
  • Phase 4: ResponseBuilder variations, query specification
  • Phase 5: Advanced middleware
  • Phase 6: Built-in fetchers (ES, DB, Cache)

License

MIT License - see LICENSE for details.

Documentation

Overview

Package datapipe provides a generic, extensible framework for building multi-source data fetching pipelines with built-in support for:

  • Multi-source fetching: Chain multiple data sources with intelligent fallback
  • Parallel enrichment: Concurrent data augmentation from multiple services
  • Pipeline pattern: Process data through discrete, composable stages
  • Observability: Built-in hooks for metrics, tracing, and logging

For more information, see https://github.com/nulllvoid/datapipe

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoFetcherAvailable = errors.New("no fetcher available for request")
	ErrValidationFailed   = errors.New("validation failed")
	ErrFetchFailed        = errors.New("fetch operation failed")
	ErrEnrichmentFailed   = errors.New("enrichment failed")
	ErrTimeout            = errors.New("operation timed out")
	ErrCanceled           = errors.New("operation canceled")
)

Functions

This section is empty.

Types

type AggregateMode

type AggregateMode int
const (
	AggregateFirst AggregateMode = iota
	AggregateMerge
	AggregateDedupe
)

type AuthenticatedRequest

type AuthenticatedRequest interface {
	Request
	GetAuthType() string
}

type BaseEnricher

type BaseEnricher[T Entity] struct {
	// contains filtered or unexported fields
}

func NewBaseEnricher

func NewBaseEnricher[T Entity](name string, opts ...EnricherOption[T]) BaseEnricher[T]

func (BaseEnricher[T]) AllowedAuths

func (e BaseEnricher[T]) AllowedAuths() []string

func (BaseEnricher[T]) CanEnrich

func (e BaseEnricher[T]) CanEnrich(authType string, expandParams []string) bool

func (BaseEnricher[T]) ExpandKey

func (e BaseEnricher[T]) ExpandKey() string

func (BaseEnricher[T]) Name

func (e BaseEnricher[T]) Name() string

func (BaseEnricher[T]) Required

func (e BaseEnricher[T]) Required() bool

type BaseFetcher

type BaseFetcher[T Entity, Req Request] struct {
	// contains filtered or unexported fields
}

func NewBaseFetcher

func NewBaseFetcher[T Entity, Req Request](name string, priority int) BaseFetcher[T, Req]

func (BaseFetcher[T, Req]) CanHandle

func (f BaseFetcher[T, Req]) CanHandle(Req) bool

func (BaseFetcher[T, Req]) Name

func (f BaseFetcher[T, Req]) Name() string

func (BaseFetcher[T, Req]) Priority

func (f BaseFetcher[T, Req]) Priority() int

type CollectionResponse

type CollectionResponse[T Entity] struct {
	Entity string `json:"entity"`
	Count  int    `json:"count"`
	More   bool   `json:"has_more"`
	Items  []T    `json:"items"`
}

func NewCollectionResponse

func NewCollectionResponse[T Entity](items []T, hasMore bool) *CollectionResponse[T]

func (*CollectionResponse[T]) GetCount

func (r *CollectionResponse[T]) GetCount() int

func (*CollectionResponse[T]) GetItems

func (r *CollectionResponse[T]) GetItems() []T

func (*CollectionResponse[T]) HasMore

func (r *CollectionResponse[T]) HasMore() bool

type CompositeEnricher

type CompositeEnricher[T Entity] struct {
	// contains filtered or unexported fields
}

func NewCompositeEnricher

func NewCompositeEnricher[T Entity](opts ...CompositeEnricherOption[T]) *CompositeEnricher[T]

func (*CompositeEnricher[T]) Enrich

func (c *CompositeEnricher[T]) Enrich(ctx context.Context, items []T, authType string, expandParams []string) []error

func (*CompositeEnricher[T]) EnricherCount

func (c *CompositeEnricher[T]) EnricherCount() int

func (*CompositeEnricher[T]) IsParallel

func (c *CompositeEnricher[T]) IsParallel() bool

type CompositeEnricherOption

type CompositeEnricherOption[T Entity] func(*CompositeEnricher[T])

func CompositeWithEnricher

func CompositeWithEnricher[T Entity](e Enricher[T]) CompositeEnricherOption[T]

func CompositeWithMaxWorkers

func CompositeWithMaxWorkers[T Entity](max int) CompositeEnricherOption[T]

func CompositeWithParallel

func CompositeWithParallel[T Entity](parallel bool) CompositeEnricherOption[T]

type Config

type Config struct {
	Timeout    time.Duration
	FailFast   bool
	MaxRetries int
	RetryDelay time.Duration
}

func DefaultConfig

func DefaultConfig() Config

type DefaultResponseBuilder

type DefaultResponseBuilder[T Entity, Req Request] struct{}

func (*DefaultResponseBuilder[T, Req]) Build

func (b *DefaultResponseBuilder[T, Req]) Build(ctx context.Context, state *State[T, Req]) (*CollectionResponse[T], error)

type EnrichStage

type EnrichStage[T Entity, Req Request] struct {
	// contains filtered or unexported fields
}

func NewEnrichStage

func NewEnrichStage[T Entity, Req Request](enricher *CompositeEnricher[T]) *EnrichStage[T, Req]

func (*EnrichStage[T, Req]) Execute

func (s *EnrichStage[T, Req]) Execute(ctx context.Context, state *State[T, Req]) error

func (*EnrichStage[T, Req]) Name

func (s *EnrichStage[T, Req]) Name() string

func (*EnrichStage[T, Req]) Required

func (s *EnrichStage[T, Req]) Required() bool

type Enricher

type Enricher[T Entity] interface {
	Name() string
	CanEnrich(authType string, expandParams []string) bool
	Enrich(ctx context.Context, items []T) error
	Required() bool
}

type EnricherOption

type EnricherOption[T Entity] func(*BaseEnricher[T])

func EnricherWithAllowedAuths

func EnricherWithAllowedAuths[T Entity](auths ...string) EnricherOption[T]

func EnricherWithExpandKey

func EnricherWithExpandKey[T Entity](key string) EnricherOption[T]

func EnricherWithRequired

func EnricherWithRequired[T Entity](required bool) EnricherOption[T]

type EnrichmentError

type EnrichmentError struct {
	Enricher string
	Message  string
	Err      error
}

func NewEnrichmentError

func NewEnrichmentError(enricher, message string, err error) *EnrichmentError

func (*EnrichmentError) Error

func (e *EnrichmentError) Error() string

func (*EnrichmentError) Unwrap

func (e *EnrichmentError) Unwrap() error

type Entity

type Entity interface {
	GetID() string
}

type ExpandableRequest

type ExpandableRequest interface {
	Request
	GetExpandParams() []string
}

type FallbackMode

type FallbackMode int
const (
	FallbackSequential FallbackMode = iota
	FallbackFirstSuccess
	FallbackParallel
)

type FetchError

type FetchError struct {
	Source  string
	Message string
	Err     error
}

func NewFetchError

func NewFetchError(source, message string, err error) *FetchError

func (*FetchError) Error

func (e *FetchError) Error() string

func (*FetchError) Unwrap

func (e *FetchError) Unwrap() error

type FetchStage

type FetchStage[T Entity, Req Request] struct {
	// contains filtered or unexported fields
}

func NewFetchStage

func NewFetchStage[T Entity, Req Request](chain *FetcherChain[T, Req]) *FetchStage[T, Req]

func (*FetchStage[T, Req]) Execute

func (s *FetchStage[T, Req]) Execute(ctx context.Context, state *State[T, Req]) error

func (*FetchStage[T, Req]) Name

func (s *FetchStage[T, Req]) Name() string

func (*FetchStage[T, Req]) Required

func (s *FetchStage[T, Req]) Required() bool

type Fetcher

type Fetcher[T Entity, Req Request] interface {
	Name() string
	Priority() int
	CanHandle(req Req) bool
	Fetch(ctx context.Context, req Req) (results []T, hasMore bool, err error)
}

type FetcherChain

type FetcherChain[T Entity, Req Request] struct {
	// contains filtered or unexported fields
}

func NewFetcherChain

func NewFetcherChain[T Entity, Req Request](opts ...FetcherChainOption[T, Req]) *FetcherChain[T, Req]

func (*FetcherChain[T, Req]) Fetch

func (c *FetcherChain[T, Req]) Fetch(ctx context.Context, req Req) ([]T, bool, string, error)

func (*FetcherChain[T, Req]) FetcherCount

func (c *FetcherChain[T, Req]) FetcherCount() int

type FetcherChainOption

type FetcherChainOption[T Entity, Req Request] func(*FetcherChain[T, Req])

func ChainWithAggregateMode

func ChainWithAggregateMode[T Entity, Req Request](mode AggregateMode) FetcherChainOption[T, Req]

func ChainWithFallbackMode

func ChainWithFallbackMode[T Entity, Req Request](mode FallbackMode) FetcherChainOption[T, Req]

func ChainWithFetcher

func ChainWithFetcher[T Entity, Req Request](f Fetcher[T, Req]) FetcherChainOption[T, Req]

func ChainWithMinResults

func ChainWithMinResults[T Entity, Req Request](min int) FetcherChainOption[T, Req]

type FilterableRequest

type FilterableRequest interface {
	Request
	GetFilters() map[string]any
}

type FunctionalStage

type FunctionalStage[T Entity, Req Request] struct {
	// contains filtered or unexported fields
}

func NewStage

func NewStage[T Entity, Req Request](name string, required bool, fn StageFunc[T, Req]) *FunctionalStage[T, Req]

func (*FunctionalStage[T, Req]) Execute

func (s *FunctionalStage[T, Req]) Execute(ctx context.Context, state *State[T, Req]) error

func (*FunctionalStage[T, Req]) Name

func (s *FunctionalStage[T, Req]) Name() string

func (*FunctionalStage[T, Req]) Required

func (s *FunctionalStage[T, Req]) Required() bool

type Logger

type Logger interface {
	Info(msg string, keyvals ...interface{})
	Error(msg string, keyvals ...interface{})
}

type Metrics

type Metrics interface {
	RecordStageDuration(pipeline, stage string, duration time.Duration)
	RecordFetchSource(pipeline, source string)
	RecordResultCount(pipeline string, count int)
	RecordError(pipeline, stage, errorType string)
}

type Middleware

type Middleware[T Entity, Req Request] func(stageName string, next StageFunc[T, Req]) StageFunc[T, Req]

func LoggingMiddleware

func LoggingMiddleware[T Entity, Req Request](logger Logger) Middleware[T, Req]

func RecoveryMiddleware

func RecoveryMiddleware[T Entity, Req Request]() Middleware[T, Req]

type NoopMetrics

type NoopMetrics struct{}

func (NoopMetrics) RecordError

func (NoopMetrics) RecordError(string, string, string)

func (NoopMetrics) RecordFetchSource

func (NoopMetrics) RecordFetchSource(string, string)

func (NoopMetrics) RecordResultCount

func (NoopMetrics) RecordResultCount(string, int)

func (NoopMetrics) RecordStageDuration

func (NoopMetrics) RecordStageDuration(string, string, time.Duration)

type Option

type Option[T Entity, Req Request, Resp Response[T]] func(*Pipeline[T, Req, Resp])

func WithConfig

func WithConfig[T Entity, Req Request, Resp Response[T]](cfg Config) Option[T, Req, Resp]

func WithEnricherComposite

func WithEnricherComposite[T Entity, Req Request, Resp Response[T]](enricher *CompositeEnricher[T]) Option[T, Req, Resp]

func WithEnrichers

func WithEnrichers[T Entity, Req Request, Resp Response[T]](enrichers ...Enricher[T]) Option[T, Req, Resp]

func WithFailFast

func WithFailFast[T Entity, Req Request, Resp Response[T]](enabled bool) Option[T, Req, Resp]

func WithFetcherChain

func WithFetcherChain[T Entity, Req Request, Resp Response[T]](chain *FetcherChain[T, Req]) Option[T, Req, Resp]

func WithFetchers

func WithFetchers[T Entity, Req Request, Resp Response[T]](fetchers ...Fetcher[T, Req]) Option[T, Req, Resp]

func WithFetchersAndMode

func WithFetchersAndMode[T Entity, Req Request, Resp Response[T]](mode FallbackMode, fetchers ...Fetcher[T, Req]) Option[T, Req, Resp]

func WithMetrics

func WithMetrics[T Entity, Req Request, Resp Response[T]](m Metrics) Option[T, Req, Resp]

func WithMiddleware

func WithMiddleware[T Entity, Req Request, Resp Response[T]](m Middleware[T, Req]) Option[T, Req, Resp]

func WithParallelEnrichment

func WithParallelEnrichment[T Entity, Req Request, Resp Response[T]](parallel bool, maxWorkers int, enrichers ...Enricher[T]) Option[T, Req, Resp]

func WithResponseBuilder

func WithResponseBuilder[T Entity, Req Request, Resp Response[T]](builder ResponseBuilder[T, Req, Resp]) Option[T, Req, Resp]

func WithStage

func WithStage[T Entity, Req Request, Resp Response[T]](stage Stage[T, Req]) Option[T, Req, Resp]

func WithTimeout

func WithTimeout[T Entity, Req Request, Resp Response[T]](d time.Duration) Option[T, Req, Resp]

func WithValidation

func WithValidation[T Entity, Req Request, Resp Response[T]](fn func(Req) error) Option[T, Req, Resp]

type Pipeline

type Pipeline[T Entity, Req Request, Resp Response[T]] struct {
	// contains filtered or unexported fields
}

func New

func New[T Entity, Req Request, Resp Response[T]](name string, opts ...Option[T, Req, Resp]) *Pipeline[T, Req, Resp]

func (*Pipeline[T, Req, Resp]) AddStage

func (p *Pipeline[T, Req, Resp]) AddStage(stage Stage[T, Req])

func (*Pipeline[T, Req, Resp]) Execute

func (p *Pipeline[T, Req, Resp]) Execute(ctx context.Context, req Req) (Resp, error)

func (*Pipeline[T, Req, Resp]) Name

func (p *Pipeline[T, Req, Resp]) Name() string

func (*Pipeline[T, Req, Resp]) StageCount

func (p *Pipeline[T, Req, Resp]) StageCount() int

func (*Pipeline[T, Req, Resp]) Use

func (p *Pipeline[T, Req, Resp]) Use(m Middleware[T, Req])

type PipelineError

type PipelineError struct {
	Pipeline string
	Stage    string
	Op       string
	Err      error
}

func NewPipelineError

func NewPipelineError(pipeline, stage, op string, err error) *PipelineError

func (*PipelineError) Error

func (e *PipelineError) Error() string

func (*PipelineError) Unwrap

func (e *PipelineError) Unwrap() error

type Request

type Request interface {
	GetLimit() int
	GetOffset() int
	Clone() Request
}

type Response

type Response[T Entity] interface {
	GetItems() []T
	GetCount() int
	HasMore() bool
}

type ResponseBuilder

type ResponseBuilder[T Entity, Req Request, Resp Response[T]] interface {
	Build(ctx context.Context, state *State[T, Req]) (Resp, error)
}

type SearchableRequest

type SearchableRequest interface {
	Request
	GetSearchQuery() string
	HasSearchQuery() bool
}

type SortOrder

type SortOrder string
const (
	SortAsc  SortOrder = "asc"
	SortDesc SortOrder = "desc"
)

type SortableRequest

type SortableRequest interface {
	Request
	GetSortField() string
	GetSortOrder() SortOrder
}

type Stage

type Stage[T Entity, Req Request] interface {
	Name() string
	Execute(ctx context.Context, state *State[T, Req]) error
	Required() bool
}

type StageFunc

type StageFunc[T Entity, Req Request] func(ctx context.Context, state *State[T, Req]) error

type State

type State[T Entity, Req Request] struct {
	// contains filtered or unexported fields
}

func NewState

func NewState[T Entity, Req Request](req Req) *State[T, Req]

func (*State[T, Req]) AddError

func (s *State[T, Req]) AddError(err error)

func (*State[T, Req]) AppendResults

func (s *State[T, Req]) AppendResults(results []T)

func (*State[T, Req]) Errors

func (s *State[T, Req]) Errors() []error

func (*State[T, Req]) GetMetadata

func (s *State[T, Req]) GetMetadata(key string) (any, bool)

func (*State[T, Req]) HasErrors

func (s *State[T, Req]) HasErrors() bool

func (*State[T, Req]) HasMore

func (s *State[T, Req]) HasMore() bool

func (*State[T, Req]) Request

func (s *State[T, Req]) Request() Req

func (*State[T, Req]) ResultCount

func (s *State[T, Req]) ResultCount() int

func (*State[T, Req]) Results

func (s *State[T, Req]) Results() []T

func (*State[T, Req]) SetMetadata

func (s *State[T, Req]) SetMetadata(key string, value any)

func (*State[T, Req]) SetRequest

func (s *State[T, Req]) SetRequest(req Req)

func (*State[T, Req]) SetResults

func (s *State[T, Req]) SetResults(results []T, hasMore bool, source string)

func (*State[T, Req]) Source

func (s *State[T, Req]) Source() string

type ValidationError

type ValidationError struct {
	Field   string
	Message string
}

func NewValidationError

func NewValidationError(field, message string) *ValidationError

func (*ValidationError) Error

func (e *ValidationError) Error() string

Directories

Path Synopsis
examples
basic command
multi_source command
with_enrichment command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL