federation

package
Version: v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2021 License: MIT Imports: 41 Imported by: 1

README

Federation

Federation is a kind of clustering mechanism which provides high-availability and horizontal scaling. In Federation mode, multiple gmqtt brokers can be grouped together and "act as one". However, it is impossible to fulfill all requirements in MQTT specification in a distributed environment. There are some limitations:

  1. Persistent session cannot be resumed from another node.
  2. Clients with same client id can connect to different nodes at the same time and will not be kicked out.

This is because session information only stores in local node and does not share between nodes.

Quick Start

The following commands will start a two nodes federation, the configuration files can be found here.
Start node1 in Terminal1:

$ gmqttd start -c path/to/retry_join/node1_config.yml

Start node2 in Terminate2:

$ gmqttd start -c path/to/retry_join/node2_config2.yml

After node1 and node2 is started, they will join into one federation atomically.

We can test the federation with mosquitto_pub/sub:
Connect to node2 and subscribe topicA:

$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884

Connect to node1 and send a message to topicA:

$ mosquitto_pub -t topicA -m 123 -h 127.0.0.1 -p 1883

The mosquitto_sub will receive "123" and print it in the terminal.

$ mosquitto_sub -t topicA -h 127.0.0.1 -p 1884
123

Join Nodes via REST API

Federation provides gRPC/REST API to join/leave and query members information, see swagger for details. In addition to join nodes upon starting up, you can join a node into federation by using Join API.

Start node3 with the configuration with empty retry_join which means that the node will not join any nodes upon starting up.

$ gmqttd start -c path/to/retry_join/join_node3_config.yml

We can send Join request to any nodes in the federation to get node3 joined, for example, sends Join request to node1:

$ curl -X POST -d '{"hosts":["127.0.0.1:8932"]}'  '127.0.0.1:8083/v1/federation/join' 
{}                                                                                                

And check the members in federation:

curl http://127.0.0.1:8083/v1/federation/members   
{
    "members": [
        {
            "name": "node1",
            "addr": "192.168.0.105:8902",
            "tags": {
                "fed_addr": "192.168.0.105:8901"
            },
            "status": "STATUS_ALIVE"
        },
        {
            "name": "node2",
            "addr": "192.168.0.105:8912",
            "tags": {
                "fed_addr": "192.168.0.105:8911"
            },
            "status": "STATUS_ALIVE"
        },
        {
            "name": "node3",
            "addr": "192.168.0.105:8932",
            "tags": {
                "fed_addr": "192.168.0.105:8931"
            },
            "status": "STATUS_ALIVE"
        }
    ]
}%

You will see there are 3 nodes ara alive in the federation.

Configuration

// Config is the configuration for the federation plugin.
type Config struct {
	// NodeName is the unique identifier for the node in the federation. Defaults to hostname.
	NodeName string `yaml:"node_name"`
	// FedAddr is the gRPC server listening address for the federation internal communication.
	// Defaults to :8901.
	// If the port is missing, the default federation port (8901) will be used.
	FedAddr string `yaml:"fed_addr"`
	// AdvertiseFedAddr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
	// Defaults to "FedAddr" or the private IP address of the node if the IP in "FedAddr" is 0.0.0.0.
	// However, in some cases, there may be a routable address that cannot be bound.
	// If the port is missing, the default federation port (8901) will be used.
	AdvertiseFedAddr string `yaml:"advertise_fed_addr"`
	// GossipAddr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
	GossipAddr string `yaml:"gossip_addr"`
	// AdvertiseGossipAddr is used to change the gossip server address that we advertise to other nodes in the cluster.
	// Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
	// If the port is missing, the default gossip port (8902) will be used.
	AdvertiseGossipAddr string `yaml:"advertise_gossip_addr"`
	// RetryJoin is the address of other nodes to join upon starting up.
	// If port is missing, the default gossip port (8902) will be used.
	RetryJoin []string `yaml:"retry_join"`
	// RetryInterval is the time to wait between join attempts. Defaults to 5s.
	RetryInterval time.Duration `yaml:"retry_interval"`
	// RetryTimeout is the timeout to wait before joining all nodes in RetryJoin successfully.
	// If timeout expires, the server will exit with error. Defaults to 1m.
	RetryTimeout time.Duration `yaml:"retry_timeout"`
	// SnapshotPath will be pass to "SnapshotPath" in serf configuration.
	// When Serf is started with a snapshot,
	// it will attempt to join all the previously known nodes until one
	// succeeds and will also avoid replaying old user events.
	SnapshotPath string `yaml:"snapshot_path"`
	// RejoinAfterLeave will be pass to "RejoinAfterLeave" in serf configuration.
	// It controls our interaction with the snapshot file.
	// When set to false (default), a leave causes a Serf to not rejoin
	// the cluster until an explicit join is received. If this is set to
	// true, we ignore the leave, and rejoin the cluster on start.
	RejoinAfterLeave bool `yaml:"rejoin_after_leave"`
}

Implementation Details

Inner-node Communication

Nodes in the same federation communicate with each other through a couple of gRPC streaming apis:

message Event {
    uint64 id = 1;
    oneof Event {
        Subscribe Subscribe = 2;
        Message message = 3;
        Unsubscribe unsubscribe = 4;
    }
}
service Federation {
    rpc Hello(ClientHello) returns (ServerHello){}
    rpc EventStream (stream Event) returns (stream Ack){}
}

In general, a node is both Client and Server which implements the Federation gRPC service.

  • As Client, the node will send subscribe, unsubscribe and message published events to other nodes if necessary.
    Each event has a EventID, which is incremental and unique in a session.
  • As Server, when receives a event from Client, the node returns an acknowledgement after the event has been handled successfully.
Session State

The event is designed to be idempotent and will be delivered at least once, just like the QoS 1 message in MQTT protocol. In order to implement QoS 1 protocol flows, the Client and Server need to associate state with a SessionID, this is referred to as the Session State. The Server also stores the federation tree and retained messages as part of the Session State.

The Session State in the Client consists of:

  • Events which have been sent to the Server, but have not been acknowledged.
  • Events pending transmission to the Server.

The Session State in the Server consists of:

  • The existence of a Session, even if the rest of the Session State is empty.
  • The EventID of the next event that the Server is willing to accept.
  • Events which have been received from the Client, but have not sent acknowledged yet.

The Session State stores in memory only. When the Client starts, it generates a random UUID as SessionID. When the Client detects a new node is joined or reconnects to the Server, it sends the Hello request which contains the SessionID to perform a handshake. During the handshake, the Server will check whether the session for the SessionID exists.

  • If the session not exists, the Server sends response with clean_start=true.
  • If the session exists, the Server sends response with clean_start=false and sets the next EventID that it is willing to accept to next_event_id.

After handshake succeed, the Client will start EventStream:

  • If the Client receives clean_start=true, it sends all local subscriptions and retained messages to the Server in order to sync the full state.
  • If the Client receives clean_start=false, it sends events of which the EventID is greater than or equal to next_event_id.
Subscription Tree

Each node in the federation will have two subscription trees, the local tree and the federation tree. The local tree stores subscriptions for local clients which is managed by gmqtt core and the federation tree stores the subscriptions for remote nodes which is managed by the federation plugin. The federation tree takes node name as subscriber identifier for subscriptions.

  • When receives a sub/unsub packet from a local client, the node will update it's local tree first and then broadcasts the event to other nodes.
  • When receives sub/unsub event from a remote node, the node will only update it's federation tree.

With the federation tree, the node can determine which node the incoming message should be routed to. For example, Node1 and Node2 are in the same federation. Client1 connects to Node1 and subscribes to topic a/b, the subscription trees of these two nodes are as follows:

Node1 local tree:

subscriber topic
client1 a/b

Node1 federation tree:
empty.

Node2 local tree:
empty.

Node2 federation tree:

subscriber topic
node1 a/b
Message Distribution Process

When an MQTT client publishes a message, the node where it is located queries the federation tree and forwards the message to the relevant node according to the message topic, and then the relevant node retrieves the local subscription tree and sends the message to the relevant subscriber.

Membership Management

Federation uses Serf to manage membership.

Documentation

Overview

Package federation is a reverse proxy.

It translates gRPC into RESTful JSON APIs.

Package federation is a generated GoMock package.

Package federation is a generated GoMock package.

Package federation is a generated GoMock package.

Package federation is a generated GoMock package.

Index

Constants

View Source
const (
	DefaultFedPort       = "8901"
	DefaultGossipPort    = "8902"
	DefaultRetryInterval = 5 * time.Second
	DefaultRetryTimeout  = 1 * time.Minute
)

Default config.

View Source
const Name = "federation"

Variables

View Source
var (
	Status_name = map[int32]string{
		0: "STATUS_UNSPECIFIED",
		1: "STATUS_ALIVE",
		2: "STATUS_LEAVING",
		3: "STATUS_LEFT",
		4: "STATUS_FAILED",
	}
	Status_value = map[string]int32{
		"STATUS_UNSPECIFIED": 0,
		"STATUS_ALIVE":       1,
		"STATUS_LEAVING":     2,
		"STATUS_LEFT":        3,
		"STATUS_FAILED":      4,
	}
)

Enum value maps for Status.

View Source
var DefaultConfig = Config{}

DefaultConfig is the default configuration.

View Source
var File_federation_proto protoreflect.FileDescriptor

Functions

func New

func New(config config.Config) (server.Plugin, error)

func RegisterFederationServer

func RegisterFederationServer(s grpc.ServiceRegistrar, srv FederationServer)

func RegisterMembershipHandler

func RegisterMembershipHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error

RegisterMembershipHandler registers the http handlers for service Membership to "mux". The handlers forward requests to the grpc endpoint over "conn".

func RegisterMembershipHandlerClient

func RegisterMembershipHandlerClient(ctx context.Context, mux *runtime.ServeMux, client MembershipClient) error

RegisterMembershipHandlerClient registers the http handlers for service Membership to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "MembershipClient". Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "MembershipClient" doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in "MembershipClient" to call the correct interceptors.

func RegisterMembershipHandlerFromEndpoint

func RegisterMembershipHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error)

RegisterMembershipHandlerFromEndpoint is same as RegisterMembershipHandler but automatically dials to "endpoint" and closes the connection when "ctx" gets done.

func RegisterMembershipHandlerServer

func RegisterMembershipHandlerServer(ctx context.Context, mux *runtime.ServeMux, server MembershipServer) error

RegisterMembershipHandlerServer registers the http handlers for service Membership to "mux". UnaryRPC :call MembershipServer directly. StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.

func RegisterMembershipServer

func RegisterMembershipServer(s grpc.ServiceRegistrar, srv MembershipServer)

Types

type Ack

type Ack struct {
	EventId uint64 `protobuf:"varint,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
	// contains filtered or unexported fields
}

func (*Ack) Descriptor deprecated

This method has been deprecated.
func (*Ack) Descriptor() ([]byte, []int)

Deprecated: Use Ack.ProtoReflect.Descriptor instead.

func (*Ack) GetEventId

func (x *Ack) GetEventId() uint64

func (*Ack) ProtoMessage

func (*Ack) ProtoMessage()

func (*Ack) ProtoReflect

func (x *Ack) ProtoReflect() protoreflect.Message

func (*Ack) Reset

func (x *Ack) Reset()

func (*Ack) String

func (x *Ack) String() string

type ClientHello

type ClientHello struct {
	SessionId string `protobuf:"bytes,1,opt,name=session_id,json=sessionId,proto3" json:"session_id,omitempty"`
	// contains filtered or unexported fields
}

ClientHello is the request message in handshake process.

func (*ClientHello) Descriptor deprecated

This method has been deprecated.
func (*ClientHello) Descriptor() ([]byte, []int)

Deprecated: Use ClientHello.ProtoReflect.Descriptor instead.

func (*ClientHello) GetSessionId

func (x *ClientHello) GetSessionId() string

func (*ClientHello) ProtoMessage

func (*ClientHello) ProtoMessage()

func (*ClientHello) ProtoReflect

func (x *ClientHello) ProtoReflect() protoreflect.Message

func (*ClientHello) Reset

func (x *ClientHello) Reset()

func (*ClientHello) String

func (x *ClientHello) String() string

type Config

type Config struct {
	// NodeName is the unique identifier for the node in the federation. Defaults to hostname.
	NodeName string `yaml:"node_name"`
	// FedAddr is the gRPC server listening address for the federation internal communication.
	// Defaults to :8901.
	// If the port is missing, the default federation port (8901) will be used.
	FedAddr string `yaml:"fed_addr"`
	// AdvertiseFedAddr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
	// Defaults to "FedAddr" or the private IP address of the node if the IP in "FedAddr" is 0.0.0.0.
	// However, in some cases, there may be a routable address that cannot be bound.
	// If the port is missing, the default federation port (8901) will be used.
	AdvertiseFedAddr string `yaml:"advertise_fed_addr"`
	// GossipAddr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
	GossipAddr string `yaml:"gossip_addr"`
	// AdvertiseGossipAddr is used to change the gossip server address that we advertise to other nodes in the cluster.
	// Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
	// If the port is missing, the default gossip port (8902) will be used.
	AdvertiseGossipAddr string `yaml:"advertise_gossip_addr"`
	// RetryJoin is the address of other nodes to join upon starting up.
	// If port is missing, the default gossip port (8902) will be used.
	RetryJoin []string `yaml:"retry_join"`
	// RetryInterval is the time to wait between join attempts. Defaults to 5s.
	RetryInterval time.Duration `yaml:"retry_interval"`
	// RetryTimeout is the timeout to wait before joining all nodes in RetryJoin successfully.
	// If timeout expires, the server will exit with error. Defaults to 1m.
	RetryTimeout time.Duration `yaml:"retry_timeout"`
	// SnapshotPath will be pass to "SnapshotPath" in serf configuration.
	// When Serf is started with a snapshot,
	// it will attempt to join all the previously known nodes until one
	// succeeds and will also avoid replaying old user events.
	SnapshotPath string `yaml:"snapshot_path"`
	// RejoinAfterLeave will be pass to "RejoinAfterLeave" in serf configuration.
	// It controls our interaction with the snapshot file.
	// When set to false (default), a leave causes a Serf to not rejoin
	// the cluster until an explicit join is received. If this is set to
	// true, we ignore the leave, and rejoin the cluster on start.
	RejoinAfterLeave bool `yaml:"rejoin_after_leave"`
}

Config is the configuration for the federation plugin.

func (*Config) UnmarshalYAML

func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

func (*Config) Validate

func (c *Config) Validate() (err error)

Validate validates the configuration, and return an error if it is invalid.

type Event

type Event struct {
	Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	// Types that are assignable to Event:
	//	*Event_Subscribe
	//	*Event_Message
	//	*Event_Unsubscribe
	Event isEvent_Event `protobuf_oneof:"Event"`
	// contains filtered or unexported fields
}

func (*Event) Descriptor deprecated

This method has been deprecated.
func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetEvent

func (m *Event) GetEvent() isEvent_Event

func (*Event) GetId

func (x *Event) GetId() uint64

func (*Event) GetMessage

func (x *Event) GetMessage() *Message

func (*Event) GetSubscribe

func (x *Event) GetSubscribe() *Subscribe

func (*Event) GetUnsubscribe

func (x *Event) GetUnsubscribe() *Unsubscribe

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

type Event_Message

type Event_Message struct {
	Message *Message `protobuf:"bytes,3,opt,name=message,proto3,oneof"`
}

type Event_Subscribe

type Event_Subscribe struct {
	Subscribe *Subscribe `protobuf:"bytes,2,opt,name=Subscribe,proto3,oneof"`
}

type Event_Unsubscribe

type Event_Unsubscribe struct {
	Unsubscribe *Unsubscribe `protobuf:"bytes,4,opt,name=unsubscribe,proto3,oneof"`
}

type Federation

type Federation struct {
	// contains filtered or unexported fields
}

func (*Federation) EventStream

func (f *Federation) EventStream(stream Federation_EventStreamServer) (err error)

func (*Federation) ForceLeave

func (f *Federation) ForceLeave(ctx context.Context, req *ForceLeaveRequest) (*empty.Empty, error)

ForceLeave forces a member of a Serf cluster to enter the "left" state. Note that if the member is still actually alive, it will eventually rejoin the cluster. The true purpose of this method is to force remove "failed" nodes See https://www.serf.io/docs/commands/force-leave.html for details.

func (*Federation) Hello

func (f *Federation) Hello(ctx context.Context, req *ClientHello) (resp *ServerHello, err error)

Hello is the handler for the handshake process before opening the event stream.

func (*Federation) HookWrapper

func (f *Federation) HookWrapper() server.HookWrapper

func (*Federation) Join

func (f *Federation) Join(ctx context.Context, req *JoinRequest) (resp *empty.Empty, err error)

Join tells the local node to join the an existing cluster. See https://www.serf.io/docs/commands/join.html for details.

func (*Federation) Leave

func (f *Federation) Leave(ctx context.Context, req *empty.Empty) (resp *empty.Empty, err error)

Leave triggers a graceful leave for the local node. This is used to ensure other nodes see the node as "left" instead of "failed". Note that a leaved node cannot re-join the cluster unless you restart the leaved node.

func (*Federation) ListMembers

func (f *Federation) ListMembers(ctx context.Context, req *empty.Empty) (resp *ListMembersResponse, err error)

ListMembers lists all known members in the Serf cluster.

func (*Federation) Load

func (f *Federation) Load(service server.Server) error

func (*Federation) Name

func (f *Federation) Name() string

func (*Federation) OnMsgArrivedWrapper

func (f *Federation) OnMsgArrivedWrapper(pre server.OnMsgArrived) server.OnMsgArrived

func (*Federation) OnSessionTerminatedWrapper

func (f *Federation) OnSessionTerminatedWrapper(pre server.OnSessionTerminated) server.OnSessionTerminated

func (*Federation) OnSubscribedWrapper

func (f *Federation) OnSubscribedWrapper(pre server.OnSubscribed) server.OnSubscribed

func (*Federation) OnUnsubscribedWrapper

func (f *Federation) OnUnsubscribedWrapper(pre server.OnUnsubscribed) server.OnUnsubscribed

func (*Federation) OnWillPublishWrapper

func (f *Federation) OnWillPublishWrapper(pre server.OnWillPublish) server.OnWillPublish

func (*Federation) Unload

func (f *Federation) Unload() error

type FederationClient

type FederationClient interface {
	Hello(ctx context.Context, in *ClientHello, opts ...grpc.CallOption) (*ServerHello, error)
	EventStream(ctx context.Context, opts ...grpc.CallOption) (Federation_EventStreamClient, error)
}

FederationClient is the client API for Federation service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

func NewFederationClient

func NewFederationClient(cc grpc.ClientConnInterface) FederationClient

type FederationServer

type FederationServer interface {
	Hello(context.Context, *ClientHello) (*ServerHello, error)
	EventStream(Federation_EventStreamServer) error
	// contains filtered or unexported methods
}

FederationServer is the server API for Federation service. All implementations must embed UnimplementedFederationServer for forward compatibility

type Federation_EventStreamClient

type Federation_EventStreamClient interface {
	Send(*Event) error
	Recv() (*Ack, error)
	grpc.ClientStream
}

type Federation_EventStreamServer

type Federation_EventStreamServer interface {
	Send(*Ack) error
	Recv() (*Event, error)
	grpc.ServerStream
}

type ForceLeaveRequest

type ForceLeaveRequest struct {
	NodeName string `protobuf:"bytes,1,opt,name=node_name,json=nodeName,proto3" json:"node_name,omitempty"`
	// contains filtered or unexported fields
}

func (*ForceLeaveRequest) Descriptor deprecated

This method has been deprecated.
func (*ForceLeaveRequest) Descriptor() ([]byte, []int)

Deprecated: Use ForceLeaveRequest.ProtoReflect.Descriptor instead.

func (*ForceLeaveRequest) GetNodeName

func (x *ForceLeaveRequest) GetNodeName() string

func (*ForceLeaveRequest) ProtoMessage

func (*ForceLeaveRequest) ProtoMessage()

func (*ForceLeaveRequest) ProtoReflect

func (x *ForceLeaveRequest) ProtoReflect() protoreflect.Message

func (*ForceLeaveRequest) Reset

func (x *ForceLeaveRequest) Reset()

func (*ForceLeaveRequest) String

func (x *ForceLeaveRequest) String() string

type JoinRequest

type JoinRequest struct {
	Hosts []string `protobuf:"bytes,1,rep,name=hosts,proto3" json:"hosts,omitempty"`
	// contains filtered or unexported fields
}

func (*JoinRequest) Descriptor deprecated

This method has been deprecated.
func (*JoinRequest) Descriptor() ([]byte, []int)

Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead.

func (*JoinRequest) GetHosts

func (x *JoinRequest) GetHosts() []string

func (*JoinRequest) ProtoMessage

func (*JoinRequest) ProtoMessage()

func (*JoinRequest) ProtoReflect

func (x *JoinRequest) ProtoReflect() protoreflect.Message

func (*JoinRequest) Reset

func (x *JoinRequest) Reset()

func (*JoinRequest) String

func (x *JoinRequest) String() string

type ListMembersResponse

type ListMembersResponse struct {
	Members []*Member `protobuf:"bytes,1,rep,name=members,proto3" json:"members,omitempty"`
	// contains filtered or unexported fields
}

func (*ListMembersResponse) Descriptor deprecated

This method has been deprecated.
func (*ListMembersResponse) Descriptor() ([]byte, []int)

Deprecated: Use ListMembersResponse.ProtoReflect.Descriptor instead.

func (*ListMembersResponse) GetMembers

func (x *ListMembersResponse) GetMembers() []*Member

func (*ListMembersResponse) ProtoMessage

func (*ListMembersResponse) ProtoMessage()

func (*ListMembersResponse) ProtoReflect

func (x *ListMembersResponse) ProtoReflect() protoreflect.Message

func (*ListMembersResponse) Reset

func (x *ListMembersResponse) Reset()

func (*ListMembersResponse) String

func (x *ListMembersResponse) String() string

type Member

type Member struct {
	Name   string            `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Addr   string            `protobuf:"bytes,2,opt,name=addr,proto3" json:"addr,omitempty"`
	Tags   map[string]string `` /* 149-byte string literal not displayed */
	Status Status            `protobuf:"varint,4,opt,name=status,proto3,enum=gmqtt.federation.api.Status" json:"status,omitempty"`
	// contains filtered or unexported fields
}

func (*Member) Descriptor deprecated

This method has been deprecated.
func (*Member) Descriptor() ([]byte, []int)

Deprecated: Use Member.ProtoReflect.Descriptor instead.

func (*Member) GetAddr

func (x *Member) GetAddr() string

func (*Member) GetName

func (x *Member) GetName() string

func (*Member) GetStatus

func (x *Member) GetStatus() Status

func (*Member) GetTags

func (x *Member) GetTags() map[string]string

func (*Member) ProtoMessage

func (*Member) ProtoMessage()

func (*Member) ProtoReflect

func (x *Member) ProtoReflect() protoreflect.Message

func (*Member) Reset

func (x *Member) Reset()

func (*Member) String

func (x *Member) String() string

type MembershipClient

type MembershipClient interface {
	// Join tells the local node to join the an existing cluster.
	// See https://www.serf.io/docs/commands/join.html for details.
	Join(ctx context.Context, in *JoinRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Leave triggers a graceful leave for the local node.
	// This is used to ensure other nodes see the node as "left" instead of "failed".
	// Note that a leaved node cannot re-join the cluster unless you restart the leaved node.
	Leave(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error)
	// ForceLeave force forces a member of a Serf cluster to enter the "left" state.
	// Note that if the member is still actually alive, it will eventually rejoin the cluster.
	// The true purpose of this method is to force remove "failed" nodes
	// See https://www.serf.io/docs/commands/force-leave.html for details.
	ForceLeave(ctx context.Context, in *ForceLeaveRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// ListMembers lists all known members in the Serf cluster.
	ListMembers(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ListMembersResponse, error)
}

MembershipClient is the client API for Membership service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

func NewMembershipClient

func NewMembershipClient(cc grpc.ClientConnInterface) MembershipClient

type MembershipServer

type MembershipServer interface {
	// Join tells the local node to join the an existing cluster.
	// See https://www.serf.io/docs/commands/join.html for details.
	Join(context.Context, *JoinRequest) (*empty.Empty, error)
	// Leave triggers a graceful leave for the local node.
	// This is used to ensure other nodes see the node as "left" instead of "failed".
	// Note that a leaved node cannot re-join the cluster unless you restart the leaved node.
	Leave(context.Context, *empty.Empty) (*empty.Empty, error)
	// ForceLeave force forces a member of a Serf cluster to enter the "left" state.
	// Note that if the member is still actually alive, it will eventually rejoin the cluster.
	// The true purpose of this method is to force remove "failed" nodes
	// See https://www.serf.io/docs/commands/force-leave.html for details.
	ForceLeave(context.Context, *ForceLeaveRequest) (*empty.Empty, error)
	// ListMembers lists all known members in the Serf cluster.
	ListMembers(context.Context, *empty.Empty) (*ListMembersResponse, error)
	// contains filtered or unexported methods
}

MembershipServer is the server API for Membership service. All implementations must embed UnimplementedMembershipServer for forward compatibility

type Message

type Message struct {
	TopicName string `protobuf:"bytes,1,opt,name=topic_name,json=topicName,proto3" json:"topic_name,omitempty"`
	Payload   string `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
	Qos       uint32 `protobuf:"varint,3,opt,name=qos,proto3" json:"qos,omitempty"`
	Retained  bool   `protobuf:"varint,4,opt,name=retained,proto3" json:"retained,omitempty"`
	// the following fields are using in v5 client.
	ContentType     string          `protobuf:"bytes,5,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"`
	CorrelationData string          `protobuf:"bytes,6,opt,name=correlation_data,json=correlationData,proto3" json:"correlation_data,omitempty"`
	MessageExpiry   uint32          `protobuf:"varint,7,opt,name=message_expiry,json=messageExpiry,proto3" json:"message_expiry,omitempty"`
	PayloadFormat   uint32          `protobuf:"varint,8,opt,name=payload_format,json=payloadFormat,proto3" json:"payload_format,omitempty"`
	ResponseTopic   string          `protobuf:"bytes,9,opt,name=response_topic,json=responseTopic,proto3" json:"response_topic,omitempty"`
	UserProperties  []*UserProperty `protobuf:"bytes,10,rep,name=user_properties,json=userProperties,proto3" json:"user_properties,omitempty"`
	// contains filtered or unexported fields
}

func (*Message) Descriptor deprecated

This method has been deprecated.
func (*Message) Descriptor() ([]byte, []int)

Deprecated: Use Message.ProtoReflect.Descriptor instead.

func (*Message) GetContentType

func (x *Message) GetContentType() string

func (*Message) GetCorrelationData

func (x *Message) GetCorrelationData() string

func (*Message) GetMessageExpiry

func (x *Message) GetMessageExpiry() uint32

func (*Message) GetPayload

func (x *Message) GetPayload() string

func (*Message) GetPayloadFormat

func (x *Message) GetPayloadFormat() uint32

func (*Message) GetQos

func (x *Message) GetQos() uint32

func (*Message) GetResponseTopic

func (x *Message) GetResponseTopic() string

func (*Message) GetRetained

func (x *Message) GetRetained() bool

func (*Message) GetTopicName

func (x *Message) GetTopicName() string

func (*Message) GetUserProperties

func (x *Message) GetUserProperties() []*UserProperty

func (*Message) ProtoMessage

func (*Message) ProtoMessage()

func (*Message) ProtoReflect

func (x *Message) ProtoReflect() protoreflect.Message

func (*Message) Reset

func (x *Message) Reset()

func (*Message) String

func (x *Message) String() string

type MockFederationClient

type MockFederationClient struct {
	// contains filtered or unexported fields
}

MockFederationClient is a mock of FederationClient interface

func NewMockFederationClient

func NewMockFederationClient(ctrl *gomock.Controller) *MockFederationClient

NewMockFederationClient creates a new mock instance

func (*MockFederationClient) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockFederationClient) EventStream

EventStream mocks base method

func (*MockFederationClient) Hello

func (m *MockFederationClient) Hello(arg0 context.Context, arg1 *ClientHello, arg2 ...grpc.CallOption) (*ServerHello, error)

Hello mocks base method

type MockFederationClientMockRecorder

type MockFederationClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockFederationClientMockRecorder is the mock recorder for MockFederationClient

func (*MockFederationClientMockRecorder) EventStream

func (mr *MockFederationClientMockRecorder) EventStream(arg0 interface{}, arg1 ...interface{}) *gomock.Call

EventStream indicates an expected call of EventStream

func (*MockFederationClientMockRecorder) Hello

func (mr *MockFederationClientMockRecorder) Hello(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call

Hello indicates an expected call of Hello

type MockFederation_EventStreamClient

type MockFederation_EventStreamClient struct {
	// contains filtered or unexported fields
}

MockFederation_EventStreamClient is a mock of Federation_EventStreamClient interface

func NewMockFederation_EventStreamClient

func NewMockFederation_EventStreamClient(ctrl *gomock.Controller) *MockFederation_EventStreamClient

NewMockFederation_EventStreamClient creates a new mock instance

func (*MockFederation_EventStreamClient) CloseSend

func (m *MockFederation_EventStreamClient) CloseSend() error

CloseSend mocks base method

func (*MockFederation_EventStreamClient) Context

Context mocks base method

func (*MockFederation_EventStreamClient) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockFederation_EventStreamClient) Header

Header mocks base method

func (*MockFederation_EventStreamClient) Recv

Recv mocks base method

func (*MockFederation_EventStreamClient) RecvMsg

func (m *MockFederation_EventStreamClient) RecvMsg(arg0 interface{}) error

RecvMsg mocks base method

func (*MockFederation_EventStreamClient) Send

Send mocks base method

func (*MockFederation_EventStreamClient) SendMsg

func (m *MockFederation_EventStreamClient) SendMsg(arg0 interface{}) error

SendMsg mocks base method

func (*MockFederation_EventStreamClient) Trailer

Trailer mocks base method

type MockFederation_EventStreamClientMockRecorder

type MockFederation_EventStreamClientMockRecorder struct {
	// contains filtered or unexported fields
}

MockFederation_EventStreamClientMockRecorder is the mock recorder for MockFederation_EventStreamClient

func (*MockFederation_EventStreamClientMockRecorder) CloseSend

CloseSend indicates an expected call of CloseSend

func (*MockFederation_EventStreamClientMockRecorder) Context

Context indicates an expected call of Context

func (*MockFederation_EventStreamClientMockRecorder) Header

Header indicates an expected call of Header

func (*MockFederation_EventStreamClientMockRecorder) Recv

Recv indicates an expected call of Recv

func (*MockFederation_EventStreamClientMockRecorder) RecvMsg

func (mr *MockFederation_EventStreamClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call

RecvMsg indicates an expected call of RecvMsg

func (*MockFederation_EventStreamClientMockRecorder) Send

func (mr *MockFederation_EventStreamClientMockRecorder) Send(arg0 interface{}) *gomock.Call

Send indicates an expected call of Send

func (*MockFederation_EventStreamClientMockRecorder) SendMsg

func (mr *MockFederation_EventStreamClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call

SendMsg indicates an expected call of SendMsg

func (*MockFederation_EventStreamClientMockRecorder) Trailer

Trailer indicates an expected call of Trailer

type MockiSerf

type MockiSerf struct {
	// contains filtered or unexported fields
}

MockiSerf is a mock of iSerf interface

func NewMockiSerf

func NewMockiSerf(ctrl *gomock.Controller) *MockiSerf

NewMockiSerf creates a new mock instance

func (*MockiSerf) EXPECT

func (m *MockiSerf) EXPECT() *MockiSerfMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockiSerf) Join

func (m *MockiSerf) Join(existing []string, ignoreOld bool) (int, error)

Join mocks base method

func (*MockiSerf) Leave

func (m *MockiSerf) Leave() error

Leave mocks base method

func (*MockiSerf) Members

func (m *MockiSerf) Members() []serf.Member

Members mocks base method

func (*MockiSerf) RemoveFailedNode

func (m *MockiSerf) RemoveFailedNode(node string) error

RemoveFailedNode mocks base method

func (*MockiSerf) Shutdown

func (m *MockiSerf) Shutdown() error

Shutdown mocks base method

type MockiSerfMockRecorder

type MockiSerfMockRecorder struct {
	// contains filtered or unexported fields
}

MockiSerfMockRecorder is the mock recorder for MockiSerf

func (*MockiSerfMockRecorder) Join

func (mr *MockiSerfMockRecorder) Join(existing, ignoreOld interface{}) *gomock.Call

Join indicates an expected call of Join

func (*MockiSerfMockRecorder) Leave

func (mr *MockiSerfMockRecorder) Leave() *gomock.Call

Leave indicates an expected call of Leave

func (*MockiSerfMockRecorder) Members

func (mr *MockiSerfMockRecorder) Members() *gomock.Call

Members indicates an expected call of Members

func (*MockiSerfMockRecorder) RemoveFailedNode

func (mr *MockiSerfMockRecorder) RemoveFailedNode(node interface{}) *gomock.Call

RemoveFailedNode indicates an expected call of RemoveFailedNode

func (*MockiSerfMockRecorder) Shutdown

func (mr *MockiSerfMockRecorder) Shutdown() *gomock.Call

Shutdown indicates an expected call of Shutdown

type MockisEvent_Event

type MockisEvent_Event struct {
	// contains filtered or unexported fields
}

MockisEvent_Event is a mock of isEvent_Event interface

func NewMockisEvent_Event

func NewMockisEvent_Event(ctrl *gomock.Controller) *MockisEvent_Event

NewMockisEvent_Event creates a new mock instance

func (*MockisEvent_Event) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

type MockisEvent_EventMockRecorder

type MockisEvent_EventMockRecorder struct {
	// contains filtered or unexported fields
}

MockisEvent_EventMockRecorder is the mock recorder for MockisEvent_Event

type Mockqueue

type Mockqueue struct {
	// contains filtered or unexported fields
}

Mockqueue is a mock of queue interface

func NewMockqueue

func NewMockqueue(ctrl *gomock.Controller) *Mockqueue

NewMockqueue creates a new mock instance

func (*Mockqueue) EXPECT

func (m *Mockqueue) EXPECT() *MockqueueMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

type MockqueueMockRecorder

type MockqueueMockRecorder struct {
	// contains filtered or unexported fields
}

MockqueueMockRecorder is the mock recorder for Mockqueue

type ServerHello

type ServerHello struct {
	CleanStart  bool   `protobuf:"varint,1,opt,name=clean_start,json=cleanStart,proto3" json:"clean_start,omitempty"`
	NextEventId uint64 `protobuf:"varint,2,opt,name=next_event_id,json=nextEventId,proto3" json:"next_event_id,omitempty"`
	// contains filtered or unexported fields
}

ServerHello is the response message in handshake process.

func (*ServerHello) Descriptor deprecated

This method has been deprecated.
func (*ServerHello) Descriptor() ([]byte, []int)

Deprecated: Use ServerHello.ProtoReflect.Descriptor instead.

func (*ServerHello) GetCleanStart

func (x *ServerHello) GetCleanStart() bool

func (*ServerHello) GetNextEventId

func (x *ServerHello) GetNextEventId() uint64

func (*ServerHello) ProtoMessage

func (*ServerHello) ProtoMessage()

func (*ServerHello) ProtoReflect

func (x *ServerHello) ProtoReflect() protoreflect.Message

func (*ServerHello) Reset

func (x *ServerHello) Reset()

func (*ServerHello) String

func (x *ServerHello) String() string

type Status

type Status int32
const (
	Status_STATUS_UNSPECIFIED Status = 0
	Status_STATUS_ALIVE       Status = 1
	Status_STATUS_LEAVING     Status = 2
	Status_STATUS_LEFT        Status = 3
	Status_STATUS_FAILED      Status = 4
)

func (Status) Descriptor

func (Status) Descriptor() protoreflect.EnumDescriptor

func (Status) Enum

func (x Status) Enum() *Status

func (Status) EnumDescriptor deprecated

This method has been deprecated.
func (Status) EnumDescriptor() ([]byte, []int)

Deprecated: Use Status.Descriptor instead.

func (Status) Number

func (x Status) Number() protoreflect.EnumNumber

func (Status) String

func (x Status) String() string

func (Status) Type

func (Status) Type() protoreflect.EnumType

type Subscribe

type Subscribe struct {
	ShareName   string `protobuf:"bytes,1,opt,name=share_name,json=shareName,proto3" json:"share_name,omitempty"`
	TopicFilter string `protobuf:"bytes,2,opt,name=topic_filter,json=topicFilter,proto3" json:"topic_filter,omitempty"`
	// contains filtered or unexported fields
}

Subscribe represents the subscription for a node, it is used to route message among nodes, so only shared_name and topic_filter is required.

func (*Subscribe) Descriptor deprecated

This method has been deprecated.
func (*Subscribe) Descriptor() ([]byte, []int)

Deprecated: Use Subscribe.ProtoReflect.Descriptor instead.

func (*Subscribe) GetShareName

func (x *Subscribe) GetShareName() string

func (*Subscribe) GetTopicFilter

func (x *Subscribe) GetTopicFilter() string

func (*Subscribe) ProtoMessage

func (*Subscribe) ProtoMessage()

func (*Subscribe) ProtoReflect

func (x *Subscribe) ProtoReflect() protoreflect.Message

func (*Subscribe) Reset

func (x *Subscribe) Reset()

func (*Subscribe) String

func (x *Subscribe) String() string

type UnimplementedFederationServer

type UnimplementedFederationServer struct {
}

UnimplementedFederationServer must be embedded to have forward compatible implementations.

func (UnimplementedFederationServer) EventStream

func (UnimplementedFederationServer) Hello

type UnimplementedMembershipServer

type UnimplementedMembershipServer struct {
}

UnimplementedMembershipServer must be embedded to have forward compatible implementations.

func (UnimplementedMembershipServer) ForceLeave

func (UnimplementedMembershipServer) Join

func (UnimplementedMembershipServer) Leave

func (UnimplementedMembershipServer) ListMembers

type UnsafeFederationServer

type UnsafeFederationServer interface {
	// contains filtered or unexported methods
}

UnsafeFederationServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to FederationServer will result in compilation errors.

type UnsafeMembershipServer

type UnsafeMembershipServer interface {
	// contains filtered or unexported methods
}

UnsafeMembershipServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to MembershipServer will result in compilation errors.

type Unsubscribe

type Unsubscribe struct {
	TopicName string `protobuf:"bytes,1,opt,name=topic_name,json=topicName,proto3" json:"topic_name,omitempty"`
	// contains filtered or unexported fields
}

func (*Unsubscribe) Descriptor deprecated

This method has been deprecated.
func (*Unsubscribe) Descriptor() ([]byte, []int)

Deprecated: Use Unsubscribe.ProtoReflect.Descriptor instead.

func (*Unsubscribe) GetTopicName

func (x *Unsubscribe) GetTopicName() string

func (*Unsubscribe) ProtoMessage

func (*Unsubscribe) ProtoMessage()

func (*Unsubscribe) ProtoReflect

func (x *Unsubscribe) ProtoReflect() protoreflect.Message

func (*Unsubscribe) Reset

func (x *Unsubscribe) Reset()

func (*Unsubscribe) String

func (x *Unsubscribe) String() string

type UserProperty

type UserProperty struct {
	K []byte `protobuf:"bytes,1,opt,name=K,proto3" json:"K,omitempty"`
	V []byte `protobuf:"bytes,2,opt,name=V,proto3" json:"V,omitempty"`
	// contains filtered or unexported fields
}

func (*UserProperty) Descriptor deprecated

This method has been deprecated.
func (*UserProperty) Descriptor() ([]byte, []int)

Deprecated: Use UserProperty.ProtoReflect.Descriptor instead.

func (*UserProperty) GetK

func (x *UserProperty) GetK() []byte

func (*UserProperty) GetV

func (x *UserProperty) GetV() []byte

func (*UserProperty) ProtoMessage

func (*UserProperty) ProtoMessage()

func (*UserProperty) ProtoReflect

func (x *UserProperty) ProtoReflect() protoreflect.Message

func (*UserProperty) Reset

func (x *UserProperty) Reset()

func (*UserProperty) String

func (x *UserProperty) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto