kafka

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2021 License: MIT Imports: 19 Imported by: 0

README

Kafka CLI

Kafka cli client

Install

Build from source

Require Go version 1.10 or higher

$ git clone https://github.com/musobarlab/kafka-cli.git

$ make build

$ kafka-cli --version

Mac OS

$ brew tap wuriyanto48/tool

$ brew install kafka-cli

$ kafka-cli --version

Linux

$ wget https://github.com/musobarlab/kafka-cli/releases/download/v0.0.0/kafka-cli-v0.0.0.linux-amd64.tar.gz

$ tar -zxvf kafka-cli-v0.0.0.linux-amd64.tar.gz

$ kafka-cli --version

Windows

Download latest version https://github.com/musobarlab/kafka-cli/releases

Usage

Publish message to Kafka broker and topic

$ kafka-cli pub -broker localhost:9092 -topic wurys -m "hahahaha" -V

JSON

$ kafka-cli pub -broker localhost:9092 -topic wurys -m "{"hello":"hello", "world":"world"}" -V

or multiple broker

$ kafka-cli pub -broker localhost:9092,localhost:9093,localhost:9094 -topic wurys -m "hahahaha" -V

Subscribe to Kafka broker and topic

$ kafka-cli sub -broker localhost:9092 -topic wurys

or multiple broker

$ kafka-cli sub -broker localhost:9092,localhost:9093,localhost:9094 -topic wurys

SASL auth mechanism you need add -auth flag for prompting username and password

$ kafka-cli sub -broker localhost:9092,localhost:9093,localhost:9094 -topic wurys -auth
$ username: your-username
$ password: your-password

Documentation

Index

Constants

View Source
const (
	// Version const
	Version = "v0.0.0"
)

Variables

View Source
var (
	// ErrorInvalidCommand error
	ErrorInvalidCommand = errors.New("invalid command")

	// ErrorRequiredOneArgument error
	ErrorRequiredOneArgument = errors.New("require at least one arguments")

	// ErrorRequiredOneBroker error
	ErrorRequiredOneBroker = errors.New("require at least one broker")

	// ErrorPubRequredMessage error
	ErrorPubRequredMessage = errors.New("pub command require a message")

	// ErrorRequiredTopicName error
	ErrorRequiredTopicName = errors.New("require a topic name")
)

Functions

This section is empty.

Types

type Argument

type Argument struct {
	Brokers     []string
	Topic       string
	Command     Command
	ShowVersion bool
	Help        func()
	Message     []byte
	Verbose     bool
	Auth        bool
	Username    string
	Password    string
}

Argument struct

func ParseArgument

func ParseArgument() (*Argument, error)

ParseArgument function

type Command

type Command int

Command type

const (
	// PublishCommand command
	PublishCommand Command = iota

	//SubscribeCommand command
	SubscribeCommand
)

func CommandFromString

func CommandFromString(c string) Command

CommandFromString function

func (Command) String

func (c Command) String() string

String function

type KafkaGoPublisherImpl added in v1.0.0

type KafkaGoPublisherImpl struct {
	// contains filtered or unexported fields
}

KafkaGoPublisherImpl struct

func NewKafkaGoPublisher added in v1.0.0

func NewKafkaGoPublisher(args *Argument) (*KafkaGoPublisherImpl, error)

NewKafkaGoPublisherImpl constructor of KafkaGoPublisherImpl

func (*KafkaGoPublisherImpl) Publish added in v1.0.0

func (publisher *KafkaGoPublisherImpl) Publish(ctx context.Context, topic string, message []byte) error

Publish function

type KafkaGoSubscriberImpl added in v1.0.0

type KafkaGoSubscriberImpl struct {
	// contains filtered or unexported fields
}

KafkaGoSubscriberImpl struct

func NewKafkaGoSubscriber added in v1.0.0

func NewKafkaGoSubscriber(args *Argument) (*KafkaGoSubscriberImpl, error)

NewKafkaGoSubscriber constructor of KafkaGoSubscriberImpl

func (*KafkaGoSubscriberImpl) Subscribe added in v1.0.0

func (s *KafkaGoSubscriberImpl) Subscribe(ctx context.Context, topics ...string) error

Subscribe function

type Publisher

type Publisher interface {
	Publish(context.Context, string, []byte) error
}

Publisher interface

type Runner

type Runner struct {
	Publisher  Publisher
	Subscriber Subscriber
	Argument   *Argument
}

Runner type

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run function

type SaramScramClient added in v1.0.0

type SaramScramClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*SaramScramClient) Begin added in v1.0.0

func (x *SaramScramClient) Begin(userName, password, authzID string) (err error)

func (*SaramScramClient) Done added in v1.0.0

func (x *SaramScramClient) Done() bool

func (*SaramScramClient) Step added in v1.0.0

func (x *SaramScramClient) Step(challenge string) (response string, err error)

type SaramaPublisherImpl added in v1.0.0

type SaramaPublisherImpl struct {
	// contains filtered or unexported fields
}

SaramaPublisherImpl struct

func NewSaramaPublisher added in v1.0.0

func NewSaramaPublisher(args *Argument) (*SaramaPublisherImpl, error)

NewSaramaPublisher constructor of SaramaPublisherImpl

func (*SaramaPublisherImpl) Publish added in v1.0.0

func (publisher *SaramaPublisherImpl) Publish(ctx context.Context, topic string, message []byte) error

Publish function

type SaramaSubscriberImpl added in v1.0.0

type SaramaSubscriberImpl struct {
	// contains filtered or unexported fields
}

SaramaSubscriberImpl struct

func NewSaramaSubscriber added in v1.0.0

func NewSaramaSubscriber(args *Argument) (*SaramaSubscriberImpl, error)

NewSaramaSubscriber constructor of SaramaSubscriberImpl

func (*SaramaSubscriberImpl) Subscribe added in v1.0.0

func (s *SaramaSubscriberImpl) Subscribe(ctx context.Context, topics ...string) error

Subscribe function

type Subscriber

type Subscriber interface {
	Subscribe(context.Context, ...string) error
}

Subscriber interface

type SubscriberHandler

type SubscriberHandler struct {
	// contains filtered or unexported fields
}

SubscriberHandler struct will implement ConsumerGroupHandler

func (*SubscriberHandler) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*SubscriberHandler) ConsumeClaim

func (handler *SubscriberHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*SubscriberHandler) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL