Documentation
¶
Overview ¶
Carbo provides building blocks to compose data processing pipeline with concurrency.
Carbo's core component is task.Task, which represents a process that takes inputs and outputs.
You can build an entire build pipeline with a number of tasks.
Althouogh it is possible to define a task directly with task.FromFn, it is recommended to use sub-types of task.Task: source, pipe and sink.
Sub-types of task.Task ¶
A data pipeline is built with three sub-types of task.Task: source, pipe and sink.
The basic form of a data pipeline should look like: source -> pipe -> ... -> pipe -> sink
Source ¶
A source is a special type of task.Task that takes an empty input that will be closed immediately after a data pipeline starts.
This is used as an entry point of a data pipeline. For example, source.FromSlice takes a slice as an argument, and then produces each element of the slice as its outputs.
Pipe ¶
A pipe is a similar component to task.Task, except the passed output channel is closed automatically after its process is finished.
This is used to convert inputs into outputs. For example, pipe.Map processes inputs one by one, and produces corresponding outputs. This can be used in case an input and an output has one-to-one correspondence.
Sink ¶
A sink is a special type of task.Task that takes an empty output. This is just like an opposite of a source.
This is used as an last component of a data pipeline. For example, sink.ToSlice takes a pointer to a slice, and appends elements from its inputs.
Connecting tasks ¶
Each task can be connected with another one using task.Connect if the type of an upstream task's output and the type of a downstream task's input match.
task.Connect also returns task.Task so it can be chained. An entire data pipeline can be built by connecting multiple tasks in this way.
Running a data pipeline ¶
Carbo has a Flow component which is a wrapper of a task that takes an empty input and an empty output. This kind of task is typically built as a chain of tasks that starts from a source and ends with a sink.
Example (Flow) ¶
Build a flow and directly run it.
package main import ( "context" "fmt" "log" "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/pipe" "github.com/hiroara/carbo/sink" "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" ) func main() { ss := source.FromSlice([]string{"a", "b", "c"}) ds := task.Connect( ss.AsTask(), pipe.Map(func(ctx context.Context, s string) (string, error) { return s + s, nil }).AsTask(), 1, ) pr := task.Connect( ds, sink.ElementWise(func(ctx context.Context, s string) error { fmt.Println(s) return nil }).AsTask(), 1, ) err := flow.FromTask(pr).Run(context.Background()) if err != nil { log.Fatal(err) } }
Output: aa bb cc
Example (FlowFactory) ¶
Define a flow factory function to build a flow with a config struct, and run the flow.
package main import ( "context" "fmt" "log" "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/sink" "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" ) type MyConfig struct { StringField string `yaml:"string_field"` IntField int `yaml:"int_field"` } func main() { fac := func(cfg *MyConfig) (*flow.Flow, error) { ss := source.FromSlice([]string{cfg.StringField}) pr := task.Connect( ss.AsTask(), sink.ElementWise(func(ctx context.Context, s string) error { fmt.Println(s) return nil }).AsTask(), 1, ) return flow.FromTask(pr), nil } err := flow.RunWithConfig(context.Background(), fac, "testdata/config.yaml") if err != nil { log.Fatal(err) } }
Output: value-from-string-field
Example (Registry) ¶
Define multiple flow factories, register them to a registry, and run a flow. This is useful to make an executable that takes a subcommand.
package main import ( "context" "fmt" "log" "github.com/hiroara/carbo/flow" "github.com/hiroara/carbo/registry" "github.com/hiroara/carbo/sink" "github.com/hiroara/carbo/source" "github.com/hiroara/carbo/task" ) type MyConfig struct { StringField string `yaml:"string_field"` IntField int `yaml:"int_field"` } func main() { fac1 := func() (*flow.Flow, error) { ss := source.FromSlice([]string{"item1"}) pr := task.Connect( ss.AsTask(), sink.ElementWise(func(ctx context.Context, s string) error { fmt.Println(s) return nil }).AsTask(), 1, ) return flow.FromTask(pr), nil } fac2 := func(cfg *MyConfig) (*flow.Flow, error) { ss := source.FromSlice([]int{cfg.IntField}) pr := task.Connect( ss.AsTask(), sink.ElementWise(func(ctx context.Context, i int) error { fmt.Println(i) return nil }).AsTask(), 1, ) return flow.FromTask(pr), nil } r := registry.New() r.Register("flow1", flow.NewFactory(fac1)) r.Register("flow2", flow.NewFactoryWithConfig(fac2, "testdata/config.yaml")) err := r.Run(context.Background(), "flow2") if err != nil { log.Fatal(err) } }
Output: 100
Directories
¶
Path | Synopsis |
---|---|
Package cache provides a way to define caching behavior used in a data pipeline.
|
Package cache provides a way to define caching behavior used in a data pipeline. |
store
Package defines Store interface for cache.
|
Package defines Store interface for cache. |
Package implements parsing YAML file as a configuration struct.
|
Package implements parsing YAML file as a configuration struct. |
Package provides an struct-embeddable implementation of Defer that is required by task.Task interface.
|
Package provides an struct-embeddable implementation of Defer that is required by task.Task interface. |
Package defines a Flow type that represents an entire data pipeline.
|
Package defines a Flow type that represents an entire data pipeline. |
internal
|
|
Package defines Spec which is a type to define marshaling behavior.
|
Package defines Spec which is a type to define marshaling behavior. |
Package defines Pipe which is a type of a task.
|
Package defines Pipe which is a type of a task. |
Package defines Registry that is a place to register flows.
|
Package defines Registry that is a place to register flows. |
Package defines Sink which is a type of a task.
|
Package defines Sink which is a type of a task. |
Package defines Source which is a type of a task.
|
Package defines Source which is a type of a task. |
Package defines Task interface which is a core component to build a data pipeline.
|
Package defines Task interface which is a core component to build a data pipeline. |
Package implements utilities to execute tasks.
|
Package implements utilities to execute tasks. |