flow

package
v0.0.0-...-8619404 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 23, 2017 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSerializeValue = errors.New("ErrSerializeValue")
	DefaultSerializer = &Serializer{
		Serialize:   defaultSerialize,
		Deserialize: defaultDeserialize,
	}
)
View Source
var (
	// If you don't want to print debug logs, please disable the output of this Logger
	Logger = log.New(os.Stdout, "", log.LstdFlags)
	// Polling interval when EOF is reached
	TailPollInterval = 250 * time.Millisecond
)
View Source
var GraphName = "flow"

Functions

func Bytes

func Bytes(iv interface{}) []byte

func GetGraphString

func GetGraphString(tk Task) string

GetGraphString returns a graph string string which is written by dot language

func IsFileExists

func IsFileExists(path string) bool

func String

func String(iv interface{}) string

Types

type ChannelOutput

type ChannelOutput struct {
	// contains filtered or unexported fields
}

func NewChannelOutput

func NewChannelOutput(name string, ch chan interface{}) *ChannelOutput

func (*ChannelOutput) Channel

func (co *ChannelOutput) Channel() chan interface{}

func (*ChannelOutput) Close

func (co *ChannelOutput) Close() error

func (*ChannelOutput) Destroy

func (co *ChannelOutput) Destroy()

func (*ChannelOutput) IsSkip

func (co *ChannelOutput) IsSkip() bool

func (*ChannelOutput) Read

func (co *ChannelOutput) Read() (interface{}, error)

func (*ChannelOutput) Ready

func (co *ChannelOutput) Ready() chan struct{}

func (*ChannelOutput) String

func (co *ChannelOutput) String() string

func (*ChannelOutput) Write

func (co *ChannelOutput) Write(v interface{}) error

type DeserializeFunc

type DeserializeFunc func([]byte) (interface{}, error)

type EmptyInput

type EmptyInput struct{}

func (*EmptyInput) Channel

func (in *EmptyInput) Channel() chan interface{}

func (*EmptyInput) Read

func (in *EmptyInput) Read() (interface{}, error)

func (*EmptyInput) Ready

func (in *EmptyInput) Ready() chan struct{}

func (*EmptyInput) String

func (in *EmptyInput) String() string

type FileOutput

type FileOutput struct {
	// contains filtered or unexported fields
}

func NewFileOutput

func NewFileOutput(path string, srz *Serializer) (*FileOutput, error)

func (*FileOutput) Channel

func (out *FileOutput) Channel() chan interface{}

func (*FileOutput) Close

func (out *FileOutput) Close() error

func (*FileOutput) Destroy

func (out *FileOutput) Destroy()

func (*FileOutput) IsSkip

func (out *FileOutput) IsSkip() bool

func (*FileOutput) Read

func (out *FileOutput) Read() (interface{}, error)

func (*FileOutput) Ready

func (out *FileOutput) Ready() chan struct{}

func (*FileOutput) String

func (out *FileOutput) String() string

func (*FileOutput) Write

func (out *FileOutput) Write(v interface{}) error

type FileStreaming

type FileStreaming struct {
	// contains filtered or unexported fields
}

streaming I/O

func NewFileStreaming

func NewFileStreaming(path string, srz *Serializer) (*FileStreaming, error)

func (*FileStreaming) Channel

func (fs *FileStreaming) Channel() chan interface{}

func (*FileStreaming) Close

func (fs *FileStreaming) Close() error

func (*FileStreaming) Destroy

func (fs *FileStreaming) Destroy()

func (*FileStreaming) IsSkip

func (fs *FileStreaming) IsSkip() bool

func (*FileStreaming) Read

func (fs *FileStreaming) Read() (interface{}, error)

func (*FileStreaming) Ready

func (fs *FileStreaming) Ready() chan struct{}

func (*FileStreaming) String

func (fs *FileStreaming) String() string

func (*FileStreaming) Write

func (fs *FileStreaming) Write(v interface{}) error

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func New

func New(tk Task) *Flow

func (*Flow) Run

func (fl *Flow) Run() (*Result, error)

func (*Flow) Stats

func (fl *Flow) Stats() *Stats

func (*Flow) StatsHandler

func (fl *Flow) StatsHandler(w http.ResponseWriter, r *http.Request)

type Input

type Input interface {
	Channel() chan interface{}
	Ready() chan struct{}
	Read() (interface{}, error)
	String() string
}

func CombineInputs

func CombineInputs(ins ...Input) Input

CombineInputs combines multiple inputs into single input

type Line

type Line struct {
	Text  []byte
	Error error
}

type Metric

type Metric struct {
	Name       string
	Time       time.Time
	BufferSize int
}

type Options

type Options func(*options)

func WithInputs

func WithInputs(ins ...Input) Options

func WithOutputs

func WithOutputs(outs ...Output) Options

func WithProcessor

func WithProcessor(processor func(Task) error) Options

func WithWorker

func WithWorker(workerNumber int) Options

type Output

type Output interface {
	Write(interface{}) error

	Close() error
	Destroy()

	Channel() chan interface{}
	Read() (interface{}, error)
	IsSkip() bool
	Ready() chan struct{}

	String() string
}

Output is output interface

type Result

type Result struct {
	// contains filtered or unexported fields
}

func Run

func Run(tk Task) (*Result, error)

Run resolves the dependency of the specified task and starts it

func (*Result) Graph

func (rs *Result) Graph() string

Graph returns graph string

type S3Output

type S3Output struct {
	// contains filtered or unexported fields
}

func NewS3Output

func NewS3Output(c client.ConfigProvider, bucket, path string, srz *Serializer) (*S3Output, error)

func (*S3Output) Channel

func (out *S3Output) Channel() chan interface{}

func (*S3Output) Close

func (out *S3Output) Close() error

func (*S3Output) Destroy

func (out *S3Output) Destroy()

func (*S3Output) IsSkip

func (out *S3Output) IsSkip() bool

func (*S3Output) Read

func (out *S3Output) Read() (interface{}, error)

func (*S3Output) Ready

func (out *S3Output) Ready() chan struct{}

func (*S3Output) String

func (out *S3Output) String() string

func (*S3Output) Write

func (out *S3Output) Write(v interface{}) error

type SerializeFunc

type SerializeFunc func(interface{}) ([]byte, error)

type Serializer

type Serializer struct {
	Serialize   SerializeFunc
	Deserialize DeserializeFunc
}

type Stats

type Stats struct {
	Metrics []*Metric
}

type Task

type Task interface {
	// Name returns task name
	Name() string
	// In returns a reader that can accept the output of other task
	In(...int) Input
	// Out returns a writer that can input to other task
	Out(...int) Output
	// Requres returns the task list on which this task depends
	Requires() []Task
	// contains filtered or unexported methods
}

func NewTask

func NewTask(name string, opts ...Options) Task

NewTask returns a new task with specified input, output, processor

type TaskInput

type TaskInput interface {
	Tasks() []*task
	Output
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL