gokafkaesque

package module
v0.0.0-...-0729c0f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2019 License: MIT Imports: 2 Imported by: 1

README

go-kafakesque

Go Documentation Build status

A Go binding for Kafka Admin Service Since I couldn't manage to find one, hence, write a new one. One of the intention of having this package is to allow me to easily write a Terraform provider.

Usage:

Import package

go get github.com/packetloop/go-kafkaesque

For package dependency management, we use dep:

go get -u github.com/golang/dep/cmd/dep

If new package is required, pls run below command after go get. For more information about dep, please visit this URL https://github.com/golang/dep.

dep ensure

Run test:

make test

To maintain codebase quality with static checks and analysis:

make run

Examples:

package main

import (
	"fmt"

	gokafkaesqueue "github.com/packetloop/go-kafkaesque"
)

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AllTopics

type AllTopics struct {
	Topics []string `json:"topics"`
}

AllTopics is a list of topic names.

func (AllTopics) Count

func (t AllTopics) Count() int

Count is a method that returns total size of topics.

func (AllTopics) TopicsToString

func (t AllTopics) TopicsToString() []string

TopicsToString is a method that returns a slice of topics.

type Client

type Client struct {
	Rest *resty.Client
}

Client contains Singularity endpoint for http requests

func NewClient

func NewClient(c serverConfig) *Client

NewClient returns Singularity HTTP endpoint.

Example
package main

import (
	"fmt"

	gokafkaesque "github.com/packetloop/go-kafkaesque"
)

func main() {
	config := gokafkaesque.NewConfig().
		SetURL("http://localhost:8080").
		SetRetry(3).
		Build()
	client := gokafkaesque.NewClient(config)
	a, _ := client.GetStatus()
	fmt.Println(a.GetMessage())

}
Output:

Ok

func (*Client) CreateTopic

func (client *Client) CreateTopic(t Topic) (Response, error)

CreateTopic accepts a Topic and returns an "Ok" response or error.

Example
package main

import (
	"fmt"

	gokafkaesque "github.com/packetloop/go-kafkaesque"
)

func main() {
	config := gokafkaesque.NewConfig().
		SetURL("http://localhost:8080").
		SetRetry(3).
		Build()
	client := gokafkaesque.NewClient(config)
	t := gokafkaesque.NewTopic("foo").SetPartitions("2").SetReplicationFactor("5").BuildTopic()
	a, _ := client.CreateTopic(t)
	fmt.Println(a.GetMessage())

}
Output:

foo created.

func (*Client) DeleteTopic

func (client *Client) DeleteTopic(t string) (Response, error)

DeleteTopic method accepts a string topic, deletes this Kafka topic and returns a string response or error.

Example
package main

import (
	"fmt"

	gokafkaesque "github.com/packetloop/go-kafkaesque"
)

func main() {
	config := gokafkaesque.NewConfig().
		SetURL("http://localhost:8080").
		SetRetry(3).
		Build()
	client := gokafkaesque.NewClient(config)
	a, _ := client.DeleteTopic("foo")
	fmt.Println(a.GetMessage())

}
Output:

foo deleted.

func (*Client) GetStatus

func (client *Client) GetStatus() (Response, error)

GetStatus returns status kafka-admin-service /health endpoint.

func (*Client) GetTopic

func (client *Client) GetTopic(t string) (Topic, error)

GetTopic is a method that return a Kafka topic

Example
package main

import (
	"fmt"

	gokafkaesque "github.com/packetloop/go-kafkaesque"
)

func main() {
	config := gokafkaesque.NewConfig().
		SetURL("http://localhost:8080").
		SetRetry(3).
		Build()
	client := gokafkaesque.NewClient(config)
	a, _ := client.GetTopic("foo")
	fmt.Printf("partition: %s\nreplication_factor: %s\n", a.GetPartitions(), a.GetReplicationFactor())

}
Output:

partition: 2
replication_factor: 5

func (*Client) GetTopics

func (client *Client) GetTopics() (AllTopics, error)

GetTopics is a method that returns all Kafka topics.

Example

This example is meant to fail because the output is too verbose We will improve it later

package main

import (
	"fmt"

	gokafkaesque "github.com/packetloop/go-kafkaesque"
)

func main() {
	config := gokafkaesque.NewConfig().
		SetURL("http://localhost:8080").
		SetRetry(3).
		Build()
	client := gokafkaesque.NewClient(config)
	a, _ := client.GetTopics()
	fmt.Println(a.TopicsToString())

}
Output:

[__confluent.support.metrics __consumer_offsets]

func (*Client) UpdateTopic

func (client *Client) UpdateTopic(t Topic) (Response, error)

UpdateTopic is a method that update a Kafka topic. This requires complete config parameters set. If we want to allow only update optional params, we need to implement PATCH request instead.

Example

This is a PUT request, which means it requires complete parameters set. To only update optional params, we need to implement PATCH request instead.

package main

import (
	"fmt"

	gokafkaesque "github.com/packetloop/go-kafkaesque"
)

func main() {
	config := gokafkaesque.NewConfig().
		SetURL("http://localhost:8080").
		SetRetry(3).
		Build()
	client := gokafkaesque.NewClient(config)
	t := gokafkaesque.NewTopic("foo").SetPartitions("2").SetReplicationFactor("5").BuildTopic()
	_, err := client.CreateTopic(t)
	if err != nil {
		fmt.Println(err)
	}
	t.Config = &gokafkaesque.Config{
		RetentionMs:       "1000",
		SegmentBytes:      "10000000",
		CleanupPolicy:     "delete",
		MinInsyncReplicas: "1",
		RetentionBytes:    "10",
		SegmentMs:         "10",
	}
	a, err := client.UpdateTopic(t)
	if err != nil {
		fmt.Println(err)
	}

	fmt.Println(a.GetMessage())

}
Output:

foo updated.

type Config

type Config struct {
	RetentionMs       string `json:"retention.ms"`
	SegmentBytes      string `json:"segment.bytes"`
	CleanupPolicy     string `json:"cleanup.policy"` // Accepted values are: "deleted", "compact"
	MinInsyncReplicas string `json:"min.insync.replicas"`
	RetentionBytes    string `json:"retention.bytes"`
	SegmentMs         string `json:"segment.ms"`
}

Config contains a Kafka topic retention config in ms.

func (*Config) GetCleanupPolicy

func (c *Config) GetCleanupPolicy() string

GetCleanupPolicy is a method that returns cleanup policy of a topic.

func (*Config) GetMinInSyncReplicas

func (c *Config) GetMinInSyncReplicas() string

GetMinInSyncReplicas is a method that returns the minimum number of insync replicas. force the log to rollof a topic.

func (*Config) GetRetentionBytes

func (c *Config) GetRetentionBytes() string

GetRetentionBytes is a method that returns the retention bytes for the topic. force the log to rollof a topic.

func (*Config) GetRetentionMs

func (c *Config) GetRetentionMs() string

GetRetentionMs is a method that returns partitions of a topic.

func (*Config) GetSegmentBytes

func (c *Config) GetSegmentBytes() string

GetSegmentBytes is a method that returns partitions of a topic.

func (*Config) GetSegmentMs

func (c *Config) GetSegmentMs() string

GetSegmentMs is a method that returns the time after which Kafka will force the log to rollof a topic.

type Response

type Response struct {
	Message string `json:"message"`
}

Response returns a response of OK.

func (*Response) GetMessage

func (h *Response) GetMessage() string

GetMessage is a method that returns actual health status of "ok".

type ServerConfigBuilder

type ServerConfigBuilder interface {
	SetURL(string) ServerConfigBuilder
	SetRetry(int) ServerConfigBuilder
	Build() serverConfig
}

ServerConfigBuilder sets port, host, http retry count config to be passed to create a NewClient.

func NewConfig

func NewConfig() ServerConfigBuilder

NewConfig returns an empty ServerConfigBuilder.

type Topic

type Topic struct {
	*Config           `json:"config"`
	Partitions        string  `json:"partitions"`
	ReplicationFactor string  `json:"replicationFactor"`
	Name              *string `json:"topic"`
}

Topic includes Kafka topic config, partitions, replication factor and name.

func (*Topic) BuildTopic

func (t *Topic) BuildTopic() Topic

BuildTopic is a method that builds a Topic parameters and pass this as an argument when calling CreateTopic method.

func (Topic) GetPartitions

func (t Topic) GetPartitions() string

GetPartitions is a method that returns partitions of a topic.

func (Topic) GetReplicationFactor

func (t Topic) GetReplicationFactor() string

GetReplicationFactor is a method that returns partitions of a topic.

func (*Topic) SetConfig

func (t *Topic) SetConfig(c Config) TopicBuilder

SetConfig is a method that accepts a Config struct and

sets Topic config such as retention periond in ms.

func (*Topic) SetPartitions

func (t *Topic) SetPartitions(p string) TopicBuilder

SetPartitions is a method that accepts an int64 and sets Topic partition.

func (*Topic) SetReplicationFactor

func (t *Topic) SetReplicationFactor(r string) TopicBuilder

SetReplicationFactor is a method that accepts an int64 and sets Topic replication factor.

type TopicBuilder

type TopicBuilder interface {
	SetPartitions(string) TopicBuilder
	SetReplicationFactor(string) TopicBuilder
	SetConfig(Config) TopicBuilder
	BuildTopic() Topic
}

TopicBuilder is an interface that builds a Kafka Topic Config.

func NewTopic

func NewTopic(name string) TopicBuilder

NewTopic accepts a string topic name and returns a TopicBuilder interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL