wsqueue

package module
v0.0.0-...-2b0d443 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2016 License: Apache-2.0 Imports: 14 Imported by: 0

README

go-wsqueue

Current Status Build Status Godoc Status

Introduction

  • Golang API over gorilla/mux and gorilla/websocket
  • Kind of AMQP but over websockets
Publish & Subscribe Pattern

Publish and subscribe semantics are implemented by Topics.

When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

Work queues Pattern

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to the queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

Queues implement load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.

Getting started

$ go get github.com/fsamin/go-wsqueue

Publish & Subscribe

Run this with 3 terminals :

go run main.go -server

go run main.go -client1

go run main.go -client2

main.go :

package main

import (
	"flag"
	"log"
	"net/http"
	"time"

	"github.com/fsamin/go-wsqueue"
	"github.com/gorilla/mux"

	"github.com/jmcvetta/randutil"
)

var fServer = flag.Bool("server", false, "Run server")
var fClient1 = flag.Bool("client1", false, "Run client #1")
var fClient2 = flag.Bool("client2", false, "Run client #2")

func main() {
	flag.Parse()
	forever := make(chan bool)

	if *fServer {
		server()
	}
	if *fClient1 {
		client("1")
	}
	if *fClient2 {
		client("2")
	}

	<-forever
}

func server() {
	r := mux.NewRouter()
	s := wsqueue.NewServer(r, "")
	q := s.CreateTopic("topic1")

	q.OpenedConnectionHandler = func(c *wsqueue.Conn) {
		log.Println("Welcome " + c.ID)
		q.Publish("Welcome " + c.ID)
	}

	q.ClosedConnectionHandler = func(c *wsqueue.Conn) {
		log.Println("Bye bye " + c.ID)
	}
	http.Handle("/", r)
	go http.ListenAndServe("0.0.0.0:9000", r)

	//Start send message to queue
	go func() {
		for {
			time.Sleep(5 * time.Second)
			s, _ := randutil.AlphaString(10)
			q.Publish("message from goroutine 1 : " + s)
		}
	}()

	go func() {
		for {
			time.Sleep(10 * time.Second)
			s, _ := randutil.AlphaString(10)
			q.Publish("message from goroutine 2 : " + s)
		}
	}()
}

func client(ID string) {
	//Connect a client
	go func() {
		c := &wsqueue.Client{
			Protocol: "ws",
			Host:     "localhost:9000",
			Route:    "/",
		}
		cMessage, cError, err := c.Subscribe("topic1")
		if err != nil {
			panic(err)
		}
		for {
			select {
			case m := <-cMessage:
				log.Println("\n\n********* Client " + ID + " *********" + m.String() + "\n******************")
			case e := <-cError:
				log.Println("\n\n********* Client " + ID + "  *********" + e.Error() + "\n******************")
			}
		}
	}()
}

Load balanced Work Queues

Run this with 3 terminals :

go run main.go -server

go run main.go -client1

go run main.go -client2

main.go :

package main

import (
	"flag"
	"log"
	"net/http"
	"time"

	"github.com/fsamin/go-wsqueue"
	"github.com/gorilla/mux"

	"github.com/jmcvetta/randutil"
)

var fServer = flag.Bool("server", false, "Run server")
var fClient1 = flag.Bool("client1", false, "Run client #1")
var fClient2 = flag.Bool("client2", false, "Run client #2")

func main() {
	flag.Parse()
	forever := make(chan bool)

	if *fServer {
		server()
	}
	if *fClient1 {
		client("1")
	}
	if *fClient2 {
		client("2")
	}

	<-forever
}

func server() {
	r := mux.NewRouter()
	s := wsqueue.NewServer(r, "")
	q := s.CreateQueue("queue1", 10)

	http.Handle("/", r)
	go http.ListenAndServe("0.0.0.0:9000", r)

	//Start send message to queue
	go func() {
		for {
			time.Sleep(5 * time.Second)
			s, _ := randutil.AlphaString(10)
			q.Send("message from goroutine 1 : " + s)
		}
	}()

	go func() {
		for {
			time.Sleep(6 * time.Second)
			s, _ := randutil.AlphaString(10)
			q.Send("message from goroutine 2 : " + s)
		}
	}()
}

func client(ID string) {
	//Connect a client
	go func() {
		c := &wsqueue.Client{
			Protocol: "ws",
			Host:     "localhost:9000",
			Route:    "/",
		}
		cMessage, cError, err := c.Listen("queue1")
		if err != nil {
			panic(err)
		}
		for {
			select {
			case m := <-cMessage:
				log.Println("\n\n********* Client " + ID + " *********" + m.String() + "\n******************")
			case e := <-cError:
				log.Println("\n\n********* Client " + ID + "  *********" + e.Error() + "\n******************")
			}
		}
	}()
}

Documentation

Overview

Package wsqueue provides a framework over gorilla/mux and gorilla/websocket to operate kind of AMQP but over websockets. It offers a Server, a Client and two king of messaging protocols : Topics and Queues.

Topics : Publish & Subscribe Pattern

Publish and subscribe semantics are implemented by Topics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

Start a server and handle a topic

    //Server side
    r := mux.NewRouter()
	s := wsqueue.NewServer(r, "")
	q := s.CreateTopic("myTopic")
    http.Handle("/", r)
	go http.ListenAndServe("0.0.0.0:9000", r)

    ...
    //Publish a message
    q.Publish("This is a message")

Start a client and listen on a topic

    //Client slide
    go func() {
		c := &wsqueue.Client{
			Protocol: "ws",
			Host:     "localhost:9000",
			Route:    "/",
		}
		cMessage, cError, err := c.Subscribe("myTopic")
		if err != nil {
			panic(err)
		}
		for {
			select {
			case m := <-cMessage:
				fmt.Println(m.String())
			case e := <-cError:
				fmt.Println(e.Error())
			}
		}
	}()

Queues : Work queues Pattern

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to the queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them. Queues implement load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.

Examples

see samples/queue/main.go, samples/topic/main.go

Index

Constants

View Source
const (
	//ACLSSchemeWorld scheme is a fully open scheme
	ACLSSchemeWorld ACLScheme = "WORLD"
	//ACLSSchemeDigest scheme represents a "manually" set group of authenticated users.
	ACLSSchemeDigest = "DIGEST"
	//ACLSSchemeIP scheme represents a "manually" set group of  user authenticated by their IP address
	ACLSSchemeIP = "IP"
)

Variables

View Source
var Logfunc = log.Printf

Logfunc is a function that logs the provided message with optional fmt.Sprintf-style arguments. By default, logs to the default log.Logger. setting it to nil can be used to disable logging for this package. This doesn’t enforce a coupling with any specific external package and is already widely supported by existing loggers.

View Source
var Warnfunc = log.Printf

Warnfunc is a function that logs the provided message with optional fmt.Sprintf-style arguments. By default, logs to the default log.Logger. setting it to nil can be used to disable logging for this package. This doesn’t enforce a coupling with any specific external package and is already widely supported by existing loggers.

Functions

This section is empty.

Types

type ACE

type ACE interface {
	Scheme() ACLScheme
}

ACE stands for Access Control Entity

type ACEDigest

type ACEDigest struct {
	Username string `json:"username,omitempty"`
	Password string `json:"password,omitempty"`
}

ACEDigest aims to authenticate a user with a username and a password

func (*ACEDigest) Scheme

func (a *ACEDigest) Scheme() ACLScheme

Scheme return WORLD, DIGEST or IP

type ACEIP

type ACEIP struct {
	IP string `json:"ip,omitempty"`
}

ACEIP aims to authenticate a user with a IP adress

func (*ACEIP) Scheme

func (a *ACEIP) Scheme() ACLScheme

Scheme is IP

type ACEWorld

type ACEWorld struct{}

ACEWorld -> everyone

func (*ACEWorld) Scheme

func (a *ACEWorld) Scheme() ACLScheme

Scheme is World

type ACL

type ACL []ACE

ACL stands for Access Control List. It's a slice of permission for a queue or a topic

type ACLScheme

type ACLScheme string

ACLScheme : There are three different scheme

type Client

type Client struct {
	Protocol string
	Host     string
	Route    string
	// contains filtered or unexported fields
}

Client is the wqueue entrypoint

func (*Client) Ack

func (c *Client) Ack(msg *Message) error

func (*Client) Listen

func (c *Client) Listen(q string) (chan Message, chan error, error)

Listen aims to connect to a Queue

func (*Client) Reply

func (c *Client) Reply(msg *Message, response *Message) error

func (*Client) Subscribe

func (c *Client) Subscribe(q string) (chan Message, chan error, error)

Subscribe aims to connect to a Topic

type Conn

type Conn struct {
	ID     ConnID
	WSConn *websocket.Conn
}

Conn is a conn

type ConnID

type ConnID string

ConnID a a connection ID

type Fibonacci

type Fibonacci struct {
	// contains filtered or unexported fields
}

Fibonacci https://en.wikipedia.org/wiki/Fibonacci_number

func NewFibonacci

func NewFibonacci() Fibonacci

NewFibonacci returns a Fibonacci number

func (*Fibonacci) Next

func (f *Fibonacci) Next() int

Next returns the next value

func (*Fibonacci) NextDuration

func (f *Fibonacci) NextDuration(timeUnit time.Duration) time.Duration

NextDuration returns the next Fibonacci number cast in the wanted duration

func (*Fibonacci) WaitForIt

func (f *Fibonacci) WaitForIt(timeUnit time.Duration)

WaitForIt ... HIMYM

type Header map[string]string

type Message

type Message struct {
	Header Header `json:"metadata,omitempty"`
	Body   string `json:"data"`
}

Message message

func (*Message) ApplicationType

func (m *Message) ApplicationType() string

ApplicationType returns application-type. Empty if content-type is not application/json

func (*Message) ContentType

func (m *Message) ContentType() string

ContentType returns content-type

func (*Message) ID

func (m *Message) ID() string

ID returns message if

func (*Message) String

func (m *Message) String() string

type Options

type Options struct {
	ACL     ACL            `json:"acl,omitempty"`
	Storage StorageOptions `json:"storage,omitempty"`
}

Options is options on topic or queues

type Queue

type Queue struct {
	Options *Options `json:"options,omitempty"`
	Queue   string   `json:"topic,omitempty"`
	// contains filtered or unexported fields
}

Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.

func (*Queue) Send

func (q *Queue) Send(data interface{}) error

Send send a message

type Server

type Server struct {
	Router          *mux.Router
	RoutePrefix     string
	QueuesCounter   *expvar.Int
	TopicsCounter   *expvar.Int
	ClientsCounter  *expvar.Int
	MessagesCounter *expvar.Int
}

Server is a server

func NewServer

func NewServer(router *mux.Router, routePrefix string) *Server

NewServer init a new WSQueue server

func (*Server) CreateQueue

func (s *Server) CreateQueue(name string, bufferSize int) *Queue

CreateQueue create queue

func (*Server) CreateTopic

func (s *Server) CreateTopic(topic string) *Topic

CreateTopic create topic

func (*Server) RegisterQueue

func (s *Server) RegisterQueue(q *Queue)

RegisterQueue register

func (*Server) RegisterTopic

func (s *Server) RegisterTopic(t *Topic)

RegisterTopic register

type Stack

type Stack struct {
	// contains filtered or unexported fields
}

Stack is a thread-safe "First In First Out" stack

func NewStack

func NewStack() *Stack

NewStack intialize a brand new Stack

func (*Stack) Dump

func (s *Stack) Dump()

Dump prints of the stack.

func (*Stack) Get

func (s *Stack) Get(index int) (interface{}, error)

Get peeks at the n-th item in the stack. Unlike other operations, this one costs O(n).

func (*Stack) Len

func (s *Stack) Len() int

Len returns current length of the stack

func (*Stack) Open

func (s *Stack) Open(o *Options)

Open the connection to the storage driver

func (*Stack) Peek

func (s *Stack) Peek() interface{}

Peek returns but doesn't remove the top of the stack

func (*Stack) Pop

func (s *Stack) Pop() interface{}

Pop returns and removes the botteom of the stack

func (*Stack) Push

func (s *Stack) Push(item interface{})

Push add an item a the top of the stack

type StorageDriver

type StorageDriver interface {
	Open(options *Options)
	Push(data interface{})
	Pop() interface{}
}

StorageDriver is in-memory Stack or Redis server

type StorageOptions

type StorageOptions map[string]interface{}

StorageOptions is a collection of options, see storage documentation

type Topic

type Topic struct {
	Options                 *Options                    `json:"options,omitempty"`
	Topic                   string                      `json:"topic,omitempty"`
	OpenedConnectionHandler func(*Conn)                 `json:"-"`
	ClosedConnectionHandler func(*Conn)                 `json:"-"`
	OnMessageHandler        func(*Conn, *Message) error `json:"-"`
	// contains filtered or unexported fields
}

Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

func (*Topic) Publish

func (t *Topic) Publish(data interface{}) error

Publish send message to everyone

Directories

Path Synopsis
samples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL