source

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package source provides built-in partition source implementations.

Partition sources discover available partitions for assignment. The package includes:

  • Static: Fixed list of partitions defined at compile time.
  • NatsKV: Dynamic partition list backed by a NATS KeyValue bucket, supporting live updates via KV watches (implements PartitionSource, PartitionUpdater, and WatchablePartitionSource).

Custom sources can be implemented by satisfying the types.PartitionSource interface.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type NatsKV

type NatsKV struct {
	// contains filtered or unexported fields
}

NatsKV implements a partition source backed by a NATS KeyValue bucket.

It watches a specific key in the KV bucket for updates to the partition list. This allows dynamic partition updates without restarting workers.

func NewNatsKV

func NewNatsKV(kv jetstream.KeyValue, key string, logger types.Logger) *NatsKV

NewNatsKV creates a new NATS KV-based partition source.

Parameters:

  • kv: The NATS KeyValue bucket to use
  • key: The key where partitions are stored (JSON encoded)
  • logger: Optional logger

func (*NatsKV) List

func (s *NatsKV) List(_ context.Context) ([]types.Partition, error)

List returns the current list of partitions.

func (*NatsKV) Start

func (s *NatsKV) Start(ctx context.Context) error

Start initializes the source and starts watching for updates.

func (*NatsKV) Stop

func (s *NatsKV) Stop(_ context.Context) error

Stop stops the watcher.

func (*NatsKV) Update

func (s *NatsKV) Update(ctx context.Context, partitions []types.Partition) error

Update updates the partition list in the KV bucket.

Note: NATS KV buckets have a default value size limit of 1MB (MaxMsgSize). To support large partition lists (e.g., 2000+ partitions), this method automatically compresses the data using Gzip before storing it.

If the compressed size still exceeds the limit, the update will fail.

func (*NatsKV) Watch

func (s *NatsKV) Watch(ctx context.Context) <-chan struct{}

Watch returns a channel that emits a signal when the partition list changes.

type Static

type Static struct {
	// contains filtered or unexported fields
}

Static implements a partition source with a fixed list of partitions.

func NewStatic

func NewStatic(partitions []types.Partition) *Static

NewStatic creates a new static partition source.

The source returns a fixed list of partitions that never changes. Useful for testing and scenarios where partitions are known at startup.

Parameters:

  • partitions: Fixed list of partitions

Returns:

  • *Static: Initialized static source

Example:

partitions := []types.Partition{
    {Keys: []string{"tool001", "chamber1"}, Weight: 100},
    {Keys: []string{"tool001", "chamber2"}, Weight: 150},
}
src := source.NewStatic(partitions)
js, _ := jetstream.New(conn)
mgr, err := parti.NewManager(&cfg, js, src, strategy.NewConsistentHash())
if err != nil { /* handle */ }

func (*Static) List

func (s *Static) List(_ context.Context) ([]types.Partition, error)

List returns the static list of partitions.

Returns:

  • []types.Partition: The fixed list of partitions
  • error: Always nil (never fails)

func (*Static) Start

func (s *Static) Start(_ context.Context) error

Start implements PartitionSource.Start. For Static source, it validates the static partitions.

func (*Static) Stop

func (s *Static) Stop(_ context.Context) error

Stop implements PartitionSource.Stop. For Static source, this is a no-op.

func (*Static) Update

func (s *Static) Update(_ context.Context, partitions []types.Partition) error

Update updates the partition list.

This allows the static source to simulate dynamic partition changes, which is useful for testing partition refresh scenarios.

Parameters:

  • ctx: Context for the operation (unused)
  • partitions: New list of partitions

Returns:

  • error: Always nil

Example:

src := source.NewStatic(initialPartitions)
// Later: add more partitions
src.Update(context.Background(), expandedPartitions)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL