kubejob

package module
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2024 License: MIT Imports: 41 Imported by: 2

README

kubejob

PkgGoDev Go codecov

A library for managing Kubernetes Job in Go

Features

  • Creates a Kubernetes Job
  • Run and wait for Kubernetes Job
  • Capture logs of Kubernetes Job
  • Can use context.Context to run Kubernetes Job
  • Delayed execution of Kubernetes Job ( also support Sidecar pattern )
  • Can control execution order ( and timing ) of command for multiple containers at Job
  • Copy any files or directory between local file system and container in Job
  • Can insert any process before the process of the init container
  • Automatically clean up Kubernetes Job

Installation

$ go get github.com/goccy/kubejob

Synopsis

Simply create and run and wait Kubernetes Job


import (
  "github.com/goccy/kubejob"
  batchv1 "k8s.io/api/batch/v1"
  corev1 "k8s.io/api/core/v1"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  "k8s.io/client-go/rest"
)

var cfg *rest.Config // = rest.InClusterConfig() assign *rest.Config value to cfg

job, err := kubejob.NewJobBuilder(cfg, "default").BuildWithJob(&batchv1.Job{
  ObjectMeta: metav1.ObjectMeta{
    GenerateName: "kubejob-",
  },
  Spec: batchv1.JobSpec{
    Template: corev1.PodTemplateSpec{
      Spec: corev1.PodSpec{
        Containers: []corev1.Container{
          {
            Name:    "test",
            Image:   "golang:1.21.0-bookworm",
            Command: []string{"echo", "hello"},
          },
        },
      },
    },
  },
})
if err != nil {
  panic(err)
}

// Run KubernetesJob and output log to stdout.
// If Job doesn't exit status by `0` , `Run()` returns error.
if err := job.Run(context.Background()); err != nil {
	panic(err)
}

Manage execution timing

If you don't want to execute the Job immediately after the Pod is Running state, you can delay the execution timing. Also, this feature can be used if the Job has multiple containers and you want to control their execution order.

ctx := context.Background()
if err := job.RunWithExecutionHandler(ctx, func(executors []*kubejob.JobExecutor) error {
  // callback when the Pod is in Running status.
  // `executors` corresponds to the list of `Containers` specified in `JobSpec`
  for _, exec := range executors { 
    // `Exec()` executes the command
    out, err := exec.Exec()
    if err != nil {
      panic(err)
    }
    fmt.Println(string(out), err)
  }
  return nil
})

Manage execution with Sidecar

job, err := kubejob.NewJobBuilder(cfg, "default").BuildWithJob(&batchv1.Job{
  ObjectMeta: metav1.ObjectMeta{
    GenerateName: "kubejob-",
  },
  Spec: batchv1.JobSpec{
    Template: corev1.PodTemplateSpec{
      Spec: corev1.PodSpec{
        Containers: []corev1.Container{
          {
            Name:    "main",
            Image:   "golang:1.21.0-bookworm",
            Command: []string{"echo", "hello"},
          },
          {
            Name:    "sidecar",
            Image:   "nginx:latest",
            Command: []string{"nginx"},
          },
        },
      },
    },
  },
})
if err != nil {
  panic(err)
}
if err := job.RunWithExecutionHandler(context.Background(), func(executors []*kubejob.JobExecutor) error {
  for _, exec := range executors {
    if exec.Container.Name == "sidecar" {
      // `ExecAsync()` executes the command and doesn't wait finished.
      exec.ExecAsync()
    } else {
      out, err := exec.Exec()
      if err != nil {
        panic(err)
      }
      fmt.Pritln(string(out))
    }
  }
  return nil
})

Execution with kubejob-agent

Normally, copying and executing commands using JobExecutor is performed using the shell or tar command in the container image and using the Kubernetes API ( pods/exec ). However, if you need to copy large files or run a large number of commands and don't want to overload the Kubernetes API, or if you don't have a shell or tar command in your container image, you can use the kubejob-agent method.

kubejob-agent is a CLI tool that can be installed by the following methods.

$ go install github.com/goccy/kubejob/cmd/kubejob-agent

Include this tool in your container image, create kubejob.AgentConfig with the path where kubejob-agent is located, and give it to kubejob.Job as follows:

agentConfig, err := kubejob.NewAgentConfig(map[string]string{
  "container-name": filepath.Join("/", "bin", "kubejob-agent"),
})
if err != nil {
  return err
}
job.UseAgent(agentConfig)

This will switch from communication using the Kubernetes API to gRPC-based communication with kubejob-agent. Communication with kubejob-agent is performed using JWT issued using the RSA Key issued each time Kubernetes Job is started, so requests cannot be sent directly to the container from other processes.

Requirements

Role

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: kubejob
rules:
  - apiGroups:
      - batch
    resources:
      - jobs
    verbs:
      - create
      - delete
  - apiGroups:
      - ""
    resources:
      - pods
    verbs:
      - get
      - list
      - watch
      - delete
  - apiGroups:
      - ""
    resources:
      - pods/log
    verbs:
      - get
      - watch
  - apiGroups:
      - ""
    resources:
      - pods/exec
    verbs:
      - create # required when using kubejob.JobExecutor without kubejob-agent

Tools

cmd/kubejob

Installation
go install github.com/goccy/kubejob/cmd/kubejob
Usage:
  kubejob [OPTIONS]

Application Options:
  -n, --namespace= specify namespace (default: default)
  -f, --file=      specify yaml or json file for written job definition
  -i, --image=     specify container image

Help Options:
  -h, --help       Show this help message
Example
$ kubejob --image golang:1.21.0-bookworm -- go version
go version
go version go1.21.0 linux/amd64

Documentation

Index

Constants

View Source
const (
	SelectorLabel        = "kubejob.io/id"
	DefaultJobName       = "kubejob-"
	DefaultContainerName = "kubejob"
)

Variables

View Source
var (
	ExecRetryCount = 8
)

Functions

This section is empty.

Types

type AgentClient added in v0.3.0

type AgentClient struct {
	// contains filtered or unexported fields
}

func NewAgentClient added in v0.3.0

func NewAgentClient(agentServerPod *corev1.Pod, listenPort uint16, workingDir, signedToken string) (*AgentClient, error)

func (*AgentClient) CopyFrom added in v0.3.0

func (c *AgentClient) CopyFrom(ctx context.Context, srcPath, dstPath string) error

func (*AgentClient) CopyTo added in v0.3.0

func (c *AgentClient) CopyTo(ctx context.Context, srcPath, dstPath string) error

func (*AgentClient) Exec added in v0.3.0

func (c *AgentClient) Exec(ctx context.Context, command []string, env []corev1.EnvVar) (*AgentExecResult, error)

func (*AgentClient) Stop added in v0.3.0

func (c *AgentClient) Stop(ctx context.Context) error

type AgentConfig added in v0.3.0

type AgentConfig struct {
	// contains filtered or unexported fields
}

func NewAgentConfig added in v0.3.0

func NewAgentConfig(containerNameToInstalledPathMap map[string]string) (*AgentConfig, error)

func (*AgentConfig) Enabled added in v0.3.1

func (c *AgentConfig) Enabled(containerName string) bool

func (*AgentConfig) InstalledPath added in v0.3.1

func (c *AgentConfig) InstalledPath(containerName string) string

func (*AgentConfig) IssueJWT added in v0.3.0

func (c *AgentConfig) IssueJWT() ([]byte, error)

func (*AgentConfig) NewAllocatedPort added in v0.3.0

func (c *AgentConfig) NewAllocatedPort() (uint16, error)

func (*AgentConfig) PublicKeyEnv added in v0.3.0

func (c *AgentConfig) PublicKeyEnv() corev1.EnvVar

func (*AgentConfig) SetAllocationStartPort added in v0.3.0

func (c *AgentConfig) SetAllocationStartPort(port uint16)

func (*AgentConfig) SetExcludePorts added in v0.3.0

func (c *AgentConfig) SetExcludePorts(ports ...uint16)

func (*AgentConfig) SetTimeout added in v0.5.0

func (c *AgentConfig) SetTimeout(timeout time.Duration)

func (*AgentConfig) Timeout added in v0.5.0

func (c *AgentConfig) Timeout() string

type AgentExecResult added in v0.3.0

type AgentExecResult struct {
	Output         string
	Success        bool
	ExitCode       int32
	ErrorMessage   string
	ElapsedTimeSec int64
}

type AgentServer added in v0.3.0

type AgentServer struct {
	*agent.UnimplementedAgentServer
	// contains filtered or unexported fields
}

func NewAgentServer added in v0.3.0

func NewAgentServer(port uint16) *AgentServer

func (*AgentServer) CopyFrom added in v0.3.0

func (*AgentServer) CopyTo added in v0.3.0

func (s *AgentServer) CopyTo(stream agent.Agent_CopyToServer) error

func (*AgentServer) Exec added in v0.3.0

func (*AgentServer) Finish added in v0.3.0

func (*AgentServer) Run added in v0.3.0

func (s *AgentServer) Run(ctx context.Context) error

type CleanupError added in v0.2.9

type CleanupError struct {
	JobName string
	Errs    []error
}

func (*CleanupError) Error added in v0.2.9

func (e *CleanupError) Error() string

type CommandError added in v0.2.9

type CommandError struct {
	Message   string
	ReaderErr error
	WriterErr error
}

func (*CommandError) Error added in v0.2.9

func (e *CommandError) Error() string

func (*CommandError) IsExitError added in v0.2.9

func (e *CommandError) IsExitError() bool

type ContainerLog

type ContainerLog struct {
	Pod        *corev1.Pod
	Container  corev1.Container
	Log        string
	IsFinished bool
}

type ContainerLogger

type ContainerLogger func(*ContainerLog)

type CopyError added in v0.2.9

type CopyError struct {
	SrcPath string
	DstPath string
	Err     error
}

func (*CopyError) Error added in v0.2.9

func (e *CopyError) Error() string

type ErrorType added in v0.3.3

type ErrorType int
const (
	FailedJobType ErrorType = iota
	CleanupErrorType
	LogStreamErrorType
	JobCreationErrorType
	JobWatchErrorType
	ValidationErrorType
	PreInitErrorType
	CommandErrorType
	JobStopContainerErrorType
	PendingPhaseTimeoutErrorType
	CopyErrorType
	JobUnexpectedErrorType
)

type FailedJob

type FailedJob struct {
	Pod    *corev1.Pod
	Reason error
}

func (*FailedJob) Error

func (j *FailedJob) Error() string

func (*FailedJob) FailedContainerNames

func (j *FailedJob) FailedContainerNames() []string

func (*FailedJob) FailedContainers

func (j *FailedJob) FailedContainers() []corev1.Container

type Job

type Job struct {
	*batchv1.Job
	// contains filtered or unexported fields
}

func (*Job) DisableCommandLog

func (j *Job) DisableCommandLog()

func (*Job) DisableContainerLog

func (j *Job) DisableContainerLog()

func (*Job) DisableInitCommandLog

func (j *Job) DisableInitCommandLog()

func (*Job) DisableInitContainerLog

func (j *Job) DisableInitContainerLog()

func (*Job) PreInit added in v0.2.9

func (j *Job) PreInit(c corev1.Container, cb func(ctx context.Context, exec *JobExecutor) error)

PreInit can define the process you want to execute before the process of init container specified at the time of starting Job. It is mainly intended to be used when you use `(*JobExecutor).CopyToFile` to copy an arbitrary file to pod before init processing.

func (*Job) Run

func (j *Job) Run(ctx context.Context) (e error)

func (*Job) RunWithExecutionHandler

func (j *Job) RunWithExecutionHandler(ctx context.Context, handler JobExecutionHandler, finalizer *JobFinalizer) error

RunWithExecutionHandler allows control timing the container commands are executed.

finalizer: specify the container to call after executing JobExecutionHandler. If a sidecar is explicitly specified, it can be stopped after the execution handler has finished, but this will not work if an injected container is present. If SetDeletePropagationPolicy is set, kubejob itself deletes the Pod by the forced termination process, so the Job can be terminated even if an injected container exists. However, if kubejob itself does not delete Pods, the forced termination process cannot be executed either. Therefore, by specifying the finalizer container, you can explicitly terminate the injected container.

func (*Job) SetContainerLogger

func (j *Job) SetContainerLogger(logger ContainerLogger)

func (*Job) SetDeletePropagationPolicy added in v0.4.0

func (j *Job) SetDeletePropagationPolicy(propagation metav1.DeletionPropagation)

func (*Job) SetInitContainerExecutionHandler added in v0.2.9

func (j *Job) SetInitContainerExecutionHandler(handler JobInitContainerExecutionHandler) error

func (*Job) SetLogLevel added in v0.2.9

func (j *Job) SetLogLevel(level LogLevel)

func (*Job) SetLogger

func (j *Job) SetLogger(logger Logger)

func (*Job) SetPendingPhaseTimeout added in v0.2.11

func (j *Job) SetPendingPhaseTimeout(timeout time.Duration)

SetPendingPhaseTimeout set the timeout when the process in the init container is finished but it does not switch to the Running phase ( PodInitializing state for a long time ).

func (*Job) SetWatchTimeoutSecond added in v0.5.0

func (j *Job) SetWatchTimeoutSecond(watchTimeoutSecond int64)

func (*Job) UseAgent added in v0.3.0

func (j *Job) UseAgent(agentCfg *AgentConfig)

UseAgent when you use RunWithExecutionHandler, kubejob replace specified commands with a wait loop command to control when the command is executed. If the kubejob-agent is present in the container, you can specify its path and the port to listen to so that it will wait using the kubejob-agent.

type JobBuilder

type JobBuilder struct {
	// contains filtered or unexported fields
}

func NewJobBuilder

func NewJobBuilder(config *rest.Config, namespace string) *JobBuilder

func (*JobBuilder) Build

func (b *JobBuilder) Build() (*Job, error)

func (*JobBuilder) BuildWithJob

func (b *JobBuilder) BuildWithJob(jobSpec *batchv1.Job) (*Job, error)

func (*JobBuilder) BuildWithReader

func (b *JobBuilder) BuildWithReader(r io.Reader) (*Job, error)

func (*JobBuilder) SetCommand

func (b *JobBuilder) SetCommand(cmd []string) *JobBuilder

func (*JobBuilder) SetImage

func (b *JobBuilder) SetImage(image string) *JobBuilder

type JobCreationError added in v0.2.9

type JobCreationError struct {
	JobName         string
	JobGenerateName string
	Err             error
}

func (*JobCreationError) Error added in v0.2.9

func (e *JobCreationError) Error() string

type JobExecutionHandler

type JobExecutionHandler func(context.Context, []*JobExecutor) error

type JobExecutor

type JobExecutor struct {
	Container    corev1.Container
	ContainerIdx int
	Pod          *corev1.Pod
	// contains filtered or unexported fields
}

func (*JobExecutor) CopyFromPod added in v0.2.9

func (e *JobExecutor) CopyFromPod(ctx context.Context, srcPath, dstPath string) error

CopyFromPod copy directory or files from specified path on Pod.

func (*JobExecutor) CopyToPod added in v0.2.9

func (e *JobExecutor) CopyToPod(ctx context.Context, srcPath, dstPath string) error

CopyToPod copy directory or files to specified path on Pod.

func (*JobExecutor) EnabledAgent added in v0.3.1

func (e *JobExecutor) EnabledAgent() bool

func (*JobExecutor) Exec

func (e *JobExecutor) Exec(ctx context.Context) ([]byte, error)

func (*JobExecutor) ExecAsync

func (e *JobExecutor) ExecAsync(ctx context.Context) error

func (*JobExecutor) ExecOnly added in v0.2.2

func (e *JobExecutor) ExecOnly(ctx context.Context) ([]byte, error)

func (*JobExecutor) ExecPrepareCommand added in v0.2.5

func (e *JobExecutor) ExecPrepareCommand(ctx context.Context, cmd []string) ([]byte, error)

func (*JobExecutor) IsRunning added in v0.2.4

func (e *JobExecutor) IsRunning() bool

func (*JobExecutor) Stop

func (e *JobExecutor) Stop() error

func (*JobExecutor) TerminationLog added in v0.2.14

func (e *JobExecutor) TerminationLog(log string) error

type JobFinalizer added in v0.5.0

type JobFinalizer struct {
	Container corev1.Container
	Handler   JobFinalizerHandler
}

type JobFinalizerHandler added in v0.5.0

type JobFinalizerHandler func(context.Context, *JobExecutor) error

type JobInitContainerExecutionHandler added in v0.2.9

type JobInitContainerExecutionHandler func(context.Context, *JobExecutor) error

type JobMultiError added in v0.3.3

type JobMultiError struct {
	Errs []error
}

func (*JobMultiError) Error added in v0.3.3

func (e *JobMultiError) Error() string

func (*JobMultiError) Has added in v0.3.3

func (e *JobMultiError) Has(typ ErrorType) bool

type JobStopContainerError added in v0.2.9

type JobStopContainerError struct {
	Reason error
}

func (*JobStopContainerError) Error added in v0.2.9

func (e *JobStopContainerError) Error() string

type JobUnexpectedError added in v0.3.3

type JobUnexpectedError struct {
	Pod *corev1.Pod
}

func (*JobUnexpectedError) Error added in v0.3.3

func (e *JobUnexpectedError) Error() string

type JobWatchError added in v0.2.9

type JobWatchError struct {
	JobName string
	Err     error
}

func (*JobWatchError) Error added in v0.2.9

func (e *JobWatchError) Error() string

type LogLevel added in v0.2.9

type LogLevel int
const (
	LogLevelNone LogLevel = iota
	LogLevelError
	LogLevelWarn
	LogLevelInfo
	LogLevelDebug
)

func (LogLevel) String added in v0.2.9

func (l LogLevel) String() string

type LogStreamError added in v0.2.9

type LogStreamError struct {
	JobName       string
	Pod           *corev1.Pod
	Container     corev1.Container
	InitContainer bool
	Err           error
}

func (*LogStreamError) Error added in v0.2.9

func (e *LogStreamError) Error() string

type Logger

type Logger func(string)

type PendingPhaseTimeoutError added in v0.2.11

type PendingPhaseTimeoutError struct {
	Timeout   time.Duration
	StartedAt time.Time
}

func (*PendingPhaseTimeoutError) Error added in v0.2.11

func (e *PendingPhaseTimeoutError) Error() string

type PreInitError added in v0.2.9

type PreInitError struct {
	Err error
}

func (*PreInitError) Error added in v0.2.9

func (e *PreInitError) Error() string

type ValidationError added in v0.2.9

type ValidationError struct {
	Required string
	Err      error
}

func (*ValidationError) Error added in v0.2.9

func (e *ValidationError) Error() string

Directories

Path Synopsis
cmd
tools module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL