disaggregated

package
v1.45.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package disaggregated implements disaggregated prefill/decode serving.

Stability: alpha

Package disaggregated implements disaggregated prefill/decode serving.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DecodeClient

type DecodeClient interface {
	Decode(ctx context.Context, req *disaggpb.DecodeRequest) (TokenReceiver, error)
}

DecodeClient abstracts the decode RPC.

type DecodeWorkerServer

type DecodeWorkerServer struct {
	disaggpb.UnimplementedDecodeWorkerServer
	// contains filtered or unexported fields
}

DecodeWorkerServer implements the DecodeWorker gRPC service. It receives KV blocks from a prefill worker, populates the KV cache, runs autoregressive decode steps, and streams generated tokens back.

func NewDecodeWorkerServer

func NewDecodeWorkerServer(model ForwardModel) *DecodeWorkerServer

NewDecodeWorkerServer creates a new DecodeWorkerServer.

func (*DecodeWorkerServer) Decode

Decode implements the DecodeWorker gRPC service. It receives a DecodeRequest containing KV blocks and token IDs, runs autoregressive decode steps, and streams TokenStream messages back to the caller.

type ForwardModel

type ForwardModel interface {
	Forward(ctx context.Context, inputs ...*tensor.TensorNumeric[float32]) (*tensor.TensorNumeric[float32], error)
}

ForwardModel abstracts the single-token forward pass for decode. The model receives an input token tensor and returns logits.

type Gateway

type Gateway struct {
	// contains filtered or unexported fields
}

Gateway routes incoming HTTP requests to disaggregated prefill and decode workers. It maintains connection pools for both worker types, routes each request to the least-loaded prefill worker, streams KV blocks to a decode worker, and multiplexes the decode token stream as an HTTP SSE response.

func NewGateway

func NewGateway(cfg GatewayConfig) (*Gateway, error)

NewGateway creates a new Gateway and dials all worker addresses. It starts background health check goroutines for each worker.

func NewTestGateway

func NewTestGateway(prefillClients []PrefillClient, decodeClients []DecodeClient) *Gateway

NewTestGateway creates a Gateway with mock clients (no gRPC connections). This is intended for benchmarks and integration tests that operate outside the disaggregated package.

func (*Gateway) Close

func (g *Gateway) Close() error

Close shuts down health checks and closes all gRPC connections.

func (*Gateway) ServeHTTP

func (g *Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP handles an incoming chat/completion request by:

  1. Picking the least-loaded prefill worker
  2. Streaming KV blocks from prefill
  3. Sending KV blocks to a decode worker
  4. Streaming decode tokens back as SSE

type GatewayConfig

type GatewayConfig struct {
	PrefillAddrs []string
	DecodeAddrs  []string

	// HealthCheckInterval is the base interval for health checks.
	// Defaults to 1s.
	HealthCheckInterval time.Duration
	// MaxBackoff is the maximum backoff duration for health checks.
	// Defaults to 30s.
	MaxBackoff time.Duration

	// DialOptions are extra gRPC dial options for worker connections.
	DialOptions []grpc.DialOption
}

GatewayConfig holds configuration for the Gateway.

type KVBlockReceiver

type KVBlockReceiver interface {
	Recv() (*disaggpb.KVBlockStream, error)
}

KVBlockReceiver reads KVBlockStream messages from a prefill stream.

type PrefillClient

type PrefillClient interface {
	Prefill(ctx context.Context, req *disaggpb.PreFillRequest) (KVBlockReceiver, error)
}

PrefillClient abstracts the prefill RPC. The production implementation wraps disaggpb.PrefillWorkerClient; tests can supply a mock.

type PrefillWorkerServer

type PrefillWorkerServer struct {
	disaggpb.UnimplementedPrefillWorkerServer
	// contains filtered or unexported fields
}

PrefillWorkerServer implements the disaggpb.PrefillWorkerServer gRPC service. It delegates the forward pass to a Prefiller and streams the resulting KV blocks as FP16 bytes.

func NewPrefillWorkerServer

func NewPrefillWorkerServer(p Prefiller) *PrefillWorkerServer

NewPrefillWorkerServer creates a new PrefillWorkerServer.

func (*PrefillWorkerServer) Prefill

Prefill runs the prefill forward pass on the prompt tokens from req, serialises each layer's KV cache as FP16 bytes, and streams them back as KVBlockStream messages. A final message with Done=true signals completion.

func (*PrefillWorkerServer) Register

func (s *PrefillWorkerServer) Register(reg grpc.ServiceRegistrar)

Register registers the server with a gRPC service registrar.

type Prefiller

type Prefiller interface {
	// Prefill runs the prompt through the model graph. It returns the
	// number of layers and a function to retrieve the KV data for each
	// layer. The returned float32 slices are the raw key/value cache
	// contents after the prefill pass.
	Prefill(ctx context.Context, tokenIDs []int32) (numLayers int, getKV func(layer int) (k, v []float32, err error), err error)
}

Prefiller runs the prefill forward pass on prompt tokens and returns per-layer KV cache data as float32 slices. Implementations wrap the real inference session; tests can supply a stub.

type TokenReceiver

type TokenReceiver interface {
	Recv() (*disaggpb.TokenStream, error)
}

TokenReceiver reads TokenStream messages from a decode stream.

Directories

Path Synopsis
Package disaggpb defines the gRPC service contracts for disaggregated prefill/decode serving.
Package disaggpb defines the gRPC service contracts for disaggregated prefill/decode serving.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL