Documentation
¶
Overview ¶
Example ¶
package main import ( "context" "io" "path" "strings" "github.com/bsm/accord" "github.com/bsm/octave" ) func main() { ctx := context.Background() // assume a mock type type mockType struct { Name string Phone string Country string } // connect to accord acc, err := accord.DialClient(ctx, "10.0.0.1:8432", &accord.ClientOptions{Namespace: "/custom/namespace"}) if err != nil { panic(err) } // initialize a pipeline pipe, err := octave.Create(ctx, "s3://source", "s3://target/to/dir", acc, &octave.Options{ Glob: "**/*.ndjson", ProcessFile: func(name string) (bool, error) { return strings.Contains(name, ".ndjson"), nil }, }) if err != nil { panic(err) } defer pipe.Close() // run the pipeline (blocking) err = pipe.Run(func(emt octave.Emitter, snk octave.Sink) error { for { // decode the record rec := new(mockType) if err := emt.Decode(rec); err == io.EOF { break } else if err != nil { return err } // get the source file name (without extension) name := path.Base(emt.Name()) if pos := strings.IndexByte(name, '.'); pos > -1 { name = name[:pos] } // write to output if err := snk.Encode(name+"-"+rec.Country+".ndjson", rec); err != nil { return err } } return nil }) if err != nil { panic(err) } }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelFunc ¶
ChannelFunc connects emitters to sinks.
type Decoder ¶
type Decoder interface { // Decode decodes the next message into an interface. // It returns io.EOF once the end of the stream is reached. Decode(v interface{}) error io.Closer }
Decoder methods
type Emitter ¶
type Emitter interface { // Name returns the name of the file the data is emitted from. Name() string // Decode decodes the next message into an interface. // It returns io.EOF once the end of the stream is reached. Decode(interface{}) error }
Emitter is a minimal decoder.
type Options ¶
type Options struct { // Number of concurrent worker threads. // Default: number of CPUs Concurrency int // A custom temporary directory. // Default: os.TempDir() TempDir string // File glob pattern. // Default: "**" Glob string // NewDecoder wraps the reader and returns a decoder for the given file name. // Default: json.NewDecoder(reader) NewDecoder func(name string, reader io.Reader) (Decoder, error) // NewEncoder wraps the writer and returns an encoder for the given file name. // Default: json.NewEncoder(writer) NewEncoder func(name string, writer io.Writer) (Encoder, error) // NewCompressionReader wraps the reader and returns an io.ReadCloser. // It may return nil to disable decompression and read the plain input. // Default: gzip.NewReader(reader) (if name's extension is .gz) NewCompressionReader func(name string, reader io.Reader) (io.ReadCloser, error) // NewCompressionWriter wraps the writer and returns an io.WriteCloser. // It may return nil to disable compression and write the plain output. // Default: gzip.NewWriter(writer) (if name's extension is .gz) NewCompressionWriter func(name string, writer io.Writer) (io.WriteCloser, error) // Pause between cycles. This is to prevernt the Pipeline // from spinning and wasting resources on empty or processed // buckets. // Default: 5s. Pause time.Duration // BeforeCycle is a callback which is triggered before each cycles. BeforeCycle func() error // ProcessFile callback is triggered before processing to determine // if a file should be processed or skipped. Must return true to proceed. ProcessFile func(name string) (bool, error) }
Options contains a list of options.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline processes data by running parallel worker threads.
func Create ¶
func Create(ctx context.Context, srcURL, dstURL string, acc *accord.Client, opt *Options) (*Pipeline, error)
Create creates a new Pipeline from URLs.
func (*Pipeline) Run ¶
func (p *Pipeline) Run(fn ChannelFunc) error
Run starts the Pipeline and blocks until an error occurs or it is manually stopped by calling Close().
Click to show internal directories.
Click to hide internal directories.