Documentation
¶
Overview ¶
Package server implements the synapse admin RPC service. The AdminServiceHandler an embedder mounts on its own [http.ServeMux] is a thin adapter from the ConnectRPC interface to the user's existing es.EventStore, es.SnapshotStore, and es.CheckpointStore.
Typical wiring:
stores := server.Stores{
Events: eventStore,
Snapshots: snapshotStore,
Checkpoints: checkpointStore,
}
adminSrv := server.New(stores)
mux := http.NewServeMux()
path, handler := adminv1connect.NewAdminServiceHandler(adminSrv)
mux.Handle(path, handler)
http.ListenAndServe(":9090", mux)
See ADR-0033.
Index ¶
- type Server
- func (s *Server) GetCheckpoint(ctx context.Context, req *connect.Request[adminv1.GetCheckpointRequest]) (*connect.Response[adminv1.GetCheckpointResponse], error)
- func (s *Server) GetSnapshot(ctx context.Context, req *connect.Request[adminv1.GetSnapshotRequest]) (*connect.Response[adminv1.GetSnapshotResponse], error)
- func (s *Server) ListCheckpoints(ctx context.Context, _ *connect.Request[adminv1.ListCheckpointsRequest], ...) error
- func (s *Server) ListDeadLetterProjections(ctx context.Context, ...) error
- func (s *Server) ListDeadLetters(ctx context.Context, req *connect.Request[adminv1.ListDeadLettersRequest], ...) error
- func (s *Server) ListStreams(ctx context.Context, _ *connect.Request[adminv1.ListStreamsRequest], ...) error
- func (s *Server) RemoveDeadLetter(ctx context.Context, req *connect.Request[adminv1.RemoveDeadLetterRequest]) (*connect.Response[adminv1.RemoveDeadLetterResponse], error)
- func (s *Server) ResetCheckpoint(ctx context.Context, req *connect.Request[adminv1.ResetCheckpointRequest]) (*connect.Response[adminv1.ResetCheckpointResponse], error)
- func (s *Server) RestoreEvents(ctx context.Context, ...) (*connect.Response[adminv1.RestoreEventsResponse], error)
- func (s *Server) RestoreSnapshots(ctx context.Context, ...) (*connect.Response[adminv1.RestoreSnapshotsResponse], error)
- func (s *Server) ShredSubject(ctx context.Context, req *connect.Request[adminv1.ShredSubjectRequest]) (*connect.Response[adminv1.ShredSubjectResponse], error)
- func (s *Server) StreamEvents(ctx context.Context, req *connect.Request[adminv1.StreamEventsRequest], ...) error
- func (s *Server) StreamsBySubject(ctx context.Context, req *connect.Request[adminv1.StreamsBySubjectRequest], ...) error
- type Stores
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server implements adminv1connect.AdminServiceHandler. It carries no goroutines and no state beyond the stores it wraps; it is safe for concurrent use to the extent the underlying stores are.
func (*Server) GetCheckpoint ¶
func (s *Server) GetCheckpoint( ctx context.Context, req *connect.Request[adminv1.GetCheckpointRequest], ) (*connect.Response[adminv1.GetCheckpointResponse], error)
GetCheckpoint returns the saved position for a named projection. found=false when no checkpoint has been saved for that name.
func (*Server) GetSnapshot ¶
func (s *Server) GetSnapshot( ctx context.Context, req *connect.Request[adminv1.GetSnapshotRequest], ) (*connect.Response[adminv1.GetSnapshotResponse], error)
GetSnapshot returns the latest snapshot for a stream. found=false when no snapshot exists.
func (*Server) ListCheckpoints ¶
func (s *Server) ListCheckpoints( ctx context.Context, _ *connect.Request[adminv1.ListCheckpointsRequest], stream *connect.ServerStream[adminv1.ListCheckpointsResponse], ) error
ListCheckpoints yields every (name, position) pair the checkpoint store has saved. Backends that cannot enumerate names yield es.ErrUnsupported from Names; the server maps that to CodeUnimplemented. Each name's position is loaded individually so the stream carries fully-populated Checkpoint messages; this is N+1 against the checkpoint store but the count is typically small.
func (*Server) ListDeadLetterProjections ¶ added in v0.5.0
func (s *Server) ListDeadLetterProjections( ctx context.Context, _ *connect.Request[adminv1.ListDeadLetterProjectionsRequest], stream *connect.ServerStream[adminv1.ListDeadLetterProjectionsResponse], ) error
ListDeadLetterProjections enumerates projection names with dead- letter entries (ADR-0041). Returns CodeUnimplemented when no DeadLetterStore is configured.
func (*Server) ListDeadLetters ¶ added in v0.5.0
func (s *Server) ListDeadLetters( ctx context.Context, req *connect.Request[adminv1.ListDeadLettersRequest], stream *connect.ServerStream[adminv1.ListDeadLettersResponse], ) error
ListDeadLetters yields dead-letter entries for a projection in OccurredAt order. Returns CodeUnimplemented when no DeadLetterStore is configured; CodeInvalidArgument on empty projection.
func (*Server) ListStreams ¶
func (s *Server) ListStreams( ctx context.Context, _ *connect.Request[adminv1.ListStreamsRequest], stream *connect.ServerStream[adminv1.ListStreamsResponse], ) error
ListStreams yields every stream id the event store knows about. Backends that cannot enumerate streams yield es.ErrUnsupported from Streams; the server maps that to CodeUnimplemented.
func (*Server) RemoveDeadLetter ¶ added in v0.5.0
func (s *Server) RemoveDeadLetter( ctx context.Context, req *connect.Request[adminv1.RemoveDeadLetterRequest], ) (*connect.Response[adminv1.RemoveDeadLetterResponse], error)
RemoveDeadLetter deletes a DLQ entry by id. Idempotent.
func (*Server) ResetCheckpoint ¶
func (s *Server) ResetCheckpoint( ctx context.Context, req *connect.Request[adminv1.ResetCheckpointRequest], ) (*connect.Response[adminv1.ResetCheckpointResponse], error)
ResetCheckpoint clears the saved position for a named projection. A Load runs first so the response can report whether a checkpoint existed; this is one extra round trip against the checkpoint store per Reset, which is acceptable for an admin operation. The Reset itself is idempotent — see es.CheckpointStore.
func (*Server) RestoreEvents ¶ added in v0.4.0
func (s *Server) RestoreEvents( ctx context.Context, stream *connect.ClientStream[adminv1.RestoreEventsRequest], ) (*connect.Response[adminv1.RestoreEventsResponse], error)
RestoreEvents streams envelopes from the client and Appends them to the destination event store, batching consecutive events for the same stream into a single Append. The destination must be empty for the affected streams — version collisions surface as FailedPrecondition and stop the restore mid-stream (ADR-0040).
func (*Server) RestoreSnapshots ¶ added in v0.4.0
func (s *Server) RestoreSnapshots( ctx context.Context, stream *connect.ClientStream[adminv1.RestoreSnapshotsRequest], ) (*connect.Response[adminv1.RestoreSnapshotsResponse], error)
RestoreSnapshots streams snapshots from the client and Saves each to the destination snapshot store. Snapshots are overwrite-on-conflict, so re-runs are idempotent up to each snapshot's own version (ADR-0040).
func (*Server) ShredSubject ¶
func (s *Server) ShredSubject( ctx context.Context, req *connect.Request[adminv1.ShredSubjectRequest], ) (*connect.Response[adminv1.ShredSubjectResponse], error)
ShredSubject destroys the data-encryption key for the named subject via the wired crypto.KeyStore. Returns CodeUnimplemented when no KeyStore is configured; CodeInvalidArgument on an empty subject. The underlying KeyStore.Shred is idempotent.
func (*Server) StreamEvents ¶
func (s *Server) StreamEvents( ctx context.Context, req *connect.Request[adminv1.StreamEventsRequest], stream *connect.ServerStream[adminv1.StreamEventsResponse], ) error
StreamEvents yields events from a single stream in version order. from_version is exclusive; limit caps the number returned (0 means no cap).
func (*Server) StreamsBySubject ¶
func (s *Server) StreamsBySubject( ctx context.Context, req *connect.Request[adminv1.StreamsBySubjectRequest], stream *connect.ServerStream[adminv1.StreamsBySubjectResponse], ) error
StreamsBySubject yields every stream id containing at least one event tagged with the given Subject. Backends that do not maintain a subject index yield es.ErrUnsupported; the server maps that to CodeUnimplemented. An empty subject yields zero results.
type Stores ¶
type Stores struct {
Events es.EventStore
Snapshots es.SnapshotStore
Checkpoints es.CheckpointStore
// Keys, when non-nil, enables the ShredSubject RPC. Wire your
// keystore implementation here; the admin server delegates Shred
// directly to it. See ADR-0036.
Keys crypto.KeyStore
// DeadLetters, when non-nil, enables the ListDeadLetterProjections,
// ListDeadLetters, and RemoveDeadLetter RPCs (ADR-0041). The same
// store is typically wired into a projection.Runner via
// projection.WithDeadLetter so failed events land here.
DeadLetters es.DeadLetterStore
}
Stores is the set of synapse stores the admin server reads from. Events, Snapshots, and Checkpoints are required; Keys and DeadLetters are optional. Nil values for the required fields cause the corresponding RPCs to fail with INTERNAL. Nil Keys / DeadLetters make their RPCs return CodeUnimplemented (the rest of the surface is unaffected).