Documentation
¶
Overview ¶
Package parallel provides tensor and pipeline parallelism for distributing inference across multiple GPUs. Tensor parallelism splits individual linear layers across devices and synchronises with AllReduce; pipeline parallelism assigns sequential layer groups to different devices. Stability: alpha
Package parallel provides multi-GPU parallelism strategies for inference, including tensor parallelism and pipeline parallelism.
Index ¶
- func ColumnParallelLinear[T tensor.Numeric](ctx context.Context, engine compute.Engine[T], input *tensor.TensorNumeric[T], ...) (*tensor.TensorNumeric[T], error)
- func ConcatMicroBatches[T tensor.Numeric](microBatches []*tensor.TensorNumeric[T]) (*tensor.TensorNumeric[T], error)
- func RowParallelLinear[T tensor.Numeric](ctx context.Context, engine compute.Engine[T], input *tensor.TensorNumeric[T], ...) (*tensor.TensorNumeric[T], error)
- func SplitMicroBatches[T tensor.Numeric](batch *tensor.TensorNumeric[T], n int, engine compute.Engine[T]) ([]*tensor.TensorNumeric[T], error)
- type AllReducer
- type LayerFunc
- type PipelineExecutor
- type PipelineParallelConfig
- type PipelineScheduler
- type ScheduleStep
- type ShardedWeight
- type SplitMode
- type StageAssignment
- type SumAllReducer
- type TensorParallelConfig
- type TensorParallelLayer
- type TensorParallelWrapper
- func (w *TensorParallelWrapper[T]) AddColumnParallelLayer(weight *tensor.TensorNumeric[T]) error
- func (w *TensorParallelWrapper[T]) AddRowParallelLayer(weight *tensor.TensorNumeric[T]) error
- func (w *TensorParallelWrapper[T]) Config() TensorParallelConfig
- func (w *TensorParallelWrapper[T]) ForwardLayer(ctx context.Context, layerIdx int, rank int, input *tensor.TensorNumeric[T]) (*tensor.TensorNumeric[T], error)
- func (w *TensorParallelWrapper[T]) NumLayers() int
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ColumnParallelLinear ¶
func ColumnParallelLinear[T tensor.Numeric]( ctx context.Context, engine compute.Engine[T], input *tensor.TensorNumeric[T], shard *ShardedWeight[T], ) (*tensor.TensorNumeric[T], error)
ColumnParallelLinear computes a column-parallel linear projection. The weight has already been split column-wise: shard shape [inFeatures, outFeatures/N]. Input shape: [batch, seqLen, inFeatures]. Output shape: [batch, seqLen, outFeatures/N] (partial; caller concatenates across ranks).
func ConcatMicroBatches ¶
func ConcatMicroBatches[T tensor.Numeric]( microBatches []*tensor.TensorNumeric[T], ) (*tensor.TensorNumeric[T], error)
ConcatMicroBatches concatenates micro-batch outputs along dimension 0.
func RowParallelLinear ¶
func RowParallelLinear[T tensor.Numeric]( ctx context.Context, engine compute.Engine[T], input *tensor.TensorNumeric[T], shard *ShardedWeight[T], reducer AllReducer[T], ) (*tensor.TensorNumeric[T], error)
RowParallelLinear computes a row-parallel linear projection followed by AllReduce to sum partial results across ranks. The weight has been split row-wise: shard shape [inFeatures/N, outFeatures]. The input must be the rank-local slice: shape [batch, seqLen, inFeatures/N]. Output shape after AllReduce: [batch, seqLen, outFeatures].
func SplitMicroBatches ¶
func SplitMicroBatches[T tensor.Numeric]( batch *tensor.TensorNumeric[T], n int, engine compute.Engine[T], ) ([]*tensor.TensorNumeric[T], error)
SplitMicroBatches splits a batch tensor along dimension 0 into n micro-batches. The batch size (dimension 0) must be divisible by n.
Types ¶
type AllReducer ¶
type AllReducer[T tensor.Numeric] interface { // AllReduceSum sums the tensor across all ranks and returns the result. // The returned tensor replaces the input on each rank. AllReduceSum(ctx context.Context, t *tensor.TensorNumeric[T]) (*tensor.TensorNumeric[T], error) }
AllReducer performs an element-wise sum reduction across devices. Implementations may use NCCL for real multi-GPU, or a simple in-process sum for CPU-based testing.
type LayerFunc ¶
type LayerFunc[T tensor.Numeric] func(layerIdx int, input *tensor.TensorNumeric[T], engine compute.Engine[T]) (*tensor.TensorNumeric[T], error)
LayerFunc is a function that processes a single transformer layer. It receives the layer index, the input activation tensor, and the engine for the stage that owns the layer. It returns the output activation.
type PipelineExecutor ¶
PipelineExecutor runs micro-batches through the pipeline stages. Each stage has its own engine (typically mapped to a different GPU).
func NewPipelineExecutor ¶
func NewPipelineExecutor[T tensor.Numeric]( scheduler *PipelineScheduler, engines []compute.Engine[T], layerFn LayerFunc[T], ) (*PipelineExecutor[T], error)
NewPipelineExecutor creates an executor with one engine per stage. The engines slice must have exactly NumStages elements.
func (*PipelineExecutor[T]) Execute ¶
func (e *PipelineExecutor[T]) Execute(ctx context.Context, microBatches []*tensor.TensorNumeric[T]) ([]*tensor.TensorNumeric[T], error)
Execute runs pipeline-parallel inference on the given micro-batches. microBatches must have exactly MicroBatchSize elements. Returns the output activation for each micro-batch after the final stage.
type PipelineParallelConfig ¶
type PipelineParallelConfig struct {
// NumStages is the number of pipeline stages (typically one per GPU).
NumStages int
// NumLayers is the total number of transformer layers in the model.
NumLayers int
// MicroBatchSize is the number of micro-batches to split the input into.
// More micro-batches reduce the bubble ratio but increase scheduling overhead.
MicroBatchSize int
}
PipelineParallelConfig configures pipeline parallelism across multiple GPUs.
func (PipelineParallelConfig) Validate ¶
func (c PipelineParallelConfig) Validate() error
Validate checks that the configuration is well-formed.
type PipelineScheduler ¶
type PipelineScheduler struct {
// contains filtered or unexported fields
}
PipelineScheduler computes the execution schedule for pipeline parallelism. It determines the order of (stage, micro-batch) pairs for both forward passes (inference) and backward passes (training, not yet implemented).
func NewPipelineScheduler ¶
func NewPipelineScheduler(cfg PipelineParallelConfig) (*PipelineScheduler, error)
NewPipelineScheduler creates a scheduler for the given configuration.
func (*PipelineScheduler) Assignment ¶
func (s *PipelineScheduler) Assignment() StageAssignment
Assignment returns the layer-to-stage assignment.
func (*PipelineScheduler) BubbleRatio ¶
func (s *PipelineScheduler) BubbleRatio() float64
BubbleRatio computes the fraction of idle time (bubbles) in the pipeline. For GPipe with S stages and M micro-batches:
bubble_ratio = (S - 1) / (S + M - 1)
A lower ratio means better GPU utilization. With 4 stages and 16 micro-batches, the bubble ratio is 3/19 ~ 15.8%.
func (*PipelineScheduler) ForwardSchedule ¶
func (s *PipelineScheduler) ForwardSchedule() [][]ScheduleStep
ForwardSchedule returns the GPipe-style forward schedule. Each step pairs a stage with a micro-batch index. In GPipe, all micro-batches flow through stage 0 first, then stage 1, etc., but micro-batches can overlap across stages. The schedule is ordered by clock cycle.
For S stages and M micro-batches, the schedule has S+M-1 clock cycles. At each clock cycle c, all stages s where 0 <= c-s < M execute concurrently.
type ScheduleStep ¶
ScheduleStep represents one unit of work: process a micro-batch on a stage.
type ShardedWeight ¶
type ShardedWeight[T tensor.Numeric] struct { // Shard is the weight slice assigned to this rank. Shard *tensor.TensorNumeric[T] // Rank is the device index for this shard. Rank int // Mode indicates column or row split. Mode SplitMode }
ShardedWeight holds one device's slice of a weight matrix along with metadata about the split.
func SplitLinearColumnWise ¶
func SplitLinearColumnWise[T tensor.Numeric]( engine compute.Engine[T], weight *tensor.TensorNumeric[T], numShards int, ) ([]*ShardedWeight[T], error)
SplitLinearColumnWise splits a 2-D weight tensor [inFeatures, outFeatures] along the output dimension into numShards equal parts. Each shard has shape [inFeatures, outFeatures/numShards].
func SplitLinearRowWise ¶
func SplitLinearRowWise[T tensor.Numeric]( engine compute.Engine[T], weight *tensor.TensorNumeric[T], numShards int, ) ([]*ShardedWeight[T], error)
SplitLinearRowWise splits a 2-D weight tensor [inFeatures, outFeatures] along the input dimension into numShards equal parts. Each shard has shape [inFeatures/numShards, outFeatures].
type SplitMode ¶
type SplitMode int
SplitMode describes how a weight matrix is partitioned.
const ( // ColumnSplit splits the weight along the output (column) dimension. // Each shard computes a slice of the output; results are concatenated. ColumnSplit SplitMode = iota // RowSplit splits the weight along the input (row) dimension. // Each shard computes a partial sum; results are reduced via AllReduce. RowSplit )
type StageAssignment ¶
type StageAssignment struct {
// StageForLayer maps layer index to stage index.
StageForLayer []int
// LayersPerStage stores the layer indices assigned to each stage.
LayersPerStage [][]int
}
StageAssignment maps transformer layers to pipeline stages.
func AssignLayers ¶
func AssignLayers(numLayers, numStages int) StageAssignment
AssignLayers distributes transformer layers across pipeline stages as evenly as possible. Remainder layers are distributed to the first stages.
type SumAllReducer ¶
SumAllReducer is a simple in-process AllReducer that sums partials from all ranks. It is intended for single-process CPU testing where multiple "ranks" are simulated with separate engine instances operating on partitioned tensors.
func NewSumAllReducer ¶
NewSumAllReducer creates an AllReducer that sums tensors in-process. numRanks is the number of partials expected per reduction.
func (*SumAllReducer[T]) AddPartial ¶
func (r *SumAllReducer[T]) AddPartial(t *tensor.TensorNumeric[T])
AddPartial registers a partial result from one rank. Once all partials are registered, AllReduceSum can be called.
func (*SumAllReducer[T]) AllReduceSum ¶
func (r *SumAllReducer[T]) AllReduceSum(ctx context.Context, t *tensor.TensorNumeric[T]) (*tensor.TensorNumeric[T], error)
AllReduceSum returns the element-wise sum of all registered partials. After the call, partials are cleared automatically.
func (*SumAllReducer[T]) Reset ¶
func (r *SumAllReducer[T]) Reset()
Reset clears accumulated partials for the next reduction round.
type TensorParallelConfig ¶
type TensorParallelConfig struct {
// NumGPUs is the number of devices to split layers across.
NumGPUs int
// DeviceIDs identifies each device. Length must equal NumGPUs.
DeviceIDs []int
}
TensorParallelConfig configures tensor parallelism across GPUs.
func (*TensorParallelConfig) Validate ¶
func (c *TensorParallelConfig) Validate() error
Validate checks that the configuration is consistent.
type TensorParallelLayer ¶
type TensorParallelLayer[T tensor.Numeric] struct { // Shards holds one weight shard per rank. Shards []*ShardedWeight[T] // Mode is the split strategy. Mode SplitMode }
TensorParallelLayer represents a single transformer linear layer that has been split across multiple ranks for tensor-parallel execution.
type TensorParallelWrapper ¶
TensorParallelWrapper coordinates tensor-parallel execution of a transformer's linear layers across N simulated or real GPU ranks.
func NewTensorParallelWrapper ¶
func NewTensorParallelWrapper[T tensor.Numeric]( config TensorParallelConfig, engines []compute.Engine[T], reducer AllReducer[T], ) (*TensorParallelWrapper[T], error)
NewTensorParallelWrapper creates a wrapper that distributes linear layers across engines (one per rank). The engines slice length must equal config.NumGPUs. The reducer is called after row-parallel layers to sum partial results.
func (*TensorParallelWrapper[T]) AddColumnParallelLayer ¶
func (w *TensorParallelWrapper[T]) AddColumnParallelLayer( weight *tensor.TensorNumeric[T], ) error
AddColumnParallelLayer splits a weight column-wise and registers the resulting shards as a new layer.
func (*TensorParallelWrapper[T]) AddRowParallelLayer ¶
func (w *TensorParallelWrapper[T]) AddRowParallelLayer( weight *tensor.TensorNumeric[T], ) error
AddRowParallelLayer splits a weight row-wise and registers the resulting shards as a new layer.
func (*TensorParallelWrapper[T]) Config ¶
func (w *TensorParallelWrapper[T]) Config() TensorParallelConfig
Config returns a copy of the tensor parallel configuration.
func (*TensorParallelWrapper[T]) ForwardLayer ¶
func (w *TensorParallelWrapper[T]) ForwardLayer( ctx context.Context, layerIdx int, rank int, input *tensor.TensorNumeric[T], ) (*tensor.TensorNumeric[T], error)
ForwardLayer executes one tensor-parallel layer on a single rank. For column-parallel: returns the rank's partial output (caller gathers). For row-parallel: returns the AllReduced full output.
func (*TensorParallelWrapper[T]) NumLayers ¶
func (w *TensorParallelWrapper[T]) NumLayers() int
NumLayers returns the number of registered parallel layers.