vega

package module
v0.0.0-...-8b480f7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2015 License: BSD-3-Clause Imports: 24 Imported by: 1

README

Vega

A distributed mailbox system for building distributed systems.

Features

  • Distributed by default operations
  • No centralized point of failure
  • All messages are durable, stored to disk via LevelDB
  • Deliver to components is at-least-once
    • Components are required to ack or nack each message
  • Agents on each machine use routing to discover how to deliver to mailboxes
    • Consul provides a consistent, distributed routing table
  • Native Go API
  • Simple HTTP interface

Eventual Features

  • AMQP API support
  • STOMP API support

Requirements

  • Consul 0.5.0 or greater

Installation

Linux

Darwin

Via Go

  • go get github.com/vektra/vega/cmd/vegad

Focus

The focus of Vega is to provide a resilient, trustworthy message delivery system.

Within an organization, it would form the backbone for how components communicate with each other.

At present, the focus is not on providing an ultra fast message passing system. There are many projects that look to provide this and generally when an ultra fast message system is needed, you want different guarantees on messages. For instance, you're ok losing messages at the expense of continuning to run quickly. Reliable and resilient is the focus because those are the aspects of a backbone communication system that are most important.

What is a distributed mailbox system?

It's common when building a distributed system (any distributed system) that components running on different machines need to talk to each other. A common way to model that communication is via passing messages between the components. Perhaps you've heard of this idea, it's pretty cool. Once a team decides they want to exchange messages between components, they have to figure out how to do that.

Also check out our Mailbox vs Queue thoughts

Queue Brokers

A common approach is to use a central service that all components talk to. This service typically provides named queues which components put messages into and pull them out from. This central service acts as an arbitrator for all the queues.

A centralized service that all components talk to means that the reliability of your whole system is now the reliability of this single, central service. It's typical for these queue brokers to be difficult to configure in a way that provides high availability as well, only making it more difficult for users to have confidence in this very important service.

Brokerless Queues

Another approach is to use software within each component that allows the components to talk directly to each other. This means there is no central broker that the reliability is pinned on, which is great.

A common issue with this approach is that each component is now significantly more complicated. Each component has to keep track of all other components that are interested in its messages. A bigger issue is how the availability of a component affect the availability of its messages. If a component crashes or needs to restart, it could lose any messages it hadn't fully sent out as well any component that wants to talk to it might timeout sending while the component is unavailable. All those problems can be solved, they just add complexity to a component. For some applications, that's a problem worth solving, but for many it's not.

Distributed Mailbox

Vega attempts to reconcile these various needs by providing a system with the following characteristics:

  • Named mailboxes that messages are pushed and pulled from
  • Each machine runs their own broker
  • Components talk to the broker on their machine only
  • Brokers use a distributed routing table (provided by Consul only right now) to pass messages between machines

This means that they differ heavily from a centralized queue system in that 2 different components can not share a mailbox. Thus a Vega mailbox can not be used as a work queue directly (though it could be used to build one).

Documentation

Index

Constants

View Source
const DefaultPort = 8475

Variables

View Source
var DefaultHTTPPort = 8477
View Source
var ENoMailbox = errors.New("No such mailbox available")
View Source
var EProtocolError = errors.New("protocol error")
View Source
var ETimeout = errors.New("operation timeout")
View Source
var EUnknownMessage = errors.New("Unknown message id")
View Source
var ErrUknownSystemMailbox = errors.New("unknown system mailbox")
View Source
var NullStorage = &nullStorage{}

Functions

func GetPrivateIP

func GetPrivateIP() (net.IP, error)

GetPrivateIP is used to return the first private IP address associated with an interface on the machine

func NewReliableStorage

func NewReliableStorage(s Storage) *reliableStorage

func RandomID

func RandomID() string

func RandomIV

func RandomIV(size int) []byte

func RandomKey

func RandomKey(size int) []byte

func RandomMailbox

func RandomMailbox() string

func XORBytes

func XORBytes(dst, a, b []byte) int

Types

type Abandon

type Abandon struct {
	Name string
}

type AckMessage

type AckMessage struct {
	MessageId MessageId
}

type Acker

type Acker func() error

type Byter

type Byter interface {
	Bytes() []byte
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(addr string) (*Client, error)

func NewInsecureClient

func NewInsecureClient(addr string) (*Client, error)

func (*Client) Abandon

func (c *Client) Abandon(name string) error

func (*Client) Close

func (c *Client) Close() (err error)

func (*Client) Declare

func (c *Client) Declare(name string) error

func (*Client) EphemeralDeclare

func (c *Client) EphemeralDeclare(name string) error

func (*Client) LongPoll

func (c *Client) LongPoll(name string, til time.Duration) (*Delivery, error)

func (*Client) LongPollCancelable

func (c *Client) LongPollCancelable(name string, til time.Duration, done chan struct{}) (*Delivery, error)

func (*Client) Poll

func (c *Client) Poll(name string) (*Delivery, error)

func (*Client) Push

func (c *Client) Push(name string, body *Message) error

func (*Client) Session

func (c *Client) Session() (*yamux.Session, error)

func (*Client) Stats

func (c *Client) Stats() (*ClientStats, error)

type ClientStats

type ClientStats struct {
	InFlight int
}

type Declare

type Declare struct {
	Name string
}

type Delivery

type Delivery struct {
	Message *Message
	Ack     Acker
	Nack    Nacker
}

func NewDelivery

func NewDelivery(m Mailbox, msg *Message) *Delivery

type Error

type Error struct {
	Error string
}

type FeatureClient

type FeatureClient struct {
	*Client
	// contains filtered or unexported fields
}

Wraps Client to provide highlevel behaviors that build on the basics of the distributed mailboxes. Should only be used by one goroutine at a time.

func Dial

func Dial(addr string) (*FeatureClient, error)

func Local

func Local() (*FeatureClient, error)

func NewFeatureClient

func NewFeatureClient(c *Client) *FeatureClient

Create a new FeatureClient wrapping a explicit Client

func (*FeatureClient) Clone

func (fc *FeatureClient) Clone() *FeatureClient

Create a new FeatureClient that wraps the same Client as this one. Useful for creating a new instance to use in a new goroutine

func (*FeatureClient) ConnectPipe

func (fc *FeatureClient) ConnectPipe(name string) (*PipeConn, error)

func (*FeatureClient) Declare

func (fc *FeatureClient) Declare(name string) error

func (*FeatureClient) HandleRequests

func (fc *FeatureClient) HandleRequests(name string, h Handler) error

func (*FeatureClient) ListenPipe

func (fc *FeatureClient) ListenPipe(name string) (*PipeConn, error)

func (*FeatureClient) LocalMailbox

func (fc *FeatureClient) LocalMailbox() string

Return the name of a ephemeral mailbox only for this instance

func (*FeatureClient) Receive

func (fc *FeatureClient) Receive(name string) *Receiver

func (*FeatureClient) Request

func (fc *FeatureClient) Request(name string, msg *Message) (*Delivery, error)

type HTTPService

type HTTPService struct {
	Address  string
	Registry Storage
	// contains filtered or unexported fields
}

func NewHTTPService

func NewHTTPService(port string, reg Storage) *HTTPService

func (*HTTPService) Accept

func (h *HTTPService) Accept() error

func (*HTTPService) BackgroundTimeouts

func (h *HTTPService) BackgroundTimeouts()

func (*HTTPService) CheckTimeouts

func (h *HTTPService) CheckTimeouts()

func (*HTTPService) Close

func (h *HTTPService) Close()

func (*HTTPService) Listen

func (h *HTTPService) Listen() error

type Handler

type Handler interface {
	HandleMessage(*Message) *Message
}

func HandlerFunc

func HandlerFunc(h func(*Message) *Message) Handler

type LongPoll

type LongPoll struct {
	Name     string
	Duration string
}

type Mailbox

type Mailbox interface {
	Abandon() error
	Push(*Message) error
	Poll() (*Message, error)
	Ack(MessageId) error
	Nack(MessageId) error
	AddWatcher() <-chan *Message
	AddWatcherCancelable(chan struct{}) <-chan *Message
	Stats() *MailboxStats
}

func NewMemMailbox

func NewMemMailbox(name string) Mailbox

type MailboxStats

type MailboxStats struct {
	Size     int
	InFlight int
}

type MemMailbox

type MemMailbox struct {
	// contains filtered or unexported fields
}

func (*MemMailbox) Abandon

func (mm *MemMailbox) Abandon() error

func (*MemMailbox) Ack

func (mm *MemMailbox) Ack(id MessageId) error

func (*MemMailbox) AddWatcher

func (mm *MemMailbox) AddWatcher() <-chan *Message

func (*MemMailbox) AddWatcherCancelable

func (mm *MemMailbox) AddWatcherCancelable(done chan struct{}) <-chan *Message

func (*MemMailbox) Nack

func (mm *MemMailbox) Nack(id MessageId) error

func (*MemMailbox) Poll

func (mm *MemMailbox) Poll() (*Message, error)

func (*MemMailbox) Push

func (mm *MemMailbox) Push(value *Message) error

func (*MemMailbox) Stats

func (mm *MemMailbox) Stats() *MailboxStats

type MemRouteTable

type MemRouteTable map[string]Pusher

func (MemRouteTable) Get

func (ht MemRouteTable) Get(name string) (Pusher, bool)

func (MemRouteTable) Remove

func (ht MemRouteTable) Remove(name string) error

func (MemRouteTable) Set

func (ht MemRouteTable) Set(name string, st Pusher) error

type Message

type Message struct {
	// Simple generic headers available to be used by the application
	Headers map[string]interface{} `codec:"headers,omitempty" json:"headers,omitempty"`

	// Properties
	ContentType     string     `codec:"content_type,omitempty" json:"content_type,omitempty"`         // MIME content type
	ContentEncoding string     `codec:"content_encoding,omitempty" json:"content_encoding,omitempty"` // MIME content encoding
	Priority        uint8      `codec:"priority,omitempty" json:"priority,omitempty"`                 // 0 to 9
	CorrelationId   string     `codec:"correlation_id,omitempty" json:"correlation_id,omitempty"`     // correlation identifier
	ReplyTo         string     `codec:"reply_to,omitempty" json:"reply_to,omitempty"`                 // address to to reply to
	MessageId       MessageId  `codec:"message_id,omitempty" json:"message_id,omitempty"`             // message identifier
	Timestamp       *time.Time `codec:"timestamp,omitempty" json:"timestamp,omitempty"`               // message timestamp
	Type            string     `codec:"type,omitempty" json:"type,omitempty"`                         // message type name
	UserId          string     `codec:"user_id,omitempty" json:"user_id,omitempty"`                   // creating user id
	AppId           string     `codec:"app_id,omitempty" json:"app_id,omitempty"`                     // creating application id

	Body []byte `codec:"body,omitempty" json:"body,omitempty"`
}

func DecodeMessage

func DecodeMessage(b []byte) *Message

func Msg

func Msg(body interface{}) *Message

Create a message with a body

func (*Message) AddHeader

func (m *Message) AddHeader(name string, val interface{})

Add an application header

func (*Message) AsBytes

func (m *Message) AsBytes() (ret []byte)

func (*Message) Equal

func (m *Message) Equal(m2 *Message) bool

func (*Message) FromBytes

func (m *Message) FromBytes(b []byte) error

func (*Message) GetHeader

func (m *Message) GetHeader(name string) (interface{}, bool)

Retreive an application header

type MessageId

type MessageId string

func NextMessageID

func NextMessageID() MessageId

func (MessageId) AppendLocalIndex

func (id MessageId) AppendLocalIndex(idxStr string) MessageId

func (MessageId) LocalIndex

func (id MessageId) LocalIndex() string

type MessageType

type MessageType int
const (
	NoopType MessageType = iota
	SuccessType
	ErrorType
	DeclareType
	EphemeralDeclareType
	AbandonType
	PollType
	PollResultType
	LongPollType
	PushType
	CloseType
	NackType
	AckType
	StatsType
	StatsResultType
)

type MultiPusher

type MultiPusher struct {
	Pushers []Pusher
}

func NewMultiPusher

func NewMultiPusher() *MultiPusher

func (*MultiPusher) Add

func (mp *MultiPusher) Add(p Pusher)

func (*MultiPusher) Push

func (mp *MultiPusher) Push(name string, msg *Message) error

type NackMessage

type NackMessage struct {
	MessageId MessageId
}

type Nacker

type Nacker func() error

type PipeConn

type PipeConn struct {
	// contains filtered or unexported fields
}

func (*PipeConn) Close

func (p *PipeConn) Close() error

func (*PipeConn) LocalAddr

func (p *PipeConn) LocalAddr() net.Addr

func (*PipeConn) Read

func (p *PipeConn) Read(b []byte) (int, error)

func (*PipeConn) RemoteAddr

func (p *PipeConn) RemoteAddr() net.Addr

func (*PipeConn) SendBulk

func (p *PipeConn) SendBulk(data io.Reader) (int64, error)

func (*PipeConn) SetDeadline

func (p *PipeConn) SetDeadline(t time.Time) error

func (*PipeConn) SetReadDeadline

func (p *PipeConn) SetReadDeadline(t time.Time) error

func (*PipeConn) SetWriteDeadline

func (p *PipeConn) SetWriteDeadline(t time.Time) error

func (*PipeConn) Write

func (p *PipeConn) Write(b []byte) (int, error)

type Poll

type Poll struct {
	Name string
}

type PollResult

type PollResult struct {
	Message *Message
}

type Push

type Push struct {
	Name    string
	Message *Message
}

type Pusher

type Pusher interface {
	Push(string, *Message) error
}

type Receiver

type Receiver struct {
	// channel that messages are sent to
	Channel <-chan *Delivery

	// Any error detected while receiving
	Error error
	// contains filtered or unexported fields
}

func (*Receiver) Close

func (rec *Receiver) Close() error

type Registry

type Registry struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewMemRegistry

func NewMemRegistry() *Registry

func NewRegistry

func NewRegistry(create func(string) Mailbox) *Registry

func (*Registry) Abandon

func (r *Registry) Abandon(name string) error

func (*Registry) Declare

func (r *Registry) Declare(name string) error

func (*Registry) LongPoll

func (r *Registry) LongPoll(name string, til time.Duration) (*Delivery, error)

func (*Registry) LongPollCancelable

func (r *Registry) LongPollCancelable(name string, til time.Duration, done chan struct{}) (*Delivery, error)

func (*Registry) Poll

func (r *Registry) Poll(name string) (*Delivery, error)

func (*Registry) Push

func (r *Registry) Push(name string, value *Message) error

type RouteTable

type RouteTable interface {
	Set(string, Pusher) error
	Remove(string) error
	Get(string) (Pusher, bool)
}

type Router

type Router struct {
	// contains filtered or unexported fields
}

func MemRouter

func MemRouter() *Router

func NewRouter

func NewRouter(rt RouteTable) *Router

func (*Router) Add

func (r *Router) Add(name string, reg Pusher)

func (*Router) DiscoverEndpoint

func (r *Router) DiscoverEndpoint(name string) (Pusher, bool)

func (*Router) Push

func (r *Router) Push(name string, body *Message) error

func (*Router) Remove

func (r *Router) Remove(name string) error

type Service

type Service struct {
	Address  string
	Registry Storage
	// contains filtered or unexported fields
}

func NewMemService

func NewMemService(addr string) (*Service, error)

func NewService

func NewService(addr string, reg Storage) (*Service, error)

func (*Service) Accept

func (s *Service) Accept() error

func (*Service) AcceptInsecure

func (s *Service) AcceptInsecure() error

func (*Service) Close

func (s *Service) Close() error

func (*Service) Port

func (s *Service) Port() int

type Storage

type Storage interface {
	Declare(string) error
	Abandon(string) error
	Push(string, *Message) error
	Poll(string) (*Delivery, error)
	LongPoll(string, time.Duration) (*Delivery, error)
	LongPollCancelable(string, time.Duration, chan struct{}) (*Delivery, error)
}

type Subscription

type Subscription struct {
	Pattern string
	Parts   []string
	Strict  bool
	Mailbox string
}

func ParseSubscription

func ParseSubscription(pattern string) *Subscription

func (*Subscription) Match

func (s *Subscription) Match(lit string) bool

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL