segmentio

package module
v3.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: Apache-2.0 Imports: 10 Imported by: 1

README

Go build Coverage duplicated_lines_density vulnerabilities bugs code_smells

segmentio

This lib provides methods to provision segmentio kafka-go structs by topic information received from MaaS. The provided Writer and Reader will receive all required configuration regarding communication with Kafka server

1. Writer. To create kafka-go Write struct based on response from MaaS, the following code can be used:
import (
  "context"
  "fmt"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
  maas "github.com/netcracker/qubership-core-lib-go-maas-core"
  "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  kafkago "github.com/segmentio/kafka-go"
)

func producer() {
  ctxmanager.Register(baseproviders.Get())
  ctx := context.Background()
  kafkaClient := maas.NewKafkaClient()
  topicAddress, _ := kafkaClient.GetOrCreateTopic(ctx, classifier.New("test"))
  writer, _ := segmentio.NewWriter(*topicAddress)
  ctxData, _ := ctxmanager.GetResponsePropagatableContextData(ctx)
  message := kafkago.Message{
  	Key:     []byte("price"),
  	Value:   []byte("10USD"),
  	Headers: segmentioHelper.BuildHeaders(ctxData),
  }
  writer.WriteMessages(ctx, message)
}
2. Reader. to create kafka-go Reader struct based on response from MaaS, the following code can be used:
GetTopic example:
import (
  "context"
  "fmt"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
  maas "github.com/netcracker/qubership-core-lib-go-maas-core"
  segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  "github.com/segmentio/kafka-go"
  "time"
)

func consumer() {
  ctxmanager.Register(baseproviders.Get())
  ctx := context.Background()
  kafkaClient := maas.NewKafkaClient()
  topicAddress, _ := kafkaClient.GetTopic(ctx, classifier.New("test"))
  readerConfig, _ := segmentioHelper.NewReaderConfig(*topicAddress, "prices-group-id")
  reader := kafka.NewReader(*readerConfig)
  defer reader.Close()
  for {
  	msg, _ := reader.ReadMessage(ctx)
  	localCtx := ctxmanager.InitContext(ctx, segmentioHelper.ExtractHeaders(msg.Headers))
  	fmt.Printf("message=%v, ctx=%v", msg, localCtx)
  }
}
WatchTopicCreate example:
import (
  "context"
  "fmt"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/baseproviders"
  "github.com/netcracker/qubership-core-lib-go/context-propagation/ctxmanager"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
  "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
  maas "github.com/netcracker/qubership-core-lib-go-maas-core"
  segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  "github.com/segmentio/kafka-go"
  "time"
)

func watchingConsumer() {
  ctxmanager.Register(baseproviders.Get())
  ctx := context.Background()
  kafkaClient := maas.NewKafkaClient()
  kafkaClient.WatchTopicCreate(ctx, classifier.New("test"), func(topicAddress model.TopicAddress) {
  	readerConfig, _ := segmentioHelper.NewReaderConfig(topicAddress, "prices-group-id")
  	reader := kafka.NewReader(*readerConfig)
  	defer reader.Close()
  	for {
  		msg, _ := reader.ReadMessage(ctx)
  		localCtx := ctxmanager.InitContext(ctx, segmentioHelper.ExtractHeaders(msg.Headers))
  		fmt.Printf("message=%v, ctx=%v", msg, localCtx)
  	}
  })
}
WatchTenantTopics example:
import (
	"context"
	"github.com/netcracker/qubership-core-lib-go-maas-client/v3/classifier"
	"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
	kafkaCl "github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka"
	segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
	"github.com/segmentio/kafka-go"
)

func watchingTenantTopics(ctx context.Context, readers []*kafka.Reader, kafkaClient kafkaCl.MaasClient) {
	kafkaClient.WatchTenantTopics(ctx, classifier.New("test"), func(topicAddresses []model.TopicAddress) {
		// close current readers
		for _, reader := range readers  {
			reader.Close()
		}
		// create new readers
		for _, topicAddress := range topicAddresses {
			tenantId := topicAddress.Classifier[classifier.TenantId]
			groupId := "tenant-prices-group-id-" + tenantId
			readerConfig, err := segmentioHelper.NewReaderConfig(topicAddress, groupId)
			if err != nil {
				panic(err.Error())
			}
			readers = append(readers, kafka.NewReader(*readerConfig))
		}
		// start consuming messages from readers
		// ...
	})
}
3. Customize underlying segmentio structs:
  import (
	"github.com/netcracker/qubership-core-lib-go-maas-client/v3/kafka/model"
	segmentioHelper "github.com/netcracker/qubership-core-lib-go-maas-segmentio/v3"
  )

func writerWithOptions(topicAddress model.TopicAddress) {
    writer, err := segmentioHelper.NewWriter(topicAddress,
        segmentioHelper.WriterOptions{AlterTransport: func(transport *kafka.Transport) (*kafka.Transport, error) {
            transport.MetadataTTL = time.Minute
            transport.IdleTimeout = 2 * time.Minute
            transport.DialTimeout = 3 * time.Minute
            return transport, nil
        }})
}

func readerConfigWithOptions(topicAddress model.TopicAddress) {
	readerConfig, err := segmentioHelper.NewReaderConfig(topicAddress, "group-id",
		segmentioHelper.ReaderOptions{AlterDialer: func(dialer *kafka.Dialer) (*kafka.Dialer, error) {
			dialer.Timeout = time.Minute
			dialer.KeepAlive = 2 * time.Minute
			dialer.FallbackDelay = 3 * time.Minute
			return dialer, nil
		}})
}

func clientWithOptions(topicAddress model.TopicAddress) {
	client, err := segmentioHelper.NewClient(topicAddress,
		segmentioHelper.ClientOptions{AlterTransport: func(transport *kafka.Transport) (*kafka.Transport, error) {
			transport.MetadataTTL = time.Minute
			transport.IdleTimeout = 2 * time.Minute
			transport.DialTimeout = 3 * time.Minute
			return transport, nil
		}})
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildHeaders

func BuildHeaders(ctxData map[string]string) []kafkago.Header

func ExtractHeaders

func ExtractHeaders(headers []kafkago.Header) map[string]interface{}

func NewClient

func NewClient(topic maasModel.TopicAddress, options ...ClientOptions) (*kafka.Client, error)

func NewDialerAndServers

func NewDialerAndServers(topic maasModel.TopicAddress) (*kafka.Dialer, []string, error)

func NewReaderConfig

func NewReaderConfig(topic maasModel.TopicAddress, groupId string, options ...ReaderOptions) (*kafka.ReaderConfig, error)

func NewWriter

func NewWriter(topic maasModel.TopicAddress, options ...WriterOptions) (*kafka.Writer, error)

Types

type ClientOptions

type ClientOptions struct {
	AlterTransport func(transport *kafka.Transport) (*kafka.Transport, error)
}

type ReaderOptions

type ReaderOptions struct {
	AlterDialer func(dialer *kafka.Dialer) (*kafka.Dialer, error)
}

type WriterOptions

type WriterOptions struct {
	AlterTransport func(transport *kafka.Transport) (*kafka.Transport, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL