Package rafthttp implements HTTP transportation layer for etcd/raft pkg.



    View Source
    const (
    	// ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
    	// A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
    	// tcp keepalive failing to detect a bad connection, which is at minutes level.
    	// For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
    	// to keep the connection alive.
    	// For short term pipeline connections, the connection MUST be killed to avoid it being
    	// put back to http pkg connection pool.
    	ConnReadTimeout  = 5 * time.Second
    	ConnWriteTimeout = 5 * time.Second
    View Source
    const (
    	// RoundTripperNameRaftMessage is the name of round-tripper that sends
    	// all other Raft messages, other than "snap.Message".
    	RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
    	// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
    	RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"


    View Source
    var (
    	RaftPrefix         = "/raft"
    	ProbingPrefix      = path.Join(RaftPrefix, "probing")
    	RaftStreamPrefix   = path.Join(RaftPrefix, "stream")
    	RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
    View Source
    var (
    	ErrExceedSizeLimit = errors.New("rafthttp: error limit exceeded")


    func NewListener

    func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error)

      NewListener returns a listener for raft message transfer between peers. It uses timeout listener to identify broken streams promptly.

      func NewRoundTripper

      func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error)

        NewRoundTripper returns a roundTripper used to send requests to rafthttp listener of remote peers.


        type Pausable

        type Pausable interface {

          Pausable is a testing interface for pausing transport traffic.

          type Peer

          type Peer interface {
          	// contains filtered or unexported methods

          type Raft

          type Raft interface {
          	Process(ctx context.Context, m raftpb.Message) error
          	IsIDRemoved(id uint64) bool
          	ReportUnreachable(id uint64)
          	ReportSnapshot(id uint64, status raft.SnapshotStatus)

          type Transport

          type Transport struct {
          	DialTimeout time.Duration // maximum duration before timing out dial of the request
          	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
          	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
          	DialRetryFrequency rate.Limit
          	TLSInfo transport.TLSInfo // TLS information used when creating connection
          	ID          types.ID   // local member ID
          	URLs        types.URLs // local peer URLs
          	ClusterID   types.ID   // raft cluster ID for request validation
          	Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
          	Snapshotter *snap.Snapshotter
          	ServerStats *stats.ServerStats // used to record general transportation statistics
          	// used to record transportation statistics with followers when
          	// performing as leader in raft protocol
          	LeaderStats *stats.LeaderStats
          	// ErrorC is used to report detected critical errors, e.g.,
          	// the member has been permanently removed from the cluster
          	// When an error is received from ErrorC, user should stop raft state
          	// machine and thus stop the Transport.
          	ErrorC chan error
          	// contains filtered or unexported fields

            Transport implements Transporter interface. It provides the functionality to send raft messages to peers, and receive raft messages from peers. User should call Handler method to get a handler to serve requests received from peerURLs. User needs to call Start before calling other functions, and call Stop when the Transport is no longer used.

            func (*Transport) ActivePeers

            func (t *Transport) ActivePeers() (cnt int)

              ActivePeers returns a channel that closes when an initial peer connection has been established. Use this to wait until the first peer connection becomes active.

              func (*Transport) ActiveSince

              func (t *Transport) ActiveSince(id types.ID) time.Time

              func (*Transport) AddPeer

              func (t *Transport) AddPeer(id types.ID, us []string)

              func (*Transport) AddRemote

              func (t *Transport) AddRemote(id types.ID, us []string)

              func (*Transport) CutPeer

              func (t *Transport) CutPeer(id types.ID)

                CutPeer drops messages to the specified peer.

                func (*Transport) Get

                func (t *Transport) Get(id types.ID) Peer

                func (*Transport) Handler

                func (t *Transport) Handler() http.Handler

                func (*Transport) MendPeer

                func (t *Transport) MendPeer(id types.ID)

                  MendPeer recovers the message dropping behavior of the given peer.

                  func (*Transport) Pause

                  func (t *Transport) Pause()

                  func (*Transport) RemoveAllPeers

                  func (t *Transport) RemoveAllPeers()

                  func (*Transport) RemovePeer

                  func (t *Transport) RemovePeer(id types.ID)

                  func (*Transport) Resume

                  func (t *Transport) Resume()

                  func (*Transport) Send

                  func (t *Transport) Send(msgs []raftpb.Message)

                  func (*Transport) SendSnapshot

                  func (t *Transport) SendSnapshot(m snap.Message)

                  func (*Transport) Start

                  func (t *Transport) Start() error

                  func (*Transport) Stop

                  func (t *Transport) Stop()

                  func (*Transport) UpdatePeer

                  func (t *Transport) UpdatePeer(id types.ID, us []string)

                  type Transporter

                  type Transporter interface {
                  	// Start starts the given Transporter.
                  	// Start MUST be called before calling other functions in the interface.
                  	Start() error
                  	// Handler returns the HTTP handler of the transporter.
                  	// A transporter HTTP handler handles the HTTP requests
                  	// from remote peers.
                  	// The handler MUST be used to handle RaftPrefix(/raft)
                  	// endpoint.
                  	Handler() http.Handler
                  	// Send sends out the given messages to the remote peers.
                  	// Each message has a To field, which is an id that maps
                  	// to an existing peer in the transport.
                  	// If the id cannot be found in the transport, the message
                  	// will be ignored.
                  	Send(m []raftpb.Message)
                  	// SendSnapshot sends out the given snapshot message to a remote peer.
                  	// The behavior of SendSnapshot is similar to Send.
                  	SendSnapshot(m snap.Message)
                  	// AddRemote adds a remote with given peer urls into the transport.
                  	// A remote helps newly joined member to catch up the progress of cluster,
                  	// and will not be used after that.
                  	// It is the caller's responsibility to ensure the urls are all valid,
                  	// or it panics.
                  	AddRemote(id types.ID, urls []string)
                  	// AddPeer adds a peer with given peer urls into the transport.
                  	// It is the caller's responsibility to ensure the urls are all valid,
                  	// or it panics.
                  	// Peer urls are used to connect to the remote peer.
                  	AddPeer(id types.ID, urls []string)
                  	// RemovePeer removes the peer with given id.
                  	RemovePeer(id types.ID)
                  	// RemoveAllPeers removes all the existing peers in the transport.
                  	// UpdatePeer updates the peer urls of the peer with the given id.
                  	// It is the caller's responsibility to ensure the urls are all valid,
                  	// or it panics.
                  	UpdatePeer(id types.ID, urls []string)
                  	// ActiveSince returns the time that the connection with the peer
                  	// of the given id becomes active.
                  	// If the connection is active since peer was added, it returns the adding time.
                  	// If the connection is currently inactive, it returns zero time.
                  	ActiveSince(id types.ID) time.Time
                  	// ActivePeers returns the number of active peers.
                  	ActivePeers() int
                  	// Stop closes the connections and stops the transporter.

                  func NewNopTransporter

                  func NewNopTransporter() Transporter

                  func NewSnapTransporter

                  func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message)