batchmapper

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

Functions

func NewBatchResponse

func NewBatchResponse(id string) batchResponse

NewBatchResponse is a utility function used to create a new batchResponse object Specifying an id is a mandatory requirement, as it is required to reference the responses back to a request.

func NewServer

func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server

NewServer creates a new batch map server.

Types

type BatchMapper

type BatchMapper interface {
	// BatchMap is the function which processes a list of input messages
	BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses
}

BatchMapper is the interface for a Batch Map mode where the user is given a list of messages, and they return the consolidated response for all of them together.

type BatchMapperFunc

type BatchMapperFunc func(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses

BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper.

func (BatchMapperFunc) BatchMap

func (mf BatchMapperFunc) BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses

BatchMap implements the functionality of BatchMap function.

type BatchResponses

type BatchResponses []batchResponse

BatchResponses is a list of batchResponse which signify the consolidated results for a batch of input messages.

func BatchResponsesBuilder

func BatchResponsesBuilder() BatchResponses

func (BatchResponses) Append

func (m BatchResponses) Append(msg batchResponse) BatchResponses

Append appends a batchResponse

func (BatchResponses) Items

func (m BatchResponses) Items() []batchResponse

Items returns the batchResponse list

type Datum

type Datum interface {
	// Value returns the payload of the message.
	Value() []byte
	// EventTime returns the event time of the message.
	EventTime() time.Time
	// Watermark returns the watermark of the message.
	Watermark() time.Time
	// Headers returns the headers of the message.
	Headers() map[string]string
	// Id returns the unique ID set for the given message
	Id() string
	// Keys returns the keys associated with a given datum
	Keys() []string
}

Datum contains methods to get the payload information.

func NewHandlerDatum

func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, id string, keys []string) Datum

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is used to wrap the data return by Batch Map functions

func MessageToDrop

func MessageToDrop() Message

MessageToDrop creates a Message to be dropped

func NewMessage

func NewMessage(value []byte) Message

NewMessage creates a Message with value

func (Message) Keys

func (m Message) Keys() []string

Keys returns message keys

func (Message) Tags

func (m Message) Tags() []string

Tags returns message tags

func (Message) Value

func (m Message) Value() []byte

Value returns message value

func (Message) WithKeys

func (m Message) WithKeys(keys []string) Message

WithKeys is used to assign the keys to the message

func (Message) WithTags

func (m Message) WithTags(tags []string) Message

WithTags is used to assign the tags to the message tags will be used for conditional forwarding

type Messages

type Messages []Message

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(f string) Option

WithServerInfoFilePath sets the server info file path to the given path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.

type Service

type Service struct {
	mappb.UnimplementedMapServer
	BatchMapper BatchMapper
	// contains filtered or unexported fields
}

Service implements the proto gen server interface and contains the map operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) MapFn added in v0.9.0

func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error

MapFn applies a user defined function to a stream of request elements and streams back the responses for them.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL