Documentation
¶
Overview ¶
Package disaggregated implements disaggregated prefill/decode serving.
Stability: alpha
Package disaggregated implements disaggregated prefill/decode serving.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DecodeClient ¶
type DecodeClient interface {
Decode(ctx context.Context, req *disaggpb.DecodeRequest) (TokenReceiver, error)
}
DecodeClient abstracts the decode RPC.
type DecodeWorkerServer ¶
type DecodeWorkerServer struct {
disaggpb.UnimplementedDecodeWorkerServer
// contains filtered or unexported fields
}
DecodeWorkerServer implements the DecodeWorker gRPC service. It receives KV blocks from a prefill worker, populates the KV cache, runs autoregressive decode steps, and streams generated tokens back.
func NewDecodeWorkerServer ¶
func NewDecodeWorkerServer(model ForwardModel) *DecodeWorkerServer
NewDecodeWorkerServer creates a new DecodeWorkerServer.
func (*DecodeWorkerServer) Decode ¶
func (s *DecodeWorkerServer) Decode(req *disaggpb.DecodeRequest, stream grpc.ServerStreamingServer[disaggpb.TokenStream]) error
Decode implements the DecodeWorker gRPC service. It receives a DecodeRequest containing KV blocks and token IDs, runs autoregressive decode steps, and streams TokenStream messages back to the caller.
type ForwardModel ¶
type ForwardModel interface {
Forward(ctx context.Context, inputs ...*tensor.TensorNumeric[float32]) (*tensor.TensorNumeric[float32], error)
}
ForwardModel abstracts the single-token forward pass for decode. The model receives an input token tensor and returns logits.
type Gateway ¶
type Gateway struct {
// contains filtered or unexported fields
}
Gateway routes incoming HTTP requests to disaggregated prefill and decode workers. It maintains connection pools for both worker types, routes each request to the least-loaded prefill worker, streams KV blocks to a decode worker, and multiplexes the decode token stream as an HTTP SSE response.
func NewGateway ¶
func NewGateway(cfg GatewayConfig) (*Gateway, error)
NewGateway creates a new Gateway and dials all worker addresses. It starts background health check goroutines for each worker.
func NewTestGateway ¶
func NewTestGateway(prefillClients []PrefillClient, decodeClients []DecodeClient) *Gateway
NewTestGateway creates a Gateway with mock clients (no gRPC connections). This is intended for benchmarks and integration tests that operate outside the disaggregated package.
type GatewayConfig ¶
type GatewayConfig struct {
PrefillAddrs []string
DecodeAddrs []string
// HealthCheckInterval is the base interval for health checks.
// Defaults to 1s.
HealthCheckInterval time.Duration
// MaxBackoff is the maximum backoff duration for health checks.
// Defaults to 30s.
MaxBackoff time.Duration
// DialOptions are extra gRPC dial options for worker connections.
DialOptions []grpc.DialOption
}
GatewayConfig holds configuration for the Gateway.
type KVBlockReceiver ¶
type KVBlockReceiver interface {
Recv() (*disaggpb.KVBlockStream, error)
}
KVBlockReceiver reads KVBlockStream messages from a prefill stream.
type PrefillClient ¶
type PrefillClient interface {
Prefill(ctx context.Context, req *disaggpb.PreFillRequest) (KVBlockReceiver, error)
}
PrefillClient abstracts the prefill RPC. The production implementation wraps disaggpb.PrefillWorkerClient; tests can supply a mock.
type PrefillWorkerServer ¶
type PrefillWorkerServer struct {
disaggpb.UnimplementedPrefillWorkerServer
// contains filtered or unexported fields
}
PrefillWorkerServer implements the disaggpb.PrefillWorkerServer gRPC service. It delegates the forward pass to a Prefiller and streams the resulting KV blocks as FP16 bytes.
func NewPrefillWorkerServer ¶
func NewPrefillWorkerServer(p Prefiller) *PrefillWorkerServer
NewPrefillWorkerServer creates a new PrefillWorkerServer.
func (*PrefillWorkerServer) Prefill ¶
func (s *PrefillWorkerServer) Prefill(req *disaggpb.PreFillRequest, stream grpc.ServerStreamingServer[disaggpb.KVBlockStream]) error
Prefill runs the prefill forward pass on the prompt tokens from req, serialises each layer's KV cache as FP16 bytes, and streams them back as KVBlockStream messages. A final message with Done=true signals completion.
func (*PrefillWorkerServer) Register ¶
func (s *PrefillWorkerServer) Register(reg grpc.ServiceRegistrar)
Register registers the server with a gRPC service registrar.
type Prefiller ¶
type Prefiller interface {
// Prefill runs the prompt through the model graph. It returns the
// number of layers and a function to retrieve the KV data for each
// layer. The returned float32 slices are the raw key/value cache
// contents after the prefill pass.
Prefill(ctx context.Context, tokenIDs []int32) (numLayers int, getKV func(layer int) (k, v []float32, err error), err error)
}
Prefiller runs the prefill forward pass on prompt tokens and returns per-layer KV cache data as float32 slices. Implementations wrap the real inference session; tests can supply a stub.
type TokenReceiver ¶
type TokenReceiver interface {
Recv() (*disaggpb.TokenStream, error)
}
TokenReceiver reads TokenStream messages from a decode stream.