tpack

package module
v0.0.0-...-f5018bf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2021 License: MIT Imports: 4 Imported by: 0

README

tpack

Test Status PkgGoDev Go Report Card

Pack a Go workflow/function as a Unix-style pipeline command.

tpack

Wiki
In Unix-like computer operating systems, a pipeline is a mechanism for inter-process communication using message passing. A pipeline is a set of processes chained together by their standard streams, so that the output text of each process (stdout) is passed directly as input (stdin) to the next one.

Use tpack to write Go applications that act as pipeline commands. Employ channels, goroutines, regular expressions and more to build powerful concurrent workflows.

Examples

See ETL workflow in the examples folder.

package main

import "github.com/reugn/tpack"

func main() {
	tpack.NewPackerStd(tpack.NewFunctionProcessor(
		doETL,
	)).Execute()
}

Test command

cat input.txt | go run *.go 2>/dev/null | wc -l

License

Licensed under the MIT License.

Documentation

Overview

Package tpack provides tools to pack a Go workflow as a Unix-style pipeline command.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type FunctionProcessor

type FunctionProcessor struct {
	// contains filtered or unexported fields
}

FunctionProcessor implements Processor using a function to process messages.

func NewFunctionProcessor

func NewFunctionProcessor(function func(in []byte) ([][]byte, error)) *FunctionProcessor

NewFunctionProcessor returns a new FunctionProcessor with the specified function.

func (*FunctionProcessor) ErrChannel

func (fp *FunctionProcessor) ErrChannel() chan []byte

ErrChannel returns the error output communication channel.

func (*FunctionProcessor) InChannel

func (fp *FunctionProcessor) InChannel() chan []byte

InChannel returns the input communication channel.

func (*FunctionProcessor) OutChannel

func (fp *FunctionProcessor) OutChannel() chan []byte

OutChannel returns the output communication channel.

type Packer

type Packer struct {
	sync.WaitGroup
	// contains filtered or unexported fields
}

Packer is the representation of a packed processing unit.

func NewPacker

func NewPacker(in io.Reader, out io.Writer, err io.Writer, processor Processor) *Packer

NewPacker returns a new Packer.

func NewPackerStd

func NewPackerStd(processor Processor) *Packer

NewPackerStd returns a new Packer with the standard streams as communication channels.

func NewPackerStdOut

func NewPackerStdOut(in io.Reader, processor Processor) *Packer

NewPackerStdOut returns a new Packer with the standard output and standard error as output communication channels and the specified input io.Reader.

func (*Packer) Execute

func (p *Packer) Execute()

Execute starts processing the incoming messages.

type Processor

type Processor interface {

	// InChannel returns the input communication channel.
	InChannel() chan []byte

	// OutChannel returns the output communication channel.
	OutChannel() chan []byte

	// ErrChannel returns the error output communication channel.
	ErrChannel() chan []byte
}

Processor represents a generic stream processor.

Directories

Path Synopsis
examples
etl

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL