pipe

package module
v0.0.0-...-1b2fc92 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2017 License: MIT Imports: 1 Imported by: 0

README

pipe

Build Status GoDoc Go Report Card

A simple stream processing library that works like Unix pipes. This library is fully asynchronous. Create a Pipe from a Reader, add some transformation functions and get the result writed to a Writer.

To install :

$ go get github.com/hyperboloide/pipe

Then add the following import :

import "github.com/hyperboloide/pipe"
Example

Bellow is a very basic example that:

  1. Open a file
  2. Compress it
  3. Save it
package main

import (
    "compress/gzip"
    "github.com/hyperboloide/pipe"
    "io"
    "log"
    "os"
)

func zip(r io.Reader, w io.Writer) error {
    gzw, err := gzip.NewWriterLevel(w, gzip.BestSpeed)
    if err != nil {
        return err
    }
    defer gzw.Close()
    _, err = io.Copy(gzw, r)
    return err
}

func main() {

    // pipe input
    in, err := os.Open("test.txt")
    if err != nil {
        log.Fatal(err)
    }
    defer in.Close()

    // pipe output
    out, err := os.Create("test.txt.tgz")
    if err != nil {
        log.Fatal(err)
    }
    defer out.Close()

    // create a new pipe with a io.Reader
    // Push a transformation function
    // Set output
    // Exec and get errors if any
    if err := pipe.New(in).Push(zip).To(out).Exec(); err != nil {
        log.Fatal(err)
    }
}
Readers and Writers

Pipe also provides a set of Reader/Writer to read from and write to.

Here is an example:

import (
	"github.com/hyperboloide/pipe"
	"github.com/hyperboloide/pipe/rw"
	"log"
	"os"
)

func DemoRW() {

	in, err := os.Open("test.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer in.Close()

	file := &rw.File{AllowSub: true}

	// Always start before use. Note that an RW after Start can be reused.
	if err := file.Start(); err != nil {
		log.Fatal(err)
	}

	// Obtain a writer
	w, err := file.NewWriter("copy.txt")
	if err != nil {
		log.Fatal(err)
	}

	// ToCloser() closes the connection at the end of the write.
	if err := pipe.New(binReader).ToCloser(w).Exec(); err != nil {
		log.Fatal(err)
	}
}

It's also easy to create your own, just implement the ReadWriteDeleter interface.

Documentation

Overview

Package pipe is a simple stream processing library that works like Unix pipes. This library has no external dependencies and is fully asynchronous. Create a Pipe from a Reader, add some transformation functions and get the result on an io.Writer.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Filter

type Filter func(io.Reader, io.Writer) error

Filter are functions to transform the stream and add them to the Pipe.

type Pipe

type Pipe struct {

	// Total number of bytes read at the origin of the Pipe.
	TotalIn int64
	// Total number of bytes written at the end of the Pipe.
	TotalOut int64
	// contains filtered or unexported fields
}

Pipe object

Example
package main

import (
	"compress/gzip"
	"github.com/hyperboloide/pipe"
	"io"
	"log"
	"os"
)

func main() {

	// some Filter transformation function
	var zip = func(r io.Reader, w io.Writer) error {
		gzw, err := gzip.NewWriterLevel(w, gzip.BestSpeed)
		if err != nil {
			return err
		}
		defer gzw.Close()
		_, err = io.Copy(gzw, r)
		return err
	}

	// pipe input
	in, err := os.Open("test.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer in.Close()

	// create a new pipe with a io.Reader
	p := pipe.New(in)

	// Pushing transformation function
	p.Push(zip)

	// pipe output
	out, err := os.Create("test.txt.tgz")
	if err != nil {
		log.Fatal(err)
	}
	defer out.Close()

	// Set pipe output io.Writer
	p.To(out)

	// Wait for pipe process to complete
	if err := p.Exec(); err != nil {
		log.Fatal(err)
	}

}
Output:

func New

func New(reader io.Reader) *Pipe

New create a new Pipe that reads from reader.

func (*Pipe) Exec

func (p *Pipe) Exec() error

Exec waits for the Pipe to complete and returns an error if any of the functions failed.

func (*Pipe) Push

func (p *Pipe) Push(procs ...Filter) *Pipe

Push appends a function to the Pipe. Note that you can add as many functions as you like at once or separatly. They will be processed in order.

func (*Pipe) Tee

func (p *Pipe) Tee() *Pipe

Tee creates a new Pipe to duplicate the stream. The stream will pass through all previously pushed functions before going through the tee Pipe. Functions pushed to the original Pipe after a call to Tee will not alter the new Tee Pipe.

func (*Pipe) To

func (p *Pipe) To(w io.Writer) *Pipe

To writes the ouptut of the Pipe in w.

func (*Pipe) ToCloser

func (p *Pipe) ToCloser(w io.WriteCloser) *Pipe

ToCloser writes the ouptut of the Pipe in io.WriteCloser w and close at the end.

Directories

Path Synopsis
aes
rw
gcs
s3
std

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL