ssp

package module
v0.0.0-...-07d9e02 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

README

Actions Status

SSP (Simple Stream Processor)

TODO

  • graph builder lib
  • walk graph
  • abstractions on top
  • serious engine
    • parallel operators
    • partitioned streams
    • remove type checks and input stream check
    • fix API to be cleaner
    • distinguish records from different streams
    • deeper testing
    • word count benchmark
  • manage time
    • add timestamps to records
    • watermarks
    • windows
  • abstractions on top
  • add some simple planning

Known Issues

  • Watermarks are global, but windows close on a per-key basis.
    This means that any value for any key/source sets a global watermark in an operator, but propagates only once records enter nodes!
    This means that we have some difficulty in understanding what happens, especially for out-of-order values.
    For example:

    Window: size 5, slide 2.
    Watermark: fixed offset 5.
    
    Windows: [0,  5), [2, 7), [4, 9), [6, 11), [8, 13), [10, 15), [12, 17), ...
    
    Records:
    {ts: 2, value: "buz"}
    {ts: 13, value: "bar"}
    {ts: 3, value: "buz"}
    {ts: 10, value: "buz"}
    
    Output: count of values per window.
    

    Record 13 will make the watermark for the WindowNode advance to 8 and, in theory, close [0, 5), [2, 7). The watermark will advance both for bar and buz, but it will propagate to buz only when 3 gets processed.
    Thus, the result will be:

    [0, 5) - buz: 2
    [2, 7) - buz: 2
    ...
    

    If the input, instead, is:

    {ts: 2, value: "buz"}
    {ts: 13, value: "bar"}
    {ts: 10, value: "buz"}
    {ts: 3, value: "buz"}
    

    So, the output will be:

    [0, 5) - buz: 1
    [0, 5) - buz: 1
    [2, 7) - buz: 1
    ...
    

    Because 10 will make buz aware of the 8 watermark and close [0, 5) without adding 3.

    The goal is to make the two outputs be consistent.

  • FixedWindowManager stores windows in a map.
    This makes iteration non-deterministic and makes some tests flaky. For example, we cannot determine the order in which windows close (the close function gets called).

Optional

  • generate graph as command?
  • multiple outputs for nodes (with tags?)
  • custom triggers (time)

Code Examples

NOTE: These examples show some bits of code, but they could not reflect the exact status of master.

Word Count

ctx := Context()
p := NewNode(func(collector Collector, v values.Value) error {
    in := []string{
        "hello",
        "this",
        "is",
        "ssp",
        "hello",
        "this",
        "is",
        "sparta",
        "sparta",
        "is",
        "leonida",
    }
    for _, v := range in {
        collector.Collect(values.New(v))
    }
    return nil
}).SetName("source").
    Out().
    KeyBy(NewStringValueKeySelector(func(v values.Value) string {
        return v.String()
    })).
    Connect(ctx, NewStatefulNode(values.New(int64(0)),
        func(state values.Value, collector Collector, v values.Value) (values.Value, error) {
            count := state.Int64() + 1
            collector.Collect(values.New(fmt.Sprintf("%v: %d", v, count)))
            return values.New(count), nil
        })).
    SetName("wordCounter").
    SetParallelism(4).
    Out()

sink, log := NewLogSink(values.String)
p.Connect(ctx, sink.SetName("sink"))

if err := Execute(ctx); err != nil {
    panic(err)
}

fmt.Println(log.GetValues())

Align

ctx := Context()
source := NewNode(func(collector Collector, v values.Value) error {
    in := []string{
        "hello",
        "this",
        "is",
        "ssp",
    }
    for _, v := range in {
        collector.Collect(values.New(v))
    }
    return nil
}).SetName("source").Out()

upper := source.
    Connect(ctx, NewNode(func(collector Collector, v values.Value) error {
        collector.Collect(values.New(strings.ToUpper(v.String())))
        return nil
    })).SetName("upper")

count := source.
    Connect(ctx, NewNode(func(collector Collector, v values.Value) error {
        collector.Collect(values.New(len(v.String())))
        return nil
    })).SetName("count")

type state struct {
    s1 []values.Value
    s2 []values.Value
}
align := NewStatefulNode(values.New(&state{}), func(sv values.Value, collector Collector, v values.Value) (values.Value, error) {
    s := sv.Get().(*state)
    source := values.GetSource(v)
    if source == 0 {
        if len(s.s2) > 0 {
            ov := s.s2[0]
            s.s2 = s.s2[1:]
            collector.Collect(values.New(fmt.Sprintf("%v: %v", v, ov)))
        } else {
            s.s1 = append(s.s1, v)
        }
    } else {
        if len(s.s1) > 0 {
            ov := s.s1[0]
            s.s1 = s.s1[1:]
            collector.Collect(values.New(fmt.Sprintf("%v: %v", ov, v)))
        } else {
            s.s2 = append(s.s2, v)
        }
    }
    return sv, nil
}).SetName("aligner")

upper.Out().Connect(ctx, align)
aligned := count.Out().Connect(ctx, align).Out()

sink, log := NewLogSink(values.String)
aligned.Connect(ctx, sink.SetName("sink"))

if err := Execute(ctx); err != nil {
    panic(err)
}

fmt.Println(log.GetValues())

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Context

func Context() context.Context

func Execute

func Execute(ctx context.Context) error

func NewInfiniteStream

func NewInfiniteStream() *infiniteStream

func NewIntValues

func NewIntValues(ints ...int) []values.Value

func NewPartitionedStream

func NewPartitionedStream(par int, ks KeySelector, ds DataStream, f func() Transport) *partitionedStream

func SendClose

func SendClose(c Collector)

func Walk

func Walk(g Graph, f Visitor)

Types

type AnonymousNode

type AnonymousNode struct {
	// contains filtered or unexported fields
}

func NewNode

func NewNode(do func(collector Collector, v values.Value) error) *AnonymousNode

func NewStatefulNode

func NewStatefulNode(state0 values.Value, do NodeFunc) *AnonymousNode

func (*AnonymousNode) Clone

func (n *AnonymousNode) Clone() Node

func (*AnonymousNode) Do

func (n *AnonymousNode) Do(collector Collector, v values.Value) error

func (AnonymousNode) GetName

func (n AnonymousNode) GetName() string

func (AnonymousNode) GetParallelism

func (n AnonymousNode) GetParallelism() int

func (*AnonymousNode) Out

func (n *AnonymousNode) Out() *Arch

func (*AnonymousNode) SetName

func (n *AnonymousNode) SetName(name string) Node

func (*AnonymousNode) SetParallelism

func (n *AnonymousNode) SetParallelism(par int) Node

func (AnonymousNode) String

func (n AnonymousNode) String() string

type Arch

type Arch struct {
	// contains filtered or unexported fields
}
func NewLink(from Node) *Arch

func (*Arch) Connect

func (a *Arch) Connect(ctx context.Context, node Node) Node

func (*Arch) From

func (a *Arch) From() Node

func (*Arch) KeyBy

func (a *Arch) KeyBy(ks KeySelector) *Arch

func (*Arch) String

func (a *Arch) String() string

func (*Arch) To

func (a *Arch) To() Node

type Collector

type Collector interface {
	Collect(v values.Value)
}

type DataStream

type DataStream interface {
	Next() values.Value
}

func NewStreamFromElements

func NewStreamFromElements(elems ...values.Value) DataStream

type Engine

type Engine struct{}

func (*Engine) Execute

func (e *Engine) Execute(ctx context.Context) error

type FixedWindowManager

type FixedWindowManager struct {
	// contains filtered or unexported fields
}

func NewFixedWindowManager

func NewFixedWindowManager(size int, slide int, state values.Value) *FixedWindowManager

func (*FixedWindowManager) ForEachClosedWindow

func (m *FixedWindowManager) ForEachClosedWindow(wm values.Timestamp, f func(w *Window) error) error

func (*FixedWindowManager) ForEachWindow

func (m *FixedWindowManager) ForEachWindow(ts values.Timestamp, f func(w *Window) error) error

type FnKeySelector

type FnKeySelector func(v values.Value) values.Key

func (FnKeySelector) GetKey

func (s FnKeySelector) GetKey(v values.Value) values.Key

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

func GetGraph

func GetGraph(ctx context.Context) Graph

func (Graph) Adjacents

func (g Graph) Adjacents(n Node) []*Arch

func (Graph) Roots

func (g Graph) Roots() (roots []Node)

func (Graph) String

func (g Graph) String() string

func (Graph) Walk

func (g Graph) Walk(v Visitor)

type KeySelector

type KeySelector interface {
	GetKey(v values.Value) values.Key
}

func NewFixedKeySelector

func NewFixedKeySelector() KeySelector

func NewRoundRobinKeySelector

func NewRoundRobinKeySelector(n int) KeySelector

func NewStringValueKeySelector

func NewStringValueKeySelector(f func(v values.Value) string) KeySelector
type Link interface {
	Connect(ctx context.Context, node Node) Node
}

type Node

type Node interface {
	Do(collector Collector, v values.Value) error
	Out() *Arch
	Clone() Node

	// Options.
	SetParallelism(par int) Node
	GetParallelism() int
	SetName(name string) Node
	GetName() string
}

func AssignTimestamp

func AssignTimestamp(tse TimestampExtractorFn) Node

func NewLogSink

func NewLogSink(t values.Type) (Node, *values.List)

func NewWindowedNode

func NewWindowedNode(size, slide int, state values.Value, fn WindowFn, closeFn WindowCloseFn) Node

type NodeFunc

type NodeFunc func(state values.Value, collector Collector, v values.Value) (values.Value, error)

type Operator

type Operator struct {
	// contains filtered or unexported fields
}

func NewOperator

func NewOperator(n Node) *Operator

func (*Operator) Close

func (o *Operator) Close() error

func (*Operator) In

func (o *Operator) In(ds DataStream)

func (*Operator) Open

func (o *Operator) Open()

func (*Operator) Out

func (o *Operator) Out(c Collector)

type OperatorOption

type OperatorOption func(options *operatorOptions)

func WithInKeySelector

func WithInKeySelector(ks KeySelector) OperatorOption

type ParallelOperator

type ParallelOperator struct {
	// contains filtered or unexported fields
}

func NewParallelOperator

func NewParallelOperator(par int, f func() *Operator, opts ...OperatorOption) *ParallelOperator

func (*ParallelOperator) Close

func (o *ParallelOperator) Close() error

func (*ParallelOperator) In

func (o *ParallelOperator) In(ds DataStream, f func() Transport)

func (*ParallelOperator) Open

func (o *ParallelOperator) Open()

func (*ParallelOperator) Out

func (o *ParallelOperator) Out(cs []Collector)

type TimestampExtractor

type TimestampExtractor interface {
	ExtractTime(v values.Value) (ts values.Timestamp, wm values.Timestamp)
}

type TimestampExtractorFn

type TimestampExtractorFn func(v values.Value) (ts values.Timestamp, wm values.Timestamp)

type Transport

type Transport interface {
	Collector
	DataStream
}

Transport enables collecting on a DataStream.

type Visitor

type Visitor func(a *Arch)

type Window

type Window struct {
	State values.Value
	// contains filtered or unexported fields
}

func NewWindow

func NewWindow(start values.Timestamp, stop values.Timestamp, state values.Value) *Window

func (*Window) AddElement

func (w *Window) AddElement(v values.TimestampedValue)

func (*Window) IsEmpty

func (w *Window) IsEmpty() bool

func (*Window) Range

func (w *Window) Range(f func(v values.TimestampedValue) error) error

func (*Window) Start

func (w *Window) Start() values.Timestamp

func (*Window) Stop

func (w *Window) Stop() values.Timestamp

func (*Window) String

func (w *Window) String() string

type WindowCloseFn

type WindowCloseFn func(w *Window, collector Collector) error

type WindowFn

type WindowFn func(w *Window, collector Collector, v values.TimestampedValue) error

type WindowManager

type WindowManager interface {
	ForEachWindow(ts values.Timestamp, f func(w *Window) error) error
	ForEachClosedWindow(wm values.Timestamp, f func(w *Window) error) error
}

WindowManager provides active windows for a given time instant, and closes windows as time progresses.

Directories

Path Synopsis
lib module
ssp module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL