process

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2023 License: MIT Imports: 33 Imported by: 0

README

process

Contains interfaces and methods for processing data. Processors contain methods for processing data singletons and batches (slices).

Documentation

Overview

todo(v1.0.0): remove this processor

Example (Applier)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "baz"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// applier is retrieved from the factory
	applier, err := process.NewApplier(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// applier is applied to capsule
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"foo":"fizz"}`))

	capsule, err = applier.Apply(ctx, capsule)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(string(capsule.Data()))
}
Output:

{"foo":"fizz","bar":"fizz"}
Example (Batcher)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// batcher is retrieved from the factory
	batcher, err := process.NewBatcher(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// batcher is applied to capsules
	var capsules []config.Capsule
	capsule := config.NewCapsule()

	// {"foo":"fizz","bar":"buzz"}
	for i := 1; i < 3; i++ {
		_ = capsule.Set("foo", "fizz")
		_ = capsule.Set("bar", "buzz")
		capsules = append(capsules, capsule)
	}

	capsules, err = batcher.Batch(context.TODO(), capsules...)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, c := range capsules {
		fmt.Println(string(c.Data()))
	}

}
Output:

{"foo":"fizz","bar":"fizz"}
{"foo":"fizz","bar":"fizz"}
Example (DNS)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"addr":"8.8.8.8"}`))

	// apply a reverse_lookup DNS query to addr
	cfg := []config.Config{
		{
			Type: "dns",
			Settings: map[string]interface{}{
				"key":     "addr",
				"set_key": "domains",
				"options": map[string]interface{}{
					"function": "reverse_lookup",
				},
			},
		},
	}

	appliers, err := process.NewAppliers(ctx, cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(context.TODO(), appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(context.TODO(), capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	fmt.Println(string(capsule.Data()))
}
Output:

Example (HTTP)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"addr":"8.8.8.8"}`))

	// enriches the IP address by querying the GreyNoise Community API.
	// authenticating to GreyNoise is accomplished by interpolating a
	// secret inside an HTTP header. refer to the GreyNoise documentation
	// for more information:
	// https://docs.greynoise.io/reference/get_v3-community-ip.
	cfg := []config.Config{
		{
			Type: "http",
			Settings: map[string]interface{}{
				"key": "addr",
				// the HTTP response body is written to this key
				"set_key": "greynoise",
				"options": map[string]interface{}{
					"method": "get",
					// the value from "addr" is interpolated into ${data}
					"url": "https://api.greynoise.io/v3/community/${data}",
					"headers": []map[string]interface{}{
						{
							"key": "key",
							// this secret must be stored in the environment
							// variable GREYNOISE_API
							"value": "${SECRETS_ENV:GREYNOISE_API}",
						},
					},
				},
			},
		},
	}

	appliers, err := process.NewAppliers(ctx, cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(ctx, appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(ctx, capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	// sample output (which may change day to day)
	// {"addr":"8.8.8.8","greynoise":{"ip":"8.8.8.8","noise":false,"riot":true,"classification":"benign","name":"Google Public DNS","link":"https://viz.greynoise.io/riot/8.8.8.8","last_seen":"2023-01-30","message":"Success"}}
	fmt.Println(string(capsule.Data()))
}
Output:

Example (IPDatabase)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"addr":"8.8.8.8"}`))

	// lookup addr in MaxMind City database
	cfg := []config.Config{
		{
			Type: "ip_database",
			Settings: map[string]interface{}{
				"key":     "addr",
				"set_key": "geo",
				"options": map[string]interface{}{
					"type": "maxmind_city",
					"settings": map[string]interface{}{
						// the location of the IP enrichment database can
						// be either a path on local disk, an HTTP(S) URL,
						// or an AWS S3 URL
						"database": "location://path/to/maxmind.mmdb",
						"language": "en",
					},
				},
			},
		},
	}

	appliers, err := process.NewAppliers(ctx, cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(ctx, appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(ctx, capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	fmt.Println(string(capsule.Data()))
}
Output:

Example (KVStore)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"foo":"bar"}`))

	// the value from key "foo" is first set into the KV store and
	// then retrieved from the KV store and set into key "baz". if
	// the KV options are identical across processors, then the same
	// KV store is used in each call. this also allows for the use of
	// multiple KV stores.
	cfg := []config.Config{
		{
			Type: "kv_store",
			Settings: map[string]interface{}{
				"key":     "foo",
				"set_key": "setter",
				"options": map[string]interface{}{
					"type": "set",
					"kv_options": map[string]interface{}{
						"type": "memory",
						"settings": map[string]interface{}{
							"capacitiy": 10,
						},
					},
				},
			},
		},
		{
			Type: "kv_store",
			Settings: map[string]interface{}{
				"key":     "setter",
				"set_key": "baz",
				"options": map[string]interface{}{
					"type": "get",
					"kv_options": map[string]interface{}{
						"type": "memory",
						"settings": map[string]interface{}{
							"capacitiy": 10,
						},
					},
				},
			},
		},
	}

	appliers, err := process.NewAppliers(ctx, cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(ctx, appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(ctx, capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	fmt.Println(string(capsule.Data()))
}
Output:

Example (Streamer)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
	"golang.org/x/sync/errgroup"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "a" into key "z"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "a",
			"set_key": "z",
		},
	}

	// streamer is retrieved from the factory
	streamer, err := process.NewStreamer(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// setup an error group and channels for streaming data
	group, ctx := errgroup.WithContext(ctx)
	in, out := config.NewChannel(), config.NewChannel()

	// first, the streamer goroutine is started
	group.Go(func() error {
		if err := streamer.Stream(ctx, in, out); err != nil {
			panic(err)
		}

		return nil
	})

	// second, the reader goroutine is started. reading must precede
	// writing to prevent blocking the main goroutine.
	group.Go(func() error {
		for capsule := range out.C {
			fmt.Println(string(capsule.Data()))
		}

		return nil
	})

	// last, writer sends data to the streamer.
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"a":"b"}`))

	in.Send(capsule)
	in.Close()

	// an error in any streamer will cause the entire pipeline, including sending
	// to the sink, to fail.
	if err := group.Wait(); err != nil {
		panic(err)
	}

}
Output:

{"a":"b","z":"b"}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Apply added in v0.4.0

func Apply(ctx context.Context, capsule config.Capsule, appliers ...Applier) (config.Capsule, error)

Apply applies processors in series to encapsulated data.

func ApplyBytes added in v0.8.0

func ApplyBytes(ctx context.Context, data []byte, appliers ...Applier) ([]byte, error)

ApplyBytes is a convenience function for applying processors in series to bytes.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// applier is retrieved from the factory
	applier, err := process.NewApplier(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// applier is applied to bytes
	b := []byte(`{"foo":"fizz"}`)
	b, err = process.ApplyBytes(context.TODO(), b, applier)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(string(b))
}
Output:

{"foo":"fizz","bar":"fizz"}

func Batch added in v0.8.0

func Batch(ctx context.Context, batch []config.Capsule, batchers ...Batcher) ([]config.Capsule, error)

Batch accepts one or more batchers and applies processors in series to encapsulated data.

func BatchBytes added in v0.8.0

func BatchBytes(ctx context.Context, data [][]byte, batchers ...Batcher) ([][]byte, error)

BatchBytes is a convenience function for applying processors in series to bytes.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// batcher is retrieved from the factory
	batcher, err := process.NewBatcher(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// applier is applied to slice of bytes
	b := [][]byte{[]byte(`{"foo":"fizz"}`)}
	b, err = process.BatchBytes(context.TODO(), b, batcher)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, e := range b {
		fmt.Println(string(e))
	}
}
Output:

{"foo":"fizz","bar":"fizz"}

func CloseAppliers added in v0.8.0

func CloseAppliers(ctx context.Context, appliers ...Applier) error

CloseAppliers closes all appliers and returns an error if any close fails.

func CloseBatchers added in v0.8.0

func CloseBatchers(ctx context.Context, batchers ...Batcher) error

CloseBatchers closes all batchers and returns an error if any close fails.

func CloseStreamers added in v0.9.1

func CloseStreamers(ctx context.Context, streamers ...Streamer) error

CloseStreamers closes all streamers and returns an error if any close fails.

Types

type Applier added in v0.8.0

type Applier interface {
	Apply(context.Context, config.Capsule) (config.Capsule, error)
	Close(context.Context) error
}

func NewApplier added in v0.8.0

func NewApplier(ctx context.Context, cfg config.Config) (Applier, error)

NewApplier returns a configured Applier from a processor configuration.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// applier is retrieved from the factory
	applier, err := process.NewApplier(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(applier)
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

func NewAppliers added in v0.8.0

func NewAppliers(ctx context.Context, cfg ...config.Config) ([]Applier, error)

NewAppliers accepts one or more processor configurations and returns configured appliers.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// one or more appliers are created
	appliers, err := process.NewAppliers(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, app := range appliers {
		fmt.Println(app)
	}
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

type Batcher added in v0.8.0

type Batcher interface {
	Batch(context.Context, ...config.Capsule) ([]config.Capsule, error)
	Close(context.Context) error
}

func NewBatcher added in v0.8.0

func NewBatcher(ctx context.Context, cfg config.Config) (Batcher, error)

NewBatcher returns a configured Batcher from a processor configuration.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// one or more appliers are created
	batcher, err := process.NewBatcher(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(batcher)
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

func NewBatchers added in v0.8.0

func NewBatchers(ctx context.Context, cfg ...config.Config) ([]Batcher, error)

NewBatchers accepts one or more processor configurations and returns configured batchers.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// one or more batchers are created
	batchers, err := process.NewBatchers(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, bat := range batchers {
		fmt.Println(bat)
	}
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

type Streamer added in v0.9.1

type Streamer interface {
	Stream(context.Context, *config.Channel, *config.Channel) error
	Close(context.Context) error
}

func NewStreamer added in v0.9.1

func NewStreamer(ctx context.Context, cfg config.Config) (Streamer, error)

NewStreamer returns a configured Streamer from a processor configuration.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "a" into key "z"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "a",
			"set_key": "z",
		},
	}

	// create a streamer from the config
	streamer, err := process.NewStreamer(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(streamer)
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"a","set_key":"z","ignore_close":false,"ignore_errors":false}

func NewStreamers added in v0.9.1

func NewStreamers(ctx context.Context, cfg ...config.Config) ([]Streamer, error)

NewStreamers accepts one or more processor configurations and returns configured streamers.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	ctx := context.TODO()

	// copies the value of key "a" into key "z"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "a",
			"set_key": "z",
		},
	}

	// one or more streamers are created
	streamers, err := process.NewStreamers(ctx, cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, s := range streamers {
		fmt.Println(s)
	}
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"a","set_key":"z","ignore_close":false,"ignore_errors":false}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL