konductor

module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: Apache-2.0

README ΒΆ

konductor

Konductor

Kubernetes operator for workflow coordination and job orchestration. Synchronize Kubernetes Jobs, coordinate multi-stage pipelines, and manage complex workflows in your cluster.

🌐 Visit Project Page

Why Konductor?

Kubernetes Jobs are powerful but lack built-in coordination. When you need to:

  • Wait for multiple Jobs to complete before starting the next stage
  • Prevent CronJobs from overlapping when they run longer than their schedule
  • Ensure only one Job runs database migrations across multiple replicas
  • Limit how many batch Jobs run concurrently to avoid overwhelming your cluster
  • Coordinate Pods with each other using CLI or SDK

Konductor provides simple primitives to solve these problems natively in Kubernetes.

Native Kubernetes Integration

  • CRDs for declarative workflow definition
  • Works seamlessly with Jobs, CronJobs, and Pods
  • No external dependencies or services required

Simple and Lightweight

  • Single operator deployment
  • Minimal resource overhead
  • Easy to understand primitives

Flexible Usage

  • CLI for shell scripts and initContainers
  • SDK for application-level integration
  • kubectl for manual operations

Production Ready

  • Automatic cleanup and TTL expiration
  • Leader election for HA operator
  • Comprehensive observability

Features

  • Barrier - Synchronize multiple Jobs at coordination points
  • Gate - Wait for dependencies before starting Jobs
  • Lease - Singleton Job execution and leader election
  • Mutex - Mutual exclusion for critical sections
  • RWMutex - Read-write locks for concurrent reads
  • Once - One-time execution guarantee
  • WaitGroup - Dynamic worker coordination
  • Semaphore - Control concurrent Job execution
  • CLI - Command-line tool for workflow management
  • SDK - Go SDK for programmatic integration

Synchronization Primitives

Barrier

Synchronize multiple processes at a coordination point.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Pod A   │────▢│ Barrier  │◀────│  Pod B   β”‚
β”‚ (waiting)β”‚     β”‚ Expected:β”‚     β”‚ (waiting)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚    3     β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β”‚ Arrived: β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚    2     β”‚
β”‚  Pod C   │────▢│          β”‚
β”‚(arriving)β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
                      β–Ό
              [All arrive β†’ Open]

Use Cases:

  • Multi-stage ETL pipelines (wait for all extractors before transforming)
  • Coordinated batch job execution
  • Distributed testing (wait for all services before running tests)
  • MapReduce-style workflows
Gate

Wait for multiple conditions before proceeding.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    Gate     β”‚  Conditions:
β”‚ (workflow)  β”‚  βœ“ Job "etl" = Complete
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  βœ“ Job "validation" = Complete
       β”‚         βœ— Barrier "workers" = Open
       β”‚
       β–Ό
  [All met β†’ Open]

Use Cases:

  • Job dependency management (Job B waits for Job A)
  • Complex workflow orchestration
  • Deployment gates (wait for validation before deploy)
  • Multi-job coordination
Lease

Singleton execution and leader election with automatic expiration.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Pod A   │────▢│  Lease  β”‚     β”‚  Pod B   β”‚
β”‚ (leader) β”‚     β”‚ Holder: β”‚     β”‚(standby) β”‚
β”‚          β”‚     β”‚  Pod A  β”‚     β”‚          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚ TTL: 30sβ”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
              [Expires β†’ Available]

Use Cases:

  • Singleton CronJobs (prevent overlapping executions)
  • Database migration coordination
  • One-time initialization Jobs
  • Leader election for distributed Jobs
Semaphore

Control concurrent Job execution.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Semaphore  β”‚  Permits: 3
β”‚(batch-jobs) β”‚  In-Use: 2
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  Available: 1
      β”‚
      β”œβ”€β”€β”€ [Permit 1] β†’ Job A (running)
      β”œβ”€β”€β”€ [Permit 2] β†’ Job B (running)
      └─── [Permit 3] β†’ Available

Use Cases:

  • Limit concurrent batch Jobs (e.g., max 10 Jobs at once)
  • Resource-constrained Job execution
  • Throttle parallel Job processing
  • Control cluster load from Jobs
Mutex

Mutual exclusion for critical sections.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Pod A   │────▢│  Mutex  β”‚     β”‚  Pod B   β”‚
β”‚ (holder) β”‚     β”‚ Holder: β”‚     β”‚(waiting) β”‚
β”‚          β”‚     β”‚  Pod A  β”‚     β”‚          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚ Locked  β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
              [Unlock β†’ Available]

Use Cases:

  • Database migration coordination
  • Shared resource access serialization
  • Critical section protection
  • Simple mutual exclusion (simpler than Lease)
RWMutex

Read-write locks allowing multiple concurrent readers or exclusive writer.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Reader A │────▢│ RWMutex │◀────│ Reader B β”‚
β”‚ (reading)β”‚     β”‚ Readers:β”‚     β”‚(reading) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚    2    β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 β”‚ Writer: β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚  None   β”‚
β”‚ Writer C │────▢│         β”‚
β”‚(waiting) β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β”‚
              [Readers done β†’ Writer]

Use Cases:

  • Cache coordination (multiple readers, single writer)
  • Configuration file access
  • Shared data structures
  • Read-heavy workloads

Installation

Helm
# Add the LogicIQ Helm repository
helm repo add logiciq https://logiciq.github.io/helm-charts
helm repo update

# Install konductor
helm install my-konductor logiciq/konductor

# Install with custom values
helm install my-konductor logiciq/konductor -f values.yaml

Quick Start

Using kubectl
# Create a barrier for 3 Jobs
kubectl apply -f - <<EOF
apiVersion: konductor.io/v1
kind: Barrier
metadata:
  name: stage-1-complete
spec:
  expected: 3
EOF

# Check status
kubectl get barrier stage-1-complete
Using CLI
# Install CLI
go install github.com/LogicIQ/konductor/cli@latest

# Wait for barrier in Job
koncli barrier wait stage-1-complete --timeout 30m
Using SDK
import konductor "github.com/LogicIQ/konductor/sdk/go"

client, _ := konductor.New(nil)

// Wait for dependencies
client.WaitGate(ctx, "dependencies-ready")

// Signal completion
client.ArriveBarrier(ctx, "stage-complete")

Usage Examples

Multi-Stage ETL Pipeline
apiVersion: konductor.io/v1
kind: Barrier
metadata:
  name: stage-1-complete
spec:
  expected: 3
---
apiVersion: batch/v1
kind: Job
metadata:
  name: stage-2-processor
spec:
  template:
    spec:
      initContainers:
      - name: wait-stage-1
        image: logiciq/koncli:latest
        command: ["koncli", "barrier", "wait", "stage-1-complete"]
      containers:
      - name: processor
        image: my-processor:latest
Job Dependency with Gate
apiVersion: konductor.io/v1
kind: Gate
metadata:
  name: validation-gate
spec:
  conditions:
  - type: Job
    name: data-validation
    state: Complete
---
apiVersion: batch/v1
kind: Job
metadata:
  name: data-processing
spec:
  template:
    spec:
      initContainers:
      - name: wait-validation
        image: logiciq/koncli:latest
        command: ["koncli", "gate", "wait", "validation-gate"]
      containers:
      - name: processor
        image: my-processor:latest
Singleton CronJob
apiVersion: konductor.io/v1
kind: Lease
metadata:
  name: daily-report
spec:
  ttl: 1h
---
apiVersion: batch/v1
kind: CronJob
spec:
  schedule: "0 2 * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: report
            image: my-report:latest
            command:
            - /bin/sh
            - -c
            - |
              if koncli lease acquire daily-report --holder $HOSTNAME; then
                generate-report
                koncli lease release daily-report --holder $HOSTNAME
              fi
Critical Section with Mutex
apiVersion: konductor.io/v1
kind: Mutex
metadata:
  name: db-migration
spec:
  ttl: 10m
---
apiVersion: batch/v1
kind: Job
metadata:
  name: migration
spec:
  template:
    spec:
      containers:
      - name: migrate
        image: my-app:latest
        command:
        - /bin/sh
        - -c
        - |
          if koncli mutex lock db-migration --holder $HOSTNAME --timeout 30s; then
            run-migrations
            koncli mutex unlock db-migration --holder $HOSTNAME
          fi

Documentation

Real-World Scenarios

Scenario 1: ETL Pipeline with Stages

Problem: 10 extract Jobs must complete before 5 transform Jobs can start.

Solution:

apiVersion: konductor.io/v1
kind: Barrier
metadata:
  name: extract-complete
spec:
  expected: 10

Extract Jobs signal completion, transform Jobs wait for barrier.

Scenario 2: Prevent Overlapping CronJobs

Problem: Daily report CronJob takes 2 hours but runs every hour.

Solution:

if koncli lease acquire daily-report --holder $HOSTNAME --timeout 0; then
  generate-report
  koncli lease release daily-report --holder $HOSTNAME
else
  echo "Previous job still running, skipping"
fi
Scenario 3: Job Dependencies

Problem: Processing Job needs validation Job and cleanup Job to complete first.

Solution:

apiVersion: konductor.io/v1
kind: Gate
metadata:
  name: processing-gate
spec:
  conditions:
  - type: Job
    name: validation-job
    state: Complete
  - type: Job
    name: cleanup-job
    state: Complete
Scenario 4: Limit Concurrent Batch Jobs

Problem: 100 batch Jobs would overwhelm cluster resources.

Solution:

apiVersion: konductor.io/v1
kind: Semaphore
metadata:
  name: batch-limit
spec:
  permits: 10  # Max 10 concurrent Jobs

Each Job acquires permit before starting, releases on completion.

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Kubernetes Cluster                β”‚
β”‚                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚     Konductor Operator               β”‚  β”‚
β”‚  β”‚  (Watches CRDs, Manages State)       β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚         β”‚         β”‚         β”‚         β”‚     β”‚
β”‚         β–Ό         β–Ό         β–Ό         β–Ό     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”β”Œβ”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚Semaphoreβ”‚β”‚ Barrier β”‚β”‚Lease β”‚β”‚ Gate β”‚   β”‚
β”‚  β”‚   CRD   β”‚β”‚   CRD   β”‚β”‚ CRD  β”‚β”‚ CRD  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”˜β””β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚         β–²         β–²         β–²         β–²     β”‚
β”‚         β”‚         β”‚         β”‚         β”‚     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β” β”‚
β”‚  β”‚                                        β”‚ β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚
β”‚  β”‚  β”‚  Pods  β”‚  β”‚  CLI   β”‚  β”‚   SDK   β”‚ β”‚ β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚
β”‚  β”‚                                        β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Contributing

Contributions welcome! Please read our contributing guidelines.

License

Apache 2.0

Directories ΒΆ

Path Synopsis
api
v1
Package v1 contains API Schema definitions for the sync v1 API group +kubebuilder:object:generate=true +groupName=sync.konductor.io
Package v1 contains API Schema definitions for the sync v1 API group +kubebuilder:object:generate=true +groupName=sync.konductor.io
sdk
go
Package konductor provides a Go SDK for Konductor coordination primitives
Package konductor provides a Go SDK for Konductor coordination primitives
go/client
Package client provides the core Konductor SDK client for interacting with coordination primitives.
Package client provides the core Konductor SDK client for interacting with coordination primitives.
go/examples
Package examples contains usage examples for the Konductor SDK.
Package examples contains usage examples for the Konductor SDK.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL