Documentation ¶
Index ¶
- type AllTopics
- type Client
- func (client *Client) CreateTopic(t Topic) (Response, error)
- func (client *Client) DeleteTopic(t string) (Response, error)
- func (client *Client) GetStatus() (Response, error)
- func (client *Client) GetTopic(t string) (Topic, error)
- func (client *Client) GetTopics() (AllTopics, error)
- func (client *Client) UpdateTopic(t Topic) (Response, error)
- type Config
- type Response
- type ServerConfigBuilder
- type Topic
- type TopicBuilder
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AllTopics ¶
type AllTopics struct {
Topics []string `json:"topics"`
}
AllTopics is a list of topic names.
func (AllTopics) TopicsToString ¶
TopicsToString is a method that returns a slice of topics.
type Client ¶
Client contains Singularity endpoint for http requests
func NewClient ¶
func NewClient(c serverConfig) *Client
NewClient returns Singularity HTTP endpoint.
Example ¶
package main import ( "fmt" gokafkaesque "github.com/packetloop/go-kafkaesque" ) func main() { config := gokafkaesque.NewConfig(). SetURL("http://localhost:8080"). SetRetry(3). Build() client := gokafkaesque.NewClient(config) a, _ := client.GetStatus() fmt.Println(a.GetMessage()) }
Output: Ok
func (*Client) CreateTopic ¶
CreateTopic accepts a Topic and returns an "Ok" response or error.
Example ¶
package main import ( "fmt" gokafkaesque "github.com/packetloop/go-kafkaesque" ) func main() { config := gokafkaesque.NewConfig(). SetURL("http://localhost:8080"). SetRetry(3). Build() client := gokafkaesque.NewClient(config) t := gokafkaesque.NewTopic("foo").SetPartitions("2").SetReplicationFactor("5").BuildTopic() a, _ := client.CreateTopic(t) fmt.Println(a.GetMessage()) }
Output: foo created.
func (*Client) DeleteTopic ¶
DeleteTopic method accepts a string topic, deletes this Kafka topic and returns a string response or error.
Example ¶
package main import ( "fmt" gokafkaesque "github.com/packetloop/go-kafkaesque" ) func main() { config := gokafkaesque.NewConfig(). SetURL("http://localhost:8080"). SetRetry(3). Build() client := gokafkaesque.NewClient(config) a, _ := client.DeleteTopic("foo") fmt.Println(a.GetMessage()) }
Output: foo deleted.
func (*Client) GetTopic ¶
GetTopic is a method that return a Kafka topic
Example ¶
package main import ( "fmt" gokafkaesque "github.com/packetloop/go-kafkaesque" ) func main() { config := gokafkaesque.NewConfig(). SetURL("http://localhost:8080"). SetRetry(3). Build() client := gokafkaesque.NewClient(config) a, _ := client.GetTopic("foo") fmt.Printf("partition: %s\nreplication_factor: %s\n", a.GetPartitions(), a.GetReplicationFactor()) }
Output: partition: 2 replication_factor: 5
func (*Client) GetTopics ¶
GetTopics is a method that returns all Kafka topics.
Example ¶
This example is meant to fail because the output is too verbose We will improve it later
package main import ( "fmt" gokafkaesque "github.com/packetloop/go-kafkaesque" ) func main() { config := gokafkaesque.NewConfig(). SetURL("http://localhost:8080"). SetRetry(3). Build() client := gokafkaesque.NewClient(config) a, _ := client.GetTopics() fmt.Println(a.TopicsToString()) }
Output: [__confluent.support.metrics __consumer_offsets]
func (*Client) UpdateTopic ¶
UpdateTopic is a method that update a Kafka topic. This requires complete config parameters set. If we want to allow only update optional params, we need to implement PATCH request instead.
Example ¶
This is a PUT request, which means it requires complete parameters set. To only update optional params, we need to implement PATCH request instead.
package main import ( "fmt" gokafkaesque "github.com/packetloop/go-kafkaesque" ) func main() { config := gokafkaesque.NewConfig(). SetURL("http://localhost:8080"). SetRetry(3). Build() client := gokafkaesque.NewClient(config) t := gokafkaesque.NewTopic("foo").SetPartitions("2").SetReplicationFactor("5").BuildTopic() _, err := client.CreateTopic(t) if err != nil { fmt.Println(err) } t.Config = &gokafkaesque.Config{ RetentionMs: "1000", SegmentBytes: "10000000", CleanupPolicy: "delete", MinInsyncReplicas: "1", RetentionBytes: "10", SegmentMs: "10", } a, err := client.UpdateTopic(t) if err != nil { fmt.Println(err) } fmt.Println(a.GetMessage()) }
Output: foo updated.
type Config ¶
type Config struct { RetentionMs string `json:"retention.ms"` SegmentBytes string `json:"segment.bytes"` CleanupPolicy string `json:"cleanup.policy"` // Accepted values are: "deleted", "compact" MinInsyncReplicas string `json:"min.insync.replicas"` RetentionBytes string `json:"retention.bytes"` SegmentMs string `json:"segment.ms"` }
Config contains a Kafka topic retention config in ms.
func (*Config) GetCleanupPolicy ¶
GetCleanupPolicy is a method that returns cleanup policy of a topic.
func (*Config) GetMinInSyncReplicas ¶
GetMinInSyncReplicas is a method that returns the minimum number of insync replicas. force the log to rollof a topic.
func (*Config) GetRetentionBytes ¶
GetRetentionBytes is a method that returns the retention bytes for the topic. force the log to rollof a topic.
func (*Config) GetRetentionMs ¶
GetRetentionMs is a method that returns partitions of a topic.
func (*Config) GetSegmentBytes ¶
GetSegmentBytes is a method that returns partitions of a topic.
func (*Config) GetSegmentMs ¶
GetSegmentMs is a method that returns the time after which Kafka will force the log to rollof a topic.
type Response ¶
type Response struct {
Message string `json:"message"`
}
Response returns a response of OK.
func (*Response) GetMessage ¶
GetMessage is a method that returns actual health status of "ok".
type ServerConfigBuilder ¶
type ServerConfigBuilder interface { SetURL(string) ServerConfigBuilder SetRetry(int) ServerConfigBuilder Build() serverConfig }
ServerConfigBuilder sets port, host, http retry count config to be passed to create a NewClient.
func NewConfig ¶
func NewConfig() ServerConfigBuilder
NewConfig returns an empty ServerConfigBuilder.
type Topic ¶
type Topic struct { *Config `json:"config"` Partitions string `json:"partitions"` ReplicationFactor string `json:"replicationFactor"` Name *string `json:"topic"` }
Topic includes Kafka topic config, partitions, replication factor and name.
func (*Topic) BuildTopic ¶
BuildTopic is a method that builds a Topic parameters and pass this as an argument when calling CreateTopic method.
func (Topic) GetPartitions ¶
GetPartitions is a method that returns partitions of a topic.
func (Topic) GetReplicationFactor ¶
GetReplicationFactor is a method that returns partitions of a topic.
func (*Topic) SetConfig ¶
func (t *Topic) SetConfig(c Config) TopicBuilder
SetConfig is a method that accepts a Config struct and
sets Topic config such as retention periond in ms.
func (*Topic) SetPartitions ¶
func (t *Topic) SetPartitions(p string) TopicBuilder
SetPartitions is a method that accepts an int64 and sets Topic partition.
func (*Topic) SetReplicationFactor ¶
func (t *Topic) SetReplicationFactor(r string) TopicBuilder
SetReplicationFactor is a method that accepts an int64 and sets Topic replication factor.
type TopicBuilder ¶
type TopicBuilder interface { SetPartitions(string) TopicBuilder SetReplicationFactor(string) TopicBuilder SetConfig(Config) TopicBuilder BuildTopic() Topic }
TopicBuilder is an interface that builds a Kafka Topic Config.
func NewTopic ¶
func NewTopic(name string) TopicBuilder
NewTopic accepts a string topic name and returns a TopicBuilder interface.