transform

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2024 License: Apache-2.0 Imports: 6 Imported by: 0

README

Golang Data Transforms SDK

This module contains a library for writing Data Transforms within Redpanda. It supports both tinygo and GOOS=wasip1 for standard golang.

Documentation

Overview

Package transform is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.

This library provides a framework for transforming records written within Redpanda from an input to an output topic. This version of the SDK is compatible with Redpanda 24.1 or greater.

Example (IdentityTransform)

This example shows the basic usage of the package: This is a "transform" that does nothing but copies the same data to an new topic.

package main

import (
	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows the basic usage of the package:
// This is a "transform" that does nothing but copies the same data to an new
// topic.
func main() {
	// Other setup can happen here, such as setting up lookup tables,
	// initializing reusable buffers, reading environment variables, etc.

	// Make sure to register your callback so Redpanda knows which
	// function to invoke when records are written
	transform.OnRecordWritten(mirrorTransform)
}

// This will be called for each record in the source topic.
//
// The records written to w be output to the destination topic.
func mirrorTransform(e transform.WriteEvent, w transform.RecordWriter) error {
	return w.Write(e.Record())
}
Output:

Example (RegularExpressionFilter)

This example shows a filter that uses a regexp to filter records from one topic into another. The filter can be determined when the transform is deployed by using environment variables to specify the pattern.

package main

import (
	"os"
	"regexp"

	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

var (
	re         *regexp.Regexp = nil
	checkValue bool           = false
)

// This example shows a filter that uses a regexp to filter records from
// one topic into another. The filter can be determined when the transform
// is deployed by using environment variables to specify the pattern.
func main() {
	// setup the regexp
	pattern, ok := os.LookupEnv("PATTERN")
	if !ok {
		panic("Missing PATTERN variable")
	}
	re = regexp.MustCompile(pattern)
	mk, ok := os.LookupEnv("MATCH_VALUE")
	checkValue = ok && mk == "1"

	transform.OnRecordWritten(doRegexFilter)
}

func doRegexFilter(e transform.WriteEvent, w transform.RecordWriter) error {
	var b []byte
	if checkValue {
		b = e.Record().Value
	} else {
		b = e.Record().Key
	}
	if b == nil {
		return nil
	}
	pass := re.Match(b)
	if pass {
		return w.Write(e.Record())
	} else {
		return nil
	}
}
Output:

Example (Transcoding)

This example shows a transform that converts CSV into JSON.

package main

import (
	"bytes"
	"encoding/csv"
	"encoding/json"
	"errors"
	"io"
	"strconv"

	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows a transform that converts CSV into JSON.
func main() {
	transform.OnRecordWritten(csvToJsonTransform)
}

type Foo struct {
	A string `json:"a"`
	B int    `json:"b"`
}

func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error {
	// The input data is a CSV (without a header row) that is the structure of:
	// key, a, b
	// This transform emits each row in that CSV as JSON.
	reader := csv.NewReader(bytes.NewReader(e.Record().Value))
	// Improve performance by reusing the result slice.
	reader.ReuseRecord = true
	for {
		row, err := reader.Read()
		if err == io.EOF {
			break
		} else if err != nil {
			return err
		}
		if len(row) != 3 {
			return errors.New("unexpected number of rows")
		}
		// Convert the last column into an int
		b, err := strconv.Atoi(row[2])
		if err != nil {
			return err
		}
		// Marshal our JSON value
		f := Foo{
			A: row[1],
			B: b,
		}
		v, err := json.Marshal(&f)
		if err != nil {
			return err
		}
		// Add our output record using the first column as the key.
		r := transform.Record{
			Key:   []byte(row[0]),
			Value: v,
		}
		if err := w.Write(r); err != nil {
			return err
		}
	}
	return nil
}
Output:

Example (Validation)

This example shows the basic usage of the package: This is a transform that validates the data is valid JSON, and outputs invalid JSON to a dead letter queue.

package main

import (
	"encoding/json"

	"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// This example shows the basic usage of the package:
// This is a transform that validates the data is valid JSON,
// and outputs invalid JSON to a dead letter queue.
func main() {
	transform.OnRecordWritten(jsonValidate)
}

// This will be called for each record in the source topic.
func jsonValidate(e transform.WriteEvent, w transform.RecordWriter) error {
	if json.Valid(e.Record().Value) {
		// Write the valid records to the "default" output topic, this is the
		// first output topic specified in the configuration.
		return w.Write(e.Record())
	}
	// If a record does not contain valid JSON then route it to another topic for
	// triage and debugging.
	return w.Write(e.Record(), transform.ToTopic("invalid_json"))
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func OnRecordWritten

func OnRecordWritten(fn OnRecordWrittenCallback)

OnRecordWritten registers a callback to be fired when a record is written to the input topic.

OnRecordWritten should be called in a package's `main` function to register the transform function that will be applied.

Types

type OnRecordWrittenCallback

type OnRecordWrittenCallback func(e WriteEvent, w RecordWriter) error

OnRecordWrittenCallback is a callback to transform records after a write event happens in the input topic.

type Record

type Record struct {
	// Key is an optional field.
	Key []byte
	// Value is the blob of data that is written to Redpanda.
	Value []byte
	// Headers are client specified key/value pairs that are
	// attached to a record.
	Headers []RecordHeader
	// Attrs is the attributes of a record.
	//
	// Output records should leave these unset.
	Attrs RecordAttrs
	// The timestamp associated with this record.
	//
	// For output records this can be left unset as it will
	// always be the same value as the input record.
	Timestamp time.Time
	// The offset of this record in the partition.
	//
	// For output records this field is left unset,
	// as it will be set by Redpanda.
	Offset int64
}

Record is a record that has been written to Redpanda.

type RecordAttrs

type RecordAttrs struct {
	// contains filtered or unexported fields
}

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

Headers are optional key/value pairs that are passed along with records.

type RecordWriter

type RecordWriter interface {
	// Write writes a record to the output topic.
	//
	// When writing a record, only the key, value and headers are
	// used other information like the timestamp will be overridden
	// by the broker.
	//
	// WriteOpts can be added to control where records go, for example to another topic.
	Write(Record, ...WriteOpt) error
}

RecordWriter is an interface for writing transformed records to the destination topic.

type WriteEvent

type WriteEvent interface {
	// Access the record associated with this event
	Record() Record
}

WriteEvent contains information about the write that took place, namely it contains the record that was written.

type WriteOpt added in v1.0.0

type WriteOpt interface {
	// contains filtered or unexported methods
}

WriteOpt is an option to modify a Write.

func ToTopic added in v1.0.0

func ToTopic(topic string) WriteOpt

ToTopic specifies the output topic that the record will be written to.

Directories

Path Synopsis
internal
rwbuf
A buffer with a reader and writer index that gives access to the underlying array.
A buffer with a reader and writer index that gives access to the underlying array.
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL