ami

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2018 License: MIT Imports: 5 Imported by: 0

README

Ami

Go client to reliable queues based on Redis Cluster Streams.

Ami docs.

Consume/produce perfomance

Perfomance is dependent from:

  • Redis Cluster nodes count;
  • ping RTT from client to Redis Cluster master nodes;
  • Ami configuration.

As example, 10-nodes Redis Cluster with half of nodes in other datacenter (50 msec ping) got near 80000 rps produced from one client. Also, consume/produce rps is not maximally optimal now and can be a little bit better.

Producer example

	pr, err := ami.NewProducer(
		ami.ProducerOptions{
			Name:              "ruthie",
			ShardsCount:       10,
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
		},
		&redis.ClusterOptions{
			Addrs: []string{"172.17.0.1:7001", "172.17.0.1:7002"},
		},
	)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		pr.Send("{}")
	}

	pr.Close()

Consumer example

	cn, err := ami.NewConsumer(
		ami.ConsumerOptions{
			Name:              "ruthie",
			Consumer:          "alice",
			ShardsCount:       10,
			PrefetchCount:     100,
			Block:             time.Second,
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
		},
		&redis.ClusterOptions{
			Addrs: []string{"172.17.0.1:7001", "172.17.0.1:7002"},
		},
	)
	if err != nil {
		panic(err)
	}

	c := cn.Start()

	wg := sync.WaitGroup{}
	wg.Add(1)

	go func() {
		for {
			m, more := <-c
			if !more {
				break
			}
			println("Got", m.Body, "ID", m.ID)
			cn.Ack(m)
		}
		wg.Done()
	}()

	time.Sleep(time.Second)

	cn.Stop()
	wg.Wait()

	cn.Close()

Documentation

Overview

Package ami - Go client to reliable queues based on Redis Cluster Streams https://redis.io/topics/streams-intro.

Producer example

pr, err := ami.NewProducer(
	ami.ProducerOptions{
		Name:              "ruthie",
		ShardsCount:       10,
		PendingBufferSize: 10000000,
		PipeBufferSize:    50000,
		PipePeriod:        time.Microsecond * 1000,
	},
	&redis.ClusterOptions{
		Addrs: []string{"172.17.0.1:7001", "172.17.0.1:7002"},
	},
)
if err != nil {
	panic(err)
}

for i := 0; i < 10000; i++ {
	pr.Send("{}")
}

pr.Close()

Consumer example

cn, err := ami.NewConsumer(
	ami.ConsumerOptions{
		Name:              "ruthie",
		Consumer:          "alice",
		ShardsCount:       10,
		PrefetchCount:     100,
		Block:             time.Second,
		PendingBufferSize: 10000000,
		PipeBufferSize:    50000,
		PipePeriod:        time.Microsecond * 1000,
	},
	&redis.ClusterOptions{
		Addrs: []string{"172.17.0.1:7001", "172.17.0.1:7002"},
	},
)
if err != nil {
	panic(err)
}

c := cn.Start()

wg := sync.WaitGroup{}
wg.Add(1)

go func() {
	for {
		m, more := <-c
		if !more {
			break
		}
		println("Got", m.Body, "ID", m.ID)
		cn.Ack(m)
	}
	wg.Done()
}()

time.Sleep(time.Second)

cn.Stop()
wg.Wait()

cn.Close()

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer added in v0.1.6

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer client for Ami

func NewConsumer added in v0.1.6

func NewConsumer(opt ConsumerOptions, ropt *redis.ClusterOptions) (*Consumer, error)

NewConsumer creates new consumer client for Ami

func (*Consumer) Ack added in v0.1.6

func (c *Consumer) Ack(m Message)

Ack acknowledges message

func (*Consumer) Close added in v0.1.6

func (c *Consumer) Close()

Close queue client

func (*Consumer) Start added in v0.1.6

func (c *Consumer) Start() chan Message

Start consume from queue

func (*Consumer) Stop added in v0.1.6

func (c *Consumer) Stop()

Stop queue client

type ConsumerOptions added in v0.1.6

type ConsumerOptions struct {
	Name              string
	Consumer          string
	ShardsCount       int8
	PrefetchCount     int64
	Block             time.Duration
	PendingBufferSize int64
	PipeBufferSize    int64
	PipePeriod        time.Duration
}

ConsumerOptions - options for consumer client for Ami

type Message

type Message struct {
	Body   string
	ID     string
	Stream string
	Group  string
}

Message from queue

type Producer added in v0.1.6

type Producer struct {
	// contains filtered or unexported fields
}

Producer client for Ami

func NewProducer added in v0.1.6

func NewProducer(opt ProducerOptions, ropt *redis.ClusterOptions) (*Producer, error)

NewProducer creates new producer client for Ami

func (*Producer) Close added in v0.1.6

func (p *Producer) Close()

Close queue client

func (*Producer) Send added in v0.1.6

func (p *Producer) Send(m string)

Send message

type ProducerOptions added in v0.1.6

type ProducerOptions struct {
	Name              string
	ShardsCount       int8
	PendingBufferSize int64
	PipeBufferSize    int64
	PipePeriod        time.Duration
}

ProducerOptions - options for producer client for Ami

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL