contube

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TopicKey     = "topic"
	TopicListKey = "topicList"
	SubNameKey   = "subName"
)
View Source
const (
	EndpointKey = "endpoint"
)
View Source
const (
	PulsarURLKey = "pulsarURL"
)

Variables

View Source
var (
	ErrEndpointNotFound        = errors.New("endpoint not found")
	ErrEndpointClosed          = errors.New("endpoint closed")
	ErrorEndpointAlreadyExists = errors.New("endpoint already exists")
)
View Source
var (
	ErrSinkTubeNotImplemented = errors.New("sink tube not implemented")
)

Functions

This section is empty.

Types

type ConfigMap

type ConfigMap map[string]interface{}

func MergeConfig

func MergeConfig(configs ...ConfigMap) ConfigMap

MergeConfig merges multiple ConfigMap into one

type EndpointHandler

type EndpointHandler func(ctx context.Context, endpoint string, payload []byte) error

type HttpTubeFactory

type HttpTubeFactory struct {
	TubeFactory
	// contains filtered or unexported fields
}

func NewHttpTubeFactory

func NewHttpTubeFactory(ctx context.Context) *HttpTubeFactory

func (*HttpTubeFactory) GetHandleFunc

func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error), logger *slog.Logger) func(http.ResponseWriter, *http.Request)

func (*HttpTubeFactory) Handle

func (f *HttpTubeFactory) Handle(ctx context.Context, endpoint string, payload []byte) error

func (*HttpTubeFactory) NewSinkTube

func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Record, error)

func (*HttpTubeFactory) NewSourceTube

func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)

type MemoryQueueFactory

type MemoryQueueFactory struct {
	// contains filtered or unexported fields
}

func (*MemoryQueueFactory) NewSinkTube

func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error)

func (*MemoryQueueFactory) NewSourceTube

func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error)

type PulsarEventQueueFactory

type PulsarEventQueueFactory struct {
	// contains filtered or unexported fields
}

func (*PulsarEventQueueFactory) NewSinkTube

func (f *PulsarEventQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error)

func (*PulsarEventQueueFactory) NewSourceTube

func (f *PulsarEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error)

type PulsarTubeFactoryConfig

type PulsarTubeFactoryConfig struct {
	PulsarURL string
}

func NewPulsarTubeFactoryConfig

func NewPulsarTubeFactoryConfig(configMap ConfigMap) *PulsarTubeFactoryConfig

func (*PulsarTubeFactoryConfig) ToConfigMap

func (c *PulsarTubeFactoryConfig) ToConfigMap() ConfigMap

type Record

type Record interface {
	GetPayload() []byte
	Commit()
}

type RecordImpl

type RecordImpl struct {
	// contains filtered or unexported fields
}

func NewRecordImpl

func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl

func (*RecordImpl) Commit

func (e *RecordImpl) Commit()

func (*RecordImpl) GetPayload

func (e *RecordImpl) GetPayload() []byte

type SinkQueueConfig

type SinkQueueConfig struct {
	Topic string
}

func NewSinkQueueConfig

func NewSinkQueueConfig(config ConfigMap) *SinkQueueConfig

func (*SinkQueueConfig) ToConfigMap

func (c *SinkQueueConfig) ToConfigMap() ConfigMap

type SinkTubeFactory

type SinkTubeFactory interface {
	// NewSinkTube returns a new channel that can be used to sink events
	// The event.Commit() would be invoked after the event is sunk successfully
	// The caller should close the channel when it is done
	NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error)
}

type SourceQueueConfig

type SourceQueueConfig struct {
	Topics  []string
	SubName string
}

func NewSourceQueueConfig

func NewSourceQueueConfig(config ConfigMap) *SourceQueueConfig

func (*SourceQueueConfig) ToConfigMap

func (c *SourceQueueConfig) ToConfigMap() ConfigMap

type SourceTubeFactory

type SourceTubeFactory interface {
	// NewSourceTube returns a new channel that can be used to receive events
	// The channel would be closed when the context is done
	NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)
}

type TubeFactory

type TubeFactory interface {
	SourceTubeFactory
	SinkTubeFactory
}

func NewMemoryQueueFactory

func NewMemoryQueueFactory(ctx context.Context) TubeFactory

func NewPulsarEventQueueFactory

func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL