Documentation
¶
Overview ¶
Package transform is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.
This library provides a framework for transforming records written within Redpanda from an input to an output topic. This version of the SDK is compatible with Redpanda 24.1 or greater.
Example (IdentityTransform) ¶
This example shows the basic usage of the package: This is a "transform" that does nothing but copies the same data to an new topic.
package main
import (
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
// This example shows the basic usage of the package:
// This is a "transform" that does nothing but copies the same data to an new
// topic.
func main() {
// Other setup can happen here, such as setting up lookup tables,
// initializing reusable buffers, reading environment variables, etc.
// Make sure to register your callback so Redpanda knows which
// function to invoke when records are written
transform.OnRecordWritten(mirrorTransform)
}
// This will be called for each record in the source topic.
//
// The records written to w be output to the destination topic.
func mirrorTransform(e transform.WriteEvent, w transform.RecordWriter) error {
return w.Write(e.Record())
}
Output:
Example (RegularExpressionFilter) ¶
This example shows a filter that uses a regexp to filter records from one topic into another. The filter can be determined when the transform is deployed by using environment variables to specify the pattern.
package main
import (
"os"
"regexp"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
var (
re *regexp.Regexp = nil
checkValue bool = false
)
// This example shows a filter that uses a regexp to filter records from
// one topic into another. The filter can be determined when the transform
// is deployed by using environment variables to specify the pattern.
func main() {
// setup the regexp
pattern, ok := os.LookupEnv("PATTERN")
if !ok {
panic("Missing PATTERN variable")
}
re = regexp.MustCompile(pattern)
mk, ok := os.LookupEnv("MATCH_VALUE")
checkValue = ok && mk == "1"
transform.OnRecordWritten(doRegexFilter)
}
func doRegexFilter(e transform.WriteEvent, w transform.RecordWriter) error {
var b []byte
if checkValue {
b = e.Record().Value
} else {
b = e.Record().Key
}
if b == nil {
return nil
}
pass := re.Match(b)
if pass {
return w.Write(e.Record())
} else {
return nil
}
}
Output:
Example (Transcoding) ¶
This example shows a transform that converts CSV into JSON.
package main
import (
"bytes"
"encoding/csv"
"encoding/json"
"errors"
"io"
"strconv"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
// This example shows a transform that converts CSV into JSON.
func main() {
transform.OnRecordWritten(csvToJsonTransform)
}
type Foo struct {
A string `json:"a"`
B int `json:"b"`
}
func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error {
// The input data is a CSV (without a header row) that is the structure of:
// key, a, b
// This transform emits each row in that CSV as JSON.
reader := csv.NewReader(bytes.NewReader(e.Record().Value))
// Improve performance by reusing the result slice.
reader.ReuseRecord = true
for {
row, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}
if len(row) != 3 {
return errors.New("unexpected number of rows")
}
// Convert the last column into an int
b, err := strconv.Atoi(row[2])
if err != nil {
return err
}
// Marshal our JSON value
f := Foo{
A: row[1],
B: b,
}
v, err := json.Marshal(&f)
if err != nil {
return err
}
// Add our output record using the first column as the key.
r := transform.Record{
Key: []byte(row[0]),
Value: v,
}
if err := w.Write(r); err != nil {
return err
}
}
return nil
}
Output:
Example (Validation) ¶
This example shows the basic usage of the package: This is a transform that validates the data is valid JSON, and outputs invalid JSON to a dead letter queue.
package main
import (
"encoding/json"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)
// This example shows the basic usage of the package:
// This is a transform that validates the data is valid JSON,
// and outputs invalid JSON to a dead letter queue.
func main() {
transform.OnRecordWritten(jsonValidate)
}
// This will be called for each record in the source topic.
func jsonValidate(e transform.WriteEvent, w transform.RecordWriter) error {
if json.Valid(e.Record().Value) {
// Write the valid records to the "default" output topic, this is the
// first output topic specified in the configuration.
return w.Write(e.Record())
}
// If a record does not contain valid JSON then route it to another topic for
// triage and debugging.
return w.Write(e.Record(), transform.ToTopic("invalid_json"))
}
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OnRecordWritten ¶
func OnRecordWritten(fn OnRecordWrittenCallback)
OnRecordWritten registers a callback to be fired when a record is written to the input topic.
OnRecordWritten should be called in a package's `main` function to register the transform function that will be applied.
Types ¶
type OnRecordWrittenCallback ¶
type OnRecordWrittenCallback func(e WriteEvent, w RecordWriter) error
OnRecordWrittenCallback is a callback to transform records after a write event happens in the input topic.
type Record ¶
type Record struct {
// Key is an optional field.
Key []byte
// Value is the blob of data that is written to Redpanda.
Value []byte
// Headers are client specified key/value pairs that are
// attached to a record.
Headers []RecordHeader
// Attrs is the attributes of a record.
//
// Output records should leave these unset.
Attrs RecordAttrs
// The timestamp associated with this record.
//
// For output records this can be left unset as it will
// always be the same value as the input record.
Timestamp time.Time
// The offset of this record in the partition.
//
// For output records this field is left unset,
// as it will be set by Redpanda.
Offset int64
}
Record is a record that has been written to Redpanda.
type RecordAttrs ¶
type RecordAttrs struct {
// contains filtered or unexported fields
}
type RecordHeader ¶
Headers are optional key/value pairs that are passed along with records.
type RecordWriter ¶
type RecordWriter interface {
// Write writes a record to the output topic.
//
// When writing a record, only the key, value and headers are
// used other information like the timestamp will be overridden
// by the broker.
//
// WriteOpts can be added to control where records go, for example to another topic.
Write(Record, ...WriteOpt) error
}
RecordWriter is an interface for writing transformed records to the destination topic.
type WriteEvent ¶
type WriteEvent interface {
// Access the record associated with this event
Record() Record
}
WriteEvent contains information about the write that took place, namely it contains the record that was written.
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
rwbuf
A buffer with a reader and writer index that gives access to the underlying array.
|
A buffer with a reader and writer index that gives access to the underlying array. |
|
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.
|
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms. |