pipeline

package
v0.0.0-...-38a333d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2024 License: GPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsoleSink

type ConsoleSink struct {
}

输出到命令行

func NewConsoleSink

func NewConsoleSink() *ConsoleSink

func (*ConsoleSink) Process

func (s *ConsoleSink) Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan any, errChan chan error)

type ErrorPolicyExit

type ErrorPolicyExit struct {
}

Error Hadling

func NewErrorPolicyExit

func NewErrorPolicyExit() *ErrorPolicyExit

func (*ErrorPolicyExit) Process

func (p *ErrorPolicyExit) Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error, cancel context.CancelFunc)

type IError

type IError interface {
	Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error, cancel context.CancelFunc)
}

type IProcessor

type IProcessor interface {
	Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan any, errChan chan error) <-chan any
}

type ISink

type ISink interface {
	Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan any, errChan chan error)
	Output(ctx context.Context, wg *sync.WaitGroup, errChan chan error)
}

type ISource

type ISource interface {
	Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error) <-chan any
}

type ModelSink

type ModelSink struct {
	Fn SinkFunc
	Ws map[string]*Writer
}

func NewModelSink

func NewModelSink(fn SinkFunc, ws map[string]*Writer) *ModelSink

func (*ModelSink) Output

func (s *ModelSink) Output(ctx context.Context, wg *sync.WaitGroup, errChan chan error)

func (*ModelSink) Process

func (s *ModelSink) Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan any, errChan chan error)

type MultiProcessor

type MultiProcessor struct {
	Proc StageFunc
}

Concurrent Prog

func (*MultiProcessor) Process

func (p *MultiProcessor) Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan any, errChan chan error) <-chan any

type ProcessorManager

type ProcessorManager struct {
	// contains filtered or unexported fields
}

func NewProcessorManager

func NewProcessorManager() *ProcessorManager

func (*ProcessorManager) AddError

func (m *ProcessorManager) AddError(err IError)

func (*ProcessorManager) AddProcessor

func (m *ProcessorManager) AddProcessor(processor IProcessor)

func (*ProcessorManager) AddSink

func (m *ProcessorManager) AddSink(sink ISink)

func (*ProcessorManager) AddSource

func (m *ProcessorManager) AddSource(source ISource)

func (*ProcessorManager) Run

func (m *ProcessorManager) Run(ctx context.Context)

type SingleProcessor

type SingleProcessor struct {
	Proc StageFunc
}

Single Prog

func (*SingleProcessor) Process

func (p *SingleProcessor) Process(ctx context.Context, wg *sync.WaitGroup, dataChan <-chan any, errChan chan error) <-chan any

type SinkFunc

type SinkFunc func(s any) string

type Source

type Source struct {
	Nums []any
}

生成器 输入数据依次放入输出通道

func NewSource

func NewSource(nums []any) *Source

func (*Source) Process

func (t *Source) Process(ctx context.Context, wg *sync.WaitGroup, errChan chan error) <-chan any

type StageFunc

type StageFunc func(s any)

type Writer

type Writer struct {
	File *os.File
	Chn  chan string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL