kafka

package
v0.0.0-...-3b037d6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2020 License: MIT Imports: 13 Imported by: 0

Documentation

Overview

Package kafka is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrReplyTimeout       = errors.New("kafka: reply timeout")
	ErrNoReceivers        = errors.New("kafka: no receivers")
	ErrUnknownCommandType = errors.New("kafka: unknown command type")
	ErrUnknownStreamType  = errors.New("kafka: unknown stream type")
	ErrNoCorrelation      = errors.New("kafka: no correlation id")
)

Functions

func DefaultSaramaConfig

func DefaultSaramaConfig() *sarama.Config

func DefaultTopicBuilder

func DefaultTopicBuilder(streamType string) string

func NewListenerHandler

func NewListenerHandler(
	errorHandler ErrorHandler,
	commandHandler CommandFunc,
	onSetup []Hook,
	onCleanUp []Hook,
) *handler

func NewSender

func NewSender(c Client, codec es.CommandCoder, options ...SenderOption) es.CommandSendReplier

Types

type Client

type Client interface {
	ReplyTo() string
	SendCommand(ctx context.Context, topic string, cid es.CorrelationID, routeKey string, command []byte) error
	ListenReply(ctx context.Context, cid es.CorrelationID, fn ReplyFunc)
	RemoveListener(ctx context.Context, cid es.CorrelationID)
	HasListener(ctx context.Context, cid es.CorrelationID) bool
	Close() error
}

func NewDefaultClient

func NewDefaultClient(ctx context.Context, addr []string, replyTo string, conf *sarama.Config, options ...ClientOption) (Client, error)

type ClientOption

type ClientOption func(c *DefaultClient)

func ClientGroupID

func ClientGroupID(gid string) ClientOption

func ClientListenerStartHook

func ClientListenerStartHook(hook ...Hook) ClientOption

func ClientListenerStopHook

func ClientListenerStopHook(hook ...Hook) ClientOption

type CommandFunc

type CommandFunc func(ctx context.Context, command []byte) error

type DefaultClient

type DefaultClient struct {
	// contains filtered or unexported fields
}

func (*DefaultClient) Close

func (c *DefaultClient) Close() (err error)

func (*DefaultClient) HasListener

func (c *DefaultClient) HasListener(ctx context.Context, cid es.CorrelationID) bool

func (*DefaultClient) ListenReply

func (c *DefaultClient) ListenReply(_ context.Context, cid es.CorrelationID, fn ReplyFunc)

func (*DefaultClient) RemoveListener

func (c *DefaultClient) RemoveListener(_ context.Context, cid es.CorrelationID)

func (*DefaultClient) ReplyTo

func (c *DefaultClient) ReplyTo() string

func (*DefaultClient) SendCommand

func (c *DefaultClient) SendCommand(_ context.Context, topic string, cid es.CorrelationID, routeKey string, command []byte) (err error)

type DefaultClientHandler

type DefaultClientHandler struct {
	// contains filtered or unexported fields
}

func NewClientHandler

func NewClientHandler(
	errorHandler ErrorHandler,
	replyHandler ReplyFunc,
	onSetup []Hook,
	onCleanUp []Hook,
) *DefaultClientHandler

func (*DefaultClientHandler) Cleanup

func (*DefaultClientHandler) ConsumeClaim

func (*DefaultClientHandler) Setup

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(
	newListener func() Listener,
	codec es.CommandCoder,
	options ...DispatcherOption,
) *Dispatcher

func (*Dispatcher) Close

func (d *Dispatcher) Close() (err error)

func (*Dispatcher) Listen

func (d *Dispatcher) Listen(ctx context.Context) error

func (*Dispatcher) Stream

func (d *Dispatcher) Stream(streamType string) *StreamReceiver

type DispatcherOption

type DispatcherOption func(d *Dispatcher)

func DispatcherTopicBuilder

func DispatcherTopicBuilder(builder TopicBuilder) DispatcherOption

type ErrorHandler

type ErrorHandler func(context.Context, error)

type Hook

type Hook func(sarama.ConsumerGroupSession)

type Listener

type Listener interface {
	Connect(topic string, group string) error
	ListenCommand(ctx context.Context, topic string, fn CommandFunc)
	SendReply(ctx context.Context, topic string, cid es.CorrelationID, routeKey string, reply []byte) error
	Close() error
}

func NewListener

func NewListener(addr []string, conf *sarama.Config, options ...ListenerOption) Listener

type ListenerOption

type ListenerOption func(l *listener)

func ListenerErrorHandler

func ListenerErrorHandler(handler ErrorHandler) ListenerOption

func ListenerStartHook

func ListenerStartHook(hook ...Hook) ListenerOption

func ListenerStopHook

func ListenerStopHook(hook ...Hook) ListenerOption

type MockClient

type MockClient struct {
	// contains filtered or unexported fields
}

MockClient is a mock of Client interface

func NewMockClient

func NewMockClient(ctrl *gomock.Controller) *MockClient

NewMockClient creates a new mock instance

func (*MockClient) Close

func (m *MockClient) Close() error

Close mocks base method

func (*MockClient) EXPECT

func (m *MockClient) EXPECT() *MockClientMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockClient) HasListener

func (m *MockClient) HasListener(ctx context.Context, cid es.CorrelationID) bool

HasListener mocks base method

func (*MockClient) ListenReply

func (m *MockClient) ListenReply(ctx context.Context, cid es.CorrelationID, fn ReplyFunc)

ListenReply mocks base method

func (*MockClient) RemoveListener

func (m *MockClient) RemoveListener(ctx context.Context, cid es.CorrelationID)

RemoveListener mocks base method

func (*MockClient) ReplyTo

func (m *MockClient) ReplyTo() string

ReplyTo mocks base method

func (*MockClient) SendCommand

func (m *MockClient) SendCommand(ctx context.Context, topic string, cid es.CorrelationID, routeKey string, command []byte) error

SendCommand mocks base method

type MockClientMockRecorder

type MockClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockClientMockRecorder is the mock recorder for MockClient

func (*MockClientMockRecorder) Close

func (mr *MockClientMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockClientMockRecorder) HasListener

func (mr *MockClientMockRecorder) HasListener(ctx, cid interface{}) *gomock.Call

HasListener indicates an expected call of HasListener

func (*MockClientMockRecorder) ListenReply

func (mr *MockClientMockRecorder) ListenReply(ctx, cid, fn interface{}) *gomock.Call

ListenReply indicates an expected call of ListenReply

func (*MockClientMockRecorder) RemoveListener

func (mr *MockClientMockRecorder) RemoveListener(ctx, cid interface{}) *gomock.Call

RemoveListener indicates an expected call of RemoveListener

func (*MockClientMockRecorder) ReplyTo

func (mr *MockClientMockRecorder) ReplyTo() *gomock.Call

ReplyTo indicates an expected call of ReplyTo

func (*MockClientMockRecorder) SendCommand

func (mr *MockClientMockRecorder) SendCommand(ctx, topic, cid, routeKey, command interface{}) *gomock.Call

SendCommand indicates an expected call of SendCommand

type MockListener

type MockListener struct {
	// contains filtered or unexported fields
}

MockListener is a mock of Listener interface

func NewMockListener

func NewMockListener(ctrl *gomock.Controller) *MockListener

NewMockListener creates a new mock instance

func (*MockListener) Close

func (m *MockListener) Close() error

Close mocks base method

func (*MockListener) Connect

func (m *MockListener) Connect(topic, group string) error

Connect mocks base method

func (*MockListener) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockListener) ListenCommand

func (m *MockListener) ListenCommand(ctx context.Context, topic string, fn CommandFunc)

ListenCommand mocks base method

func (*MockListener) SendReply

func (m *MockListener) SendReply(ctx context.Context, topic string, cid es.CorrelationID, routeKey string, reply []byte) error

SendReply mocks base method

type MockListenerMockRecorder

type MockListenerMockRecorder struct {
	// contains filtered or unexported fields
}

MockListenerMockRecorder is the mock recorder for MockListener

func (*MockListenerMockRecorder) Close

func (mr *MockListenerMockRecorder) Close() *gomock.Call

Close indicates an expected call of Close

func (*MockListenerMockRecorder) Connect

func (mr *MockListenerMockRecorder) Connect(topic, group interface{}) *gomock.Call

Connect indicates an expected call of Connect

func (*MockListenerMockRecorder) ListenCommand

func (mr *MockListenerMockRecorder) ListenCommand(ctx, topic, fn interface{}) *gomock.Call

ListenCommand indicates an expected call of ListenCommand

func (*MockListenerMockRecorder) SendReply

func (mr *MockListenerMockRecorder) SendReply(ctx, topic, cid, routeKey, reply interface{}) *gomock.Call

SendReply indicates an expected call of SendReply

type ReplyFunc

type ReplyFunc func(ctx context.Context, cid es.CorrelationID, reply []byte) error

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

func (*Sender) Close

func (a *Sender) Close() (err error)

func (*Sender) HasReplier

func (a *Sender) HasReplier(ctx context.Context, cid es.CorrelationID) bool

func (*Sender) ReceiveReply

func (a *Sender) ReceiveReply(ctx context.Context, fn es.ReplyReceiverFunc)

func (*Sender) RemoveReplier

func (a *Sender) RemoveReplier(ctx context.Context, cid es.CorrelationID)

func (*Sender) SendAndWaitReply

func (a *Sender) SendAndWaitReply(ctx context.Context, c *es.Command) (*es.Reply, error)

func (*Sender) SendCommand

func (a *Sender) SendCommand(ctx context.Context, command *es.Command) error

func (*Sender) SendReply

func (a *Sender) SendReply(ctx context.Context, r *es.Reply) error

func (*Sender) SendWithReply

func (a *Sender) SendWithReply(ctx context.Context, c *es.Command, fn es.ReplyReceiverFunc) error

type SenderOption

type SenderOption func(s *Sender)

func ReplyTimeout

func ReplyTimeout(timeout time.Duration) SenderOption

func SenderTopicBuilder

func SenderTopicBuilder(tb TopicBuilder) SenderOption

type StreamReceiver

type StreamReceiver struct {
	// contains filtered or unexported fields
}

func (*StreamReceiver) Endpoint

func (a *StreamReceiver) Endpoint(name string, endpoint es.Endpoint) *StreamReceiver

func (*StreamReceiver) Receive

func (a *StreamReceiver) Receive(name string, recv es.CommandReceiverFunc) *StreamReceiver

type TopicBuilder

type TopicBuilder func(string) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL