messaging

package
v0.9.0-rc5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2015 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package messaging implements a distributed, raft-backed messaging system.

Basics

The broker writes every configuration change and data insert and replicates those changes to data nodes across the cluster. These changes are segmented into multiple topics so that they can be parallelized. Configuration changes are placed in a single "config" topic that is replicated to all data nodes. Each shard's data is placed in its own topic so that it can be parallized across the cluster.

Index

Constants

View Source
const (
	InternalMessageType = BrokerMessageType | MessageType(0x00)

	CreateReplicaMessageType = BrokerMessageType | MessageType(0x10)
	DeleteReplicaMessageType = BrokerMessageType | MessageType(0x11)

	SubscribeMessageType   = BrokerMessageType | MessageType(0x20)
	UnsubscribeMessageType = BrokerMessageType | MessageType(0x21)
)
View Source
const BroadcastTopicID = uint64(0)

BroadcastTopicID is the topic used to communicate with all replicas.

View Source
const (
	BrokerMessageType = 0x8000
)
View Source
const DefaultReconnectTimeout = 100 * time.Millisecond

DefaultReconnectTimeout is the default time to wait between when a broker stream disconnects and another connection is retried.

Variables

View Source
var (
	// ErrPathRequired is returned when opening a broker without a path.
	ErrPathRequired = errors.New("path required")

	// ErrPathRequired is returned when opening a broker without a connection address.
	ErrConnectionAddressRequired = errors.New("connection address required")

	// ErrClosed is returned when closing a broker that's already closed.
	ErrClosed = errors.New("broker already closed")

	// ErrSubscribed is returned when a stream is already subscribed to a topic.
	ErrSubscribed = errors.New("already subscribed")

	// ErrTopicExists is returned when creating a duplicate topic.
	ErrTopicExists = errors.New("topic already exists")

	// ErrReplicaExists is returned when creating a duplicate replica.
	ErrReplicaExists = errors.New("replica already exists")

	// ErrReplicaNotFound is returned when referencing a replica that doesn't exist.
	ErrReplicaNotFound = errors.New("replica not found")

	// ErrReplicaIDRequired is returned when creating a replica without an id.
	ErrReplicaIDRequired = errors.New("replica id required")

	// ErrClientOpen is returned when opening an already open client.
	ErrClientOpen = errors.New("client already open")

	// ErrClientClosed is returned when closing an already closed client.
	ErrClientClosed = errors.New("client closed")

	// ErrBrokerURLRequired is returned when opening a broker without URLs.
	ErrBrokerURLRequired = errors.New("broker url required")

	// ErrMessageTypeRequired is returned publishing a message without a type.
	ErrMessageTypeRequired = errors.New("message type required")

	// ErrTopicRequired is returned publishing a message without a topic ID.
	ErrTopicRequired = errors.New("topic required")
)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	Logger *log.Logger
	// contains filtered or unexported fields
}

Broker represents distributed messaging system segmented into topics. Each topic represents a linear series of events.

func NewBroker

func NewBroker() *Broker

NewBroker returns a new instance of a Broker with default values.

func (*Broker) Close

func (b *Broker) Close() error

Close closes the broker and all topics.

func (*Broker) CreateReplica

func (b *Broker) CreateReplica(id uint64, connectURL *url.URL) error

CreateReplica creates a new named replica.

func (*Broker) DeleteReplica

func (b *Broker) DeleteReplica(id uint64) error

DeleteReplica deletes an existing replica by id.

func (*Broker) Index

func (b *Broker) Index() uint64

Index returns the highest index seen by the broker across all topics. Returns 0 if the broker is closed.

func (*Broker) Initialize

func (b *Broker) Initialize() error

Initialize creates a new cluster.

func (*Broker) IsLeader

func (b *Broker) IsLeader() bool

IsLeader returns true if the broker is the current leader.

func (*Broker) Join

func (b *Broker) Join(u *url.URL) error

Join joins an existing cluster.

func (*Broker) LeaderURL

func (b *Broker) LeaderURL() *url.URL

LeaderURL returns the connection url for the leader broker.

func (*Broker) Log

func (b *Broker) Log() *raft.Log

func (*Broker) Open

func (b *Broker) Open(path string, u *url.URL) error

Open initializes the log. The broker then must be initialized or join a cluster before it can be used.

func (*Broker) Path

func (b *Broker) Path() string

Path returns the path used when opening the broker. Returns empty string if the broker is not open.

func (*Broker) Publish

func (b *Broker) Publish(m *Message) (uint64, error)

Publish writes a message. Returns the index of the message. Otherwise returns an error.

func (*Broker) PublishSync

func (b *Broker) PublishSync(m *Message) error

PublishSync writes a message and waits until the change is applied.

func (*Broker) Replica

func (b *Broker) Replica(id uint64) *Replica

Replica returns a replica by id.

func (*Broker) Replicas

func (b *Broker) Replicas() []*Replica

Replicas returns a list of the replicas in the system

func (*Broker) SetLogOutput

func (b *Broker) SetLogOutput(w io.Writer)

SetLogOutput sets writer for all Broker log output.

func (*Broker) Subscribe

func (b *Broker) Subscribe(replicaID, topicID uint64) error

Subscribe adds a subscription to a topic from a replica.

func (*Broker) Sync

func (b *Broker) Sync(index uint64) error

Sync pauses until the given index has been applied.

func (*Broker) URL

func (b *Broker) URL() *url.URL

URL returns the connection url for the broker.

func (*Broker) Unsubscribe

func (b *Broker) Unsubscribe(replicaID, topicID uint64) error

Unsubscribe removes a subscription for a topic from a replica.

type Client

type Client struct {

	// The amount of time to wait before reconnecting to a broker stream.
	ReconnectTimeout time.Duration

	// The logging interface used by the client for out-of-band errors.
	Logger *log.Logger
	// contains filtered or unexported fields
}

Client represents a client for the broker's HTTP API. Once opened, the client will stream down all messages that

func NewClient

func NewClient(replicaID uint64) *Client

NewClient returns a new instance of Client.

func (*Client) C

func (c *Client) C() <-chan *Message

C returns streaming channel. Messages can be duplicated so it is important to check the index of the incoming message index to make sure it has not been processed.

func (*Client) Close

func (c *Client) Close() error

Close disconnects the client from the broker cluster.

func (*Client) CreateReplica

func (c *Client) CreateReplica(id uint64, u *url.URL) error

CreateReplica creates a replica on the broker.

func (*Client) DeleteReplica

func (c *Client) DeleteReplica(id uint64) error

DeleteReplica removes a replica on the broker.

func (*Client) LeaderURL

func (c *Client) LeaderURL() *url.URL

LeaderURL returns the URL of the broker leader.

func (*Client) Open

func (c *Client) Open(path string, urls []*url.URL) error

Open initializes and opens the connection to the cluster. The URLs used to contact the cluster are either those supplied to the function, or if none are supplied, are read from the file at "path". These URLs do need to be URLs of actual Brokers. Regardless of URL source, at least 1 URL must be available for the client to be successfully opened.

func (*Client) Publish

func (c *Client) Publish(m *Message) (uint64, error)

Publish sends a message to the broker and returns an index or error.

func (*Client) ReplicaID

func (c *Client) ReplicaID() uint64

ReplicaID returns the replica id that the client was opened with.

func (*Client) SetLogOutput

func (c *Client) SetLogOutput(w io.Writer)

SetLogOutput sets writer for all Client log output.

func (*Client) Subscribe

func (c *Client) Subscribe(replicaID, topicID uint64) error

Subscribe subscribes a replica to a topic on the broker.

func (*Client) URLs

func (c *Client) URLs() []*url.URL

URLs returns a list of broker URLs to connect to.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(replicaID, topicID uint64) error

Unsubscribe unsubscribes a replica from a topic on the broker.

type ClientConfig

type ClientConfig struct {
	Brokers []*url.URL `json:"brokers"`
}

ClientConfig represents the Client configuration that must be persisted across restarts.

func NewClientConfig

func NewClientConfig(u []*url.URL) *ClientConfig

NewClientConfig returns a new instance of ClientConfig.

type CreateReplicaCommand

type CreateReplicaCommand struct {
	ID  uint64 `json:"id"`
	URL string `json:"url"`
}

CreateReplica creates a new replica.

type DeleteReplicaCommand

type DeleteReplicaCommand struct {
	ID uint64 `json:"id"`
}

DeleteReplicaCommand removes a replica.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler represents an HTTP handler by the broker.

func NewHandler

func NewHandler(b *Broker) *Handler

NewHandler returns a new instance of Handler.

func (*Handler) Broker

func (h *Handler) Broker() *Broker

Broker returns the broker on the handler.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP serves an HTTP request.

func (*Handler) SetBroker

func (h *Handler) SetBroker(b *Broker)

SetBroker sets the broker on the handler.

type Message

type Message struct {
	Type    MessageType
	TopicID uint64
	Index   uint64
	Data    []byte
}

Message represents a single item in a topic.

func (*Message) MarshalBinary

func (m *Message) MarshalBinary() ([]byte, error)

MarshalBinary returns a binary representation of the message. This implements encoding.BinaryMarshaler. An error cannot be returned.

func (*Message) UnmarshalBinary

func (m *Message) UnmarshalBinary(b []byte) error

UnmarshalBinary reads a message from a binary encoded slice. This implements encoding.BinaryUnmarshaler.

func (*Message) WriteTo

func (m *Message) WriteTo(w io.Writer) (n int64, err error)

WriteTo encodes and writes the message to a writer. Implements io.WriterTo.

type MessageDecoder

type MessageDecoder struct {
	// contains filtered or unexported fields
}

MessageDecoder decodes messages from a reader.

func NewMessageDecoder

func NewMessageDecoder(r io.Reader) *MessageDecoder

NewMessageDecoder returns a new instance of the MessageDecoder.

func (*MessageDecoder) Decode

func (dec *MessageDecoder) Decode(m *Message) error

Decode reads a message from the decoder's reader.

type MessageType

type MessageType uint16

MessageType represents the type of message.

type Replica

type Replica struct {
	URL *url.URL
	// contains filtered or unexported fields
}

Replica represents a collection of subscriptions to topics on the broker. The replica maintains the highest index read for each topic so that the broker can use this high water mark for trimming the topic logs.

func (*Replica) Topics

func (r *Replica) Topics() []uint64

Topics returns a list of topic names that the replica is subscribed to.

func (*Replica) Write

func (r *Replica) Write(p []byte) (int, error)

Write writes a byte slice to the underlying writer. If no writer is available then ErrReplicaUnavailable is returned.

func (*Replica) WriteTo

func (r *Replica) WriteTo(w io.Writer) (int64, error)

WriteTo begins writing messages to a named stream. Only one writer is allowed on a stream at a time.

type SubscribeCommand

type SubscribeCommand struct {
	ReplicaID uint64 `json:"replicaID"` // replica id
	TopicID   uint64 `json:"topicID"`   // topic id
	Index     uint64 `json:"index"`     // index
}

SubscribeCommand subscribes a replica to a new topic.

type UnsubscribeCommand

type UnsubscribeCommand struct {
	ReplicaID uint64 `json:"replicaID"` // replica id
	TopicID   uint64 `json:"topicID"`   // topic id
}

UnsubscribeCommand removes a subscription for a topic from a replica.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL