join

package
Version: v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2021 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type GlobalTableJoiner

type GlobalTableJoiner struct {
	//Topic string
	Id          int32
	Typ         Type
	Store       string
	KeyMapper   KeyMapper
	ValueMapper ValueMapper

	Registry store.Registry
	// contains filtered or unexported fields
}

func (*GlobalTableJoiner) AddChild

func (j *GlobalTableJoiner) AddChild(node topology.Node)

func (*GlobalTableJoiner) AddChildBuilder

func (j *GlobalTableJoiner) AddChildBuilder(builder topology.NodeBuilder)

func (*GlobalTableJoiner) Build

func (j *GlobalTableJoiner) Build() (topology.Node, error)

func (*GlobalTableJoiner) ChildBuilders

func (j *GlobalTableJoiner) ChildBuilders() []topology.NodeBuilder

func (*GlobalTableJoiner) Childs

func (j *GlobalTableJoiner) Childs() []topology.Node

func (*GlobalTableJoiner) ID

func (j *GlobalTableJoiner) ID() int32

func (*GlobalTableJoiner) Join

func (j *GlobalTableJoiner) Join(ctx context.Context, key interface{}, leftVal interface{}) (joinedVal interface{}, err error)

func (*GlobalTableJoiner) Name

func (j *GlobalTableJoiner) Name() string

func (*GlobalTableJoiner) Next

func (j *GlobalTableJoiner) Next() bool

func (*GlobalTableJoiner) Run

func (j *GlobalTableJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, next bool, err error)

func (*GlobalTableJoiner) Type

func (j *GlobalTableJoiner) Type() topology.Type

type Joiner

type Joiner interface {
	topology.Node
	Join(ctx context.Context, key, val interface{}) (joinedVal interface{}, err error)
}

type KeyMapper

type KeyMapper func(key, value interface{}) (mappedKey interface{}, err error)

type Repartition

type Repartition struct {
	Enable       bool
	StreamSide   Side
	KeyEncoder   encoding.Builder
	ValueEncoder encoding.Builder
	Topic        RepartitionTopic
}

func (Repartition) Validate

func (r Repartition) Validate(s Side) error

type RepartitionOption

type RepartitionOption func(sink *RepartitionOptions)

func RepartitionLeftStream

func RepartitionLeftStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption

func RepartitionRightStream

func RepartitionRightStream(keyEncodingBuilder, valueEncodingBuilder encoding.Builder) RepartitionOption

type RepartitionOptions

type RepartitionOptions struct {
	LeftTopic        func(string) string
	RightTopic       func(string) string
	LeftRepartition  Repartition
	RightRepartition Repartition
}

func (*RepartitionOptions) Apply

func (iOpts *RepartitionOptions) Apply(options ...RepartitionOption)

type RepartitionTopic

type RepartitionTopic struct {
	Name              string
	Suffix            string
	ReplicationFactor int
	NumOfPartitions   int
	MinInSycReplicas  int
}

type Side

type Side int
const (
	LeftSide Side = iota + 1
	RightSide
)

type SideJoiner

type SideJoiner struct {
	Id          int32
	Side        string
	LeftWindow  *Window
	RightWindow *Window
	ValueMapper ValueMapper
	// contains filtered or unexported fields
}

func (*SideJoiner) AddChild

func (sj *SideJoiner) AddChild(node topology.Node)

func (*SideJoiner) AddChildBuilder

func (sj *SideJoiner) AddChildBuilder(builder topology.NodeBuilder)

func (*SideJoiner) Build

func (sj *SideJoiner) Build() (topology.Node, error)

func (*SideJoiner) ChildBuilders

func (sj *SideJoiner) ChildBuilders() []topology.NodeBuilder

func (*SideJoiner) Childs

func (sj *SideJoiner) Childs() []topology.Node

func (*SideJoiner) ID

func (sj *SideJoiner) ID() int32

func (*SideJoiner) Next

func (sj *SideJoiner) Next() bool

func (*SideJoiner) Run

func (sj *SideJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*SideJoiner) Type

func (sj *SideJoiner) Type() topology.Type

type StreamJoiner

type StreamJoiner struct {
	Id int32
	// contains filtered or unexported fields
}

func (*StreamJoiner) AddChild

func (j *StreamJoiner) AddChild(node topology.Node)

func (*StreamJoiner) AddChildBuilder

func (j *StreamJoiner) AddChildBuilder(builder topology.NodeBuilder)

func (*StreamJoiner) Build

func (j *StreamJoiner) Build() (topology.Node, error)

func (*StreamJoiner) ChildBuilders

func (j *StreamJoiner) ChildBuilders() []topology.NodeBuilder

func (*StreamJoiner) Childs

func (j *StreamJoiner) Childs() []topology.Node

func (*StreamJoiner) ID

func (j *StreamJoiner) ID() int32

func (*StreamJoiner) Name

func (j *StreamJoiner) Name() string

func (*StreamJoiner) Next

func (j *StreamJoiner) Next() bool

func (*StreamJoiner) Run

func (j *StreamJoiner) Run(ctx context.Context, kIn, vIn interface{}) (kOut, vOut interface{}, cont bool, err error)

func (*StreamJoiner) Type

func (j *StreamJoiner) Type() topology.Type

type Type

type Type int
const (
	LeftJoin Type = iota
	InnerJoin
)

type ValueMapper

type ValueMapper func(left, right interface{}) (joined interface{}, err error)

type Window

type Window struct {
	// contains filtered or unexported fields
}

func NewWindow

func NewWindow() *Window

func (*Window) Read

func (w *Window) Read(key interface{}) (interface{}, bool)

func (*Window) Write

func (w *Window) Write(key, value interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL