contube

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TopicKey     = "topic"
	TopicListKey = "topicList"
	SubNameKey   = "subName"
)
View Source
const (
	EndpointKey = "endpoint"
)
View Source
const (
	PulsarURLKey = "pulsar_url"
)

Variables

View Source
var (
	ErrEndpointNotFound        = errors.New("endpoint not found")
	ErrEndpointClosed          = errors.New("endpoint closed")
	ErrorEndpointAlreadyExists = errors.New("endpoint already exists")
)
View Source
var (
	ErrSinkTubeNotImplemented = errors.New("sink tube not implemented")
)

Functions

This section is empty.

Types

type ConfigMap

type ConfigMap map[string]interface{}

func MergeConfig

func MergeConfig(configs ...ConfigMap) ConfigMap

MergeConfig merges multiple ConfigMap into one

func (ConfigMap) ToConfigStruct added in v0.4.0

func (c ConfigMap) ToConfigStruct(v any) error

type EndpointHandler

type EndpointHandler func(ctx context.Context, endpoint string, payload []byte) error

type HttpTubeFactory

type HttpTubeFactory struct {
	TubeFactory
	// contains filtered or unexported fields
}

func NewHttpTubeFactory

func NewHttpTubeFactory(ctx context.Context) *HttpTubeFactory

func (*HttpTubeFactory) GetHandleFunc

func (f *HttpTubeFactory) GetHandleFunc(getEndpoint func(r *http.Request) (string, error),
	logger *slog.Logger) func(http.ResponseWriter, *http.Request)

func (*HttpTubeFactory) Handle

func (f *HttpTubeFactory) Handle(ctx context.Context, endpoint string, payload []byte) error

func (*HttpTubeFactory) NewSinkTube

func (f *HttpTubeFactory) NewSinkTube(_ context.Context, _ ConfigMap) (chan<- Record, error)

func (*HttpTubeFactory) NewSourceTube

func (f *HttpTubeFactory) NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)

type MemoryQueueFactory

type MemoryQueueFactory struct {
	// contains filtered or unexported fields
}

func (*MemoryQueueFactory) NewSinkTube

func (f *MemoryQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error)

func (*MemoryQueueFactory) NewSourceTube

func (f *MemoryQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error)

type PulsarEventQueueFactory

type PulsarEventQueueFactory struct {
	// contains filtered or unexported fields
}

func (*PulsarEventQueueFactory) NewSinkTube

func (f *PulsarEventQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error)

func (*PulsarEventQueueFactory) NewSourceTube

func (f *PulsarEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error)

type PulsarTubeFactoryConfig

type PulsarTubeFactoryConfig struct {
	PulsarURL string
}

func NewPulsarTubeFactoryConfig

func NewPulsarTubeFactoryConfig(configMap ConfigMap) (*PulsarTubeFactoryConfig, error)

func (*PulsarTubeFactoryConfig) ToConfigMap

func (c *PulsarTubeFactoryConfig) ToConfigMap() ConfigMap

type Record

type Record interface {
	GetPayload() []byte
	Commit()
}

type RecordImpl

type RecordImpl struct {
	// contains filtered or unexported fields
}

func NewRecordImpl

func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl

func (*RecordImpl) Commit

func (e *RecordImpl) Commit()

func (*RecordImpl) GetPayload

func (e *RecordImpl) GetPayload() []byte

type SinkQueueConfig

type SinkQueueConfig struct {
	Topic string
}

func NewSinkQueueConfig

func NewSinkQueueConfig(config ConfigMap) *SinkQueueConfig

func (*SinkQueueConfig) ToConfigMap

func (c *SinkQueueConfig) ToConfigMap() ConfigMap

type SinkTubeFactory

type SinkTubeFactory interface {
	// NewSinkTube returns a new channel that can be used to sink events
	// The event.Commit() would be invoked after the event is sunk successfully
	// The caller should close the channel when it is done
	NewSinkTube(ctx context.Context, config ConfigMap) (chan<- Record, error)
}

type SourceQueueConfig

type SourceQueueConfig struct {
	Topics  []string
	SubName string
}

func NewSourceQueueConfig

func NewSourceQueueConfig(config ConfigMap) (*SourceQueueConfig, error)

func (*SourceQueueConfig) ToConfigMap

func (c *SourceQueueConfig) ToConfigMap() ConfigMap

type SourceTubeFactory

type SourceTubeFactory interface {
	// NewSourceTube returns a new channel that can be used to receive events
	// The channel would be closed when the context is done
	NewSourceTube(ctx context.Context, config ConfigMap) (<-chan Record, error)
}

type TubeFactory

type TubeFactory interface {
	SourceTubeFactory
	SinkTubeFactory
}

func NewMemoryQueueFactory

func NewMemoryQueueFactory(ctx context.Context) TubeFactory

func NewPulsarEventQueueFactory

func NewPulsarEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL