connect

package
v0.0.0-...-e54daf9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type IKafkaMessageListener

type IKafkaMessageListener interface {
	// Setup is run at the beginning of a new session, before ConsumeClaim.
	Setup(kafka.ConsumerGroupSession) error

	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
	// but before the offsets are committed for the very last time.
	Cleanup(kafka.ConsumerGroupSession) error

	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
	// Once the Messages() channel is closed, the Handler must finish its processing
	// loop and exit.
	ConsumeClaim(kafka.ConsumerGroupSession, kafka.ConsumerGroupClaim) error

	// channel that recive signal that consummer is already start
	Ready() chan bool

	// set new channel for send ready signal
	SetReady(chFlag chan bool)
}

type KafkaConnection

type KafkaConnection struct {

	// The logger.
	Logger *clog.CompositeLogger
	// The connection resolver.
	ConnectionResolver *KafkaConnectionResolver
	// The configuration options.
	Options *cconf.ConfigParams
	// contains filtered or unexported fields
}

Kafka connection using plain driver. By defining a connection and sharing it through multiple message queues you can reduce number of used connections.

### Configuration parameters ###

  • client_id: (optional) name of the client id
  • connection(s):
  • discovery_key: (optional) a key to retrieve the connection from IDiscovery
  • host: host name or IP address
  • port: port number (default: 27017)
  • uri: resource URI or connection string with all parameters in it
  • credential(s):
  • store_key: (optional) a key to retrieve the credentials from ICredentialStore
  • username: user name
  • password: user password
  • options:
  • acks (optional) control the number of required acks: -1 - all, 0 - none, 1 - only leader (default: -1)
  • num_partitions: (optional) number of partitions of the created topic (default: 1)
  • replication_factor: (optional) kafka replication factor of the topic (default: 1)
  • log_level: (optional) log level 0 - None, 1 - Error, 2 - Warn, 3 - Info, 4 - Debug (default: 1)
  • connect_timeout: (optional) number of milliseconds to connect to broker (default: 1000)
  • max_retries: (optional) maximum retry attempts (default: 5)
  • retry_timeout: (optional) number of milliseconds to wait on each reconnection attempt (default: 30000)
  • request_timeout: (optional) number of milliseconds to wait on flushing messages (default: 30000)

### References ###

  • *:logger:*:*:1.0 (optional) ILogger components to pass log messages
  • *:discovery:*:*:1.0 (optional) IDiscovery services
  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

func NewKafkaConnection

func NewKafkaConnection() *KafkaConnection

NewKafkaConnection creates a new instance of the connection component.

func (*KafkaConnection) Close

func (c *KafkaConnection) Close(ctx context.Context) error
Closes component and frees used resources.
Parameters:
	- ctx context.Context	transaction id to trace execution through call chain.

Return error or nil no errors occured

func (*KafkaConnection) Configure

func (c *KafkaConnection) Configure(ctx context.Context, config *cconf.ConfigParams)
	Configures component by passing configuration parameters.
	Parameters:
		- ctx context.Context	operation context
  	- config    configuration parameters to be set.

func (*KafkaConnection) CreateQueue

func (c *KafkaConnection) CreateQueue(name string) error

Creates a message queue. If connection doesn't support this function it exists without error. Parameters:

  • name string the name of the queue to be created.

Returns: the name of the queue to be created.

func (*KafkaConnection) DeleteQueue

func (c *KafkaConnection) DeleteQueue(name string) error

Deletes a message queue. If connection doesn't support this function it exists without error. Parameters:

  • name string the name of the queue to be deleted.

func (*KafkaConnection) GetConnection

func (c *KafkaConnection) GetConnection() kafka.SyncProducer

Returns connection object

func (*KafkaConnection) IsOpen

func (c *KafkaConnection) IsOpen() bool

Checks if the component is opened. Returns: true if the component has been opened and false otherwise.

func (*KafkaConnection) Open

func (c *KafkaConnection) Open(ctx context.Context) error
	Opens the component.
	Parameters:
		- ctx context.Context	transaction id to trace execution through call chain.
  	- Return 			error or nil no errors occured.

func (*KafkaConnection) Publish

func (c *KafkaConnection) Publish(ctx context.Context, topic string, messages []*kafka.ProducerMessage) error

Publish a message to a specified topic

Parameters:

  • ctx context.Context operation context
  • topic a topic name
  • messages messages to be published

Returns: error or nil for success

func (*KafkaConnection) ReadQueueNames

func (c *KafkaConnection) ReadQueueNames() ([]string, error)

Reads a list of registered queue names. If connection doesn't support this function returnes an empty list. Returns queue names.

func (*KafkaConnection) SetReferences

func (c *KafkaConnection) SetReferences(ctx context.Context, references cref.IReferences)
	Sets references to dependent components.
	Parameters:
		- ctx context.Context	operation context
 	- references 	references to locate the component dependencies.

func (*KafkaConnection) Subscribe

func (c *KafkaConnection) Subscribe(ctx context.Context, topic string, groupId string, config *kafka.Config, listener IKafkaMessageListener) error

Subscribe to a topic

Parameters:

  • ctx context.Context operation context
  • topic a subject (topic) name
  • groupId (optional) a consumer group id
  • config consumer configuration parameters
  • listener a message listener

Returns: err or nil for success

func (*KafkaConnection) Unsubscribe

func (c *KafkaConnection) Unsubscribe(ctx context.Context, topic string, groupId string, listener IKafkaMessageListener) error

Unsubscribe from a previously subscribed topic topic

Parameters:

  • ctx context.Context operation context
  • topic a topic name
  • groupId (optional) a consumer group id
  • listener a message listener

Returns: err or nil for success

type KafkaConnectionResolver

type KafkaConnectionResolver struct {
	//	The connections resolver.
	ConnectionResolver *ccon.ConnectionResolver
	//	The credentials resolver.
	CredentialResolver *cauth.CredentialResolver
}

KafkaConnectionResolver helper class that resolves Kafka connection and credential parameters, validates them and generates connection options.

Configuration parameters:

  • connection(s):
  • discovery_key: (optional) a key to retrieve the connection from IDiscovery
  • host: host name or IP address
  • port: port number
  • uri: resource URI or connection string with all parameters in it
  • credential(s):
  • store_key: (optional) a key to retrieve the credentials from ICredentialStore
  • username: user name
  • password: user password

References:

  • *:discovery:*:*:1.0 (optional) IDiscovery services to resolve connections
  • *:credential-store:*:*:1.0 (optional) Credential stores to resolve credentials

func NewKafkaConnectionResolver

func NewKafkaConnectionResolver() *KafkaConnectionResolver

func (*KafkaConnectionResolver) Compose

func (c *KafkaConnectionResolver) Compose(ctx context.Context, connections []*ccon.ConnectionParams, credential *cauth.CredentialParams) (*cconf.ConfigParams, error)

Compose method are composes Kafka connection options from connection and credential parameters. Parameters:

  • ctx context.Context transaction id to trace execution through call chain.
  • connection *ccon.ConnectionParams connection parameters
  • credential *cauth.CredentialParams credential parameters

Returns: options *cconf.ConfigParams, err error resolved options or error.

func (*KafkaConnectionResolver) Configure

func (c *KafkaConnectionResolver) Configure(ctx context.Context, config *cconf.ConfigParams)

Configure are configures component by passing configuration parameters. Parameters:

  • ctx context.Context operation context
  • config *cconf.ConfigParams

configuration parameters to be set.

func (*KafkaConnectionResolver) Resolve

Resolves Kafka connection options from connection and credential parameters. Parameters:

  • ctx context.Context transaction id to trace execution through call chain.

Returns options *cconf.ConfigParams, err error receives resolved options or error.

func (*KafkaConnectionResolver) SetReferences

func (c *KafkaConnectionResolver) SetReferences(ctx context.Context, references cref.IReferences)

SetReferences are sets references to dependent components.

Parameters:
	- ctx context.Context	operation context
	- references  cref.IReferences
references to locate the component dependencies.

type KafkaMessage

type KafkaMessage struct {
	// Kafka consummer message
	Message *kafka.ConsumerMessage
	// Counsummer session
	Session kafka.ConsumerGroupSession
}

Kafka message structure

type KafkaSubscription

type KafkaSubscription struct {
	Topic    string
	GroupId  string
	Listener IKafkaMessageListener
	Handler  *kafka.ConsumerGroup
}

Subscription structure

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL