parallel

package
v1.37.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package parallel provides tensor and pipeline parallelism for distributing inference across multiple GPUs. Tensor parallelism splits individual linear layers across devices and synchronises with AllReduce; pipeline parallelism assigns sequential layer groups to different devices. Stability: alpha

Package parallel provides multi-GPU parallelism strategies for inference, including tensor parallelism and pipeline parallelism.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ColumnParallelLinear

func ColumnParallelLinear[T tensor.Numeric](
	ctx context.Context,
	engine compute.Engine[T],
	input *tensor.TensorNumeric[T],
	shard *ShardedWeight[T],
) (*tensor.TensorNumeric[T], error)

ColumnParallelLinear computes a column-parallel linear projection. The weight has already been split column-wise: shard shape [inFeatures, outFeatures/N]. Input shape: [batch, seqLen, inFeatures]. Output shape: [batch, seqLen, outFeatures/N] (partial; caller concatenates across ranks).

func ConcatMicroBatches

func ConcatMicroBatches[T tensor.Numeric](
	microBatches []*tensor.TensorNumeric[T],
) (*tensor.TensorNumeric[T], error)

ConcatMicroBatches concatenates micro-batch outputs along dimension 0.

func RowParallelLinear

func RowParallelLinear[T tensor.Numeric](
	ctx context.Context,
	engine compute.Engine[T],
	input *tensor.TensorNumeric[T],
	shard *ShardedWeight[T],
	reducer AllReducer[T],
) (*tensor.TensorNumeric[T], error)

RowParallelLinear computes a row-parallel linear projection followed by AllReduce to sum partial results across ranks. The weight has been split row-wise: shard shape [inFeatures/N, outFeatures]. The input must be the rank-local slice: shape [batch, seqLen, inFeatures/N]. Output shape after AllReduce: [batch, seqLen, outFeatures].

func SplitMicroBatches

func SplitMicroBatches[T tensor.Numeric](
	batch *tensor.TensorNumeric[T],
	n int,
	engine compute.Engine[T],
) ([]*tensor.TensorNumeric[T], error)

SplitMicroBatches splits a batch tensor along dimension 0 into n micro-batches. The batch size (dimension 0) must be divisible by n.

Types

type AllReducer

type AllReducer[T tensor.Numeric] interface {
	// AllReduceSum sums the tensor across all ranks and returns the result.
	// The returned tensor replaces the input on each rank.
	AllReduceSum(ctx context.Context, t *tensor.TensorNumeric[T]) (*tensor.TensorNumeric[T], error)
}

AllReducer performs an element-wise sum reduction across devices. Implementations may use NCCL for real multi-GPU, or a simple in-process sum for CPU-based testing.

type LayerFunc

type LayerFunc[T tensor.Numeric] func(layerIdx int, input *tensor.TensorNumeric[T], engine compute.Engine[T]) (*tensor.TensorNumeric[T], error)

LayerFunc is a function that processes a single transformer layer. It receives the layer index, the input activation tensor, and the engine for the stage that owns the layer. It returns the output activation.

type PipelineExecutor

type PipelineExecutor[T tensor.Numeric] struct {
	// contains filtered or unexported fields
}

PipelineExecutor runs micro-batches through the pipeline stages. Each stage has its own engine (typically mapped to a different GPU).

func NewPipelineExecutor

func NewPipelineExecutor[T tensor.Numeric](
	scheduler *PipelineScheduler,
	engines []compute.Engine[T],
	layerFn LayerFunc[T],
) (*PipelineExecutor[T], error)

NewPipelineExecutor creates an executor with one engine per stage. The engines slice must have exactly NumStages elements.

func (*PipelineExecutor[T]) Execute

func (e *PipelineExecutor[T]) Execute(ctx context.Context, microBatches []*tensor.TensorNumeric[T]) ([]*tensor.TensorNumeric[T], error)

Execute runs pipeline-parallel inference on the given micro-batches. microBatches must have exactly MicroBatchSize elements. Returns the output activation for each micro-batch after the final stage.

type PipelineParallelConfig

type PipelineParallelConfig struct {
	// NumStages is the number of pipeline stages (typically one per GPU).
	NumStages int
	// NumLayers is the total number of transformer layers in the model.
	NumLayers int
	// MicroBatchSize is the number of micro-batches to split the input into.
	// More micro-batches reduce the bubble ratio but increase scheduling overhead.
	MicroBatchSize int
}

PipelineParallelConfig configures pipeline parallelism across multiple GPUs.

func (PipelineParallelConfig) Validate

func (c PipelineParallelConfig) Validate() error

Validate checks that the configuration is well-formed.

type PipelineScheduler

type PipelineScheduler struct {
	// contains filtered or unexported fields
}

PipelineScheduler computes the execution schedule for pipeline parallelism. It determines the order of (stage, micro-batch) pairs for both forward passes (inference) and backward passes (training, not yet implemented).

func NewPipelineScheduler

func NewPipelineScheduler(cfg PipelineParallelConfig) (*PipelineScheduler, error)

NewPipelineScheduler creates a scheduler for the given configuration.

func (*PipelineScheduler) Assignment

func (s *PipelineScheduler) Assignment() StageAssignment

Assignment returns the layer-to-stage assignment.

func (*PipelineScheduler) BubbleRatio

func (s *PipelineScheduler) BubbleRatio() float64

BubbleRatio computes the fraction of idle time (bubbles) in the pipeline. For GPipe with S stages and M micro-batches:

bubble_ratio = (S - 1) / (S + M - 1)

A lower ratio means better GPU utilization. With 4 stages and 16 micro-batches, the bubble ratio is 3/19 ~ 15.8%.

func (*PipelineScheduler) ForwardSchedule

func (s *PipelineScheduler) ForwardSchedule() [][]ScheduleStep

ForwardSchedule returns the GPipe-style forward schedule. Each step pairs a stage with a micro-batch index. In GPipe, all micro-batches flow through stage 0 first, then stage 1, etc., but micro-batches can overlap across stages. The schedule is ordered by clock cycle.

For S stages and M micro-batches, the schedule has S+M-1 clock cycles. At each clock cycle c, all stages s where 0 <= c-s < M execute concurrently.

type ScheduleStep

type ScheduleStep struct {
	Stage      int
	MicroBatch int
}

ScheduleStep represents one unit of work: process a micro-batch on a stage.

type ShardedWeight

type ShardedWeight[T tensor.Numeric] struct {
	// Shard is the weight slice assigned to this rank.
	Shard *tensor.TensorNumeric[T]
	// Rank is the device index for this shard.
	Rank int
	// Mode indicates column or row split.
	Mode SplitMode
}

ShardedWeight holds one device's slice of a weight matrix along with metadata about the split.

func SplitLinearColumnWise

func SplitLinearColumnWise[T tensor.Numeric](
	engine compute.Engine[T],
	weight *tensor.TensorNumeric[T],
	numShards int,
) ([]*ShardedWeight[T], error)

SplitLinearColumnWise splits a 2-D weight tensor [inFeatures, outFeatures] along the output dimension into numShards equal parts. Each shard has shape [inFeatures, outFeatures/numShards].

func SplitLinearRowWise

func SplitLinearRowWise[T tensor.Numeric](
	engine compute.Engine[T],
	weight *tensor.TensorNumeric[T],
	numShards int,
) ([]*ShardedWeight[T], error)

SplitLinearRowWise splits a 2-D weight tensor [inFeatures, outFeatures] along the input dimension into numShards equal parts. Each shard has shape [inFeatures/numShards, outFeatures].

type SplitMode

type SplitMode int

SplitMode describes how a weight matrix is partitioned.

const (
	// ColumnSplit splits the weight along the output (column) dimension.
	// Each shard computes a slice of the output; results are concatenated.
	ColumnSplit SplitMode = iota
	// RowSplit splits the weight along the input (row) dimension.
	// Each shard computes a partial sum; results are reduced via AllReduce.
	RowSplit
)

type StageAssignment

type StageAssignment struct {
	// StageForLayer maps layer index to stage index.
	StageForLayer []int
	// LayersPerStage stores the layer indices assigned to each stage.
	LayersPerStage [][]int
}

StageAssignment maps transformer layers to pipeline stages.

func AssignLayers

func AssignLayers(numLayers, numStages int) StageAssignment

AssignLayers distributes transformer layers across pipeline stages as evenly as possible. Remainder layers are distributed to the first stages.

type SumAllReducer

type SumAllReducer[T tensor.Numeric] struct {
	// contains filtered or unexported fields
}

SumAllReducer is a simple in-process AllReducer that sums partials from all ranks. It is intended for single-process CPU testing where multiple "ranks" are simulated with separate engine instances operating on partitioned tensors.

func NewSumAllReducer

func NewSumAllReducer[T tensor.Numeric](engine compute.Engine[T], numRanks int) *SumAllReducer[T]

NewSumAllReducer creates an AllReducer that sums tensors in-process. numRanks is the number of partials expected per reduction.

func (*SumAllReducer[T]) AddPartial

func (r *SumAllReducer[T]) AddPartial(t *tensor.TensorNumeric[T])

AddPartial registers a partial result from one rank. Once all partials are registered, AllReduceSum can be called.

func (*SumAllReducer[T]) AllReduceSum

func (r *SumAllReducer[T]) AllReduceSum(ctx context.Context, t *tensor.TensorNumeric[T]) (*tensor.TensorNumeric[T], error)

AllReduceSum returns the element-wise sum of all registered partials. After the call, partials are cleared automatically.

func (*SumAllReducer[T]) Reset

func (r *SumAllReducer[T]) Reset()

Reset clears accumulated partials for the next reduction round.

type TensorParallelConfig

type TensorParallelConfig struct {
	// NumGPUs is the number of devices to split layers across.
	NumGPUs int
	// DeviceIDs identifies each device. Length must equal NumGPUs.
	DeviceIDs []int
}

TensorParallelConfig configures tensor parallelism across GPUs.

func (*TensorParallelConfig) Validate

func (c *TensorParallelConfig) Validate() error

Validate checks that the configuration is consistent.

type TensorParallelLayer

type TensorParallelLayer[T tensor.Numeric] struct {
	// Shards holds one weight shard per rank.
	Shards []*ShardedWeight[T]
	// Mode is the split strategy.
	Mode SplitMode
}

TensorParallelLayer represents a single transformer linear layer that has been split across multiple ranks for tensor-parallel execution.

type TensorParallelWrapper

type TensorParallelWrapper[T tensor.Numeric] struct {
	// contains filtered or unexported fields
}

TensorParallelWrapper coordinates tensor-parallel execution of a transformer's linear layers across N simulated or real GPU ranks.

func NewTensorParallelWrapper

func NewTensorParallelWrapper[T tensor.Numeric](
	config TensorParallelConfig,
	engines []compute.Engine[T],
	reducer AllReducer[T],
) (*TensorParallelWrapper[T], error)

NewTensorParallelWrapper creates a wrapper that distributes linear layers across engines (one per rank). The engines slice length must equal config.NumGPUs. The reducer is called after row-parallel layers to sum partial results.

func (*TensorParallelWrapper[T]) AddColumnParallelLayer

func (w *TensorParallelWrapper[T]) AddColumnParallelLayer(
	weight *tensor.TensorNumeric[T],
) error

AddColumnParallelLayer splits a weight column-wise and registers the resulting shards as a new layer.

func (*TensorParallelWrapper[T]) AddRowParallelLayer

func (w *TensorParallelWrapper[T]) AddRowParallelLayer(
	weight *tensor.TensorNumeric[T],
) error

AddRowParallelLayer splits a weight row-wise and registers the resulting shards as a new layer.

func (*TensorParallelWrapper[T]) Config

Config returns a copy of the tensor parallel configuration.

func (*TensorParallelWrapper[T]) ForwardLayer

func (w *TensorParallelWrapper[T]) ForwardLayer(
	ctx context.Context,
	layerIdx int,
	rank int,
	input *tensor.TensorNumeric[T],
) (*tensor.TensorNumeric[T], error)

ForwardLayer executes one tensor-parallel layer on a single rank. For column-parallel: returns the rank's partial output (caller gathers). For row-parallel: returns the AllReduced full output.

func (*TensorParallelWrapper[T]) NumLayers

func (w *TensorParallelWrapper[T]) NumLayers() int

NumLayers returns the number of registered parallel layers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL