workmq

package module
v0.0.0-...-ba85578 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2017 License: MIT Imports: 10 Imported by: 0

README

WorkMQ

A message queue system written in Go.

This is a message queue implementation written in Go. It allows to declare some queues, number of workers and processors that will process data sent in these queues.

Initially used for a Golang workshop study case, we've decided to put it open-source.

Schema

Installation

First, install it:

$ go get -u github.com/unikorp/workmq

Then, import it in your application code:

import (
    "github.com/unikorp/workmq"
)

Configuration

Queues and workers configuration is managed using a config.json file in the root directory.

Here is an example JSON with 2 queues, listening on UDP port and exposing the given HTTP port:

{
  "ports": {
    "udp": ":10001",
    "http": ":8080"
  },
  "queues": {
    "queue.1s": {
      "processor": "processor.logger.1s",
      "num_workers": 150
    },
    "queue.2s": {
      "processor": "processor.logger.2s",
      "num_workers": 200
    }
  }
}

Here, we have 2 queues:

  • queue.1s that will be processed by registered processor processor.logger.1s and will use 150 workers (goroutines),
  • queue.2s that will be processed by registered processor processor.logger.2s and will use 200 workers (goroutines).

Usage

Here is a code example that initializes WorkMQ, registers a processor and start handling messages:

package main

import (
	"fmt"
	"time"

	"github.com/unikorp/workmq"
)

func main() {
	app := workmq.Init()

	app.AddProcessor("processor.logger.1s", func(worker *workmq.Worker, message workmq.Message) {
		time.Sleep(time.Second * 1)

		fmt.Printf("Worker #%d (queue: \"%s\") manages message %s\n", worker.ID, worker.Queue, message.Body)
	})

	app.Handle()
}

Send data

You can send message data over UDP by sending a JSON string with the following structure:

{ "queue": "queue.1s", "body": "<your data>" }

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Ports  PortsConfig            `json:"ports"`
	Queues map[string]QueueConfig `json:"queues"`
}

Config is the configuration type definition.

func GetConfig

func GetConfig() Config

GetConfig returns the configuration object that can be used anywhere in application.

type Message

type Message struct {
	Queue string `json:"queue"`
	Body  string `json:"body"`
}

Message struct

func TransformStringToMessage

func TransformStringToMessage(value []byte) Message

TransformStringToMessage transforms a string value to a Message struct

type PortsConfig

type PortsConfig struct {
	UDP  string `json:"udp"`
	HTTP string `json:"http"`
}

PortsConfig is the "port" configuration section type definition.

type Processor

type Processor func(worker *Worker, message Message)

Processor type

type QueueConfig

type QueueConfig struct {
	Processor  string `json:"processor"`
	NumWorkers int    `json:"num_workers"`
}

QueueConfig is the "queues" configuration section type definition.

type RateCounters

type RateCounters map[string]*ratecounter.RateCounter

RateCounters containers counters

type Worker

type Worker struct {
	ID        int
	Queue     string
	Message   <-chan Message
	Processor Processor
	Counter   *ratecounter.RateCounter
}

Worker struct

func NewWorker

func NewWorker(id int, queue string, processor Processor, message <-chan Message, counter *ratecounter.RateCounter) Worker

NewWorker creates a new Worker instance

func (*Worker) Process

func (w *Worker) Process()

Process listens for a processor on the worker.

type Workmq

type Workmq struct {
	Config     Config
	Queues     map[string]chan Message
	Processors map[string]Processor
	Counters   RateCounters
	Workers    []Worker
	Wg         sync.WaitGroup
}

Workmq type

func Init

func Init() *Workmq

Init initializes processor part

func (*Workmq) AddProcessor

func (w *Workmq) AddProcessor(name string, processor Processor)

AddProcessor adds a processor into the processors list

func (*Workmq) GetProcessor

func (w *Workmq) GetProcessor(name string) (Processor, error)

GetProcessor retrieves a processor from its name

func (*Workmq) Handle

func (w *Workmq) Handle()

Handle handles the configuration and runs workers

func (*Workmq) ListenHTTP

func (w *Workmq) ListenHTTP()

ListenHTTP creates a HTTP server to expose statistics information

func (*Workmq) ListenUDP

func (w *Workmq) ListenUDP()

ListenUDP creates a UDP server that listens for new messages

func (*Workmq) RemoveProcessor

func (w *Workmq) RemoveProcessor(name string)

RemoveProcessor removes a processor from its name

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL