sourcetransformer

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 20 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__

Functions

func NewServer

func NewServer(m SourceTransformer, inputOptions ...Option) numaflow.Server

NewServer creates a new SourceTransformer server.

Types

type Datum

type Datum interface {
	// Value returns the payload of the message.
	Value() []byte
	// EventTime returns the event time of the message.
	EventTime() time.Time
	// Watermark returns the watermark of the message.
	Watermark() time.Time
	// Headers returns the headers of the message.
	Headers() map[string]string
}

Datum contains methods to get the payload information.

func NewHandlerDatum

func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string) Datum

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is used to wrap the data return by SourceTransformer functions. Compared with Message of other UDFs, source transformer Message contains one more field, the event time, usually extracted from the payload.

func MessageToDrop

func MessageToDrop(eventTime time.Time) Message

MessageToDrop creates a Message to be dropped with eventTime. eventTime is required because, even though a message is dropped, it is still considered as being processed, hence the watermark should be updated accordingly using the provided event time.

func NewMessage

func NewMessage(value []byte, eventTime time.Time) Message

NewMessage creates a Message with eventTime and value

func (Message) EventTime

func (m Message) EventTime() time.Time

EventTime returns message eventTime

func (Message) Keys

func (m Message) Keys() []string

Keys returns message keys

func (Message) Tags

func (m Message) Tags() []string

Tags returns message tags

func (Message) Value

func (m Message) Value() []byte

Value returns message value

func (Message) WithKeys

func (m Message) WithKeys(keys []string) Message

WithKeys is used to assign the keys to message

func (Message) WithTags

func (m Message) WithTags(tags []string) Message

WithTags is used to assign the tags to message tags will be used for conditional forwarding

type Messages

type Messages []Message

func MessagesBuilder

func MessagesBuilder() Messages

MessagesBuilder returns an empty instance of Messages

func (Messages) Append

func (m Messages) Append(msg Message) Messages

Append appends a Message

func (Messages) Items

func (m Messages) Items() []Message

Items returns the Message list

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(f string) Option

WithServerInfoFilePath sets the server info file path to the given path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.

type Service

type Service struct {
	v1.UnimplementedSourceTransformServer
	Transformer SourceTransformer
	// contains filtered or unexported fields
}

Service implements the proto gen server interface and contains the transformer operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) SourceTransformFn

func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFnServer) error

SourceTransformFn applies a function to each request element. In addition to map function, SourceTransformFn also supports assigning a new event time to response. SourceTransformFn can be used only at source vertex by source data transformer.

type SourceTransformFunc

type SourceTransformFunc func(ctx context.Context, keys []string, datum Datum) Messages

SourceTransformFunc is a utility type used to convert a function to a SourceTransformer.

func (SourceTransformFunc) Transform

func (mf SourceTransformFunc) Transform(ctx context.Context, keys []string, datum Datum) Messages

Transform implements the function of source transformer function.

type SourceTransformer

type SourceTransformer interface {
	// Transform is the function to transform each coming message.
	Transform(ctx context.Context, keys []string, datum Datum) Messages
}

SourceTransformer is the interface of SourceTransformer function implementation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL