kafka

package module
v0.0.0-...-311a170 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: BSD-3-Clause Imports: 12 Imported by: 0

README

Kafka Activity

This activity publishes messages on a topic in a Kafka cluster.

Flogo CLI
flogo install github.com/AiRISTAFlowInc/fs-contrib/activity/kafka

Configuration

Settings:
Name Type Description
brokerUrls string The brokers of the Kafka cluster to connect to - REQUIRED
topic string The Kafka topic on which to place the message - REQUIRED
user string If connecting to a SASL enabled port, the user id to use for authentication
password string If connecting to a SASL enabled port, the password to use for authentication
trustStore string If connecting to a TLS secured port, the directory containing the certificates representing the trust chain for the connection. This is usually just the CACert used to sign the server's certificate
Input:
Name Type Description
message string The message to send
Output:
Name Type Description
partition int32 Documents the partition that the message was placed on
offSet int64 Documents the offset for the message

Examples

The below example sends Hello From Flogo to a Kafka Broker running on localhost:

{
  "id": "publish_kafka_message",
  "name": "Publish Message to Kafka",
  "activity": {
    "ref": "github.com/AiRISTAFlowInc/fs-contrib/activity/kafka",
    "settings": {
      "brokerUrls" : "localhost:9092",
      "topic"      : "syslog"
    },
    "input": {
      "message"    : "Hello From Flogo"
    }
  }
}

Development

Testing

To run tests first set up the Kafka broker using the docker-compose file given below:

version: '2'
  
services:

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

Then run the following command:

go test 

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

New create a new kafka activity

Types

type Activity

type Activity struct {
	// contains filtered or unexported fields
}

Activity is a kafka activity

func (*Activity) Eval

func (act *Activity) Eval(ctx activity.Context) (done bool, err error)

Eval implements the evaluation of the kafka activity

func (*Activity) Metadata

func (*Activity) Metadata() *activity.Metadata

Metadata returns the metadata for the kafka activity

type Input

type Input struct {
	Message string `md:"message,required"` // The message to send
}

func (*Input) FromMap

func (i *Input) FromMap(values map[string]interface{}) error

func (*Input) ToMap

func (i *Input) ToMap() map[string]interface{}

type KafkaConnection

type KafkaConnection struct {
	// contains filtered or unexported fields
}

func (*KafkaConnection) Connection

func (c *KafkaConnection) Connection() sarama.SyncProducer

func (*KafkaConnection) Stop

func (c *KafkaConnection) Stop() error

type Output

type Output struct {
	Partition int32 `md:"partition"` // Documents the partition that the message was placed on
	OffSet    int64 `md:"offset"`    // Documents the offset for the message
}

func (*Output) FromMap

func (o *Output) FromMap(values map[string]interface{}) error

func (*Output) ToMap

func (o *Output) ToMap() map[string]interface{}

type Settings

type Settings struct {
	BrokerUrls string `md:"brokerUrls,required"` // The Kafka cluster to connect to
	User       string `md:"user"`                // If connecting to a SASL enabled port, the user id to use for authentication
	Password   string `md:"password"`            // If connecting to a SASL enabled port, the password to use for authentication
	TrustStore string `md:"trustStore"`          // If connecting to a TLS secured port, the directory containing the certificates representing the trust chain for the connection. This is usually just the CACert used to sign the server's certificate
	Topic      string `md:"topic,required"`      // The Kafka topic on which to place the message
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL