Documentation
¶
Index ¶
- func NewRecordContext(record kafka.Record) context.Context
- func RecordFromContext(ctx context.Context) kafka.Record
- type Builder
- type BuilderContext
- type ChangeLogger
- type ChangelogSyncer
- type ChangelogSyncerBuilder
- type CloseableNode
- type DefaultNode
- func (n *DefaultNode) AddEdge(node Node)
- func (n *DefaultNode) Edges() []Node
- func (n *DefaultNode) Err(message string) error
- func (n *DefaultNode) Forward(ctx context.Context, kIn, vIn interface{}, cont bool) (interface{}, interface{}, bool, error)
- func (n *DefaultNode) ForwardAll(ctx context.Context, kvs []KeyValPair, cont bool) (kOut interface{}, vOut interface{}, next bool, err error)
- func (n *DefaultNode) Id() NodeId
- func (n *DefaultNode) Ignore() (interface{}, interface{}, bool, error)
- func (n *DefaultNode) IgnoreAndWrapErrWith(err error, message string) (interface{}, interface{}, bool, error)
- func (n *DefaultNode) IgnoreWithError(err error) (interface{}, interface{}, bool, error)
- func (n *DefaultNode) NameAs(name string)
- func (n *DefaultNode) NodeName() string
- func (n *DefaultNode) ReadsFrom() []string
- func (n *DefaultNode) SetId(id NodeId)
- func (n *DefaultNode) Setup(ctx SubTopologySetupContext) error
- func (n *DefaultNode) String() string
- func (n *DefaultNode) WrapErr(err error) error
- func (n *DefaultNode) WrapErrWith(err error, message string) error
- func (n *DefaultNode) WritesAt() []string
- type Edge
- type InitableNode
- type KeyValPair
- type LoggableStateStore
- type LoggableStoreBuilder
- type Node
- type NodeBuilder
- type NodeContext
- type NodeId
- type NodeInfo
- type RecodeContext
- type Sink
- type SinkBuilder
- type SinkEncoder
- type Source
- type SourceEncoder
- type State
- type StateBuilder
- type StateStore
- type StateStoreNameFunc
- type StateStoreProvider
- type StateType
- type SubTopology
- type SubTopologyBuilder
- type SubTopologyBuilderContext
- type SubTopologyBuilders
- type SubTopologyContext
- type SubTopologyId
- type SubTopologyKind
- type SubTopologySetupContext
- type Topology
- type Type
- type Visualizer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Builder ¶
type Builder interface {
NewKSubTopologyBuilder(kind SubTopologyKind) SubTopologyBuilder
RemoveSubTopology(builder SubTopologyBuilder)
SubTopologies() SubTopologyBuilders
Build(ctx BuilderContext) (Topology, error)
Describe() string
Reset(ctx BuilderContext) error
}
type BuilderContext ¶
type BuilderContext interface {
StoreRegistry() stores.Registry
ProducerBuilder() kafka.ProducerBuilder
Admin() kafka.Admin
ApplicationId() string
Logger() log.Logger
MetricsReporter() metrics.Reporter
}
func NewBuilderContext ¶
type ChangeLogger ¶
type ChangelogSyncer ¶
type ChangelogSyncerBuilder ¶
type ChangelogSyncerBuilder interface {
// Setup setups the changelog by creating changelog topics and offset stores
Setup(ctx SubTopologyBuilderContext) error
Build(ctx SubTopologyContext, store stores.Store) (ChangelogSyncer, error)
BuildLogger(ctx SubTopologyContext, store string) (ChangeLogger, error)
Internal() bool
Topic() string
}
type CloseableNode ¶
type DefaultNode ¶
type DefaultNode struct {
// contains filtered or unexported fields
}
func (*DefaultNode) AddEdge ¶
func (n *DefaultNode) AddEdge(node Node)
func (*DefaultNode) Edges ¶
func (n *DefaultNode) Edges() []Node
func (*DefaultNode) Err ¶
func (n *DefaultNode) Err(message string) error
func (*DefaultNode) ForwardAll ¶
func (n *DefaultNode) ForwardAll(ctx context.Context, kvs []KeyValPair, cont bool) (kOut interface{}, vOut interface{}, next bool, err error)
func (*DefaultNode) Id ¶
func (n *DefaultNode) Id() NodeId
func (*DefaultNode) Ignore ¶
func (n *DefaultNode) Ignore() (interface{}, interface{}, bool, error)
func (*DefaultNode) IgnoreAndWrapErrWith ¶
func (n *DefaultNode) IgnoreAndWrapErrWith(err error, message string) (interface{}, interface{}, bool, error)
func (*DefaultNode) IgnoreWithError ¶
func (n *DefaultNode) IgnoreWithError(err error) (interface{}, interface{}, bool, error)
func (*DefaultNode) NameAs ¶
func (n *DefaultNode) NameAs(name string)
func (*DefaultNode) NodeName ¶
func (n *DefaultNode) NodeName() string
func (*DefaultNode) ReadsFrom ¶
func (n *DefaultNode) ReadsFrom() []string
func (*DefaultNode) SetId ¶
func (n *DefaultNode) SetId(id NodeId)
func (*DefaultNode) Setup ¶
func (n *DefaultNode) Setup(ctx SubTopologySetupContext) error
func (*DefaultNode) String ¶
func (n *DefaultNode) String() string
func (*DefaultNode) WrapErr ¶
func (n *DefaultNode) WrapErr(err error) error
func (*DefaultNode) WrapErrWith ¶
func (n *DefaultNode) WrapErrWith(err error, message string) error
func (*DefaultNode) WritesAt ¶
func (n *DefaultNode) WritesAt() []string
type Edge ¶
type Edge struct {
// contains filtered or unexported fields
}
func NewEdge ¶
func NewEdge(parent, node NodeBuilder) Edge
func (Edge) Node ¶
func (e Edge) Node() NodeBuilder
func (Edge) Parent ¶
func (e Edge) Parent() NodeBuilder
type InitableNode ¶
type InitableNode interface {
Node
// Init is called once the Node build is completed and before message processing starts.
// Please refer SubTopology.Init()
Init(ctx NodeContext) error
}
type KeyValPair ¶
type KeyValPair struct {
Key interface{}
Value interface{}
}
type LoggableStateStore ¶
type LoggableStateStore interface {
StateStore
ChangeLogger
}
type LoggableStoreBuilder ¶
type LoggableStoreBuilder interface {
Name() string
NameFormatter(ctx SubTopologyContext) StateStoreNameFunc
KeyEncoder() encoding.Encoder
ValEncoder() encoding.Encoder
Build(ctx SubTopologyContext) (StateStore, error)
Changelog() ChangelogSyncerBuilder
}
type NodeBuilder ¶
type NodeBuilder interface {
NodeInfo
SetId(id NodeId)
// Setup is called once when stream app starting (SubTopologyBuilder.Setup())
// Eg usage: create changelog topics for node
Setup(ctx SubTopologySetupContext) error
// Build calls with every Consumer PartitionAssignEvent and this method is responsible to create a new instance of
// the node
Build(ctx SubTopologyContext) (Node, error)
}
type NodeContext ¶
type NodeContext interface {
SubTopologyContext
Store(name string) StateStore
}
func NewNodeContext ¶
func NewNodeContext(parent SubTopologyContext, subTopology SubTopology) NodeContext
type NodeId ¶
type NodeId struct {
// contains filtered or unexported fields
}
func (NodeId) SubTopologyId ¶
type RecodeContext ¶
type Sink ¶
type Sink interface {
Node
Encoder() SinkEncoder
Topic() string
// Close closes the source buffers
Close() error
}
type SinkBuilder ¶
type SinkBuilder interface {
NodeBuilder
Topic() string
AutoCreate() bool
}
type SinkEncoder ¶
type Source ¶
type Source interface {
Node
NodeBuilder
Encoder() SourceEncoder
Topic() string
ShouldCoPartitionedWith(source Source)
TopicConfigs() kafka.TopicConfig
CoPartitionedWith() Source
RePartitionedAs() Source
AutoCreate() bool
Internal() bool
InitialOffset() kafka.Offset
}
type SourceEncoder ¶
type State ¶
type State struct {
Type StateType
Store LoggableStateStore
}
type StateBuilder ¶
type StateBuilder struct {
Type StateType
Store LoggableStateStore
}
type StateStore ¶
type StateStore interface {
stores.Store
// Flush flashes the records in buffer to stores.Store
Flush() error
ResetCache()
ChangelogSyncer
}
type StateStoreNameFunc ¶
type StateStoreProvider ¶
type StateStoreProvider interface {
Store(ctx SubTopologyContext) LoggableStateStore
}
type SubTopology ¶
type SubTopology interface {
Id() SubTopologyId
Sinks() []Sink
Source(topic string) Source
Nodes() []Node
Init(ctx SubTopologyContext) error
Close() error
Store(name string) StateStore
StateStores() map[string]StateStore
}
type SubTopologyBuilder ¶
type SubTopologyBuilder interface {
Id() SubTopologyId
SetId(SubTopologyId)
AddNode(builder NodeBuilder)
AddNodeWithEdge(node, edge NodeBuilder)
Parent(node NodeBuilder) NodeBuilder
NodeSource(node NodeBuilder) Source
Setup(ctx SubTopologySetupContext) error
Build(ctx SubTopologyContext) (SubTopology, error)
AddEdge(parent, node NodeBuilder)
RemoveNode(node NodeBuilder)
RemoveAll()
Edges() []Edge
Sources() []Source
MergeSubTopology(subTp SubTopologyBuilder)
Nodes() []NodeBuilder
AddStore(builder LoggableStoreBuilder)
StateStores() map[string]LoggableStoreBuilder
AddSource(source Source)
Kind() SubTopologyKind
}
type SubTopologyBuilderContext ¶
type SubTopologyBuilderContext interface {
BuilderContext
MaxPartitionCount() int32
}
type SubTopologyBuilders ¶
type SubTopologyBuilders []SubTopologyBuilder
func (SubTopologyBuilders) SourceTopics ¶
func (b SubTopologyBuilders) SourceTopics() []string
func (SubTopologyBuilders) SourceTopicsFor ¶
func (b SubTopologyBuilders) SourceTopicsFor(kind SubTopologyKind) []string
func (SubTopologyBuilders) Topics ¶
func (b SubTopologyBuilders) Topics() []string
type SubTopologyContext ¶
type SubTopologyContext interface {
BuilderContext
context.Context
PartitionConsumer() kafka.PartitionConsumer
Partition() int32
Producer() kafka.Producer
Logger() log.Logger
TopicMeta() map[string]*kafka.Topic
}
func NewSubTopologyContext ¶
func NewSubTopologyContext( parent context.Context, partition int32, builderCtx BuilderContext, producer kafka.Producer, partitionConsumer kafka.PartitionConsumer, logger log.Logger, topicMeta map[string]*kafka.Topic, ) SubTopologyContext
type SubTopologyId ¶
type SubTopologyId struct {
// contains filtered or unexported fields
}
func NewSubTopologyId ¶
func NewSubTopologyId(id int, name string) SubTopologyId
func (SubTopologyId) Id ¶
func (id SubTopologyId) Id() int
func (SubTopologyId) Name ¶
func (id SubTopologyId) Name() string
func (SubTopologyId) String ¶
func (id SubTopologyId) String() string
type SubTopologyKind ¶
type SubTopologyKind string
const ( KindStream SubTopologyKind = `stream` KindTable SubTopologyKind = `table` KindGlobalTable SubTopologyKind = `global-table` )
type SubTopologySetupContext ¶
type SubTopologySetupContext interface {
BuilderContext
MaxPartitionCount() int32
TopicMeta() map[string]*kafka.TopicConfig
}
func NewSubTopologySetupContext ¶
func NewSubTopologySetupContext( builderCtx BuilderContext, topicMeta map[string]*kafka.TopicConfig, maxPartitions int32, ) SubTopologySetupContext
type Topology ¶
type Topology interface {
SubTopologies() []SubTopologyBuilder
SubTopology(id SubTopologyId) SubTopologyBuilder
SubTopologyByTopic(topic string) SubTopologyBuilder
StreamTopologies() SubTopologyBuilders
SourceByTopic(topic string) Source
GlobalTableTopologies() SubTopologyBuilders
Describe() string
}
type Visualizer ¶
func NewTopologyVisualizer ¶
func NewTopologyVisualizer() Visualizer
Click to show internal directories.
Click to hide internal directories.