qstreamer

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2024 License: MIT Imports: 4 Imported by: 0

README

Queue Streamer

Queue Streamer is a Go package that processes and transfers data between Kafka topics with exactly-once delivery guarantees. This package receives messages from Kafka brokers and transfers them to specified topics. This document explains how to install and use Queue Streamer.

Installation

To install Queue Streamer, use the Go modules:

go get github.com/violetpay-org/queue-streamer

Usage

Here is an example code to use Queue Streamer.

Example
package main

import (
	"github.com/violetpay-org/queue-streamer"
	"sync"
)


func main() {
	wg := &sync.WaitGroup{}
	
	brokers := []string{"localhost:9092", "localhost:9093", "localhost:9094"}
	origin := qstreamer.NewTopic("origin-topic", 3)           // Topic name and partition

	// Serializer that converts the message to the message to be produced.
	// In this case, the message is not converted, so it is a pass-through serializer.
	serializer := qstreamer.NewPassThroughSerializer()
	
	destination1 := qstreamer.NewTopic("destination-topic-1", 5) // Topic name and partition
	destination2 := qstreamer.NewTopic("destination-topic-2", 3)
	
	streamer := qstreamer.NewTopicStreamer(brokers, origin)

	cfg := qstreamer.NewStreamConfig(&TestSerializer{}, destination1)
	streamer.AddConfig(cfg)

	cfg = qstreamer.NewStreamConfig(&TestSerializer{}, destination2)
	streamer.AddConfig(cfg)
	
	streamer.Run() // Non-blocking
	defer streamer.Stop()
	wg.Add(1)

	wg.Wait()
}
Explanation
  1. Set Topics: Use the NewTopic function to set the start and end topics.

  2. Use PassThroughSerializer: Create a pass-through serializer using NewPassThroughSerializer which does not alter the message.

    • If you want to convert the message, you can create a custom serializer that implements the Serializer interface.
  3. Set StreamConfig: Use the NewStreamConfig function to configure the stream settings.

  4. Create and Configure TopicStreamer: Use the NewTopicStreamer function to create the topic streamer and the AddConfig method to add the stream configuration.

  5. Run and Stop Streamer: Call the Run method to start the streamer and the Stop method to stop the streamer.Use WaitGroup: Use sync.WaitGroup to prevent the main goroutine from exiting.

Contribution

Contributions are welcome! You can contribute to the project by reporting bugs, requesting features, and submitting pull requests.Run method to start the streamer and the Stop method to stop the streamer.

License

Queue Streamer is distributed under the MIT License. See the LICENSE file for more details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewTopic

func NewTopic(name string, partition int32) shared.Topic

Types

type PassThroughSerializer

type PassThroughSerializer struct {
}

func NewPassThroughSerializer

func NewPassThroughSerializer() *PassThroughSerializer

func (*PassThroughSerializer) MessageToProduceMessage

func (ts *PassThroughSerializer) MessageToProduceMessage(value string) string

type StreamConfig

type StreamConfig struct {
	// contains filtered or unexported fields
}

func NewStreamConfig

func NewStreamConfig(ms shared.MessageSerializer, topic shared.Topic) StreamConfig

func (StreamConfig) MessageSerializer

func (ss StreamConfig) MessageSerializer() shared.MessageSerializer

func (StreamConfig) Topic

func (ss StreamConfig) Topic() shared.Topic

type TopicStreamer

type TopicStreamer struct {
	// contains filtered or unexported fields
}

func NewTopicStreamer

func NewTopicStreamer(brokers []string, topic shared.Topic, args ...interface{}) *TopicStreamer

NewTopicStreamer creates a new topic streamer that streams messages from a topic to other topics. The streamer is configured with a list of brokers and a topic to stream from. If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.

  • ts := NewTopicStreamer(brokers, topic)
  • ts := NewTopicStreamer(brokers, topic, consumerConfig, producerConfig)
  • ts := NewTopicStreamer(brokers, topic, nil, producerConfig)

func (*TopicStreamer) AddConfig

func (ts *TopicStreamer) AddConfig(spec StreamConfig)

func (*TopicStreamer) Run

func (ts *TopicStreamer) Run()

func (*TopicStreamer) Stop

func (ts *TopicStreamer) Stop()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL