Documentation ¶
Overview ¶
Package messenger is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
Message Path
Index ¶
- Variables
- type KafkaMessenger
- type KafkaMessengerConfig
- type Message
- func (m *Message) ConvertToSaramaProducerMessage(topic string) (*sarama.ProducerMessage, error)
- func (*Message) Descriptor() ([]byte, []int)
- func (m *Message) GetDestination() *Path
- func (m *Message) GetMetadata() map[string][]byte
- func (m *Message) GetOrigin() *Path
- func (m *Message) GetPayload() []byte
- func (m *Message) GetReturn() *Path
- func (*Message) ProtoMessage()
- func (m *Message) Reset()
- func (m *Message) SetMetadataFromConsumerMessage(consumerMessage *sarama.ConsumerMessage)
- func (m *Message) String() string
- type Messenger
- type Path
Constants ¶
This section is empty.
Variables ¶
var Logger = logrus.New()
Logger logs but can be replaced
Functions ¶
This section is empty.
Types ¶
type KafkaMessenger ¶
type KafkaMessenger struct {
// contains filtered or unexported fields
}
KafkaMessenger implements Messenger using Kafka
func NewKafkaMessenger ¶
func NewKafkaMessenger(broker string, config *KafkaMessengerConfig) (*KafkaMessenger, error)
NewKafkaMessenger returns a new KafkaMessenger
func (*KafkaMessenger) Acknowledge ¶
func (km *KafkaMessenger) Acknowledge(message *Message) error
Acknowledge tells Kafka that the message has been received and processed
func (*KafkaMessenger) Close ¶
func (km *KafkaMessenger) Close() error
Close stops the Kafka Messenger from sending and receiving messages
func (*KafkaMessenger) Receive ¶
func (km *KafkaMessenger) Receive() <-chan *Message
Receive returns messages from Kafka
type KafkaMessengerConfig ¶
type Message ¶
type Message struct { Origin *Path `protobuf:"bytes,1,opt,name=origin" json:"origin,omitempty"` Return *Path `protobuf:"bytes,2,opt,name=return" json:"return,omitempty"` Destination *Path `protobuf:"bytes,3,opt,name=destination" json:"destination,omitempty"` Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` Metadata map[string][]byte `` /* 143-byte string literal not displayed */ }
Message is passed between connectors and Conduction A Message should not have origin AND destination. Origin will be in Messages from connectors to Conduction and destination will be from Conduction to connectors
func NewMessageFromSaramaConsumerMessage ¶
func NewMessageFromSaramaConsumerMessage(consumerMessage *sarama.ConsumerMessage) (*Message, error)
NewMessageFromSaramaConsumerMessage returns new Message
func (*Message) ConvertToSaramaProducerMessage ¶
func (m *Message) ConvertToSaramaProducerMessage(topic string) (*sarama.ProducerMessage, error)
ConvertToSaramaProducerMessage returns a Kafka Producer Message from Message
func (*Message) Descriptor ¶
func (*Message) GetDestination ¶
func (*Message) GetMetadata ¶
func (*Message) GetPayload ¶
func (*Message) ProtoMessage ¶
func (*Message) ProtoMessage()
func (*Message) SetMetadataFromConsumerMessage ¶
func (m *Message) SetMetadataFromConsumerMessage(consumerMessage *sarama.ConsumerMessage)
SetMetadataFromConsumerMessage sets the Kafka metadata like partiion, offset and topic into Message
type Messenger ¶
type Messenger interface { Send(topic string, message *Message) error Receive() <-chan *Message Acknowledge(message *Message) error Close() error }
Messenger orchestrates communication between conduction modules
type Path ¶
type Path struct { Route string `protobuf:"bytes,1,opt,name=route" json:"route,omitempty"` Type string `protobuf:"bytes,2,opt,name=type" json:"type,omitempty"` Identity string `protobuf:"bytes,3,opt,name=identity" json:"identity,omitempty"` Metadata map[string][]byte `` /* 143-byte string literal not displayed */ }
func (*Path) Descriptor ¶
func (*Path) GetIdentity ¶
func (*Path) GetMetadata ¶
func (*Path) ProtoMessage ¶
func (*Path) ProtoMessage()