amqprpc

package module
v0.0.0-...-61310a7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2018 License: MIT Imports: 8 Imported by: 0

README

go-amqprpc

Go net/rpc codec implementation for AMQP. Small rework of amqprpc.

Updates:

  • Fixed memory leak (server requests was not removed from the map after processing)
  • Fixed concurrency issue: serverCodec.requests map cannot use Delivery.Seq as the sequence number because it can be the same for different clients and it can cause data loss. Instead the single per server atomic counter is used.

Installation

$ go get github.com/vbogretsov/go-amqprpc

Usage

Server
package main

import (
    "log"
    "net/rpc"

    "github.com/streadway/amqp"
    "github.com/vbogretsov/go-amqprpc"
)

type Args struct {
    A int
    B int
}

type Test struct{}

func (t *Test) Mul(args *Args, reply *int) error {
    *reply = args.A * args.B
    return nil
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }

    serverCodec, err := amqprpc.NewServerCodec(conn, "testrpc", amqprpc.MsgPack)
    if err != nil {
        log.Fatal(err)
    }

    rpc.Register(&Test{})
    rpc.ServeCodec(serverCodec)
}
Client
package main

import (
    "log"
    "math/rand"
    "net/rpc"
    "sync"
    "time"

    "github.com/streadway/amqp"
    "github.com/vbogretsov/go-amqprpc"
)

type Args struct {
    A int
    B int
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatal(err)
    }

    clientCodec, err := amqprpc.NewClientCodec(conn, "testrpc", amqprpc.MsgPack)
    if err != nil {
        log.Fatal(err)
    }
    defer clientCodec.Close()

    client := rpc.NewClientWithCodec(clientCodec)

    numCalls := 10000
    wg := sync.WaitGroup{}
    wg.Add(numCalls)
    sem := make(chan int, 100)

    t0 := time.Now()
    for i := 0; i < numCalls; i++ {
        sem <- 1
        go func() {
            args := Args{rand.Int() % 100, rand.Int() % 100}
            var result int

            if err := client.Call("Test.Mul", args, &result); err != nil {
                log.Fatal(err)
            }

            if result != args.A*args.B {
                log.Printf("%v * %v != %v", args.A, args.B, result)
                log.Fatal("FAIL")
            }

            wg.Done()
            <-sem
        }()
    }
    wg.Wait()
    log.Printf("SUCCESS, rps: %v", float64(numCalls)/time.Now().Sub(t0).Seconds())
}

Licence

See the LICENSE file.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewClientCodec

func NewClientCodec(conn *amqp.Connection, name string, formatter Formatter) (rpc.ClientCodec, error)

func NewServerCodec

func NewServerCodec(conn *amqp.Connection, name string, formatter Formatter) (rpc.ServerCodec, error)

Types

type Formatter

type Formatter interface {
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
}

Formatter marshals and unmarshals the message body.

var (
	MsgPack Formatter = msgPackFormatter{}
	Json    Formatter = jsonFormatter{}
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL