runner

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2021 License: MIT Imports: 29 Imported by: 0

Documentation

Overview

Package runner provides the public api to kafmesh generated code.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConfigureTopics

func ConfigureTopics(ctx context.Context, brokers []string, topics []Topic) error

ConfigureTopics configures and checks topics in the slice passed.

func WaitTillServiceIsRunning

func WaitTillServiceIsRunning(ctx context.Context, url string) error

WaitTillServiceIsRunning waits till the kafmesh service is running

Types

type Codec

type Codec struct {
	// contains filtered or unexported fields
}

Codec is a goka codec for proto schema objects

func (*Codec) Decode

func (w *Codec) Decode(data []byte) (interface{}, error)

Decode decodes the bytes into a proto object

func (*Codec) Encode

func (w *Codec) Encode(message interface{}) ([]byte, error)

Encode encodes the proto object into bytes

type ComponentDiscovery added in v0.8.0

type ComponentDiscovery struct {
	Name        string
	Description string
}

ComponentDiscovery provides component information for discovery

type EmitMessage

type EmitMessage interface {
	Key() string
	Value() interface{}
}

EmitMessage is the message to be emitted

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

Emitter is the emitter for a goka stream

func NewEmitter

func NewEmitter(emitter *goka.Emitter) *Emitter

NewEmitter creates a new wrapped goka emitter

func (*Emitter) Delete

func (e *Emitter) Delete(key string) error

Delete produces a nil for the key to kafka

func (*Emitter) Emit

func (e *Emitter) Emit(key string, msg interface{}) error

Emit emits a message and waits for the ack

func (*Emitter) EmitBulk

func (e *Emitter) EmitBulk(ctx context.Context, msgs []EmitMessage) error

EmitBulk emits messages and waits for them all to complete

func (*Emitter) Watch

func (e *Emitter) Watch(ctx context.Context) func() error

Watch watches for critial errors on the emitter

type InputDiscovery added in v0.8.0

type InputDiscovery struct {
	TopicDiscovery
}

InputDiscovery provides input information for discovery

type JoinDiscovery added in v0.8.0

type JoinDiscovery struct {
	TopicDiscovery
}

JoinDiscovery provides join information for discovery

type KafaConfigurator

type KafaConfigurator func(ctx context.Context, brokers []string) error

KafaConfigurator configures the kafka topics require to run the service

type LookupDiscovery added in v0.8.0

type LookupDiscovery struct {
	TopicDiscovery
}

LookupDiscovery provides lookup information for discovery

type Message

type Message interface {
	descriptor.Message
}

Message is a protobuf object

type MessageContext

type MessageContext struct {
	Partition int32
	Topic     string
	Offset    int64
	Timestamp time.Time
}

MessageContext is the extra kafka context data for the message

type MessageType added in v0.8.0

type MessageType int

MessageType is the type of serialization used for the kafka topic

const (
	// MessageTypeProtobuf uses protobuf serialization
	MessageTypeProtobuf MessageType = iota
)

type Metrics added in v0.14.0

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics handles prometheus metrics

func NewMetrics added in v0.14.0

func NewMetrics() *Metrics

NewMetrics creates a new metrics handler

func (*Metrics) SourceError added in v0.14.0

func (m *Metrics) SourceError(service, component, topic string)

SourceError records an error publishing to a source topic

func (*Metrics) SourceHit added in v0.14.0

func (m *Metrics) SourceHit(service, component, topic string, count int)

SourceHit records how many messages are published to a source topic

type OutputDiscovery added in v0.8.0

type OutputDiscovery struct {
	TopicDiscovery
}

OutputDiscovery provides output information for discovery

type PersistentDiscovery added in v0.8.0

type PersistentDiscovery struct {
	TopicDiscovery
}

PersistentDiscovery provides persistence information for discovery

type ProcessorContext added in v0.15.0

type ProcessorContext struct {
	context.Context
	// contains filtered or unexported fields
}

ProcessorContext is a context for processor observability

func (*ProcessorContext) Finish added in v0.15.0

func (c *ProcessorContext) Finish()

Finish sends the operation to observers

func (*ProcessorContext) GetState added in v0.15.0

func (c *ProcessorContext) GetState(topic, message, value string)

GetState registers a persistence get state

func (*ProcessorContext) Input added in v0.15.0

func (c *ProcessorContext) Input(topic, message, value string)

Input registers an input

func (*ProcessorContext) Join added in v0.15.0

func (c *ProcessorContext) Join(topic, message, value string)

Join registers a join

func (*ProcessorContext) Lookup added in v0.15.0

func (c *ProcessorContext) Lookup(topic, message, key, value string)

Lookup registers a lookup

func (*ProcessorContext) Output added in v0.15.0

func (c *ProcessorContext) Output(topic, message, key, value string)

Output registers an output

func (*ProcessorContext) SetState added in v0.15.0

func (c *ProcessorContext) SetState(topic, message, value string)

SetState registers a persistence set state

type ProcessorDiscovery added in v0.8.0

type ProcessorDiscovery struct {
	ServiceDiscovery
	ComponentDiscovery

	Name        string
	Description string
	GroupName   string

	Inputs      []InputDiscovery
	Joins       []JoinDiscovery
	Lookups     []LookupDiscovery
	Outputs     []OutputDiscovery
	Persistence *PersistentDiscovery
}

ProcessorDiscovery provides processor information for discovery

type ProtoViewSourceJob added in v0.6.1

type ProtoViewSourceJob struct {
	context.Context
	// contains filtered or unexported fields
}

ProtoViewSourceJob executes a protobuf synchronize

func NewProtoViewSourceJob added in v0.6.1

func NewProtoViewSourceJob(ctx context.Context, view *goka.View, emitter *Emitter) *ProtoViewSourceJob

NewProtoViewSourceJob creates a new proto view source job

func (*ProtoViewSourceJob) Finish added in v0.6.1

func (s *ProtoViewSourceJob) Finish() error

Finish the job and run deletes

func (*ProtoViewSourceJob) Update added in v0.6.1

func (s *ProtoViewSourceJob) Update(key string, msg proto.Message) error

Update adds a key/value pair to the job

type ProtoWrapper

type ProtoWrapper struct {
	// contains filtered or unexported fields
}

ProtoWrapper is a codec generator for proto schema codecs

func NewProtoWrapper

func NewProtoWrapper(registry *Registry) *ProtoWrapper

NewProtoWrapper creates a new proto schema codec ProtoWrapper

func (*ProtoWrapper) Codec

func (w *ProtoWrapper) Codec(topic string, message Message) (*Codec, error)

Codec returns a codec for the protobuf object

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the proto schema registry api

func NewRegistry

func NewRegistry(url string) (*Registry, error)

NewRegistry creates a new proto schema registry

func (*Registry) RegisterSchema

func (r *Registry) RegisterSchema(topic, schema string) (uint32, error)

RegisterSchema registers the proto schema with the proto schema registry

func (*Registry) WaitForRegistryToBeReady

func (r *Registry) WaitForRegistryToBeReady(timeout time.Duration) error

WaitForRegistryToBeReady waits for the register to start responding

type Service

type Service struct {
	Metrics *Metrics

	DiscoverInfo *discoveryv1.Service
	// contains filtered or unexported fields
}

Service is the kafmesh service

func NewService

func NewService(brokers []string, protoRegistry *Registry, grpcServer *grpc.Server) *Service

NewService creates a new kafmesh service

func (*Service) ConfigureKafka

func (s *Service) ConfigureKafka(ctx context.Context, configurator KafaConfigurator) error

ConfigureKafka waits for kafka to be ready and configures the topics for this service. It will also check if topics it doesn't create exist in the correct configuration.

func (*Service) Options

func (s *Service) Options() ServiceOptions

Options returns service options for runners

func (*Service) ProcessorContext added in v0.15.0

func (s *Service) ProcessorContext(ctx context.Context, component, processor, key string) *ProcessorContext

ProcessorContext creates a processor context

func (*Service) RegisterProcessor added in v0.8.0

func (s *Service) RegisterProcessor(processor ProcessorDiscovery) error

RegisterProcessor registers a processor with the discovery service

func (*Service) RegisterRunner

func (s *Service) RegisterRunner(runner func(context.Context) func() error) error

RegisterRunner registers a runner with the service. Will return error if service is running

func (*Service) RegisterSink added in v0.8.0

func (s *Service) RegisterSink(sink SinkDiscovery) error

RegisterSink registers a sink with the discovery service

func (*Service) RegisterSource added in v0.8.0

func (s *Service) RegisterSource(source SourceDiscovery) error

RegisterSource registers a source with the discovery service

func (*Service) RegisterView added in v0.8.0

func (s *Service) RegisterView(view ViewDiscovery) error

RegisterView registers a view with the discovery service

func (*Service) RegisterViewSink added in v0.8.0

func (s *Service) RegisterViewSink(viewSink ViewSinkDiscovery) error

RegisterViewSink registers a view sink with the discovery service

func (*Service) RegisterViewSource added in v0.8.0

func (s *Service) RegisterViewSource(viewSource ViewSourceDiscovery) error

RegisterViewSource registers a view source with the discovery service

func (*Service) Run

func (s *Service) Run(ctx context.Context) func() error

Run executes the kafmesh services

type ServiceDiscovery added in v0.8.0

type ServiceDiscovery struct {
	Name        string
	Description string
}

ServiceDiscovery provides service information for discovery

type ServiceOptions

type ServiceOptions struct {
	Brokers      []string
	ProtoWrapper *ProtoWrapper
}

ServiceOptions are the options passed to services

type SinkDefinition

type SinkDefinition interface {
	Codec() goka.Codec
	Group() string
	Topic() string
	MaxBufferSize() int
	Interval() time.Duration
	Flush() error
	Collect(ctx MessageContext, key string, msg interface{}) error
}

SinkDefinition is the definition of a sink that runs at an interval and will also flush if the buffer is full

type SinkDiscovery added in v0.8.0

type SinkDiscovery struct {
	ServiceDiscovery
	ComponentDiscovery
	TopicDiscovery

	Name        string
	Description string
}

SinkDiscovery provides sink information for discovery

type SinkRunner

type SinkRunner struct {
	// contains filtered or unexported fields
}

SinkRunner is a sink runner for kafmesh

func NewSinkRunner

func NewSinkRunner(definition SinkDefinition, brokers []string) *SinkRunner

NewSinkRunner create a new sink runner

func (*SinkRunner) Run

func (r *SinkRunner) Run(ctx context.Context) func() error

Run runs the sink

type SourceDiscovery added in v0.8.0

type SourceDiscovery struct {
	ServiceDiscovery
	ComponentDiscovery
	TopicDiscovery
}

SourceDiscovery provides source information for discovery

type Topic

type Topic struct {
	Name       string
	Partitions int
	Replicas   int
	Compact    bool
	Retention  time.Duration
	Segment    time.Duration
	Create     bool
}

Topic is a definition for a kafka topic

type TopicDiscovery added in v0.8.0

type TopicDiscovery struct {
	Message string
	Topic   string
	Type    MessageType
}

TopicDiscovery provides topic information for discovery

type ViewDiscovery added in v0.8.0

type ViewDiscovery struct {
	ServiceDiscovery
	ComponentDiscovery
	TopicDiscovery
}

ViewDiscovery adds view information for discovery

type ViewSinkDiscovery added in v0.8.0

type ViewSinkDiscovery struct {
	ServiceDiscovery
	ComponentDiscovery
	TopicDiscovery

	Name        string
	Description string
}

ViewSinkDiscovery provides view sink information for discovery

type ViewSourceDiscovery added in v0.8.0

type ViewSourceDiscovery struct {
	ServiceDiscovery
	ComponentDiscovery
	TopicDiscovery

	Name        string
	Description string
}

ViewSourceDiscovery provides view source information for discovery

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL