execution

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2020 License: MIT Imports: 25 Imported by: 7

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEndOfStream = errors.New("end of stream")
View Source
var ErrNewTransactionRequired = fmt.Errorf("new transaction required")
View Source
var MaxWatermark = time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)

Based on protocol buffer max timestamp value.

View Source
var SystemSource string = "sys"

Functions

func AreStreamsEqual

func AreStreamsEqual(ctx context.Context, first, second RecordStream) error

func AreStreamsEqualNoOrdering

func AreStreamsEqualNoOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, opts ...AreEqualOpt) error

func AreStreamsEqualNoOrderingWithCount added in v0.3.0

func AreStreamsEqualNoOrderingWithCount(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, count int) error

func AreStreamsEqualNoOrderingWithIDCheck added in v0.3.0

func AreStreamsEqualNoOrderingWithIDCheck(ctx context.Context, stateStorage storage.Storage, gotStream, wantStream RecordStream, opts ...AreEqualOpt) error

func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking added in v0.3.0

func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking(ctx context.Context, stateStorage storage.Storage, got, want RecordStream, opts ...AreEqualOpt) error

func AreStreamsEqualWithOrdering added in v0.3.0

func AreStreamsEqualWithOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream) error

func DefaultEquality added in v0.3.0

func DefaultEquality(record1 *Record, record2 *Record) error

func EqualityOfAll added in v0.3.0

func EqualityOfAll(fs ...RecordEqualityFunc) func(record1 *Record, record2 *Record) error

func EqualityOfEventTimeField added in v0.3.0

func EqualityOfEventTimeField(record1 *Record, record2 *Record) error

func EqualityOfEverythingButIDs added in v0.3.0

func EqualityOfEverythingButIDs(record1 *Record, record2 *Record) error

func EqualityOfFieldsAndValues added in v0.3.0

func EqualityOfFieldsAndValues(record1 *Record, record2 *Record) error

func EqualityOfID added in v0.3.0

func EqualityOfID(record1 *Record, record2 *Record) error

func EqualityOfUndo added in v0.3.0

func EqualityOfUndo(record1 *Record, record2 *Record) error

func GetAndStartAllShuffles added in v0.3.0

func GetAndStartAllShuffles(ctx context.Context, stateStorage storage.Storage, rootStreamID *StreamID, nodes []Node, variables octosql.Variables) ([]RecordStream, []*ExecutionOutput, error)

This is used to start the whole plan. It starts each phase (separated by shuffles) one by one and takes care to properly pass shuffle ID's to shuffle receivers and senders.

func GetRawStringID added in v0.3.0

func GetRawStringID() string

func GetSourceStringID added in v0.3.0

func GetSourceStringID(tx storage.StateTransaction, inputName octosql.Value) (string, error)

GetSourceStreamID loads the StreamID of the given input stream in case it exists (from a previous run maybe?) Otherwise it allocates a new StreamID and saves it.

func NewErrWaitForChanges added in v0.3.0

func NewErrWaitForChanges(subscription *storage.Subscription) error

func ParseType

func ParseType(str string) octosql.Value

ParseType tries to parse the given string into any type it succeeds to. Returns back the string on failure.

func SystemField added in v0.3.0

func SystemField(field string) octosql.VariableName

Types

type Aggregate

type Aggregate interface {
	docs.Documented
	AddValue(ctx context.Context, tx storage.StateTransaction, value octosql.Value) error
	RetractValue(ctx context.Context, tx storage.StateTransaction, value octosql.Value) error
	GetValue(ctx context.Context, tx storage.StateTransaction) (octosql.Value, error)
	String() string
}

type AggregatePrototype

type AggregatePrototype func() Aggregate

type AliasedExpression

type AliasedExpression struct {
	// contains filtered or unexported fields
}

func NewAliasedExpression

func NewAliasedExpression(name octosql.VariableName, expr Expression) *AliasedExpression

func (*AliasedExpression) ExpressionValue

func (alExpr *AliasedExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

func (*AliasedExpression) Name

func (alExpr *AliasedExpression) Name() octosql.VariableName

type And

type And struct {
	Left, Right Formula
}

func NewAnd

func NewAnd(left Formula, right Formula) *And

func (*And) Evaluate

func (f *And) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type AreEqualConfig added in v0.3.0

type AreEqualConfig struct {
	Equality RecordEqualityFunc
}

type AreEqualOpt added in v0.3.0

type AreEqualOpt func(*AreEqualConfig)

func WithEqualityBasedOn added in v0.3.0

func WithEqualityBasedOn(fs ...RecordEqualityFunc) AreEqualOpt

type BatchSizeManager added in v0.3.0

type BatchSizeManager struct {
	// contains filtered or unexported fields
}

The batch size manager decides if a batch should take more records. It tries to satisfy the target latency and will try not to ever surpass it. It will also grow the batch size on successful commit by at least 1. In case the commit is too big to finalize, it will drastically reduce the batch size.

func NewBatchSizeManager added in v0.3.0

func NewBatchSizeManager(latencyTarget time.Duration) *BatchSizeManager

func (*BatchSizeManager) CommitAborted added in v0.3.0

func (bsm *BatchSizeManager) CommitAborted()

func (*BatchSizeManager) CommitSuccessful added in v0.3.0

func (bsm *BatchSizeManager) CommitSuccessful()

func (*BatchSizeManager) CommitTooBig added in v0.3.0

func (bsm *BatchSizeManager) CommitTooBig()

func (*BatchSizeManager) MarkRecordsProcessed added in v0.3.0

func (bsm *BatchSizeManager) MarkRecordsProcessed(count int)

You can use this to process records in a way other than one by one.

func (*BatchSizeManager) RecordsLeftToTake added in v0.3.0

func (bsm *BatchSizeManager) RecordsLeftToTake() int

You can use this to process records in a way other than one by one.

func (*BatchSizeManager) Reset added in v0.3.0

func (bsm *BatchSizeManager) Reset()

func (*BatchSizeManager) ShouldTakeNextRecord added in v0.3.0

func (bsm *BatchSizeManager) ShouldTakeNextRecord() bool

type Constant

type Constant struct {
	Value bool
}

func NewConstant

func NewConstant(value bool) *Constant

func (Constant) Evaluate

func (f Constant) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type ConstantStrategy added in v0.3.0

type ConstantStrategy struct {
	// contains filtered or unexported fields
}

func (*ConstantStrategy) CalculatePartition added in v0.3.0

func (s *ConstantStrategy) CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)

type ConstantStrategyPrototype added in v0.3.0

type ConstantStrategyPrototype struct {
	// contains filtered or unexported fields
}

func (*ConstantStrategyPrototype) Get added in v0.3.0

type ConstantValue added in v0.3.0

type ConstantValue struct {
	// contains filtered or unexported fields
}

func NewConstantValue added in v0.3.0

func NewConstantValue(value octosql.Value) *ConstantValue

func (*ConstantValue) ExpressionValue added in v0.3.0

func (dv *ConstantValue) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type CountingTrigger added in v0.3.0

type CountingTrigger struct {
	// contains filtered or unexported fields
}

func NewCountingTrigger added in v0.3.0

func NewCountingTrigger(count Expression) *CountingTrigger

func (*CountingTrigger) Get added in v0.3.0

func (c *CountingTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type Datatype

type Datatype string
const (
	DatatypeBoolean Datatype = "boolean"
	DatatypeInt     Datatype = "int"
	DatatypeFloat64 Datatype = "float64"
	DatatypeString  Datatype = "string"
	DatatypeTuple   Datatype = "octosql.Value"
)

type DelayTrigger added in v0.3.0

type DelayTrigger struct {
	// contains filtered or unexported fields
}

func NewDelayTrigger added in v0.3.0

func NewDelayTrigger(delay Expression) *DelayTrigger

func (*DelayTrigger) Get added in v0.3.0

func (c *DelayTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type Distinct

type Distinct struct {
	// contains filtered or unexported fields
}

func NewDistinct

func NewDistinct(storage storage.Storage, source Node, eventTimeField octosql.VariableName) *Distinct

func (*Distinct) Get

func (node *Distinct) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type DistinctStream

type DistinctStream struct {
	// contains filtered or unexported fields
}

func (*DistinctStream) AddRecord added in v0.3.0

func (ds *DistinctStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*DistinctStream) Trigger added in v0.3.0

type DummyNode

type DummyNode struct {
	// contains filtered or unexported fields
}

func NewDummyNode

func NewDummyNode(data []*Record) *DummyNode

func (*DummyNode) Get

func (dn *DummyNode) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Equal

type Equal struct {
}

func (*Equal) Apply

func (rel *Equal) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type ErrWaitForChanges added in v0.3.0

type ErrWaitForChanges struct {
	*storage.Subscription
}

func GetErrWaitForChanges added in v0.3.0

func GetErrWaitForChanges(err error) *ErrWaitForChanges

func (*ErrWaitForChanges) Error added in v0.3.0

func (e *ErrWaitForChanges) Error() string

type ExecutionOutput added in v0.3.0

type ExecutionOutput struct {
	// Watermark source is the highest (in the execution tree)
	// watermark source available, which the record consumer should consume.
	WatermarkSource WatermarkSource

	// Next shuffles contains information about the next shuffles down the execution plan
	// which need to be started.
	NextShuffles map[string]ShuffleData

	// Tasks to run are functions which need to be run asynchronously,
	// after the storage initialization has been committed (and will thus be available for reading).
	TasksToRun []Task
}

This struct represents additional metadata to be returned with Get() and used recursively (like WatermarkSource)

func NewExecutionOutput added in v0.3.0

func NewExecutionOutput(ws WatermarkSource, nextShuffles map[string]ShuffleData, tasksToRun []Task) *ExecutionOutput

type Expression

type Expression interface {
	ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)
}

func NewRecordExpression added in v0.3.0

func NewRecordExpression() Expression

type Field

type Field struct {
	Name octosql.VariableName
}

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

func NewFilter

func NewFilter(formula Formula, child Node) *Filter

func (*Filter) Get

func (node *Filter) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type FilteredStream

type FilteredStream struct {
	// contains filtered or unexported fields
}

func (*FilteredStream) Close

func (stream *FilteredStream) Close(ctx context.Context, storage storage.Storage) error

func (*FilteredStream) Next

func (stream *FilteredStream) Next(ctx context.Context) (*Record, error)

type Formula

type Formula interface {
	Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)
}

type Function

type Function struct {
	Name          string
	ArgumentNames [][]string
	Description   docs.Documentation
	Validator     Validator
	Logic         func(...octosql.Value) (octosql.Value, error)
}

func (*Function) Document

func (f *Function) Document() docs.Documentation

type FunctionExpression

type FunctionExpression struct {
	// contains filtered or unexported fields
}

func NewFunctionExpression

func NewFunctionExpression(fun *Function, args []Expression) *FunctionExpression

func (*FunctionExpression) ExpressionValue

func (fe *FunctionExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type GetTestStreamOption added in v0.3.0

type GetTestStreamOption func(*StreamID)

func GetTestStreamWithStreamID added in v0.3.0

func GetTestStreamWithStreamID(id *StreamID) GetTestStreamOption

type GreaterEqual

type GreaterEqual struct {
}

func (*GreaterEqual) Apply

func (rel *GreaterEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type GroupBy

type GroupBy struct {
	// contains filtered or unexported fields
}

func NewGroupBy

func NewGroupBy(storage storage.Storage, source Node, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy

func (*GroupBy) Get

func (node *GroupBy) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type GroupByStream

type GroupByStream struct {
	// contains filtered or unexported fields
}

func (*GroupByStream) AddRecord added in v0.3.0

func (gb *GroupByStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*GroupByStream) Trigger added in v0.3.0

func (gb *GroupByStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type HashMap

type HashMap struct {
	// contains filtered or unexported fields
}

func NewHashMap

func NewHashMap() *HashMap

func (*HashMap) Get

func (hm *HashMap) Get(key octosql.Value) (interface{}, bool, error)

func (*HashMap) GetIterator

func (hm *HashMap) GetIterator() *Iterator

func (*HashMap) Set

func (hm *HashMap) Set(key octosql.Value, value interface{}) error

type In

type In struct {
}

func (*In) Apply

func (rel *In) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type InMemoryStream

type InMemoryStream struct {
	// contains filtered or unexported fields
}

func NewInMemoryStream

func NewInMemoryStream(ctx context.Context, data []*Record) *InMemoryStream

func (*InMemoryStream) Close

func (ims *InMemoryStream) Close(ctx context.Context, storage storage.Storage) error

func (*InMemoryStream) Next

func (ims *InMemoryStream) Next(ctx context.Context) (*Record, error)

type IntermediateRecordStore added in v0.3.0

type IntermediateRecordStore interface {
	// ReadyForMore is used to check if the intermediate record store is able to consume more data.
	// This allows it to communicate back-pressure.
	ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
	AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error
	Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
	UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
	GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
	TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
	MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
	MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
	Close(ctx context.Context, storage storage.Storage) error
}

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

func (*Iterator) Next

func (iter *Iterator) Next() (octosql.Value, interface{}, bool)

Next returns next key, value, exists

type JobOutputQueueIntermediateRecordStore added in v0.3.0

type JobOutputQueueIntermediateRecordStore struct {
	// contains filtered or unexported fields
}

func (*JobOutputQueueIntermediateRecordStore) AddRecord added in v0.3.0

func (j *JobOutputQueueIntermediateRecordStore) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*JobOutputQueueIntermediateRecordStore) Close added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) GetWatermark added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) MarkEndOfStream added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) MarkError added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) Next added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) ReadyForMore added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) TriggerKeys added in v0.3.0

func (*JobOutputQueueIntermediateRecordStore) UpdateWatermark added in v0.3.0

type JoinType added in v0.3.0

type JoinType int
const (
	INNER_JOIN JoinType = 0
	LEFT_JOIN  JoinType = 1
	OUTER_JOIN JoinType = 2
)

func (JoinType) String added in v0.3.0

func (j JoinType) String() string

type JoinedStream added in v0.3.0

type JoinedStream struct {
	// contains filtered or unexported fields
}

func (*JoinedStream) AddRecord added in v0.3.0

func (js *JoinedStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*JoinedStream) Trigger added in v0.3.0

func (js *JoinedStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type KeyHashingStrategy added in v0.3.0

type KeyHashingStrategy struct {
	// contains filtered or unexported fields
}

func (*KeyHashingStrategy) CalculatePartition added in v0.3.0

func (s *KeyHashingStrategy) CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)

TODO: The key should really be calculated by the preceding map. Like all group by values.

type KeyHashingStrategyPrototype added in v0.3.0

type KeyHashingStrategyPrototype struct {
	// contains filtered or unexported fields
}

func (*KeyHashingStrategyPrototype) Get added in v0.3.0

type LessEqual

type LessEqual struct {
}

func (*LessEqual) Apply

func (rel *LessEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type LessThan

type LessThan struct {
}

func (*LessThan) Apply

func (rel *LessThan) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type Like

type Like struct {
}

func (*Like) Apply

func (rel *Like) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type LogicExpression

type LogicExpression struct {
	// contains filtered or unexported fields
}

func NewLogicExpression

func NewLogicExpression(formula Formula) *LogicExpression

func (*LogicExpression) ExpressionValue

func (le *LogicExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type LookupJoin added in v0.3.0

type LookupJoin struct {
	// contains filtered or unexported fields
}

func NewLookupJoin added in v0.3.0

func NewLookupJoin(maxJobsCount int, stateStorage storage.Storage, source Node, joined Node, isLeftJoin bool) *LookupJoin

func (*LookupJoin) Get added in v0.3.0

func (node *LookupJoin) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type LookupJoinStream added in v0.3.0

type LookupJoinStream struct {
	// contains filtered or unexported fields
}

func (*LookupJoinStream) AddRecord added in v0.3.0

func (rs *LookupJoinStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*LookupJoinStream) Close added in v0.3.0

func (rs *LookupJoinStream) Close(ctx context.Context, storage storage.Storage) error

func (*LookupJoinStream) GetNextRecord added in v0.3.0

func (rs *LookupJoinStream) GetNextRecord(ctx context.Context, tx storage.StateTransaction) (*Record, error)

func (*LookupJoinStream) GetWatermark added in v0.3.0

func (rs *LookupJoinStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*LookupJoinStream) HandleControlMessages added in v0.3.0

func (rs *LookupJoinStream) HandleControlMessages(ctx context.Context, tx storage.StateTransaction) error

func (*LookupJoinStream) MarkEndOfStream added in v0.3.0

func (rs *LookupJoinStream) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*LookupJoinStream) MarkError added in v0.3.0

func (rs *LookupJoinStream) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*LookupJoinStream) Next added in v0.3.0

func (*LookupJoinStream) ReadyForMore added in v0.3.0

func (rs *LookupJoinStream) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*LookupJoinStream) RunScheduler added in v0.3.0

func (rs *LookupJoinStream) RunScheduler(ctx context.Context)

The scheduler takes records from the toBeJoined queue, and starts jobs to do joins. Control messages (records too, to satisfy the initial ordering of messages) are put on a controlMessages queue, where they will be handled by the receiver.

func (*LookupJoinStream) RunWorker added in v0.3.0

func (rs *LookupJoinStream) RunWorker(ctx context.Context, id *RecordID) error

The worker drives streams to completion, puts received records to output queues scoped by record id. In the end, it puts an EndOfStream message on the queue.

func (*LookupJoinStream) TriggerKeys added in v0.3.0

func (j *LookupJoinStream) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*LookupJoinStream) UpdateWatermark added in v0.3.0

func (rs *LookupJoinStream) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type Map

type Map struct {
	// contains filtered or unexported fields
}

func NewMap

func NewMap(expressions []NamedExpression, child Node, keep bool) *Map

func (*Map) Get

func (node *Map) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type MappedStream

type MappedStream struct {
	// contains filtered or unexported fields
}

func (*MappedStream) Close

func (stream *MappedStream) Close(ctx context.Context, storage storage.Storage) error

func (*MappedStream) Next

func (stream *MappedStream) Next(ctx context.Context) (*Record, error)

type Metadata added in v0.3.0

type Metadata struct {
	Id                   *RecordID `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	Undo                 bool      `protobuf:"varint,2,opt,name=undo,proto3" json:"undo,omitempty"`
	EventTimeField       string    `protobuf:"bytes,3,opt,name=eventTimeField,proto3" json:"eventTimeField,omitempty"`
	XXX_NoUnkeyedLiteral struct{}  `json:"-"`
	XXX_unrecognized     []byte    `json:"-"`
	XXX_sizecache        int32     `json:"-"`
}

func (*Metadata) Descriptor added in v0.3.0

func (*Metadata) Descriptor() ([]byte, []int)

func (*Metadata) GetEventTimeField added in v0.3.0

func (m *Metadata) GetEventTimeField() string

func (*Metadata) GetId added in v0.3.0

func (m *Metadata) GetId() *RecordID

func (*Metadata) GetUndo added in v0.3.0

func (m *Metadata) GetUndo() bool

func (*Metadata) ProtoMessage added in v0.3.0

func (*Metadata) ProtoMessage()

func (*Metadata) Reset added in v0.3.0

func (m *Metadata) Reset()

func (*Metadata) String added in v0.3.0

func (m *Metadata) String() string

func (*Metadata) XXX_DiscardUnknown added in v0.3.0

func (m *Metadata) XXX_DiscardUnknown()

func (*Metadata) XXX_Marshal added in v0.3.0

func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Metadata) XXX_Merge added in v0.3.0

func (m *Metadata) XXX_Merge(src proto.Message)

func (*Metadata) XXX_Size added in v0.3.0

func (m *Metadata) XXX_Size() int

func (*Metadata) XXX_Unmarshal added in v0.3.0

func (m *Metadata) XXX_Unmarshal(b []byte) error

type MoreThan

type MoreThan struct {
}

func (*MoreThan) Apply

func (rel *MoreThan) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type MultiTrigger added in v0.3.0

type MultiTrigger struct {
	// contains filtered or unexported fields
}

func NewMultiTrigger added in v0.3.0

func NewMultiTrigger(triggers ...TriggerPrototype) *MultiTrigger

func (*MultiTrigger) Get added in v0.3.0

func (m *MultiTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type NamedExpression

type NamedExpression interface {
	Expression
	Name() octosql.VariableName
}

type NextShuffleMetadataChange added in v0.3.0

type NextShuffleMetadataChange struct {
	ShuffleIDAddSuffix string
	Partition          int
	Source             Node
}

func NewNextShuffleMetadataChange added in v0.3.0

func NewNextShuffleMetadataChange(shuffleIDAddSuffix string, partition int, source Node) *NextShuffleMetadataChange

func (*NextShuffleMetadataChange) Get added in v0.3.0

type Node

type Node interface {
	Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
}

type NodeExpression

type NodeExpression struct {
	// contains filtered or unexported fields
}

func NewNodeExpression

func NewNodeExpression(node Node, stateStorage storage.Storage) *NodeExpression

func (*NodeExpression) ExpressionValue

func (ne *NodeExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type Not

type Not struct {
	Child Formula
}

func NewNot

func NewNot(child Formula) *Not

func (*Not) Evaluate

func (f *Not) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type NotEqual

type NotEqual struct {
}

func (*NotEqual) Apply

func (rel *NotEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type NotIn

type NotIn struct {
}

func (*NotIn) Apply

func (rel *NotIn) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type Or

type Or struct {
	Left, Right Formula
}

func NewOr

func NewOr(left Formula, right Formula) *Or

func (*Or) Evaluate

func (f *Or) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type OrderBy

type OrderBy struct {
	// contains filtered or unexported fields
}

func NewOrderBy

func NewOrderBy(storage storage.Storage, source Node, exprs []Expression, directions []OrderDirection, eventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *OrderBy

func (*OrderBy) Get

func (node *OrderBy) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type OrderByKey added in v0.3.0

type OrderByKey struct {
	// contains filtered or unexported fields
}

func NewOrderByKey added in v0.3.0

func NewOrderByKey(key []byte) *OrderByKey

func (*OrderByKey) MonotonicMarshal added in v0.3.0

func (k *OrderByKey) MonotonicMarshal() []byte

func (*OrderByKey) MonotonicUnmarshal added in v0.3.0

func (k *OrderByKey) MonotonicUnmarshal(data []byte) error

type OrderByStream added in v0.3.0

type OrderByStream struct {
	// contains filtered or unexported fields
}

func (*OrderByStream) AddRecord added in v0.3.0

func (ob *OrderByStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error

func (*OrderByStream) Trigger added in v0.3.0

func (ob *OrderByStream) Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error)

type OrderDirection

type OrderDirection string
const (
	Ascending  OrderDirection = "asc"
	Descending OrderDirection = "desc"
)

type OutputOptions added in v0.3.0

type OutputOptions struct {
	OrderByExpressions []Expression
	OrderByDirections  []OrderDirection
	Limit              Expression
	Offset             Expression
}

func NewOutputOptions added in v0.3.0

func NewOutputOptions(
	orderByExpressions []Expression,
	orderByDirections []OrderDirection,
	limit Expression,
	offset Expression,
) *OutputOptions

type OutputQueue added in v0.3.0

type OutputQueue struct {
	// contains filtered or unexported fields
}

func NewOutputQueue added in v0.3.0

func NewOutputQueue(tx storage.StateTransaction) *OutputQueue

func (*OutputQueue) Peek added in v0.3.0

func (q *OutputQueue) Peek(ctx context.Context, msg proto.Message) error

func (*OutputQueue) Pop added in v0.3.0

func (q *OutputQueue) Pop(ctx context.Context, msg proto.Message) error

func (*OutputQueue) Push added in v0.3.0

func (q *OutputQueue) Push(ctx context.Context, element proto.Message) error

type PipelineMetadata added in v0.3.0

type PipelineMetadata struct {
	// The ID for the next shuffle.
	NextShuffleID *ShuffleID

	// The partition of the current stream.
	Partition int
}

type Predicate

type Predicate struct {
	Left     Expression
	Relation Relation
	Right    Expression
}

func NewPredicate

func NewPredicate(left Expression, relation Relation, right Expression) *Predicate

func (*Predicate) Evaluate

func (f *Predicate) Evaluate(ctx context.Context, variables octosql.Variables) (bool, error)

type ProcessByKey added in v0.3.0

type ProcessByKey struct {
	// contains filtered or unexported fields
}

func (*ProcessByKey) AddRecord added in v0.3.0

func (p *ProcessByKey) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*ProcessByKey) Close added in v0.3.0

func (p *ProcessByKey) Close(ctx context.Context, storage storage.Storage) error

func (*ProcessByKey) GetWatermark added in v0.3.0

func (p *ProcessByKey) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*ProcessByKey) MarkEndOfStream added in v0.3.0

func (p *ProcessByKey) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*ProcessByKey) MarkError added in v0.3.0

func (p *ProcessByKey) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*ProcessByKey) Next added in v0.3.0

func (*ProcessByKey) ReadyForMore added in v0.3.0

func (p *ProcessByKey) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*ProcessByKey) TriggerKeys added in v0.3.0

func (p *ProcessByKey) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*ProcessByKey) UpdateWatermark added in v0.3.0

func (p *ProcessByKey) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type ProcessFunction added in v0.3.0

type ProcessFunction interface {
	AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, key octosql.Value, record *Record) error
	Trigger(ctx context.Context, tx storage.StateTransaction, key octosql.Value) ([]*Record, error) // New Records and Retractions
}

type PullEngine added in v0.3.0

type PullEngine struct {
	// contains filtered or unexported fields
}

func NewPullEngine added in v0.3.0

func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, sources []RecordStream, streamID *StreamID, watermarkSource WatermarkSource, shouldPrefixStreamID bool, ctx context.Context) *PullEngine

func (*PullEngine) Close added in v0.3.0

func (engine *PullEngine) Close(ctx context.Context, storage storage.Storage) error

func (*PullEngine) GetWatermark added in v0.3.0

func (engine *PullEngine) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*PullEngine) Next added in v0.3.0

func (engine *PullEngine) Next(ctx context.Context) (*Record, error)

func (*PullEngine) Run added in v0.3.0

func (engine *PullEngine) Run()

type QueueElement added in v0.3.0

type QueueElement struct {
	// Types that are valid to be assigned to Type:
	//	*QueueElement_Record
	//	*QueueElement_Watermark
	//	*QueueElement_EndOfStream
	//	*QueueElement_Error
	Type                 isQueueElement_Type `protobuf_oneof:"type"`
	XXX_NoUnkeyedLiteral struct{}            `json:"-"`
	XXX_unrecognized     []byte              `json:"-"`
	XXX_sizecache        int32               `json:"-"`
}

func (*QueueElement) Descriptor added in v0.3.0

func (*QueueElement) Descriptor() ([]byte, []int)

func (*QueueElement) GetEndOfStream added in v0.3.0

func (m *QueueElement) GetEndOfStream() bool

func (*QueueElement) GetError added in v0.3.0

func (m *QueueElement) GetError() string

func (*QueueElement) GetRecord added in v0.3.0

func (m *QueueElement) GetRecord() *Record

func (*QueueElement) GetType added in v0.3.0

func (m *QueueElement) GetType() isQueueElement_Type

func (*QueueElement) GetWatermark added in v0.3.0

func (m *QueueElement) GetWatermark() *timestamp.Timestamp

func (*QueueElement) ProtoMessage added in v0.3.0

func (*QueueElement) ProtoMessage()

func (*QueueElement) Reset added in v0.3.0

func (m *QueueElement) Reset()

func (*QueueElement) String added in v0.3.0

func (m *QueueElement) String() string

func (*QueueElement) XXX_DiscardUnknown added in v0.3.0

func (m *QueueElement) XXX_DiscardUnknown()

func (*QueueElement) XXX_Marshal added in v0.3.0

func (m *QueueElement) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*QueueElement) XXX_Merge added in v0.3.0

func (m *QueueElement) XXX_Merge(src proto.Message)

func (*QueueElement) XXX_OneofWrappers added in v0.3.0

func (*QueueElement) XXX_OneofWrappers() []interface{}

XXX_OneofWrappers is for the internal use of the proto package.

func (*QueueElement) XXX_Size added in v0.3.0

func (m *QueueElement) XXX_Size() int

func (*QueueElement) XXX_Unmarshal added in v0.3.0

func (m *QueueElement) XXX_Unmarshal(b []byte) error

type QueueElement_EndOfStream added in v0.3.0

type QueueElement_EndOfStream struct {
	EndOfStream bool `protobuf:"varint,3,opt,name=endOfStream,proto3,oneof"`
}

type QueueElement_Error added in v0.3.0

type QueueElement_Error struct {
	Error string `protobuf:"bytes,4,opt,name=error,proto3,oneof"`
}

type QueueElement_Record added in v0.3.0

type QueueElement_Record struct {
	Record *Record `protobuf:"bytes,1,opt,name=record,proto3,oneof"`
}

type QueueElement_Watermark added in v0.3.0

type QueueElement_Watermark struct {
	Watermark *timestamp.Timestamp `protobuf:"bytes,2,opt,name=watermark,proto3,oneof"`
}

type Record

type Record struct {
	Metadata             *Metadata        `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"`
	FieldNames           []string         `protobuf:"bytes,2,rep,name=fieldNames,proto3" json:"fieldNames,omitempty"`
	Data                 []*octosql.Value `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"`
	XXX_NoUnkeyedLiteral struct{}         `json:"-"`
	XXX_unrecognized     []byte           `json:"-"`
	XXX_sizecache        int32            `json:"-"`
}

func NewRecord

func NewRecord(fields []octosql.VariableName, data map[octosql.VariableName]octosql.Value, opts ...RecordOption) *Record

func NewRecordFromRecord added in v0.3.0

func NewRecordFromRecord(record *Record, opts ...RecordOption) *Record

func NewRecordFromSlice

func NewRecordFromSlice(fields []octosql.VariableName, data []octosql.Value, opts ...RecordOption) *Record

func NewRecordFromSliceWithNormalize

func NewRecordFromSliceWithNormalize(fields []octosql.VariableName, data []interface{}, opts ...RecordOption) *Record

func Normalize

func Normalize(rec *Record) *Record

func ReadAll added in v0.3.0

func ReadAll(ctx context.Context, stateStorage storage.Storage, stream RecordStream) ([]*Record, error)

func ReadAllWithCount added in v0.3.0

func ReadAllWithCount(ctx context.Context, stateStorage storage.Storage, stream RecordStream, count int) ([]*Record, error)

func (*Record) AsTuple

func (r *Record) AsTuple() octosql.Value

func (*Record) AsVariables

func (r *Record) AsVariables() octosql.Variables

func (*Record) Descriptor added in v0.3.0

func (*Record) Descriptor() ([]byte, []int)

func (*Record) Equal

func (r *Record) Equal(other *Record) bool

func (*Record) EventTime added in v0.2.0

func (r *Record) EventTime() octosql.Value

func (*Record) EventTimeField added in v0.3.0

func (r *Record) EventTimeField() octosql.VariableName

func (*Record) Fields

func (r *Record) Fields() []Field

func (*Record) GetData added in v0.3.0

func (m *Record) GetData() []*octosql.Value

func (*Record) GetFieldNames added in v0.3.0

func (m *Record) GetFieldNames() []string

func (*Record) GetMetadata added in v0.3.0

func (m *Record) GetMetadata() *Metadata

func (*Record) GetVariableNames added in v0.3.0

func (r *Record) GetVariableNames() []octosql.VariableName

func (*Record) Hash added in v0.3.0

func (r *Record) Hash() (uint64, error)

func (*Record) ID added in v0.3.0

func (r *Record) ID() *RecordID

func (*Record) IsUndo added in v0.2.0

func (r *Record) IsUndo() bool

func (*Record) ProtoMessage added in v0.3.0

func (*Record) ProtoMessage()

func (*Record) Reset added in v0.3.0

func (m *Record) Reset()

func (*Record) Show added in v0.3.0

func (r *Record) Show() string

func (*Record) ShowFields added in v0.3.0

func (r *Record) ShowFields() []Field

func (*Record) String added in v0.2.0

func (m *Record) String() string

func (*Record) Value

func (r *Record) Value(field octosql.VariableName) octosql.Value

func (*Record) XXX_DiscardUnknown added in v0.3.0

func (m *Record) XXX_DiscardUnknown()

func (*Record) XXX_Marshal added in v0.3.0

func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*Record) XXX_Merge added in v0.3.0

func (m *Record) XXX_Merge(src proto.Message)

func (*Record) XXX_Size added in v0.3.0

func (m *Record) XXX_Size() int

func (*Record) XXX_Unmarshal added in v0.3.0

func (m *Record) XXX_Unmarshal(b []byte) error

type RecordEqualityFunc added in v0.3.0

type RecordEqualityFunc func(record1 *Record, record2 *Record) error

type RecordExpression added in v0.3.0

type RecordExpression struct{}

func (*RecordExpression) ExpressionValue added in v0.3.0

func (re *RecordExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type RecordID added in v0.3.0

type RecordID struct {
	ID                   string   `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func NewRecordID added in v0.3.0

func NewRecordID(id string) *RecordID

GetRandomRecordID can be used to get a new random RecordID.

func NewRecordIDFromStreamIDWithOffset added in v0.3.0

func NewRecordIDFromStreamIDWithOffset(streamID *StreamID, offset int) *RecordID

NewRecordIDFromStreamIDWithOffset can be used to get a new RecordID deterministically based on the streamID and record offset.

func (*RecordID) AsPrefix added in v0.3.0

func (id *RecordID) AsPrefix() []byte

This is a helper function to use a record ID as a storage prefix.

func (*RecordID) Descriptor added in v0.3.0

func (*RecordID) Descriptor() ([]byte, []int)

func (*RecordID) GetID added in v0.3.0

func (m *RecordID) GetID() string

func (*RecordID) MonotonicMarshal added in v0.3.0

func (id *RecordID) MonotonicMarshal() []byte

func (*RecordID) MonotonicUnmarshal added in v0.3.0

func (id *RecordID) MonotonicUnmarshal(data []byte) error

func (*RecordID) ProtoMessage added in v0.3.0

func (*RecordID) ProtoMessage()

func (*RecordID) Reset added in v0.3.0

func (m *RecordID) Reset()

func (RecordID) Show added in v0.3.0

func (id RecordID) Show() string

func (*RecordID) String added in v0.3.0

func (m *RecordID) String() string

func (*RecordID) XXX_DiscardUnknown added in v0.3.0

func (m *RecordID) XXX_DiscardUnknown()

func (*RecordID) XXX_Marshal added in v0.3.0

func (m *RecordID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*RecordID) XXX_Merge added in v0.3.0

func (m *RecordID) XXX_Merge(src proto.Message)

func (*RecordID) XXX_Size added in v0.3.0

func (m *RecordID) XXX_Size() int

func (*RecordID) XXX_Unmarshal added in v0.3.0

func (m *RecordID) XXX_Unmarshal(b []byte) error

type RecordOption added in v0.2.0

type RecordOption func(stream *Record)

func WithEventTimeField added in v0.3.0

func WithEventTimeField(field octosql.VariableName) RecordOption

func WithID added in v0.3.0

func WithID(id *RecordID) RecordOption

func WithMetadataFrom added in v0.2.0

func WithMetadataFrom(base *Record) RecordOption

func WithNoUndo added in v0.3.0

func WithNoUndo() RecordOption

func WithUndo added in v0.2.0

func WithUndo() RecordOption

type RecordStream

type RecordStream interface {
	Next(ctx context.Context) (*Record, error)
	Close(ctx context.Context, storage storage.Storage) error
}

func GetTestStream added in v0.3.0

func GetTestStream(t *testing.T, stateStorage storage.Storage, variables octosql.Variables, node Node, opts ...GetTestStreamOption) RecordStream

type Regexp added in v0.3.0

type Regexp struct {
}

func (*Regexp) Apply added in v0.3.0

func (rel *Regexp) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)

type Relation

type Relation interface {
	Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)
}

func NewEqual

func NewEqual() Relation

func NewGreaterEqual

func NewGreaterEqual() Relation

func NewIn

func NewIn() Relation

func NewLessEqual

func NewLessEqual() Relation

func NewLessThan

func NewLessThan() Relation

func NewLike

func NewLike() Relation

func NewMoreThan

func NewMoreThan() Relation

func NewNotEqual

func NewNotEqual() Relation

func NewNotIn

func NewNotIn() Relation

func NewRegexp added in v0.3.0

func NewRegexp() Relation

type RequalifiedStream

type RequalifiedStream struct {
	// contains filtered or unexported fields
}

func (*RequalifiedStream) Close

func (stream *RequalifiedStream) Close(ctx context.Context, storage storage.Storage) error

func (*RequalifiedStream) Next

func (stream *RequalifiedStream) Next(ctx context.Context) (*Record, error)

type Requalifier

type Requalifier struct {
	// contains filtered or unexported fields
}

func NewRequalifier

func NewRequalifier(qualifier string, child Node) *Requalifier

func (*Requalifier) Get

func (node *Requalifier) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Shuffle added in v0.3.0

type Shuffle struct {
	// contains filtered or unexported fields
}

func NewShuffle added in v0.3.0

func NewShuffle(outputPartitionCount int, strategyPrototype ShuffleStrategyPrototype, sources []Node) *Shuffle

func (*Shuffle) Get added in v0.3.0

func (s *Shuffle) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

func (*Shuffle) StartSources added in v0.3.0

func (s *Shuffle) StartSources(ctx context.Context, stateStorage storage.Storage, shuffleID *ShuffleID, variables octosql.Variables) (map[string]ShuffleData, error)

type ShuffleData added in v0.3.0

type ShuffleData struct {
	ShuffleID *ShuffleID
	Shuffle   *Shuffle
	Variables octosql.Variables
}

type ShuffleID added in v0.3.0

type ShuffleID struct {
	Id                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

func GetSourceShuffleID added in v0.3.0

func GetSourceShuffleID(tx storage.StateTransaction, inputName octosql.Value) (*ShuffleID, error)

func NewShuffleID added in v0.3.0

func NewShuffleID(id string) *ShuffleID

func (*ShuffleID) AsMapKey added in v0.3.0

func (id *ShuffleID) AsMapKey() string

func (*ShuffleID) AsPrefix added in v0.3.0

func (id *ShuffleID) AsPrefix() []byte

func (*ShuffleID) Descriptor added in v0.3.0

func (*ShuffleID) Descriptor() ([]byte, []int)

func (*ShuffleID) GetId added in v0.3.0

func (m *ShuffleID) GetId() string

func (*ShuffleID) ProtoMessage added in v0.3.0

func (*ShuffleID) ProtoMessage()

func (*ShuffleID) Reset added in v0.3.0

func (m *ShuffleID) Reset()

func (*ShuffleID) String added in v0.3.0

func (m *ShuffleID) String() string

func (*ShuffleID) XXX_DiscardUnknown added in v0.3.0

func (m *ShuffleID) XXX_DiscardUnknown()

func (*ShuffleID) XXX_Marshal added in v0.3.0

func (m *ShuffleID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ShuffleID) XXX_Merge added in v0.3.0

func (m *ShuffleID) XXX_Merge(src proto.Message)

func (*ShuffleID) XXX_Size added in v0.3.0

func (m *ShuffleID) XXX_Size() int

func (*ShuffleID) XXX_Unmarshal added in v0.3.0

func (m *ShuffleID) XXX_Unmarshal(b []byte) error

type ShuffleReceiver added in v0.3.0

type ShuffleReceiver struct {
	// contains filtered or unexported fields
}

ShuffleReceiver is a RecordStream abstraction on a shuffle and receives records from it for a partition.

func NewShuffleReceiver added in v0.3.0

func NewShuffleReceiver(streamID *StreamID, shuffleID *ShuffleID, sourcePartitionCount int, partition int) *ShuffleReceiver

func (*ShuffleReceiver) Close added in v0.3.0

func (rs *ShuffleReceiver) Close(ctx context.Context, storage storage.Storage) error

func (*ShuffleReceiver) GetWatermark added in v0.3.0

func (rs *ShuffleReceiver) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*ShuffleReceiver) Next added in v0.3.0

func (rs *ShuffleReceiver) Next(ctx context.Context) (*Record, error)

type ShuffleSender added in v0.3.0

type ShuffleSender struct {
	// contains filtered or unexported fields
}

ShuffleSender is used to send data to a shuffle from a given partition.

func NewShuffleSender added in v0.3.0

func NewShuffleSender(streamID *StreamID, shuffleID *ShuffleID, shuffleStrategy ShuffleStrategy, outputPartitionCount int, partition int) *ShuffleSender

func (*ShuffleSender) AddRecord added in v0.3.0

func (node *ShuffleSender) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error

func (*ShuffleSender) Close added in v0.3.0

func (node *ShuffleSender) Close(ctx context.Context, storage storage.Storage) error

func (*ShuffleSender) GetWatermark added in v0.3.0

func (node *ShuffleSender) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)

func (*ShuffleSender) MarkEndOfStream added in v0.3.0

func (node *ShuffleSender) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error

func (*ShuffleSender) MarkError added in v0.3.0

func (node *ShuffleSender) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error

func (*ShuffleSender) Next added in v0.3.0

func (*ShuffleSender) ReadyForMore added in v0.3.0

func (node *ShuffleSender) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error

func (*ShuffleSender) TriggerKeys added in v0.3.0

func (node *ShuffleSender) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)

func (*ShuffleSender) UpdateWatermark added in v0.3.0

func (node *ShuffleSender) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error

type ShuffleStrategy added in v0.3.0

type ShuffleStrategy interface {
	// Return output partition index based on the record and output partition count.
	CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)
}

func NewConstantStrategy added in v0.3.0

func NewConstantStrategy(partition int) ShuffleStrategy

func NewKeyHashingStrategy added in v0.3.0

func NewKeyHashingStrategy(variables octosql.Variables, key []Expression) ShuffleStrategy

type ShuffleStrategyPrototype added in v0.3.0

type ShuffleStrategyPrototype interface {
	Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)
}

func NewConstantStrategyPrototype added in v0.3.0

func NewConstantStrategyPrototype(partition int) ShuffleStrategyPrototype

func NewKeyHashingStrategyPrototype added in v0.3.0

func NewKeyHashingStrategyPrototype(key []Expression) ShuffleStrategyPrototype

type StarExpression added in v0.3.0

type StarExpression struct {
	// contains filtered or unexported fields
}

func NewStarExpression added in v0.3.0

func NewStarExpression(qualifier string) *StarExpression

func (*StarExpression) ExpressionValue added in v0.3.0

func (se *StarExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

func (*StarExpression) Fields added in v0.3.0

func (se *StarExpression) Fields(variables octosql.Variables) []octosql.VariableName

func (*StarExpression) Name added in v0.3.0

func (se *StarExpression) Name() octosql.VariableName

type StreamID added in v0.3.0

type StreamID struct {
	Id                   string   `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

StreamID is a unique identifier for a RecordStream node. This StreamID should prefix all state storage keys this node uses.

func GetRawStreamID added in v0.3.0

func GetRawStreamID() *StreamID

GetRawStreamID can be used to get a new random StreamID without saving it.

func GetSourceStreamID added in v0.3.0

func GetSourceStreamID(tx storage.StateTransaction, inputName octosql.Value) (*StreamID, error)

func NewStreamID added in v0.3.0

func NewStreamID(str string) *StreamID

NewStreamID can be used to create a StreamID without saving it.

func (*StreamID) AsPrefix added in v0.3.0

func (id *StreamID) AsPrefix() []byte

A RecordStream node should use its StreamID as a prefix to all storage operations. This is a helper function to make that easier.

func (*StreamID) Descriptor added in v0.3.0

func (*StreamID) Descriptor() ([]byte, []int)

func (*StreamID) GetId added in v0.3.0

func (m *StreamID) GetId() string

func (*StreamID) ProtoMessage added in v0.3.0

func (*StreamID) ProtoMessage()

func (*StreamID) Reset added in v0.3.0

func (m *StreamID) Reset()

func (*StreamID) String added in v0.3.0

func (m *StreamID) String() string

func (*StreamID) XXX_DiscardUnknown added in v0.3.0

func (m *StreamID) XXX_DiscardUnknown()

func (*StreamID) XXX_Marshal added in v0.3.0

func (m *StreamID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*StreamID) XXX_Merge added in v0.3.0

func (m *StreamID) XXX_Merge(src proto.Message)

func (*StreamID) XXX_Size added in v0.3.0

func (m *StreamID) XXX_Size() int

func (*StreamID) XXX_Unmarshal added in v0.3.0

func (m *StreamID) XXX_Unmarshal(b []byte) error

type StreamJoin added in v0.3.0

type StreamJoin struct {
	// contains filtered or unexported fields
}

func NewStreamJoin added in v0.3.0

func NewStreamJoin(leftSource, rightSource Node, leftKey, rightKey []Expression, storage storage.Storage, eventTimeField octosql.VariableName, joinType JoinType, triggerPrototype TriggerPrototype) *StreamJoin

func (*StreamJoin) Get added in v0.3.0

func (node *StreamJoin) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)

type Task added in v0.3.0

type Task func() error

type Trigger added in v0.3.0

type Trigger interface {
	docs.Documented
	RecordReceived(ctx context.Context, tx storage.StateTransaction, key octosql.Value, eventTime time.Time) error
	UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
	PollKeysToFire(ctx context.Context, tx storage.StateTransaction, batchSize int) ([]octosql.Value, error)
	KeysFired(ctx context.Context, tx storage.StateTransaction, key []octosql.Value) error
}

type TriggerPrototype added in v0.3.0

type TriggerPrototype interface {
	Get(ctx context.Context, variables octosql.Variables) (Trigger, error)
}

type TupleExpression

type TupleExpression struct {
	// contains filtered or unexported fields
}

func NewTuple

func NewTuple(expressions []Expression) *TupleExpression

func (*TupleExpression) ExpressionValue

func (tup *TupleExpression) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

type UnionWatermarkGenerator added in v0.3.0

type UnionWatermarkGenerator struct {
	// contains filtered or unexported fields
}

func NewUnionWatermarkGenerator added in v0.3.0

func NewUnionWatermarkGenerator(sources []WatermarkSource) *UnionWatermarkGenerator

func (*UnionWatermarkGenerator) GetWatermark added in v0.3.0

type Validator

type Validator interface {
	docs.Documented
	Validate(args ...octosql.Value) error
}

type Variable

type Variable struct {
	// contains filtered or unexported fields
}

func NewVariable

func NewVariable(name octosql.VariableName) *Variable

func (*Variable) ExpressionValue

func (v *Variable) ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)

func (*Variable) Name

func (v *Variable) Name() octosql.VariableName

type WatermarkSource added in v0.3.0

type WatermarkSource interface {
	GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
}

type WatermarkTrigger added in v0.3.0

type WatermarkTrigger struct {
}

func NewWatermarkTrigger added in v0.3.0

func NewWatermarkTrigger() *WatermarkTrigger

func (*WatermarkTrigger) Get added in v0.3.0

func (c *WatermarkTrigger) Get(ctx context.Context, variables octosql.Variables) (Trigger, error)

type ZeroWatermarkGenerator added in v0.3.0

type ZeroWatermarkGenerator struct {
}

func NewZeroWatermarkGenerator added in v0.3.0

func NewZeroWatermarkGenerator() *ZeroWatermarkGenerator

func (*ZeroWatermarkGenerator) GetWatermark added in v0.3.0

Directories

Path Synopsis
tvf

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL