peerstream

package
v1.16.109 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: MPL-2.0 Imports: 47 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ACLResolver

type ACLResolver interface {
	ResolveTokenAndDefaultMeta(string, *acl.EnterpriseMeta, *acl.AuthorizerContext) (resolver.Result, error)
}

type Backend

type Backend interface {
	Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)

	// IsLeader indicates whether the consul server is in a leader state or not.
	IsLeader() bool

	// SetLeaderAddress is called on a raft.LeaderObservation in a go routine
	// in the consul server; see trackLeaderChanges()
	SetLeaderAddress(string)

	// GetLeaderAddress provides the best hint for the current address of the
	// leader. There is no guarantee that this is the actual address of the
	// leader.
	GetLeaderAddress() string

	ValidateProposedPeeringSecret(id string) (bool, error)
	PeeringSecretsWrite(req *pbpeering.SecretsWriteRequest) error
	PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
	PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
	CatalogRegister(req *structs.RegisterRequest) error
	CatalogDeregister(req *structs.DeregisterRequest) error
	PeeringWrite(req *pbpeering.PeeringWriteRequest) error
}

type BidirectionalStream

type BidirectionalStream interface {
	Send(*pbpeerstream.ReplicationMessage) error
	Recv() (*pbpeerstream.ReplicationMessage, error)
	Context() context.Context
}

type Config

type Config struct {
	Backend     Backend
	GetStore    func() StateStore
	Logger      hclog.Logger
	ForwardRPC  func(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error)
	ACLResolver ACLResolver
	// Datacenter of the Consul server this gRPC server is hosted on
	Datacenter     string
	ConnectEnabled bool
	// contains filtered or unexported fields
}

type HandleStreamRequest

type HandleStreamRequest struct {
	// LocalID is the UUID for the peering in the local Consul datacenter.
	LocalID string

	// RemoteID is the UUID for the peering from the perspective of the peer.
	RemoteID string

	// PeerName is the name of the peering.
	PeerName string

	// Partition is the local partition associated with the peer.
	Partition string

	// Stream is the open stream to the peer cluster.
	Stream BidirectionalStream
}

func (HandleStreamRequest) IsAcceptor

func (r HandleStreamRequest) IsAcceptor() bool

type MaterializedViewStore

type MaterializedViewStore interface {
	Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
	Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
}

type MockACLResolver

type MockACLResolver struct {
	mock.Mock
}

MockACLResolver is an autogenerated mock type for the ACLResolver type

func NewMockACLResolver

func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver

NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.

func (*MockACLResolver) ResolveTokenAndDefaultMeta

func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.EnterpriseMeta, _a2 *acl.AuthorizerContext) (resolver.Result, error)

ResolveTokenAndDefaultMeta provides a mock function with given fields: _a0, _a1, _a2

type MockClient

type MockClient struct {
	ErrCh             chan error
	ReplicationStream *MockStream
	// contains filtered or unexported fields
}

func NewMockClient

func NewMockClient(ctx context.Context) *MockClient

func (*MockClient) Close

func (c *MockClient) Close()

func (*MockClient) DrainStream

func (c *MockClient) DrainStream(t *testing.T)

DrainStream reads messages from the stream until both the exported service list and trust bundle messages have been read. We do this because their ording is indeterministic.

func (*MockClient) Recv

func (*MockClient) RecvWithTimeout

func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeerstream.ReplicationMessage, error)

func (*MockClient) Send

type MockStream

type MockStream struct {
	// contains filtered or unexported fields
}

MockStream mocks peering.PeeringService_StreamResourcesServer

func (*MockStream) Context

func (s *MockStream) Context() context.Context

Context implements grpc.ServerStream and grpc.ClientStream

func (*MockStream) Recv

Recv implements pbpeerstream.PeeringService_StreamResourcesServer

func (*MockStream) RecvMsg

func (s *MockStream) RecvMsg(m interface{}) error

RecvMsg implements grpc.ServerStream and grpc.ClientStream

func (*MockStream) Send

Send implements pbpeerstream.PeeringService_StreamResourcesServer

func (*MockStream) SendHeader

func (s *MockStream) SendHeader(metadata.MD) error

SendHeader implements grpc.ServerStream

func (*MockStream) SendMsg

func (s *MockStream) SendMsg(m interface{}) error

SendMsg implements grpc.ServerStream and grpc.ClientStream

func (*MockStream) SetHeader

func (s *MockStream) SetHeader(metadata.MD) error

SetHeader implements grpc.ServerStream

func (*MockStream) SetTrailer

func (s *MockStream) SetTrailer(metadata.MD)

SetTrailer implements grpc.ServerStream

type MutableStatus

type MutableStatus struct {
	Status
	// contains filtered or unexported fields
}

func (*MutableStatus) Done

func (s *MutableStatus) Done() <-chan struct{}

func (*MutableStatus) GetExportedServicesCount

func (s *MutableStatus) GetExportedServicesCount() int

func (*MutableStatus) GetImportedServicesCount

func (s *MutableStatus) GetImportedServicesCount() int

func (*MutableStatus) GetStatus

func (s *MutableStatus) GetStatus() Status

func (*MutableStatus) IsConnected

func (s *MutableStatus) IsConnected() bool

func (*MutableStatus) SetExportedServices

func (s *MutableStatus) SetExportedServices(serviceNames []structs.ServiceName)

func (*MutableStatus) SetImportedServices

func (s *MutableStatus) SetImportedServices(serviceNames []structs.ServiceName)

func (*MutableStatus) TrackAck

func (s *MutableStatus) TrackAck()

func (*MutableStatus) TrackConnected

func (s *MutableStatus) TrackConnected()

func (*MutableStatus) TrackDisconnectedDueToError

func (s *MutableStatus) TrackDisconnectedDueToError(error string)

TrackDisconnectedDueToError tracks when the stream was disconnected due to an error. For example the heartbeat timed out, or we couldn't send into the stream.

func (*MutableStatus) TrackDisconnectedGracefully

func (s *MutableStatus) TrackDisconnectedGracefully()

TrackDisconnectedGracefully tracks when the stream was disconnected in a way we expected. For example, we got a terminated message, or we terminated the stream ourselves.

func (*MutableStatus) TrackNack

func (s *MutableStatus) TrackNack(msg string)

func (*MutableStatus) TrackRecvError

func (s *MutableStatus) TrackRecvError(error string)

func (*MutableStatus) TrackRecvHeartbeat

func (s *MutableStatus) TrackRecvHeartbeat()

TrackRecvHeartbeat tracks receiving a heartbeat from our peer.

func (*MutableStatus) TrackRecvResourceSuccess

func (s *MutableStatus) TrackRecvResourceSuccess()

TrackRecvResourceSuccess tracks receiving a replicated resource.

func (*MutableStatus) TrackSendError

func (s *MutableStatus) TrackSendError(error string)

func (*MutableStatus) TrackSendSuccess

func (s *MutableStatus) TrackSendSuccess()

type Server

type Server struct {
	Config

	Tracker *Tracker
}

func NewServer

func NewServer(cfg Config) *Server

func (*Server) ConnectedStreams

func (s *Server) ConnectedStreams() map[string]chan struct{}

ConnectedStreams returns a map of connected stream IDs to the corresponding channel for tearing them down.

func (*Server) DrainStream

func (s *Server) DrainStream(req HandleStreamRequest)

DrainStream attempts to gracefully drain the stream when the connection is going to be torn down. Tearing down the connection too quickly can lead our peer receiving a context cancellation error before the stream termination message. Handling the termination message is important to set the expectation that the peering will not be reestablished unless recreated.

func (*Server) ExchangeSecret

ExchangeSecret exchanges the one-time secret embedded in a peering token for a long-lived secret for use with the peering stream handler. This secret exchange prevents peering tokens from being reused.

Note that if the peering secret exchange fails, a peering token may need to be re-generated, since the one-time initiation secret may have been invalidated.

func (*Server) HandleStream

func (s *Server) HandleStream(streamReq HandleStreamRequest) error

func (*Server) Register

func (s *Server) Register(grpcServer *grpc.Server)

func (*Server) StreamResources

StreamResources handles incoming streaming connections.

func (*Server) StreamStatus

func (s *Server) StreamStatus(peerID string) (resp Status, found bool)

type StateStore

type StateStore interface {
	PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
	PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
	PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
	PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
	PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
	PeeringSecretsRead(ws memdb.WatchSet, peerID string) (*pbpeering.PeeringSecrets, error)
	ExportedServicesForPeer(ws memdb.WatchSet, peerID, dc string) (uint64, *structs.ExportedServiceList, error)
	ServiceDump(ws memdb.WatchSet, kind structs.ServiceKind, useKind bool, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
	CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
	NodeServiceList(ws memdb.WatchSet, nodeNameOrID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeServiceList, error)
	CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error)
	TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
	ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error)
	ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *acl.EnterpriseMeta) (uint64, structs.ConfigEntry, error)
	AbandonCh() <-chan struct{}
}

StateStore provides a read-only interface for querying Peering data.

type Status

type Status struct {
	// Connected is true when there is an open stream for the peer.
	Connected bool

	// NeverConnected is true for peerings that have never connected, false otherwise.
	NeverConnected bool

	// DisconnectErrorMessage tracks the error that caused the stream to disconnect non-gracefully.
	// If the stream is connected or it disconnected gracefully it will be empty.
	DisconnectErrorMessage string

	// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
	DisconnectTime *time.Time

	// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
	LastAck *time.Time

	// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
	LastNack *time.Time

	// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
	LastNackMessage string

	// LastSendError tracks the time of the last error sending into the stream.
	LastSendError *time.Time

	// LastSendErrorMessage tracks the last error message when sending into the stream.
	LastSendErrorMessage string

	// LastSendSuccess tracks the time we last successfully sent a resource TO the peer.
	LastSendSuccess *time.Time

	// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
	LastRecvHeartbeat *time.Time

	// LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
	LastRecvResourceSuccess *time.Time

	// LastRecvError tracks either:
	// - The time we failed to store a resource replicated FROM the peer.
	// - The time of the last error when receiving from the stream.
	LastRecvError *time.Time

	// LastRecvErrorMessage tracks the last error message when receiving from the stream.
	LastRecvErrorMessage string

	// TODO(peering): consider keeping track of imported and exported services thru raft
	// ImportedServices keeps track of which service names are imported for the peer
	ImportedServices []string
	// ExportedServices keeps track of which service names a peer asks to export
	ExportedServices []string
}

Status contains information about the replication stream to a peer cluster. TODO(peering): There's a lot of fields here...

func (*Status) GetExportedServicesCount

func (s *Status) GetExportedServicesCount() uint64

func (*Status) GetImportedServicesCount

func (s *Status) GetImportedServicesCount() uint64

type Subscriber

type Subscriber interface {
	Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error)
}

type SubscriptionBackend

type SubscriptionBackend interface {
	Subscriber
}

type Tracker

type Tracker struct {
	// contains filtered or unexported fields
}

Tracker contains a map of (PeerID -> MutableStatus). As streams are opened and closed we track details about their status.

func NewTracker

func NewTracker(heartbeatTimeout time.Duration) *Tracker

func (*Tracker) Connected

func (t *Tracker) Connected(id string) (*MutableStatus, error)

Connected registers a stream for a given peer, and marks it as connected. It also enforces that there is only one active stream for a peer.

func (*Tracker) ConnectedStreams

func (t *Tracker) ConnectedStreams() map[string]chan struct{}

func (*Tracker) DeleteStatus

func (t *Tracker) DeleteStatus(id string)

func (*Tracker) DisconnectedDueToError

func (t *Tracker) DisconnectedDueToError(id string, error string)

DisconnectedDueToError marks the peer id's stream status as disconnected due to an error.

func (*Tracker) DisconnectedGracefully

func (t *Tracker) DisconnectedGracefully(id string)

DisconnectedGracefully marks the peer id's stream status as disconnected gracefully.

func (*Tracker) IsHealthy

func (t *Tracker) IsHealthy(s Status) bool

IsHealthy is a calculates the health of a peering status. We define a peering as unhealthy if its status has been in the following states for longer than the configured incomingHeartbeatTimeout.

  • If it is disconnected
  • If the last received Nack is newer than last received Ack
  • If the last received error is newer than last received success

If none of these conditions apply, we call the peering healthy.

func (*Tracker) Register

func (t *Tracker) Register(id string) (*MutableStatus, error)

Register a stream for a given peer but do not mark it as connected.

func (*Tracker) StreamStatus

func (t *Tracker) StreamStatus(id string) (resp Status, found bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL