selina

package module
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2020 License: MIT Imports: 8 Imported by: 0

README

selina

Go Report Card Coverage Status

Simple Pipeline for go, inspired on ratchet https://github.com/dailyburn/ratchet

Unstable API, please use go modules

Installation

go get github.com/licaonfee/selina

Usage

package main

import (
    "fmt"
    "os"
    "strings"
    "context"

    "github.com/licaonfee/selina"
    "github.com/licaonfee/selina/workers/regex"
    "github.com/licaonfee/selina/workers/text"
)

const sample = `this is a sample text
this is second line
#lines started with # will be skipped
this line pass`

func main() {
    rd := strings.NewReader(sample)
    input := selina.NewNode("Read", text.NewReader(text.ReaderOptions{Reader: rd}))
    //https://regex101.com/r/7ZS3Uw/1
    filter := selina.NewNode("Filter", regex.NewFilter(regex.FilterOptions{Pattern: "^[^#].+"}))
    output := selina.NewNode("Write", text.NewWriter(text.WriterOptions{Writer: os.Stdout}))
    pipe := selina.NewSimplePipeline(input, filter, output)
    if err := pipe.Run(context.Background()); err != nil {
        fmt.Printf("ERR: %v\n", err)
    }
    for name, stat := range pipe.Stats(){
        fmt.Printf("Node:%s=%v\n", name, stat)
    }
}

Design

Selina have three main components

  • Pipeline
  • Node
  • Worker
Pipeline

Start data processing and manage all chained nodes in a single object

Node

Contains methods to pass data from Worker to Worker and get metrics

Worker

All data Extraction/Transformation/Load logic is encapsulated in a Worker instance

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrAlreadyStarted = errors.New("node already started")

ErrAlreadyStarted returned if Start method is called more than once

View Source
var ErrInconsistentStart = errors.New("Pipeliner does not start all nodes")
View Source
var ErrMissingStats = errors.New("missing nodes in Stats map")
View Source
var ErrNotHaveNodes = errors.New("Pipeliner does not have nodes")
View Source
var ErrStopNotStarted = errors.New("stopping a not started worker")

ErrStopNotStarted returned when Stop is called before Start method

Functions

func ATPipelineContextCancel

func ATPipelineContextCancel(p Pipeliner) error

ATPipelineContextCancel context must be propagated to all Nodes

func ATPipelineStartAll

func ATPipelineStartAll(p Pipeliner) error

ATPipelineStartAll all Nodes in a pipeline mus be started when pipeline.Start is called

func ATPipelineStats added in v0.3.0

func ATPipelineStats(p Pipeliner) error

func ChannelAsSlice

func ChannelAsSlice(in <-chan []byte) []string

ChannelAsSlice read from in channel until is closed return an slice with all messages received

func SendContext added in v0.3.0

func SendContext(ctx context.Context, msg []byte, output chan<- []byte) error

SendContext try to send msg to output, it returns an error if context is canceled before msg is sent

func SliceAsChannel

func SliceAsChannel(data []string, autoClose bool) chan []byte

SliceAsChannel return a channel that read from an slice if autoClose is true , then channel is closed after last message is consummed

func SliceAsChannelRaw

func SliceAsChannelRaw(data [][]byte, autoClose bool) chan []byte

SliceAsChannelRaw same as SliceAsChannel

Types

type Broadcaster

type Broadcaster struct {
	DataCounter
	// contains filtered or unexported fields
}

Broadcaster allow to write same value to multiple groutines

func (*Broadcaster) Broadcast

func (b *Broadcaster) Broadcast(input <-chan []byte)

Broadcast read values from input and send it to output channels

func (*Broadcaster) Client

func (b *Broadcaster) Client() <-chan []byte

Client create an output chanel, it panics if Broadcast is already called

type DataCounter added in v0.3.0

type DataCounter struct {
	// contains filtered or unexported fields
}

DataCounter a simple atomic wrapper

func (*DataCounter) Stats added in v0.3.0

func (c *DataCounter) Stats() (count int64, data int64)

func (*DataCounter) SumData added in v0.3.0

func (c *DataCounter) SumData(msg []byte)

SumData icrement count+1 and data + len(msg) while both values are incremented in an atomic way is posible to get inconsistent reads on call Stats while object is in use

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node a node that can send and receive data

func NewNode

func NewNode(name string, w Worker) *Node

NewNode create a new node that wraps Worker name is a user defined identifier, internally Node generates an unique id

func (*Node) Chain

func (n *Node) Chain(next *Node) *Node

Chain send messages emitted by worker to next node, it returns next node to be chained again if next is already chained this operation does nothing

func (*Node) ID added in v0.4.0

func (n *Node) ID() string

func (*Node) IsChained added in v0.4.0

func (n *Node) IsChained(other *Node) bool

IsChained returns true if Chain was called before with other

func (*Node) Name

func (n *Node) Name() string

func (*Node) Running

func (n *Node) Running() bool

Running true if Start() method was called

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

Start initialize the worker, worker.Process should be called multiple times until Node is stoped or worker.Process return an error

func (*Node) Stats added in v0.3.0

func (n *Node) Stats() Stats

Stats return Worker channels stats

func (*Node) Stop

func (n *Node) Stop() error

Stop stop worker in node, must be called after Start successive calls to Stop does nothing

type Pipeliner

type Pipeliner interface {
	Run(context.Context) error
	Stats() map[string]Stats
	Nodes() []*Node
}

Pipeliner all implementations must meet the following conditions Run must call Node.Start of all Nodes Context passed in Run must be propagated to all Node.Start methods Nodes() return an slice with all instances of *Nod

func FreePipeline added in v0.4.0

func FreePipeline(nodes ...*Node) Pipeliner

FreePipeline provide a method to run arbitrary chained Nodes this method does not call Node.Chain

func LinealPipeline added in v0.4.0

func LinealPipeline(nodes ...*Node) Pipeliner

LinealPipeline creates a Pipeliner Nodes in "nodes" are chained in a slingle branch Pipeline Node(0)->Node(1)->Node(2)->....Node(n)

type ProcessArgs added in v0.3.0

type ProcessArgs struct {
	Input  <-chan []byte
	Output chan<- []byte
}

type Receiver

type Receiver struct {
	DataCounter
	// contains filtered or unexported fields
}

Receiver join multiple channels into a single output channel this allow to add new channels after Receive is called

func (*Receiver) Receive

func (r *Receiver) Receive() <-chan []byte

Receive listen to all channels configured with Watch when all channels are closed, output chanel is closed too if there is no channels in watch list , this method returns a nil channel

func (*Receiver) Watch

func (r *Receiver) Watch(input <-chan []byte)

Watch add a new channel to be joined Call Watch after Receive is a panic

type SimplePipeline

type SimplePipeline struct {
	// contains filtered or unexported fields
}

SimplePipeline default value is unusable, you must create it with NewSimplePipeline

func (*SimplePipeline) Nodes

func (p *SimplePipeline) Nodes() []*Node

Nodes return all instances of *Node

func (*SimplePipeline) Run

func (p *SimplePipeline) Run(ctx context.Context) error

Run init pipeline proccesing, return an error!= nil if any Node fail

func (*SimplePipeline) Stats

func (p *SimplePipeline) Stats() map[string]Stats

Stats returns a map with all nodes Stats object

type Stats added in v0.3.0

type Stats struct {
	Time          time.Time
	Sent          int64
	SentBytes     int64
	Received      int64
	ReceivedBytes int64
}

type Worker

type Worker interface {
	//Process must close write only channel
	Process(ctx context.Context, args ProcessArgs) error
}

Worker is standard interface implemented by proccessors, is used to build pipeline nodes All Worker implementations must meet the following conditions On close input channel, Process must finalize its work gracefully, and return nil On context cancellation, Process finalize ASAP and return context.Cancelled On finish, Process must close output channel and return error or nil

Directories

Path Synopsis
examples
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL