Documentation
¶
Index ¶
- type ActiveConnection
- type ConnectionManager
- func (cm *ConnectionManager) CloseAll()
- func (cm *ConnectionManager) ConnectTo(ctx context.Context, address string, namespaces []string) (*pb.ConnectResponse, error)
- func (cm *ConnectionManager) Disconnect(ctx context.Context, connectionID string, reason string, failed bool) error
- func (cm *ConnectionManager) GetActiveConnection(connectionID string) (*ActiveConnection, bool)
- func (cm *ConnectionManager) GetClient(address string) (pb.CollectiveDispatcherClient, bool)
- func (cm *ConnectionManager) HandleConnect(ctx context.Context, req *pb.ConnectRequest) (*pb.ConnectResponse, error)
- func (cm *ConnectionManager) ListActiveConnections() []*ActiveConnection
- func (cm *ConnectionManager) ListConnections() []*pb.Connection
- func (cm *ConnectionManager) RecordRequest(ctx context.Context, connectionID string, bytesSent, bytesReceived int64)
- func (cm *ConnectionManager) RecoverFromRestart(ctx context.Context) error
- type Dispatcher
- func (d *Dispatcher) Connect(ctx context.Context, req *pb.ConnectRequest) (*pb.ConnectResponse, error)
- func (d *Dispatcher) ConnectTo(ctx context.Context, address string, namespaces []string) (*pb.ConnectResponse, error)
- func (d *Dispatcher) Dispatch(ctx context.Context, req *pb.DispatchRequest) (*pb.DispatchResponse, error)
- func (d *Dispatcher) GetConnectionManager() *ConnectionManager
- func (d *Dispatcher) RegisterService(namespace, serviceName, methodName string, handler ServiceHandler)
- func (d *Dispatcher) Serve(ctx context.Context, req *pb.ServeRequest) (*pb.ServeResponse, error)
- func (d *Dispatcher) SetRegistryValidator(validator RegistryValidator)
- func (d *Dispatcher) Shutdown()
- type RegistryValidator
- type ServiceHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActiveConnection ¶
type ActiveConnection struct {
ConnectionID string // Links to persisted Connection record
Client pb.CollectiveDispatcherClient // gRPC client for making calls
GrpcConn *grpc.ClientConn // Underlying gRPC connection
LastActivity time.Time // For timeout/health checks
IsInitiator bool // True if we initiated this connection
// Cached connection info for when persistence is unavailable
Connection *pb.Connection
}
ActiveConnection represents the runtime state of an active connection. This is kept in memory and contains resources that cannot be persisted (gRPC clients).
type ConnectionManager ¶
type ConnectionManager struct {
// contains filtered or unexported fields
}
ConnectionManager manages connections between collectors. It maintains both persisted connection records (in system/connections collection) and active in-memory connection state (gRPC clients, etc.).
func NewConnectionManager ¶
func NewConnectionManager(collectorID, address string, namespaces []string, sessionID string, coll *collection.Collection) *ConnectionManager
NewConnectionManager creates a new connection manager. The sessionID should be unique per collector restart (e.g., UUID or timestamp-based).
func (*ConnectionManager) CloseAll ¶
func (cm *ConnectionManager) CloseAll()
CloseAll closes all active connections.
func (*ConnectionManager) ConnectTo ¶
func (cm *ConnectionManager) ConnectTo(ctx context.Context, address string, namespaces []string) (*pb.ConnectResponse, error)
ConnectTo initiates a connection to another collector.
func (*ConnectionManager) Disconnect ¶
func (cm *ConnectionManager) Disconnect(ctx context.Context, connectionID string, reason string, failed bool) error
Disconnect closes a connection and updates the persisted record.
func (*ConnectionManager) GetActiveConnection ¶
func (cm *ConnectionManager) GetActiveConnection(connectionID string) (*ActiveConnection, bool)
GetActiveConnection returns an active connection by ID.
func (*ConnectionManager) GetClient ¶
func (cm *ConnectionManager) GetClient(address string) (pb.CollectiveDispatcherClient, bool)
GetClient returns a client for the given address.
func (*ConnectionManager) HandleConnect ¶
func (cm *ConnectionManager) HandleConnect(ctx context.Context, req *pb.ConnectRequest) (*pb.ConnectResponse, error)
HandleConnect processes an incoming connection request from another collector.
func (*ConnectionManager) ListActiveConnections ¶
func (cm *ConnectionManager) ListActiveConnections() []*ActiveConnection
ListActiveConnections returns all active connections.
func (*ConnectionManager) ListConnections ¶
func (cm *ConnectionManager) ListConnections() []*pb.Connection
ListConnections returns Connection protos for all active connections. If persistence is enabled, this loads the persisted data. Otherwise, it uses the cached Connection from in-memory state.
func (*ConnectionManager) RecordRequest ¶
func (cm *ConnectionManager) RecordRequest(ctx context.Context, connectionID string, bytesSent, bytesReceived int64)
RecordRequest increments the request count and updates activity for a connection.
func (*ConnectionManager) RecoverFromRestart ¶
func (cm *ConnectionManager) RecoverFromRestart(ctx context.Context) error
RecoverFromRestart marks connections from previous sessions as STALE. This should be called on startup before accepting new connections.
type Dispatcher ¶
type Dispatcher struct {
pb.UnimplementedCollectiveDispatcherServer
// contains filtered or unexported fields
}
Dispatcher implements the CollectiveDispatcher service
func NewDispatcher ¶
func NewDispatcher(collectorID, address string, namespaces []string) *Dispatcher
NewDispatcher creates a new dispatcher instance
func NewDispatcherWithRegistry ¶
func NewDispatcherWithRegistry(collectorID, address string, namespaces []string, validator RegistryValidator, coll *collection.Collection) *Dispatcher
NewDispatcherWithRegistry creates a new dispatcher instance with registry validation
func (*Dispatcher) Connect ¶
func (d *Dispatcher) Connect(ctx context.Context, req *pb.ConnectRequest) (*pb.ConnectResponse, error)
Connect handles incoming connection requests
func (*Dispatcher) ConnectTo ¶
func (d *Dispatcher) ConnectTo(ctx context.Context, address string, namespaces []string) (*pb.ConnectResponse, error)
ConnectTo initiates a connection to another collector
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch(ctx context.Context, req *pb.DispatchRequest) (*pb.DispatchResponse, error)
Dispatch routes a request to the appropriate collector
func (*Dispatcher) GetConnectionManager ¶
func (d *Dispatcher) GetConnectionManager() *ConnectionManager
GetConnectionManager returns the connection manager
func (*Dispatcher) RegisterService ¶
func (d *Dispatcher) RegisterService(namespace, serviceName, methodName string, handler ServiceHandler)
RegisterService registers a service handler for a namespace and method
func (*Dispatcher) Serve ¶
func (d *Dispatcher) Serve(ctx context.Context, req *pb.ServeRequest) (*pb.ServeResponse, error)
Serve handles service method invocations from other collectors
func (*Dispatcher) SetRegistryValidator ¶
func (d *Dispatcher) SetRegistryValidator(validator RegistryValidator)
SetRegistryValidator sets the registry validator for this dispatcher
type RegistryValidator ¶
type RegistryValidator interface {
ValidateServiceMethod(ctx context.Context, namespace, serviceName, methodName string) error
}
RegistryValidator is an interface for validating services against a registry
type ServiceHandler ¶
ServiceHandler is a function that handles a service method invocation