datacommunicator

package
v0.0.0-...-695150b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2020 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package datacommunicator ...

Package datacommunicator ...

Index

Constants

View Source
const (
	KAFKA = iota // KAFKA as Messaging Platform, Please use this ID
)

BrokerType defines the underline MQ platform to be selected for the messages. KAFKA is the platform supported as part of odimra phase 1.

Variables

This section is empty.

Functions

func Decode

func Decode(d []byte, a interface{}) error

Decode converts the byte stream into Data (DECODE). /data will be masked as Interface before sent to Consumer or Requester.

func Encode

func Encode(d interface{}) ([]byte, error)

Encode converts the interface into Byte stream (ENCODE).

func KafkaConnect

func KafkaConnect(kp *KafkaPacket, messageQueueConfigPath string) error

KafkaConnect defines the connection procedure for KAFKA Server. For now, we are taking only one server as input. TLS for client send would be formed as TLS object and same would be passed to the Server for connnection request. Common Dialer object will be used for both Reader and Writer objects. These objects would be updated if there is a request coming for specific Pipe, that specific Pipe name and Connection object would be stored as part of this map pair.

func SetConfiguration

func SetConfiguration(filePath string) error

SetConfiguration defines the function to read the client side configuration file any configuration data, which need / should be provided by MQ user would be taken directly from the user by asking to fill a structure. THIS DATA DETAILS SHOULD BE DEFINED AS PART OF INTERFACE DEFINITION.

func TLS

func TLS(cCert, cKey, caCert string) (*tls.Config, error)

TLS creates the TLS Configuration object to used by any Broker for Auth and Encryption. The Certficate and Key files are created from Java Keytool generated JKS format files. Please look into README for more information In case of Kafka, we generate the Server certificate files in JKS format. We do the same for Clients as well. Then we convert those files into PEM format.

Types

type KafkaF

type KafkaF struct {

	// KServer defines the Kafka Server URI/Nodename. DEFAULT = localhost
	KServer string `toml:"KServers"`
	// KLport defines the Server listening port for Kafka. DEFAULT = 9092
	KLport int `toml:"KLPort"`
	// KTimeout defines the timeout for Kafka Server connection.
	// DEFAULT = 10 (in seconds)
	KTimeout int `toml:"KTimeout"`
	// KAFKACertFile defines the TLS Certificate File for KAFKA. No DEFAULT
	KAFKACertFile string `toml:"KAFKACertFile"`
	// KAFKAKeyFile defines the TLS Key File for KAFKA. No DEFAULT
	KAFKAKeyFile string `toml:"KAFKAKeyFile"`
	// KAFKACAFile defines the KAFKA Certification Authority. No DEFAULT
	KAFKACAFile string `toml:"KAFKACAFile"`
}

KafkaF defines the KAFKA Server connection configurations. This structure will be extended once we are adding the TLS Authentication and Message encoding capability.

type KafkaPacket

type KafkaPacket struct {

	// All common base function objects are defined in this object. This
	// object will support only Publishing and Subscriptions based on KAFKA
	// support. We use KAFKA 2.2.0 with Scala 2.12.
	Packet

	// Readers would maintain a mapping between the Kafka Reader pointer
	// and the Topic which is handled in that reader.
	Readers map[string]*kafka.Reader

	// Writers defines the mapping between KAFKA Writer pointer reference
	// and the Topic which is handled in that Writer
	Writers map[string]*kafka.Writer

	// DialerConn defines the member which can be used for single connection
	// towards KAFKA
	DialerConn *kafka.Dialer

	// Server defines the KAFKA server with port
	Server string
}

KafkaPacket defines the KAFKA Message Object. This one conains all the required KAFKA-GO related identifiers to maintain connection with KAFKA servers. For Publishing and Consuming two different Connection used towards Kafka as we are using Reader and Writer IO Stream Integration with RPC call. Because of the way Kafka communication works, we are storing these IO objects as Value for a map & mapped to Channel name for which these objects are created. Apart of Reader or Writer maps, It also maintains the Dialer Object for initial Kafka connection. Current Active Server name too maintained as part of KafkaPacket Object.

func (*KafkaPacket) Accept

func (kp *KafkaPacket) Accept(pipe string, fn MsgProcess) error

Accept function defines the Consumer or Subscriber functionality for KAFKA. If Reader object for the specified Pipe is not available, New Reader Object would be created. From this function Goroutine "Read" will be invoked to handle the incoming messages.

func (*KafkaPacket) Close

func (kp *KafkaPacket) Close()

Close will disconnect KAFKA Connection. This API should be called when client is completely closing Kafka connection, both Reader and Writer objects. We don't close just one channel subscription using this API. For that we would be have different APIs defined, called "Remove".

func (*KafkaPacket) Distribute

func (kp *KafkaPacket) Distribute(pipe string, d interface{}) error

Distribute defines the Producer / Publisher role and functionality. Writer would be created for each Pipe comes-in for communication. If Writer already exists, that connection would be used for this call. Before publishing the message in the specified Pipe, it will be converted into Byte stream using "Encode" API. Encryption is enabled for the message via TLS.

func (*KafkaPacket) Get

func (kp *KafkaPacket) Get(pipe string, d interface{}) interface{}

Get - Not supported for now in Kafka from Message Bus side due to limitations on the quality of the go library implementation. Will be taken-up in future.

func (*KafkaPacket) Read

func (kp *KafkaPacket) Read(p string, fn MsgProcess) error

Read would access the KAFKA messages in a infinite loop. Callback method access is existing only in "goka" library. Not available in "kafka-go".

func (*KafkaPacket) Remove

func (kp *KafkaPacket) Remove(pipe string) error

Remove will just remove the existing subscription. This API would check just the Reader map as to Distribute / Publish messages, we don't need subscription

type MQBus

type MQBus interface {
	Distribute(pipe string, data interface{}) error
	Accept(pipe string, fn MsgProcess) error
	Get(pipe string, d interface{}) interface{}
	Remove(pipe string) error
	Close()
}

MQBus Interface defines the Process interface function (Only function user should call). These functions are implemented as part of Packet struct. Distribute - API to Publish Messages into specified Pipe (Topic / Subject) Accept - Consume the incoming message if subscribed by that component Get - Would initiate blocking call to remote process to get response Close - Would disconnect the connection with Middleware.

func Communicator

func Communicator(bt int, messageQueueConfigPath string) (MQBus, error)

Communicator defines the Broker platform Middleware selection and corresponding communication object would be created to send / receive the messages. Broker type would be stored as part of Connection Object "Packet". TODO: We would be looking into Kafka Synchronous communication API for providing support for Sync Communication Model in MessageBus

type MQF

type MQF struct {
	KafkaF `toml:"KAFKA"`
}

MQF define the configuration File content for KAFKA in Golang structure format. These configurations are embedded into MQF structure for direct access to the data.

type MsgProcess

type MsgProcess func(d interface{})

MsgProcess defines the functions for processing accepted messages. Any client who wants to accept and handle the events / notifications / messages, should implement this function as part of their procedure. That same function should be sent to MessageBus as callback for handling the incoming messages.

type Packet

type Packet struct {
	// BrokerType defines the underline MQ platform
	BrokerType int
}

Packet defines all the message related information that Producer or Consumer should know for message transactions. Both Producer and Consumer use this same structure for message transactions. BrokerType - Refer above defined Constants for possible values DataResponder - Refer HandleResponse Type description

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL