udf

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2017 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func MustRegisterGlobalUDF

func MustRegisterGlobalUDF(name string, f UDF)

MustRegisterGlobalUDF is like RegisterGlobalUDF but panics if an error occurred.

func MustRegisterGlobalUDSCreator

func MustRegisterGlobalUDSCreator(typeName string, c UDSCreator)

MustRegisterGlobalUDSCreator is like RegisterGlobalUDSCreator but panics if an error occurred.

func MustRegisterGlobalUDSFCreator

func MustRegisterGlobalUDSFCreator(typeName string, c UDSFCreator)

MustRegisterGlobalUDSFCreator is like RegisterGlobalUDSFCreator but panics if an error occurred.

func RegisterGlobalUDF

func RegisterGlobalUDF(name string, f UDF) error

RegisterGlobalUDF adds a UDF which is visible to all topologies. UDFs registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.

func RegisterGlobalUDSCreator

func RegisterGlobalUDSCreator(typeName string, c UDSCreator) error

RegisterGlobalUDSCreator adds a UDSCreator which can be referred from all topologies. UDSCreators registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.

func RegisterGlobalUDSFCreator

func RegisterGlobalUDSFCreator(typeName string, c UDSFCreator) error

RegisterGlobalUDSFCreator adds a UDSFCreator which can be referred from all topologies. UDSFCreators registered after running topologies might not be seen by those topologies. Call it from init functions to avoid such conditions.

Types

type FunctionManager

type FunctionManager interface {
	FunctionRegistry

	// Register allows to add a function.
	Register(name string, f UDF) error
}

FunctionManager is a FunctionRegistry that allows to register additional functions.

func CopyGlobalUDFRegistry

func CopyGlobalUDFRegistry(ctx *core.Context) FunctionManager

CopyGlobalUDFRegistry creates a new FunctionManager which has all UDFs registered to the global function manager.

func NewDefaultFunctionRegistry

func NewDefaultFunctionRegistry(ctx *core.Context) FunctionManager

NewDefaultFunctionRegistry returns a new instance of the default FunctionRegistry implementation.

type FunctionRegistry

type FunctionRegistry interface {
	// Context returns a core.Context associated with the registry.
	Context() *core.Context

	// Lookup will return a function with the given name and arity
	// or core.NotExistError if there is none. Note that this interface allows
	// multiple functions with the same name but different arity,
	// and it also allows functions with an arbitrary number of
	// parameters. However, a function returned must never be used
	// with a different arity than the one given in the Lookup call.
	Lookup(name string, arity int) (UDF, error)
}

FunctionRegistry is an interface to lookup functions for use in BQL statements by their name.

type UDF

type UDF interface {
	// Call calls the UDF. data.Values must not be nil.
	Call(*core.Context, ...data.Value) (data.Value, error)

	// Accept checks if the function accepts the given number of arguments
	// excluding core.Context.
	Accept(arity int) bool

	// IsAggregationParameter returns true if the k-th parameter expects
	// aggregated values. A UDF with Accept(n) == true is an aggregate
	// function if and only if this function returns true for one or more
	// values of k in the range 0, ..., n-1.
	IsAggregationParameter(k int) bool
}

UDF is an interface having a user defined function.

func BinaryFunc

func BinaryFunc(f func(*core.Context, data.Value, data.Value) (data.Value, error)) UDF

BinaryFunc creates a UDF that takes two data.Value parameters.

func ConvertGeneric

func ConvertGeneric(function interface{}) (UDF, error)

ConvertGeneric creates a new UDF from various form of functions. Arguments of the function don't have to be tuple types, but some standard types are allowed. The UDF returned provide a weak type conversion, that is it uses data.To{Type} function to convert values. Therefore, a string may be passed as an integer or vice versa. If the function wants to provide strict type conversion, generate UDF by Func function.

Acceptable types:

  • bool
  • standard integers
  • standard floats
  • string
  • time.Time
  • data.Bool, data.Int, data.Float, data.String, data.Blob, data.Timestamp, data.Array, data.Map, data.Value
  • a slice of types above

func ConvertGenericAggregate

func ConvertGenericAggregate(function interface{}, aggParams []bool) (UDF, error)

ConvertGenericAggregate creates a new aggregate UDF from various form of functions. aggParams argument is used to indicate which arguments of the function are aggregation parameter. receives aggregation parameter. Supported and acceptable types are the same as ConvertGeneric.

func Func

func Func(f func(*core.Context, ...data.Value) (data.Value, error), arity int) UDF

Func creates a UDF that accepts `arity` many data.Value parameters.

func MustConvertGeneric

func MustConvertGeneric(function interface{}) UDF

MustConvertGeneric is like ConvertGeneric, but panics on errors.

func MustConvertGenericAggregate

func MustConvertGenericAggregate(function interface{}, aggParams []bool) UDF

MustConvertGenericAggregate is like ConvertGenericAggregate, but panics on errors.

func NullaryFunc

func NullaryFunc(f func(*core.Context) (data.Value, error)) UDF

NullaryFunc creates a UDF that takes no parameters (except the Context).

func TernaryFunc

func TernaryFunc(f func(*core.Context, data.Value, data.Value, data.Value) (data.Value, error)) UDF

TernaryFunc creates a UDF that takes three data.Value parameters.

func UnaryFunc

func UnaryFunc(f func(*core.Context, data.Value) (data.Value, error)) UDF

UnaryFunc creates a UDF that takes one data.Value parameter.

func VariadicFunc

func VariadicFunc(f func(*core.Context, ...data.Value) (data.Value, error)) UDF

VariadicFunc creates a UDF based on a function receiving the variadic number of data.Values.

type UDSCreator

type UDSCreator interface {
	// CreateState creates an instance of the state type. CreateState must not
	// call core.SharedState.Init.
	CreateState(ctx *core.Context, params data.Map) (core.SharedState, error)
}

UDSCreator creates a User Defined State based on core.SharedState.

func UDSCreatorFunc

func UDSCreatorFunc(f func(*core.Context, data.Map) (core.SharedState, error)) UDSCreator

UDSCreatorFunc creates a UDSCreator from a function.

type UDSCreatorRegistry

type UDSCreatorRegistry interface {
	// Register adds a UDS creator to the registry. It returns an error if
	// the type name is already registered.
	Register(typeName string, c UDSCreator) error

	// Lookup returns a UDS creator having the type name. It returns
	// core.NotExistError if it doesn't have the creator.
	Lookup(typeName string) (UDSCreator, error)

	// List returns all creators the registry has. The caller can safely modify
	// the map returned from this method.
	List() (map[string]UDSCreator, error)

	// Unregister removes a creator from the registry. It returns core.NotExistError
	// when the registry doesn't have a creator having the type name.
	//
	// The registry itself doesn't support cascading delete. It should properly
	// done by the caller.
	Unregister(typeName string) error
}

UDSCreatorRegistry manages creators of UDSs.

func CopyGlobalUDSCreatorRegistry

func CopyGlobalUDSCreatorRegistry() (UDSCreatorRegistry, error)

CopyGlobalUDSCreatorRegistry creates a new independent copy of the global UDSCreatorRegistry.

func NewDefaultUDSCreatorRegistry

func NewDefaultUDSCreatorRegistry() UDSCreatorRegistry

NewDefaultUDSCreatorRegistry returns a UDSCreatorRegistry having a default implementation.

type UDSF

type UDSF interface {
	// Process sends a tuple to the UDSF.
	//
	// When the UDSF is running in the stream mode, this method must not block
	// and return immediately after it finished processing the received tuple.
	// It is called everytime a tuple is received from streams. It behaves just
	// like core.Box.
	//
	// When the UDSF is running in the source mode, this method is only called
	// once. It can block until it generates all tuples or Terminate method
	// is called. A tuple passed to this method in the source mode doesn't
	// contain anything meaningful. It behaves like core.Source although the
	// interface is like core.Box. The core.Writer returns core.ErrSourceStopped
	// when the UDSF running in the source mode is stopped. Therefore, if
	// Process method returns on that error, the implementation of Terminate
	// can just be resource deallocation.
	Process(ctx *core.Context, t *core.Tuple, w core.Writer) error

	// Terminate terminates the UDSF. Resources allocated when the UDSF is
	// created by UDFSCreator should be released in this method. Also, when
	// the UDSF is running in the source mode, Process method must return as
	// soon as Terminate is called.
	Terminate(ctx *core.Context) error
}

UDSF is a user-defined stream-generating function. While a regular UDF generates a tuple from an input tuple, UDSF can generate multiple tuples from a tuple or even drop tuples. It's also a little different from a "function". UDSF can have a state. Thus, a UDSF is created by UDSFCreator for each invocation in a BQL statement.

UDSF doesn't have Init method because initialization should be done in UDSFCreator.CreateUDSF.

There're two kinds of processing modes of UDSFs: stream mode and source mode. A UDSF is processed in the stream mode when it has at least one input stream. Otherwise, the UDSF is processed in the source mode. Process and Terminate methods change their behavior based on the mode. See documentation of each method for details.

type UDSFCreator

type UDSFCreator interface {
	// CreateUDSF returns a new UDSF instance. UDSF can be configured (via decl)
	// to accept inputs from multiple existing streams. It also receives
	// arguments passed to the UDSF in a BQL statement. The caller will call
	// UDSF.Terminate when the UDSF becomes unnecessary.
	//
	// When a UDSF needs inputs from other sources or streams, call Input method
	// of UDSFDeclarer. For example let's assume there's a UDSF below:
	//
	//	func createMyUDSF(decl udf.UDSFDeclarer, input1, input2 string) (udf.UDSF, error) {
	//		...
	//		decl.Input(input1, &udf.UDSFInputConfig{
	//			InputName: "custom_input_name_1",
	//		})
	//		decl.Input(input2, &udf.UDSFInputConfig{
	//			InputName: "custom_input_name_2",
	//		})
	//		...
	//	}
	//
	//	func init() {
	//		udf.MustRegisterGlobalUDSFCreator("my_udsf",
	//			udf.MustConvertToUDSFCreator(createMyUDSF))
	//	}
	//
	// Then a user can specify input stream by the following statement:
	//
	//	CREATE STREAM stream1 AS SELECT ...;
	//	CREATE STREAM stream2 AS SELECT ...;
	//	CREATE STREAM join_by_udsf
	//	  SELECT RSTREAM * FROM my_udsf("stream1", "stream2") [RANGE 1 TUPLES];
	//
	// In this example, my_udsf receives two streams: stream1 and stream2
	// created in advance.
	//
	// A UDSF doesn't have to have an input. For example, there can be a UDSF
	// which generates sequential numbers and doesn't depend on any stream.
	// Such UDSFs will be run as the source mode. See UDSF for more details.
	CreateUDSF(ctx *core.Context, decl UDSFDeclarer, args ...data.Value) (UDSF, error)

	// Accept returns true if the UDSF supports the given arity.
	Accept(arity int) bool
}

UDSFCreator creates a new UDSF instance.

func ConvertToUDSFCreator

func ConvertToUDSFCreator(function interface{}) (UDSFCreator, error)

ConvertToUDSFCreator converts a function to a UDSFCreator.

func MustConvertToUDSFCreator

func MustConvertToUDSFCreator(function interface{}) UDSFCreator

MustConvertToUDSFCreator converts a function to a UDSFCreator. It panics if there is an error during conversion.

type UDSFCreatorRegistry

type UDSFCreatorRegistry interface {
	// Register adds a UDSF creator to the registry. It returns an error if
	// the type name is already registered.
	Register(typeName string, c UDSFCreator) error

	// Lookup returns a UDSF creator having the type name. It returns
	// core.NotExistError if it doesn't have the creator.
	Lookup(typeName string, arity int) (UDSFCreator, error)

	// List returns all creators the registry has. The caller can safely modify
	// the map returned from this method.
	List() (map[string]UDSFCreator, error)

	// Unregister removes a creator from the registry. It returns core.NotExistError
	// when the registry doesn't have a creator having the type name.
	//
	// The registry itself doesn't support cascading delete. It should properly
	// done by the caller.
	Unregister(typeName string) error
}

UDSFCreatorRegistry manages creators of UDSFs.

func CopyGlobalUDSFCreatorRegistry

func CopyGlobalUDSFCreatorRegistry() (UDSFCreatorRegistry, error)

CopyGlobalUDSFCreatorRegistry creates a new independent copy of the global UDSFCreatorRegistry.

func NewDefaultUDSFCreatorRegistry

func NewDefaultUDSFCreatorRegistry() UDSFCreatorRegistry

NewDefaultUDSFCreatorRegistry returns a UDSFCreatorRegistry having a default implementation.

type UDSFDeclarer

type UDSFDeclarer interface {
	// Input adds an input from an existing stream.
	Input(name string, config *UDSFInputConfig) error

	// ListInputs returns all inputs declared by a UDSF. The caller can safely
	// modify the map returned from this method.
	ListInputs() map[string]*UDSFInputConfig
}

UDSFDeclarer allow UDSFs to customize their behavior.

func NewUDSFDeclarer

func NewUDSFDeclarer() UDSFDeclarer

NewUDSFDeclarer creates a new declarer.

type UDSFInputConfig

type UDSFInputConfig struct {
	// InputName is a custom name attached to incoming tuples. If this name is
	// empty, "*" will be used.
	InputName string
}

UDSFInputConfig has input configuration parameters for UDSF.

type UDSLoader

type UDSLoader interface {
	UDSCreator

	// LoadState loads a state from saved data. The saved data can be read from
	// io.Reader. Parameters given by SET clause are passed as params.
	LoadState(ctx *core.Context, r io.Reader, params data.Map) (core.SharedState, error)
}

UDSLoader loads a User Defined State from saved data. A UDS cannot be loaded if a UDSCreator doesn't implement UDSLoader even if the UDS implements core.LoadableSharedState.

When a UDS isn't created yet, UDSLoader.LoadState will be used to load the state and core.LoadableSharedState.Load will not be used.

When a UDS is already created or loaded and it implements core.LoadableSharedState, its Load method is called to load a model and UDSLoader.LoadState will not be called. If a UDS doesn't implement core.LoadableSharedState but UDSLoader is provided for its type, then UDSLoader.LoadState creates a new instance and the previous instance is replaced with it, which means loading the UDS could consume twice as much memory as core.LoadableSharedState.Load does. When a UDS doesn't implement core.LoadableSharedState and its UDSCreator doesn't implement UDSLoader, the UDS cannot be loaded.

type UDSStorage

type UDSStorage interface {
	// Save returns a writer to write the state data. Save doesn't discard the
	// previously saved data until UDSStorageWriter.Commit is called.
	//
	// Save can be called while a state is being loaded. In such case, behavior
	// is up to each storage. Some possible implementations are: (1) Save
	// creates a new entry for the state and conflicting Loads continue to read
	// the previous data, (2) Save blocks until conflicting Loads finish, and
	// (3) Save returns an error. Implementation (3) isn't recommended because
	// it might result in starvation in highly concurrent workload.
	//
	// Either Commit or Abort of a UDSStorageWriter returned from this method
	// has to be called. When Commit is called, the data is persisted. When
	// Abort is called, the data is discarded and the previous data remains.
	//
	// Save can write header information or other data such as a space for
	// storing checksum later to UDSStorageWriter before returning it. Save can
	// also manipulate the written data as long as the data can be loaded again.
	//
	// A caller can assign a tag to the saved state so that multiple versions of
	// the UDS can be managed with unique names. When a tag is an empty string,
	// "default" will be used. The valid format of tags is same as node names,
	// which is validated by core.ValidateSymbol.
	Save(topology, state, tag string) (UDSStorageWriter, error)

	// Load loads the previously saved data of the state. io.ReadCloser.Close
	// has to be called when it gets unnecessary.
	//
	// Load can be called while a state is being saved. In such case, behavior
	// is up to each storage. A storage's Load can block until Save is done,
	// can return an error, or can even return a reader of the previously saved
	// data.
	//
	// Load returns core.NotExistError when the state doesn't exist.
	//
	// When a tag is an empty string, "default" will be used.
	Load(topology, state, tag string) (io.ReadCloser, error)

	// ListTopologies returns a list of topologies that have saved states.
	ListTopologies() ([]string, error)

	// List returns a list of names of saved states in a topology as a map
	// whose key is a name of a UDS. Each value contains tags assigned to
	// the state as an array.
	List(topology string) (map[string][]string, error)
}

UDSStorage is an interface to support saving and loading UDSs.

func NewInMemoryUDSStorage

func NewInMemoryUDSStorage() UDSStorage

NewInMemoryUDSStorage creates a new UDSStorage which store all data in memory. This storage should only be used for experiment or test purpose.

type UDSStorageWriter

type UDSStorageWriter interface {
	io.Writer

	// Commit persists the data written to the writer so far and closes it.
	// Write cannot be called once the data is committed.
	Commit() error

	// Abort discard the data written to the writer. Write cannot be called
	// after calling Abort.
	Abort() error
}

UDSStorageWriter is used to save a state. An instance of UDSStorageWriter doesn't have to be thread-safe. It means that an instance may not be able to be used from multiple goroutines. However, different instances can be used concurrently so that multiple states can be saved simultaneously.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL