Documentation
¶
Overview ¶
Package transform is the SDK for Redpanda's inline Data Transforms, based on WebAssembly.
This library provides a framework for transforming records written within Redpanda from an input to an output topic. This version of the SDK is compatible with Redpanda 24.1 or greater.
Example (IdentityTransform) ¶
This example shows the basic usage of the package: This is a "transform" that does nothing but copies the same data to an new topic.
package main import ( "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows the basic usage of the package: // This is a "transform" that does nothing but copies the same data to an new // topic. func main() { // Other setup can happen here, such as setting up lookup tables, // initializing reusable buffers, reading environment variables, etc. // Make sure to register your callback so Redpanda knows which // function to invoke when records are written transform.OnRecordWritten(mirrorTransform) } // This will be called for each record in the source topic. // // The records written to w be output to the destination topic. func mirrorTransform(e transform.WriteEvent, w transform.RecordWriter) error { return w.Write(e.Record()) }
Output:
Example (RegularExpressionFilter) ¶
This example shows a filter that uses a regexp to filter records from one topic into another. The filter can be determined when the transform is deployed by using environment variables to specify the pattern.
package main import ( "os" "regexp" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) var ( re *regexp.Regexp = nil checkValue bool = false ) // This example shows a filter that uses a regexp to filter records from // one topic into another. The filter can be determined when the transform // is deployed by using environment variables to specify the pattern. func main() { // setup the regexp pattern, ok := os.LookupEnv("PATTERN") if !ok { panic("Missing PATTERN variable") } re = regexp.MustCompile(pattern) mk, ok := os.LookupEnv("MATCH_VALUE") checkValue = ok && mk == "1" transform.OnRecordWritten(doRegexFilter) } func doRegexFilter(e transform.WriteEvent, w transform.RecordWriter) error { var b []byte if checkValue { b = e.Record().Value } else { b = e.Record().Key } if b == nil { return nil } pass := re.Match(b) if pass { return w.Write(e.Record()) } else { return nil } }
Output:
Example (Transcoding) ¶
This example shows a transform that converts CSV into JSON.
package main import ( "bytes" "encoding/csv" "encoding/json" "errors" "io" "strconv" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows a transform that converts CSV into JSON. func main() { transform.OnRecordWritten(csvToJsonTransform) } type Foo struct { A string `json:"a"` B int `json:"b"` } func csvToJsonTransform(e transform.WriteEvent, w transform.RecordWriter) error { // The input data is a CSV (without a header row) that is the structure of: // key, a, b // This transform emits each row in that CSV as JSON. reader := csv.NewReader(bytes.NewReader(e.Record().Value)) // Improve performance by reusing the result slice. reader.ReuseRecord = true for { row, err := reader.Read() if err == io.EOF { break } else if err != nil { return err } if len(row) != 3 { return errors.New("unexpected number of rows") } // Convert the last column into an int b, err := strconv.Atoi(row[2]) if err != nil { return err } // Marshal our JSON value f := Foo{ A: row[1], B: b, } v, err := json.Marshal(&f) if err != nil { return err } // Add our output record using the first column as the key. r := transform.Record{ Key: []byte(row[0]), Value: v, } if err := w.Write(r); err != nil { return err } } return nil }
Output:
Example (Validation) ¶
This example shows the basic usage of the package: This is a transform that validates the data is valid JSON, and outputs invalid JSON to a dead letter queue.
package main import ( "encoding/json" "github.com/redpanda-data/redpanda/src/transform-sdk/go/transform" ) // This example shows the basic usage of the package: // This is a transform that validates the data is valid JSON, // and outputs invalid JSON to a dead letter queue. func main() { transform.OnRecordWritten(jsonValidate) } // This will be called for each record in the source topic. func jsonValidate(e transform.WriteEvent, w transform.RecordWriter) error { if json.Valid(e.Record().Value) { // Write the valid records to the "default" output topic, this is the // first output topic specified in the configuration. return w.Write(e.Record()) } // If a record does not contain valid JSON then route it to another topic for // triage and debugging. return w.Write(e.Record(), transform.ToTopic("invalid_json")) }
Output:
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OnRecordWritten ¶
func OnRecordWritten(fn OnRecordWrittenCallback)
OnRecordWritten registers a callback to be fired when a record is written to the input topic.
OnRecordWritten should be called in a package's `main` function to register the transform function that will be applied.
Types ¶
type OnRecordWrittenCallback ¶
type OnRecordWrittenCallback func(e WriteEvent, w RecordWriter) error
OnRecordWrittenCallback is a callback to transform records after a write event happens in the input topic.
type Record ¶
type Record struct { // Key is an optional field. Key []byte // Value is the blob of data that is written to Redpanda. Value []byte // Headers are client specified key/value pairs that are // attached to a record. Headers []RecordHeader // Attrs is the attributes of a record. // // Output records should leave these unset. Attrs RecordAttrs // The timestamp associated with this record. // // For output records this can be left unset as it will // always be the same value as the input record. Timestamp time.Time // The offset of this record in the partition. // // For output records this field is left unset, // as it will be set by Redpanda. Offset int64 }
Record is a record that has been written to Redpanda.
type RecordAttrs ¶
type RecordAttrs struct {
// contains filtered or unexported fields
}
type RecordHeader ¶
Headers are optional key/value pairs that are passed along with records.
type RecordWriter ¶
type RecordWriter interface { // Write writes a record to the output topic. // // When writing a record, only the key, value and headers are // used other information like the timestamp will be overridden // by the broker. // // WriteOpts can be added to control where records go, for example to another topic. Write(Record, ...WriteOpt) error }
RecordWriter is an interface for writing transformed records to the destination topic.
type WriteEvent ¶
type WriteEvent interface { // Access the record associated with this event Record() Record }
WriteEvent contains information about the write that took place, namely it contains the record that was written.
Directories
¶
Path | Synopsis |
---|---|
internal
|
|
rwbuf
A buffer with a reader and writer index that gives access to the underlying array.
|
A buffer with a reader and writer index that gives access to the underlying array. |
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms.
|
Package sr is a schema registry client for Redpanda for usage within inline Data Transforms. |