selina

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2020 License: MIT Imports: 6 Imported by: 0

README

selina

Go Report Card Coverage Status

Simple Pipeline for go, inspired on ratchet https://github.com/dailyburn/ratchet

Unstable API, please use go modules

Installation

go get github.com/licaonfee/selina

Usage

package main

import (
    "fmt"
    "os"
    "strings"
    "context"

    "github.com/licaonfee/selina"
    "github.com/licaonfee/selina/workers/regex"
    "github.com/licaonfee/selina/workers/text"
)

const sample = `this is a sample text
this is second line
#lines started with # will be skipped
this line pass`

func main() {
    rd := strings.NewReader(sample)
    input := selina.NewNode("Read", text.NewReader(text.ReaderOptions{Reader: rd}))
    //https://regex101.com/r/7ZS3Uw/1
    filter := selina.NewNode("Filter", regex.NewFilter(regex.FilterOptions{Pattern: "^[^#].+"}))
    output := selina.NewNode("Write", text.NewWriter(text.WriterOptions{Writer: os.Stdout}))
    pipe := selina.NewSimplePipeline(input, filter, output)
    if err := pipe.Run(context.Background()); err != nil {
        fmt.Printf("ERR: %v\n", err)
    }
    for name, stat := range pipe.Stats(){
        fmt.Printf("Node:%s=%v\n", name, stat)
    }
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyStarted = errors.New("node already started")

ErrAlreadyStarted returned if Start method is called more than once

View Source
var ErrStopNotStarted = errors.New("stopping a not started worker")

ErrStopNotStarted returned when Stop is called before Start method

Functions

func ATPipelineContextCancel

func ATPipelineContextCancel(p Pipeliner) error

ATPipelineContextCancel context must be propagated to all Nodes

func ATPipelineStartAll

func ATPipelineStartAll(p Pipeliner) error

ATPipelineStartAll all Nodes in a pipeline mus be started when pipeline.Start is called

func ATPipelineStats added in v0.3.0

func ATPipelineStats(p Pipeliner) error

func ChannelAsSlice

func ChannelAsSlice(in <-chan []byte) []string

ChannelAsSlice read from in channel until is closed return an slice with all messages received

func SendContext added in v0.3.0

func SendContext(ctx context.Context, msg []byte, output chan<- []byte) error

SendContext try to send msg to output, it returns an error if context is canceled before msg is sent

func SliceAsChannel

func SliceAsChannel(data []string, autoClose bool) chan []byte

SliceAsChannel return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed

func SliceAsChannelRaw

func SliceAsChannelRaw(data [][]byte, autoClose bool) chan []byte

SliceAsChannelRaw same as SliceAsChannel

Types

type Broadcaster

type Broadcaster struct {
	DataCounter
	// contains filtered or unexported fields
}

Broadcaster allow to write same value to multiple groutines

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(input <-chan []byte)

Broadcast read values from input and send it to output channels

func (*Broadcaster) Client

func (b *Broadcaster) Client() <-chan []byte

Client create an output chanel, it panics if Broadcast is already called

type DataCounter added in v0.3.0

type DataCounter struct {
	// contains filtered or unexported fields
}

DataCounter a simple atomic wrapper

func (*DataCounter) Stats added in v0.3.0

func (c *DataCounter) Stats() (count int64, data int64)

func (*DataCounter) SumData added in v0.3.0

func (c *DataCounter) SumData(msg []byte)

SumData icrement count+1 and data + len(msg) while both values are incremented in an atomic way is posible to get inconsistent reads on call Stats while object is in use

type MultiError

type MultiError struct {
	InnerErrors map[string]error
}

MultiError and error that contains all pipeline's Node.Start error

func (*MultiError) Error

func (e *MultiError) Error() string

Error to implement "error" interface

type Node

type Node struct {
	Name string
	// contains filtered or unexported fields
}

Node a node that can send and receive data

func NewNode

func NewNode(name string, w Worker) *Node

NewNode create a new node that wraps Worker

func (*Node) Chain

func (n *Node) Chain(next *Node) *Node

Chain send messages emitted by worker to next node, it returns next node to be chained again

func (*Node) Running

func (n *Node) Running() bool

Running true if Start() method was called

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

Start initialize the worker, worker.Process should be called multiple times until Node is stoped or worker.Process return an error

func (*Node) Stats added in v0.3.0

func (n *Node) Stats() Stats

Stats return Worker channels stats

func (*Node) Stop

func (n *Node) Stop() error

Stop stop worker in node, must be called after Start successive calls to Stop does nothing

type Pipeliner

type Pipeliner interface {
	Run(context.Context) error
	Stats() map[string]Stats
	Nodes() []*Node
}

Pipeliner all implementations must meet the following conditions Run must call Node.Start of all Nodes Context passed in Run must be propagated to all Node.Start methods Nodes() return an slice with all instances of *Node

func NewSimplePipeline

func NewSimplePipeline(n ...*Node) Pipeliner

NewSimplePipeline create a linear pipeline

type ProcessArgs added in v0.3.0

type ProcessArgs struct {
	Input  <-chan []byte
	Output chan<- []byte
}

type Receiver

type Receiver struct {
	DataCounter
	// contains filtered or unexported fields
}

Receiver join multiple channels into a single output channel this allow to add new channels after Receive is called

func (*Receiver) Receive

func (r *Receiver) Receive() <-chan []byte

Receive listen to all channels configured with Watch when all channels are closed, output chanel is closed too if there is no channels in watch list , this method returns a nil channel

func (*Receiver) Watch

func (r *Receiver) Watch(input <-chan []byte)

Watch add a new channel to be joined Call Watch after Receive is a panic

type SimplePipeline

type SimplePipeline struct {
	// contains filtered or unexported fields
}

SimplePipeline default value is unusable, you must create it with NewSimplePipeline

func (*SimplePipeline) Nodes

func (p *SimplePipeline) Nodes() []*Node

Nodes return all instances of *Node

func (*SimplePipeline) Run

func (p *SimplePipeline) Run(ctx context.Context) error

Run init pipeline proccesing, return an error!= nil if any Node fail

func (*SimplePipeline) Stats

func (p *SimplePipeline) Stats() map[string]Stats

Stats returns a map with all nodes Stats object

type Stats added in v0.3.0

type Stats struct {
	Sent          int64
	SentBytes     int64
	Received      int64
	ReceivedBytes int64
}

type Worker

type Worker interface {
	//Process must close write only channel
	Process(ctx context.Context, args ProcessArgs) error
}

Worker is standard interface implemented by proccessors, is used to build pipeline nodes All Worker implementations must meet the following conditions On close input channel, Process must finalize its work gracefully, and return nil On context cancellation, Process finalize ASAP and return context.Cancelled On finish, Process must close output channel and return error or nil

Directories

Path Synopsis
examples
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL