rafthttp

package module
v0.0.0-...-c97137f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2019 License: Apache-2.0 Imports: 16 Imported by: 1

README

raft-http Build Status Coverage Status Go Report Card GoDoc

This repository provides the rafthttp package, which can be used to establish a network connection between to raft nodes using HTTP. Once the HTTP connection is established, the Upgrade header will be used to switch it to raw TCP mode, and the regular TCP-based network transport of the raft package can take it from there.

Documentation

The documentation for this package can be found on Godoc.

Documentation

Overview

Package rafthttp provides an extension for the github.com/hashicorp/raft package.

It implements a raft.StreamLayer that a raft.NetworkTransport can use to connect to and accept connections from other raft.Transport's using HTTP/WebSocket rather than straight TCP.

This is handy for applications that expose an HTTP endpoint and don't want to open an extra TCP port for handling raft-level traffic.

In addition to the regular raft.StreamLayer interface, rafthttp.Layer implements extra methods to join and leave a cluster.

Typical usage of this package is as follows:

  • Create a rafthttp.Handler object which implements the standard http.Handler interface.
  • Create a standard http.Server and configure it to route an endpoint path of your choice to the rafthttp.Handler above. All your raft servers must use the same endpoint path. You'll probably want to gate the rafthttp.Handler behind some authorization mechanism of your choice.
  • Create a net.Listener and use it to start a the http.Server create above. From this point the rafthttp.Handler will start accepting raft-related requests.

- Create a rafthttp.Layer object passing it:

  1. The endpoint path you chose above, which will be used to establish outbound raft.Transport connections to other raft servers over HTTP/WebSocket.

  2. The network address of the net.Listener you used to start the http.Server, which will be used by the local raft server to know its own network address.

  3. The rafthttp.Handler object you created above, which will be used to accept inbound raft.NetworkTransport connections from other raft servers over HTTP/WebSocket.

  4. A rafthttp.Dial function, which will be used to establish outbound raft.NetworkTransport connections to other raft servers over HTTP/WebSocket (the rafthttp.Layer will use it to perform HTTP requests to other servers using your chosen endpoint path).

  • Create a raft.NetworkTransport passing it the rafthttp.Layer you created above.

- Create a raft.Raft server using the raft.NetworkTransport created above.

  • Spawn a goroutine running the raftmembership.HandleChangeRequests function from the github.com/Canonical/raft-membership package, passing it the raft.Raft server you created above and the channel returned by Request() method of the rafthttp.Handler created above. This will process join and leave requests, that you can perform using the Join() and Leave() methods of the rafthttp.Layer object you created above. This goroutine will terminate automatically when you shutdown your raft.Raft server, since that will close your raft.NetworkTransport, which in turn closes the your rafttest.Layer, which closes your rafttest.Handler, which will ultimately close the channel returned by its Requests() method and signal the raftmembership.HandleChangeRequests function to return.

To cleanly shutdown the service, first shutdown your raft.Raft instance, then call the CloseStreams() method of your raft.NetworkTransport instance (to close all connections) and then stop your http.Server.

Example

Connect threed raft nodes using HTTP network layers.

package main

import (
	"bytes"
	"fmt"
	"log"
	"net"
	"net/http"
	"strconv"
	"strings"
	"testing"
	"time"

	"github.com/CanonicalLtd/raft-http"
	"github.com/CanonicalLtd/raft-membership"
	"github.com/CanonicalLtd/raft-test"
	"github.com/hashicorp/raft"
)

// Connect threed raft nodes using HTTP network layers.
func main() {
	t := &testing.T{}

	// Create a set of transports using HTTP layers.
	handlers := make([]*rafthttp.Handler, 3)
	layers := make([]*rafthttp.Layer, 3)
	transports := make([]*raft.NetworkTransport, 3)
	out := bytes.NewBuffer(nil)
	layersCleanup := make([]func(), 3)
	for i := range layers {
		handler := rafthttp.NewHandler()
		layer, cleanup := newExampleLayer(handler)

		transport := raft.NewNetworkTransport(layer, 2, time.Second, out)

		layers[i] = layer
		layersCleanup[i] = cleanup
		handlers[i] = handler
		transports[i] = transport
	}

	// Create a raft.Transport factory that uses the above layers.
	transport := rafttest.Transport(func(i int) raft.Transport { return transports[i] })
	servers := rafttest.Servers(0)

	// Create a 3-node cluster with default test configuration.
	rafts, control := rafttest.Cluster(t, rafttest.FSMs(3), transport, servers)
	defer func() {
		control.Close()
		for _, transport := range transports {
			transport.CloseStreams()
		}
		for _, cleanup := range layersCleanup {
			defer cleanup()
		}
	}()

	// Start handling membership change requests on all nodes.
	for i, handler := range handlers {
		id := raft.ServerID(strconv.Itoa(i))
		go raftmembership.HandleChangeRequests(rafts[id], handler.Requests())
	}

	// Node 0 is the one supposed to get leadership, since it's currently
	// the only one in the cluster.
	control.Elect("0")

	// Request that the second node joins the cluster.
	if err := layers[1].Join("1", transports[0].LocalAddr(), time.Second); err != nil {
		log.Fatalf("joining server 1 failed: %v", err)
	}

	// Request that the third node joins the cluster, contacting
	// the non-leader node 1. The request will be automatically
	// redirected to node 0.
	if err := layers[2].Join("2", transports[1].LocalAddr(), time.Second); err != nil {
		log.Fatal(err)
	}

	// Rquest that the third node leaves the cluster.
	if err := layers[2].Leave("2", transports[2].LocalAddr(), time.Second); err != nil {
		log.Fatal(err)
	}

	fmt.Println("has logged connections:", strings.Contains(out.String(), "accepted connection from"))
	fmt.Println("peers:", rafts["0"].Stats()["num_peers"])
}

// Create a new Layer using a new Handler attached to a running HTTP
// server.
func newExampleLayer(handler *rafthttp.Handler) (*rafthttp.Layer, func()) {
	listener, err := net.Listen("tcp", "127.0.0.1:0")
	if err != nil {
		log.Fatalf("listening to local port failed: %v", err)
	}
	layer := rafthttp.NewLayer("/", listener.Addr(), handler, rafthttp.NewDialTCP())
	server := &http.Server{Handler: handler}
	go server.Serve(listener)

	cleanup := func() {
		if err := server.Close(); err != nil {
			log.Fatalf("closing server failed: %v", err)
		}
	}

	return layer, cleanup
}
Output:

has logged connections: true
peers: 1

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func ChangeMembership

func ChangeMembership(
	kind raftmembership.ChangeRequestKind,
	path string,
	dial Dial,
	id raft.ServerID,
	address, target string,
	timeout time.Duration) error

ChangeMembership can be used to join or leave a cluster over HTTP.

Types

type Dial

type Dial func(addr string, timeout time.Duration) (net.Conn, error)

Dial is a function that given an address and a timeout returns a new network connection (typically TCP or TLS over TCP).

func NewDialTCP

func NewDialTCP() Dial

NewDialTCP returns a Dial function that establishes a network connection using raw TCP.

func NewDialTLS

func NewDialTLS(config *tls.Config) Dial

NewDialTLS returns a Dial function that enstablishes a network connection using TLS over TCP.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler implements an HTTP handler that will look for an Upgrade header in the request to switch the HTTP connection to raw TCP mode, so it can be used as raft.NetworkTransport stream.

func NewHandler

func NewHandler() *Handler

NewHandler returns a new Handler.

Incoming raft membership requests (received via POST and DELETE) are forwarded to the given channel, which is supposed to be processed using raftmembership.HandleChangeRequests().

func NewHandlerWithLogger

func NewHandlerWithLogger(logger *log.Logger) *Handler

NewHandlerWithLogger returns a new Handler configured with the given logger.

func (*Handler) Close

func (h *Handler) Close()

Close stops handling incoming requests.

func (*Handler) Requests

func (h *Handler) Requests() <-chan *raftmembership.ChangeRequest

Requests returns a channel of inbound Raft membership change requests received over HTTP. Consumer code is supposed to process this channel by invoking raftmembership.HandleChangeRequests.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServerHTTP upgrades the given HTTP connection to a raw TCP one for use by raft.

func (*Handler) Timeout

func (h *Handler) Timeout(timeout time.Duration)

Timeout sets the maximum amount of time for a request to be processed. It defaults to 10 seconds if not set.

type Layer

type Layer struct {
	// contains filtered or unexported fields
}

Layer represents the connection between raft nodes.

func NewLayer

func NewLayer(path string, localAddr net.Addr, handler *Handler, dial Dial) *Layer

NewLayer returns a new raft stream layer that initiates connections with HTTP and then uses Upgrade to switch them into raw TCP.

func NewLayerWithLogger

func NewLayerWithLogger(path string, localAddr net.Addr, handler *Handler, dial Dial, logger *log.Logger) *Layer

NewLayerWithLogger returns a Layer using the specified logger.

func (*Layer) Accept

func (l *Layer) Accept() (net.Conn, error)

Accept waits for the next connection.

func (*Layer) Addr

func (l *Layer) Addr() net.Addr

Addr returns the local address for the layer.

func (*Layer) Close

func (l *Layer) Close() error

Close closes the layer.

func (*Layer) Dial

func (l *Layer) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error)

Dial creates a new network connection.

func (*Layer) Join

func (l *Layer) Join(id raft.ServerID, addr raft.ServerAddress, timeout time.Duration) error

Join tries to join the cluster by contacting the leader at the given address. The raft node associated with this layer must have the given server identity.

func (*Layer) Leave

func (l *Layer) Leave(id raft.ServerID, addr raft.ServerAddress, timeout time.Duration) error

Leave tries to leave the cluster by contacting the leader at the given address. The raft node associated with this layer must have the given server identity.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL