go4data

package module
v0.0.0-...-4214274 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2021 License: MIT Imports: 17 Imported by: 0

README

Go4Data

Automate all things
codecov go report MIT license GitHub stars Go

About Go4Data

Go4Data is a data processing tool.
The idea behind Go4Data is that you should be able to create automated concurrent data processing flows.

Go4Data is still under heavy development!

There are a few components that one would need to know more about to develop with Go4Data. But a regular user should be able to use Go4Data without too much knowledge about different indepth knowledge. To learn more about components and what they do, view Components

Go4Data is built around Processors that is a component used by Go4Data to handle the data pipeline.
It is the processor that handels starting/stopping and making things work. The idea in Go4Data is too try to make it as seamless as possible.

All Processors has to have an Handler assigned before it can start processing any data. It is the handler that contains processing capabilities. The goal of handlers is to make them as seamless as possible.

An example of how Go4Data is intended to work with its pubsub system and handlers doing processing seamless . Screenshot

Some people like UML, so I've used Dumels, great job those who made it. Here you can see a UML of the project. Dumels

Installation

go get github.com/percybolmer/go4data

Usage

There are currently 3 different ways of using Go4Data. You can either load processors from a yaml file or you can initialize them by hand. Loading from yaml is the recommended way to avoid alot of coding.

  1. Use the Tooling, there is a Go4Data runner that loads a yaml. This is the most easy to use way, but offers limited flexibility.
  2. Use the package in custom codebase. You can generate processors and apply handlers to them and use those to do things. Forexample if your intressted in monitoring a directory for new files and read the contents you could do that and subscribe to the output.
  3. Use the loader to load a yaml file in your code and run them.

See examples folder for examples.
Csv files to Elasticsearch with Filtering
Creating processor and subscribing to output
Using Redis as the Pub/Sub engine

Components in Go4Data

Below is a more indepth explaination of all the components that are found in Go4Data.

Processors

Processor is the default component that is used. It is used to make a standarized way of handeling the dataflow, error handeling and metrics.

A processor consists of the following fields

ID - which is a unique ID that each processor should have. This is done automagically when running NewProcessor Name - This is a name of the processor, this does not have to be unique, its usage is mainly for the upcomming UI. FailureHandler - is the assigned way of handeling errors that occur during processing. See FailureHandler.
Handler - is the processing action to apply, this determines what the processor should be doing. See Handler for more information, and see HandlerList.
Subscriptions - is all the topics to listen for data on.
Topics - is where to send data after processing it.
QueueSize - is how many payloads are allowed to be on queue in the Processor. This is to limit and avoid memory burning if a topic isnt drained.
Metric - is stored by both the Handler and Processor. The handler will inherit the Processors set metric. The default metric is Prometheus. But this can be changed by the user by setting a new metricProvider.
Workers - is how many concurrent workers the handler is allowed to run. Modify this only if you want to increase the amount of goroutines your handler should run. This can be increased to make certain handlers work faster, but remember that it can also slow things down if you set too many.

Handler

Handler is the data processing unit that will actually do any work. Any struct that fulfills the handler interface is allowed to be used by a Processor.
Handler is an golang interface that looks like this

// Handler is a interface that allows users to create structs with certain functions attached that can be used inside a processor
// to handle payloads between them.
type Handler interface {
	// Handle is the function that will be performed on the incomming Payloads
	// topics is the topics to push output onto
	Handle(ctx context.Context, payload payload.Payload, topics ...string) error
	// ValidateConfiguration is used to make sure everything that is needed by the handler is set
	ValidateConfiguration() (bool, []string)
	// GetConfiguration will return the configuration slice
	GetConfiguration() *property.Configuration
	//GetHandlerName will return the name used to reference a handler
	GetHandlerName() string
	// Subscriptionless should return true/false wether the handler itself is a self generating handler
	// This is true for handlers like ListDirectory etc, which does not need
	// any inputs to function
	// Setting Subscriptionless to true will actually disable the processor needing subscriptions to work and rely on the Handler to publish itself
	Subscriptionless() bool
	// SetMetricProvider is a function that is used to set a metric provider to a handler.
	// This should be used if you want to output metrics from your handler. Bydefault we use prometheusprovider as a metric provider.
	// A unique prefix also has to be attached since we dont want handler metrics to collide. Bydefault most Processors use Processor.Name + Processor.ID
	SetMetricProvider(p metric.Provider, prefix string) error
	// GetErrorChannel() chan error
	GetErrorChannel() chan error
}

Users can write their own Handlers if they want to add functionality.
The easiest way to start writing a handler is to take a look at handlergenerator

Payload

Payload is the items that are sent inside the data pipeline.
Items that are transferred between Processors are called Payloads. It is also interface based, so it is highly customizable and easy to create new payloads. Too take a look at the currently available payloads see payload

Payload is a interface that looks like

// Payload is a interface that will allows different Processors to send data between them in a unified fashion
type Payload interface {
	// GetPayloadLength returns the payload length in flota64
	GetPayloadLength() float64
	// GetPayload will return a byte array with the Payload from the ingress
	// Payload should be limited to 512 MB since thats the MAX cap for a redis payload
	// Also note that JSON payloads will be base64 encoded
	GetPayload() []byte
	// SetPayload will change the values of the payload
	SetPayload([]byte)
	// GetSource should return a string containing the name of the source, etc for a file its the filename or the recdis queue topic
	GetSource() string
	// SetSource should change the value of the source
	SetSource(string)
	// GetMetaData should return a configuration object that contains metadata about the payload
	GetMetaData() *property.Configuration
}

It is possible for Payloads to run through the Filter. This is a handler that will remove payloads that does not fulfill any filter requirements. Filters are regexpes that can be run on the payload.
If a payload is gonna be passed through the Filter, they needed to be part of the Filterable interface.

// Filterable is a interface that is used to apply Filters to payloads
type Filterable interface {
	ApplyFilter(f *Filter) bool
}

Properties

Properties are configurations that are applied to Handlers.
This is a way of configuring Handlers in a standard way. The Property is set by the Handlers.
A property can be a Required property, which means that a Handler will not start if this property does not contain a correct value. And a property can ofcourse be a nonrequired property, which is an optional configuration.

It is up to the Handler to make sure that all properties are accounted for, and this is done in ValidateConfiguartion for each handler.

Inside the property package there is also a struct called Configuration.
Configuration is used to easier handle Properties inside a Handler.

Metrics

Each Processor has a MetricProvider set that is inherited by the Handlers. The goal of a metricprovider is to enable Handlers and Processors to publish metrics about their processing.
The default is Prometheus metrics, unless changed.

Pubsub

Payloads are transported between Handlers by using a Publish/Subscription model.
The main idea is that when processing is done, a payload is published onto a Topic, or Topics. The topics that will be published to is assigned when initializing the Processor with NewProcessor.

For another Processor to receive the published payloads, they have to Subscribe on the topics.

Currently there are two supported Pub/Sub engines that Go4Data can use. It has a DefaultEngine that is set by default and no configuration is needed. There is also a RedisEngine that allows the user to instead use Redis.

DefaultEngine - Used by default, works great for single node data flows. RedisEngine - Can be configured to be used, works best if you have multiple Go4Data nodes that all should Pub/Sub on the same Topics.

Failures

So once in a while, a Processor or Handler may experience errors. This is ofcourse something that wants to be noticed.
A wrapper around the regular error is used in Go4Data, to add some context and posibility to recreate errors.

// Failure is the Go4Datas Custom error handeling struct
// It contains Error and some meta data about what Processor that triggerd the error
type Failure struct {
	// Err is the error that occurred
	Err error `json:"error"`
	// Payload is the payload that was being processed when a Failure occurred
	Payload payload.Payload `json:"payload"`
	// Processor is the UUID of the procesor that triggers the Error
	Processor uint `json:"processor"`
}

Failures are handled by the Proccessors assigned FailureHandler.
A failurehandler is a simple function that can easily be changed by the user. The default failurehandler is PrintFailure which will output the Payload into stdout.

The failurehandler looks like

FailureHandler func(f Failure)

Loader

The loader is used to load go4data yaml configurations into ready-to-use processors. It can also be used to Save configured processors.

The usage is fairly easy. Example of loading a yml and then saving it again

    loadedProcessors, err := go4data.Load("testing/loader/loadthis.yml")
    if err != nil {
		t.Fatal(err)
    }
    
   	go4data.Save("testing/loader/loadthis.yml", loadedProcessorss)

Tooling

Running a Go4Data yaml

If only interessted in using go4data as a CLI tool then use runner.

After you have downloaded go4data inside that folder and run

go build -o runner
./runner -go4data /path/to/go4data.yml -port 2112

The port is where to host Prometheus metrics, currently runner only has support for prometheus.

Building a new Handler

To build a handler one should look at Handler to learn what a Handler is. Any struct that fullfills the Handler interface can be assigned to a Processor.

To help in building new handlers there is a tooling that will generate a fresh handler for you, the tool can be found here.

This is a code generator that can be used to build a template Handler for you. Compile the code generator by going into the tooling folder after downloading the source. run

go build -o handlergenerator
./handlergenerator -package $YOURHANDLERPACKAGE -location $HANDLERPACKAGEPATH -templatepath $PATHTOHANDLERTEMPLATE -handler $HANDLERNAME

You should now see a new Handler that is generated and be able to use it. Offcourse, you still have to do some coding, The generated handler will only print stdout. View other handlers to see how they are setup.

Example when Creating the pcap reader I ran

handlergenerator -package network -location handlers/network -handler OpenPcap

The tooling will use HANDLERGENERATORPATH environment variable to know where the templates are if not specified. The templates can be found Template Location

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Documentation

Overview

Package go4data has failures that ca nbe sent by Processors to either logging or just stdout etc

Package go4data is a package that is used to create procescors that runs any kind of handler on a payload flow The payloads will be transferred between processors that has a relationship assigned

Index

Constants

This section is empty.

Variables

View Source
var (
	//IDCounter is used to make sure no processors are generated with a ID that already exists
	IDCounter uint = 1
	// DefaultQueueSize is a limit set to define how many payloads can be sent in queue
	DefaultQueueSize = 1000

	//ErrProcessorHasNoHandlerApplied is when starting a processor that has a nil Handler
	ErrProcessorHasNoHandlerApplied = errors.New("the processor has no Handler set. Please assign a Handler to it before running")
	//ErrNilContext not allowed
	ErrNilContext = errors.New("nil context is not allowed when starting a processor")
	//ErrProcessorAlreadyStopped is when trying to stop a processor that is alrady stopped
	ErrProcessorAlreadyStopped = errors.New("the processor is already stopped")
	//ErrRequiredPropertiesNotFulfilled is when trying to start a Handler but it needs additional properties
	ErrRequiredPropertiesNotFulfilled = errors.New("the Handler needs additional properties to work, see the Handlers documentation")
	//ErrHandlerDoesNotAcceptPublishers is when trying to register an publisher to a processor that has a selfpublishing Handler
	ErrHandlerDoesNotAcceptPublishers = errors.New("the used Handler does not allow publishers")
	//ErrDuplicateTopic is when trying to register an duplicate TOPIC to publish to
	ErrDuplicateTopic = errors.New("the topic is already registered")
	// ErrFailedToUnmarshal is thrown when trying to unmarshal go4datas but it fails
	ErrFailedToUnmarshal = errors.New("failed to unmarshal since data provided is not correct")
)
View Source
var (
	//ErrIngressRelationshipNeeded is when a processor isn't getting the needed ingress
	ErrIngressRelationshipNeeded = errors.New("the processor needs an ingress to properly run")
)

Functions

func NewID

func NewID() uint

NewID is used to generate a new ID

func PrintFailure

func PrintFailure(f Failure)

PrintFailure is a FailureHandler

func Save

func Save(path string, data interface{}) error

Save takes care of storing a yaml config of data

Types

type Failure

type Failure struct {
	// Err is the error that occurred
	Err error `json:"error"`
	// Payload is the payload that was being processed when a Failure occurred
	Payload payload.Payload `json:"payload"`
	// Processor is the UUID of the procesor that triggers the Error
	Processor uint `json:"processor"`
}

Failure is the go4datas Custom error handeling struct It contains Error and some meta data about what Processor that triggerd the error

type LoaderHandler

type LoaderHandler struct {
	Cfg  *property.Configuration `json:"configs" yaml:"configs"`
	Name string                  `json:"handler" yaml:"handler_name"`
}

LoaderHandler is a Handler thats easier to save/load

type LoaderProccessor

type LoaderProccessor struct {
	// ID is a unique identifier for each processor,
	ID uint `json:"id" yaml:"id"`
	// Name is a user configured Name for a processor that can be used relatd to an Processor easier than an ID, cannot be duplicate tho
	// It will be changed to be duplicates later, but for now PrometheusMetrics crashes.
	Name string `json:"name" yaml:"name"`
	// Running is a boolean indicator if the processor is currently Running
	Running bool `json:"running" yaml:"running"`
	// Workers is a int that represents how many concurrent handlers to run
	Workers int `json:"workers" yaml:"workers"`
	// Topics is the Topics to publish payload onto
	Topics []string `json:"topics" yaml:"topics"`
	// Subscriptions is the Topics to subscribe to
	Subscriptions []string `json:"subscriptions" yaml:"subscriptions"`
	// QueueSize is a integer of how many payloads are accepted on the Output channels to Subscribers
	QueueSize int `json:"queuesize" yaml:"queuesize"`
	// LoaderHandler is a Handler that can be loaded/saved
	Handler LoaderHandler `json:"loaderhandler" yaml:"handler"`
}

LoaderProccessor is used to load/save processors

func (*LoaderProccessor) ConvertToProcessor

func (la *LoaderProccessor) ConvertToProcessor() (*Processor, error)

ConvertToProcessor is used to convert a Loader back into a Processor thats Runnable.

type Processor

type Processor struct {
	// ID is a unique identifier for each processor,
	ID uint `json:"id" yaml:"id"`
	// Name is a user configured Name for a processor that can be used relatd to an Processor easier than an ID
	Name string `json:"name" yaml:"name"`
	// Running is a boolean indicator if the processor is currently Running
	Running bool `json:"running" yaml:"running"`
	// Workers is a int that determines how many Concurrent workers the processor should run
	Workers int `json:"workers yaml:"workers"`
	// FailureHandler is the failurehandler to use with the Processor
	FailureHandler func(f Failure) `json:"-" yaml:"-"`
	// Handler is the handler to Perform on the Payload  received
	Handler handlers.Handler `json:"handler" yaml:"handler"`

	// Topics is the Topics to publish payload onto
	Topics []string `json:"topics" yaml:"topics"`
	// QueueSize is a integer of how many payloads are accepted on the Output channels to Subscribers
	QueueSize int `json:"queuesize" yaml:"queuesize"`
	// Metric is used to store metrics
	Metric metric.Provider `json:"-" yaml:"-"`

	sync.Mutex `json:"-" yaml:"-"`
	// contains filtered or unexported fields
}

Processor is used to perform an Handler on each Item that is ingressed

func Load

func Load(path string) ([]*Processor, error)

Load will return a slice of processors loaded from a config

func NewProcessor

func NewProcessor(name string, topics ...string) *Processor

NewProcessor is used to spawn a new processor You need to set a registered Handler or it will return an error Topics is a vararg that allows you to insert any topic you want the processor to publish its payloads to

func (*Processor) AddTopics

func (p *Processor) AddTopics(topics ...string) error

AddTopics will add Topics to publish onto

func (*Processor) ConvertToLoader

func (p *Processor) ConvertToLoader() *LoaderProccessor

ConvertToLoader is actually just a way too convert into a savable format

func (*Processor) GetConfiguration

func (p *Processor) GetConfiguration() *property.Configuration

GetConfiguration is just an reacher for Handlers getcfg

func (*Processor) HandleSubscriptionless

func (p *Processor) HandleSubscriptionless(ctx context.Context)

HandleSubscriptionless is used to handle Handlers that has no requirement of subscriptions

func (*Processor) MonitorErrChannel

func (p *Processor) MonitorErrChannel(ctx context.Context)

MonitorErrChannel is used to monitor errorchannel of a handler if its not nil

func (*Processor) SetHandler

func (p *Processor) SetHandler(a handlers.Handler)

SetHandler will change the Handler the Processor performs on incomming payloads Should hot reloading like this be ok? Do we need to Stop / Start the proccessor after?

func (*Processor) SetID

func (p *Processor) SetID(i uint)

SetID is a way to overwrite the generated ID, this is mostly used when Loading Processors from a Daisy file

func (*Processor) SetName

func (p *Processor) SetName(n string)

SetName will change the name of the processor

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start will run a Processor and execute the given Handler on any incomming payloads

func (*Processor) Stop

func (p *Processor) Stop() error

Stop will cancel the goroutines running

func (*Processor) Subscribe

func (p *Processor) Subscribe(topics ...string) error

Subscribe will subscribe to a certain topic and make the Processor Ingest its payloads into it

Directories

Path Synopsis
examples
redisEngine
package runner tool to run go4data files
package runner tool to run go4data files
databases
Package databases is generated by Handlergenerator tooling Make sure to insert real Description here
Package databases is generated by Handlergenerator tooling Make sure to insert real Description here
files
Package files is generated by Handlergenerator tooling Make sure to insert real Description here Package files is generated by Handlergenerator tooling Make sure to insert real Description here Package files is generated by Handlergenerator tooling This Handler is used to print Payloads onto a file
Package files is generated by Handlergenerator tooling Make sure to insert real Description here Package files is generated by Handlergenerator tooling Make sure to insert real Description here Package files is generated by Handlergenerator tooling This Handler is used to print Payloads onto a file
filters
Package filters contains all there is to the Filterable interface.
Package filters contains all there is to the Filterable interface.
network
Package network is generated by Handlergenerator tooling Make sure to insert real Description here Package network is generated by Handlergenerator tooling OpenPcap will open up a pcap and output all network packets to the next processor
Package network is generated by Handlergenerator tooling Make sure to insert real Description here Package network is generated by Handlergenerator tooling OpenPcap will open up a pcap and output all network packets to the next processor
parsers
Package parsers is generated by Handlergenerator tooling Make sure to insert real Description here
Package parsers is generated by Handlergenerator tooling Make sure to insert real Description here
terminal
Package terminal is generated by Handlergenerator tooling Make sure to insert real Description here Package terminal contains Handlers related to STDOUT
Package terminal is generated by Handlergenerator tooling Make sure to insert real Description here Package terminal contains Handlers related to STDOUT
Package payload contains Structs that fulfills payload interface Package payload contains all the payload related stuff Payload is used to hold data between Processors, Payload is just an interface so each Processor could create its own Struct to handle data as long as it fulfills our interface
Package payload contains Structs that fulfills payload interface Package payload contains all the payload related stuff Payload is used to hold data between Processors, Payload is just an interface so each Processor could create its own Struct to handle data as long as it fulfills our interface
Package pubsub contains defaultEngine is the default built-in Channel based engine used to pubsub
Package pubsub contains defaultEngine is the default built-in Channel based engine used to pubsub
tooling
handlergenerator
This tool is used to generate a new Handler and test file It will generate a basic struct based on the super simple template
This tool is used to generate a new Handler and test file It will generate a basic struct based on the super simple template
runner
package runner tool to run go4data files
package runner tool to run go4data files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL