
segmentio
This lib provides methods to provision segmentio kafka-go structs by topic information received from MaaS.
The provided Writer and Reader will receive all required configuration regarding communication with Kafka server
1. Writer. To create kafka-go Write struct based on response from MaaS, the following code can be used:
import (
"context"
"fmt"
"github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
"github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
maas "github.com/netcracker/qubership-core-lib-go-maas-core"
"github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
kafkago "github.com/segmentio/kafka-go"
)
func producer() {
ctxmanager.Register(baseproviders.Get())
ctx := context.Background()
kafkaClient := maas.NewKafkaClient()
topicAddress, _ := kafkaClient.GetOrCreateTopic(ctx, classifier.New("test"))
writer, _ := segmentio.NewWriter(*topicAddress)
ctxData, _ := ctxmanager.GetResponsePropagatableContextData(ctx)
message := kafkago.Message{
Key: []byte("price"),
Value: []byte("10USD"),
Headers: segmentioHelper.BuildHeaders(ctxData),
}
writer.WriteMessages(ctx, message)
}
2. Reader. to create kafka-go Reader struct based on response from MaaS, the following code can be used:
GetTopic example:
import (
"context"
"fmt"
"github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
"github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
maas "github.com/netcracker/qubership-core-lib-go-maas-core"
segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
"github.com/segmentio/kafka-go"
"time"
)
func consumer() {
ctxmanager.Register(baseproviders.Get())
ctx := context.Background()
kafkaClient := maas.NewKafkaClient()
topicAddress, _ := kafkaClient.GetTopic(ctx, classifier.New("test"))
readerConfig, _ := segmentioHelper.NewReaderConfig(*topicAddress, "prices-group-id")
reader := kafka.NewReader(*readerConfig)
defer reader.Close()
for {
msg, _ := reader.ReadMessage(ctx)
localCtx := ctxmanager.InitContext(ctx, segmentioHelper.ExtractHeaders(msg.Headers))
fmt.Printf("message=%v, ctx=%v", msg, localCtx)
}
}
WatchTopicCreate example:
import (
"context"
"fmt"
"github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
"github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
maas "github.com/netcracker/qubership-core-lib-go-maas-core"
segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
"github.com/segmentio/kafka-go"
"time"
)
func watchingConsumer() {
ctxmanager.Register(baseproviders.Get())
ctx := context.Background()
kafkaClient := maas.NewKafkaClient()
kafkaClient.WatchTopicCreate(ctx, classifier.New("test"), func(topicAddress model.TopicAddress) {
readerConfig, _ := segmentioHelper.NewReaderConfig(topicAddress, "prices-group-id")
reader := kafka.NewReader(*readerConfig)
defer reader.Close()
for {
msg, _ := reader.ReadMessage(ctx)
localCtx := ctxmanager.InitContext(ctx, segmentioHelper.ExtractHeaders(msg.Headers))
fmt.Printf("message=%v, ctx=%v", msg, localCtx)
}
})
}
WatchTenantTopics example:
import (
"context"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
kafkaCl "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka"
segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
"github.com/segmentio/kafka-go"
)
func watchingTenantTopics(ctx context.Context, readers []*kafka.Reader, kafkaClient kafkaCl.MaasClient) {
kafkaClient.WatchTenantTopics(ctx, classifier.New("test"), func(topicAddresses []model.TopicAddress) {
// close current readers
for _, reader := range readers {
reader.Close()
}
// create new readers
for _, topicAddress := range topicAddresses {
tenantId := topicAddress.Classifier[classifier.TenantId]
groupId := "tenant-prices-group-id-" + tenantId
readerConfig, err := segmentioHelper.NewReaderConfig(topicAddress, groupId)
if err != nil {
panic(err.Error())
}
readers = append(readers, kafka.NewReader(*readerConfig))
}
// start consuming messages from readers
// ...
})
}
3. Customize underlying segmentio structs:
import (
"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
)
func writerWithOptions(topicAddress model.TopicAddress) {
writer, err := segmentioHelper.NewWriter(topicAddress,
segmentioHelper.WriterOptions{AlterTransport: func(transport *kafka.Transport) (*kafka.Transport, error) {
transport.MetadataTTL = time.Minute
transport.IdleTimeout = 2 * time.Minute
transport.DialTimeout = 3 * time.Minute
return transport, nil
}})
}
func readerConfigWithOptions(topicAddress model.TopicAddress) {
readerConfig, err := segmentioHelper.NewReaderConfig(topicAddress, "group-id",
segmentioHelper.ReaderOptions{AlterDialer: func(dialer *kafka.Dialer) (*kafka.Dialer, error) {
dialer.Timeout = time.Minute
dialer.KeepAlive = 2 * time.Minute
dialer.FallbackDelay = 3 * time.Minute
return dialer, nil
}})
}
func clientWithOptions(topicAddress model.TopicAddress) {
client, err := segmentioHelper.NewClient(topicAddress,
segmentioHelper.ClientOptions{AlterTransport: func(transport *kafka.Transport) (*kafka.Transport, error) {
transport.MetadataTTL = time.Minute
transport.IdleTimeout = 2 * time.Minute
transport.DialTimeout = 3 * time.Minute
return transport, nil
}})
}