pm

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2022 License: MIT Imports: 28 Imported by: 0

README

Process Manager

SYNOPSIS

The pm package can be used in two ways: as a command line or as an SDK.

Command Line Usage

In a command line, simply gives a procman config file. It starts each process in the config one by one in the order of their dependencies.

go install github.com/telnet2/pm/pm@latest
pm -conf pm.yaml "some command"

If an argument is given, pm starts up processes defined in the pm.yaml, executes the command given, and then stops all processes in the config.

SDK Usage

ProcMan is used as a library to manage process execution and lifecycle events. It can start, restart, and stop individual processes.

import "github.com/telnet2/pm"

pman := pm.NewProcMan()
cfg, err := pm.LoadYamlFile("pm.yaml")
if err := nil {
  log.Fatal(err)
}
pm.AddConfig(pman)

ctx := context.TODO()
err = pm.Start(ctx)
if err != nil {
  log.Fatalf("fail to start: %v", err)
}

go func() {
  // in some other go-routine ...
  err := pm.Restart("<process-id>")  // restart the process
  err = pm.Stop("<process-id>")
}()

pm.WaitDone()
pm.Shutdown(nil)

DESCRIPTION

Project File
services:
  task1:
    command: "for x in {0..3}; do echo $x; sleep 1; done"
    ready_condition:
      stdout_message: "3"
  task2:
    command: echo "ls done"
    depends_on:
      - task1
  task3:
    command: env
    environment:
      - A=B
      - C=D
    env_file: ./config_test.env # relative to this config file or the conf dir found in the context

COMPONENTS

pubsub Package

A publisher-subscriber pattern for go-routines. This pubsub package is used to deliver events and logs from a process to other logics.

A Publisher is created to manage its Subscribers. A subscriber is simply a channel that receives messages from the publisher. Compared to using a simple channel, this package has following advantages.

  • delivery to multiple listeners / subscribers
  • lazy joining to publishers
  • easy to use
  • publishing timeout
  • receiving message conditionally
  • all subscriber channels are closed when publisher is closed.

Instead of exchanging interface{}, a few publisher types are created such as StringPublisher, ProcEventPublisher.

USAGE

// creates a publisher with 1s timeout and the buffer size of 10 for subscribers.
pub := NewStringPublisher(time.Second, 10)

// spawn a go-routine A
var sub1 chan string = pub.Subscribe()
for s := range sub1 {
    // process message s
}
// exits the for-loop when the sub1 is closed 

// in another go-routine B
sub2 := pub.Subscribe()
for s := range sub2 {
    // receive the same messages with sub1
}
cmd Package

cmd package provides powerful control over a process lifecycle and its outputs (stdout/stderr). Compared to exec.Command, it helps to obtain the exit status correctly, kills the process group instead of killing the executed process only, streams the stdout/stderr outputs to channels, and provides event mechanism to monitor the process lifecycle events (start, running, stopped).

This cmd package is originally copied from https://github.com/go-cmd/cmd. It has more customization adding LogDir to store outputs as log files conveiniently, using the above pubsub pattern in publishing stdout/stderr outputs, and executing a command via sh -c or bash -c. Since it is highly integrated with procman, we expect more customization to come to support future features.

USAGE

count = cmd.NewCmdOptions(
			cmd.Options{Streaming: true},
			"count",
			"for i in {0..3}; do echo $i; done",
		)
count.Dir = runnable.Dir
count.Env = runnable.Env
count.LogDir = "/tmp/count_log_dir"

logCh := count.StdoutPub.Subscribe()

// start the count process asynchronously
_ = count.Start()

// print stdout of count
for l := range logCh {
    fmt.Println(l)
}
// process count is done

// Print out the exit code
fmt.Println("Exit:", count.Status().Exit)
Executor
ReadyChecker

We support two types of ready checkers. ReadyChecker implements various ready checkers.

How to implement a custom ready checker?

TODO

  • Star / restart / stop shell commands.
  • Launch in the order of the dependencies.
    • All stops if a dependent service fails to launch
  • Support various ready checker.
    • Log message
    • Http query
    • File tail
    • MySQL database / table check
  • Configuration to describe a set of commands to run.
  • Can run as a daemon.
  • Can be used as an SDK.
    • Monitoring logs using Pubsub pattern.
    • Monitoring command status using Pubsub pattern.

Documentation

Overview

Package pm provides a process manager based on github.com/telnet2/pm/cmd. The `pm` manages the execution of multiple processes in the order of dependencies.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CtxConfDir

func CtxConfDir(ctx context.Context) string

func CtxLogDir

func CtxLogDir(ctx context.Context) string

func ExpandEnvs

func ExpandEnvs(r *Runnable)

ExpandEnvs expands env vars used in this runnable.

func GoFunc

func GoFunc(fun func())

GoFunc starts a given `fun` in a new go-routine and waits until the go-routine starts.

func WithConfDir

func WithConfDir(ctx context.Context, confDir string) context.Context

func WithLogDir

func WithLogDir(ctx context.Context, logDir string) context.Context

Types

type ConfigFile

type ConfigFile struct {
	Services map[string]*Runnable `yaml:"services"`
}

func LoadYaml

func LoadYaml(ctx context.Context, r io.Reader) (*ConfigFile, error)

LoadYaml loads config from io.Reader

func LoadYamlFile

func LoadYamlFile(ctx context.Context, f string) (*ConfigFile, error)

LoadYamlFile loads a config from a given yaml file.

func (*ConfigFile) String

func (cf *ConfigFile) String() string

func (*ConfigFile) WriteFile

func (cf *ConfigFile) WriteFile(yamlFile string) error

WriteFile writes *ConfigFile into a file

type DummyChecker

type DummyChecker struct {
}

DummyChecker do nothing

func (*DummyChecker) Init

func (*DummyChecker) WaitForReady

func (*DummyChecker) WaitForReady(context.Context, *Runnable) error

type Duration

type Duration struct {
	time.Duration
}

func (*Duration) UnmarshalJSON

func (duration *Duration) UnmarshalJSON(b []byte) error

UnmarshalJSON helps to decode a string-type duration.

func (*Duration) UnmarshalYAML

func (duration *Duration) UnmarshalYAML(value *yaml.Node) error

UnmarshalYAML helps to decode a string-type duration.

type EventMap

type EventMap = map[string]struct{}

type HttpChecker

type HttpChecker struct {
	// contains filtered or unexported fields
}

func (*HttpChecker) Init

func (rc *HttpChecker) Init(ctx context.Context, _ *Runnable) error

Init does nothing.

func (*HttpChecker) WaitForReady

func (rc *HttpChecker) WaitForReady(ctx context.Context, _ *Runnable) error

WaitForReady makes an http call to the url and matches the response with the expectation.

type HttpReadyExpect

type HttpReadyExpect struct {
	Status    int               `yaml:"status,omitempty" json:"status,omitempty"`
	Headers   map[string]string `yaml:"headers,omitempty" json:"headers,omitempty"`
	BodyRegex string            `yaml:"body_regex,omitempty" json:"bodyRegex,omitempty"`
}

type HttpReadySpec

type HttpReadySpec struct {
	URL      string           `yaml:"url" json:"url,omitempty"  validate:"required" envexp:""`
	Interval time.Duration    `yaml:"interval,omitempty" json:"interval,omitempty" default:"3s"` // polling interval (default 3s)
	Expect   *HttpReadyExpect `yaml:"expect" json:"expect,omitempty"`
}

HttpReadySpec defines how to determine the readiness of the url. If `expect` field is not specified, 200 OK is considered to be ready.

type LogChecker

type LogChecker struct {
	// contains filtered or unexported fields
}

Implements ReadyChecker

func (*LogChecker) Init

func (rc *LogChecker) Init(ctx context.Context, r *Runnable) error

Init is called before the runnable starts.

func (*LogChecker) WaitForReady

func (rc *LogChecker) WaitForReady(ctx context.Context, _ *Runnable) error

WaitForReady waits until a given runnable is ready.

type MongoDbChecker

type MongoDbChecker struct {
	// contains filtered or unexported fields
}

func (*MongoDbChecker) Init

func (rc *MongoDbChecker) Init(ctx context.Context, r *Runnable) error

Init does nothing.

func (*MongoDbChecker) WaitForReady

func (rc *MongoDbChecker) WaitForReady(ctx context.Context, r *Runnable) error

WaitForReady makes an http call to the url and matches the response with the expectation.

type MongoDbReadySpec

type MongoDbReadySpec struct {
	URI      string        `yaml:"uri" json:"uri" default:"mongo://mongo:mongo@localhost:27017/admin" envexp:""`
	Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" default:"3s"` // polling interval (default 3s)
}

MongoDbReadySpec defines how to determine the readiness of a mysql connection.

type MySqlChecker

type MySqlChecker struct {
	// contains filtered or unexported fields
}

func (*MySqlChecker) Init

func (rc *MySqlChecker) Init(ctx context.Context, r *Runnable) error

Init does nothing.

func (*MySqlChecker) WaitForReady

func (rc *MySqlChecker) WaitForReady(ctx context.Context, r *Runnable) error

WaitForReady makes an http call to the url and matches the response with the expectation.

type MySqlReadySpec

type MySqlReadySpec struct {
	Network  string        `yaml:"network,omitempty" json:"network,omitempty" default:"tcp"`
	Addr     string        `yaml:"addr" json:"addr" envexp:""` // Suppor TCP only
	Database string        `yaml:"database" json:"database" envexp:""`
	User     string        `yaml:"user" json:"user" default:"root" envexp:""`
	Password string        `yaml:"password,omitempty" json:"password,omitempty" envexp:""`
	Table    string        `yaml:"table,omitempty" json:"table,omitempty" envexp:""`
	Interval time.Duration `yaml:"interval,omitempty" json:"interval,omitempty" default:"3s"` // polling interval (default 3s)
}

MySqlReadySpec defines how to determine the readiness of a mysql connection.

type ProcEvent

type ProcEvent struct {
	Id       string
	Source   *Runnable
	Event    string
	ExitCode int
}

type ProcEventPublisher

type ProcEventPublisher struct {
	// contains filtered or unexported fields
}

ProcEventPublisher is basic pub/sub structure. Allows to send events and subscribe to them. Can be safely used from multiple goroutines.

func NewProcEventPublisher

func NewProcEventPublisher(publishTimeout time.Duration, buffer int) *ProcEventPublisher

NewProcEventPublisher creates a new pub/sub publisher to broadcast messages. The duration is used as the send timeout as to not block the publisher publishing messages to other clients if one client is slow or unresponsive. The buffer is used when creating new channels for subscribers.

func (*ProcEventPublisher) Close

func (p *ProcEventPublisher) Close()

Close closes the channels to all subscribers registered with the publisher.

func (*ProcEventPublisher) Evict

func (p *ProcEventPublisher) Evict(sub chan *ProcEvent)

Evict removes the specified subscriber from receiving any more messages.

func (*ProcEventPublisher) Len

func (p *ProcEventPublisher) Len() int

Len returns the number of subscribers for the publisher

func (*ProcEventPublisher) Publish

func (p *ProcEventPublisher) Publish(v *ProcEvent)

Publish sends the data in v to all subscribers currently registered with the publisher.

func (*ProcEventPublisher) Subscribe

func (p *ProcEventPublisher) Subscribe() chan *ProcEvent

Subscribe adds a new subscriber to the publisher returning the channel.

func (*ProcEventPublisher) SubscribeTopic

func (p *ProcEventPublisher) SubscribeTopic(topic _ProcEventTopicFunc) chan *ProcEvent

SubscribeTopic adds a new subscriber that filters messages sent by a topic.

func (*ProcEventPublisher) SubscribeTopicWithBuffer

func (p *ProcEventPublisher) SubscribeTopicWithBuffer(topic _ProcEventTopicFunc, buffer int) chan *ProcEvent

SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic. The returned channel has a buffer of the specified size.

type ProcMan

type ProcMan struct {
	RunGraph map[string]*Runnable // RunGraph is the map from the id of a runnable to its contents
	// contains filtered or unexported fields
}

ProcMan implements the process manager.

func NewProcMan

func NewProcMan() *ProcMan

func (*ProcMan) Add

func (pm *ProcMan) Add(run *Runnable) error

Add adds a runnable to the ProcMan. If the id isn't unique, returns an error.

func (*ProcMan) AddConfig

func (pm *ProcMan) AddConfig(configFile *ConfigFile) error

AddConfig adds services from a given config file or a struct.

func (*ProcMan) IsAllDone

func (pm *ProcMan) IsAllDone() bool

IsAllDone returns true if every runnable is done

func (*ProcMan) Restart

func (pm *ProcMan) Restart(ctx context.Context, id string) error

Restart restarts the service. It is different from Start() in that it only restarts the runnable with a given id. Also, it starts only a given runnable not its dependents.

func (*ProcMan) Shutdown

func (pm *ProcMan) Shutdown(err error)

Shutdown closes the ProcMan and releases all the resources.

func (*ProcMan) Start

func (pm *ProcMan) Start(ctx context.Context) error

Start executes all the runnables in the order of its dependencies and waits for its execution.

func (*ProcMan) Stop

func (pm *ProcMan) Stop(id string) error

Stop stops the process

func (*ProcMan) SubscribeEvent

func (pm *ProcMan) SubscribeEvent(id string, events map[string]struct{}) chan *ProcEvent

SubscribeEvent subscribes to the events of the id. If the id is '*', it listens all the events.

func (*ProcMan) SubscribeLog

func (pm *ProcMan) SubscribeLog(id string, stdout bool) chan string

SubscribeLog subscribes to the log publisher with an `id`. The service `id` must be already added to this ProcMan.

func (*ProcMan) UnsubscribeEvent

func (pm *ProcMan) UnsubscribeEvent(sub chan *ProcEvent)

UnsubscribeEvent

func (*ProcMan) WaitDone

func (pm *ProcMan) WaitDone()

WaitDone waits until all processes are finished.

type ReadyChecker

type ReadyChecker interface {
	Init(context.Context, *Runnable) error // BeforeStart  is called before a given runnable starts.

	// WaitForReady is called after a given runnable starts.
	// It shouldn't return if the runnable is not ready.
	WaitForReady(context.Context, *Runnable) error
}

ReadyChecker is reponsible for waiting a runnable is ready. We need to break the dependency to `Runnable`. Otherwise, a ReadyChecker must be created in this package due to circular dependency.

func NewReadyChecker

func NewReadyChecker(id string, spec *ReadySpec) ReadyChecker

NewReadyChecker is a factory returning a concrete ReadyChecker

type ReadySpec

type ReadySpec struct {
	StdoutLog string            `yaml:"stdout_message,omitempty" json:"stdoutMessage,omitempty"` // log is a regex to find from stdout/stderr logs
	Http      *HttpReadySpec    `yaml:"http,omitempty" json:"http,omitempty"`                    // http defines the ready condition via http protocol
	MySql     *MySqlReadySpec   `yaml:"mysql,omitempty" json:"mysql,omitempty"`                  // http defines the ready condition via http protocol
	MongoDb   *MongoDbReadySpec `yaml:"mongo_db,omitempty" json:"mongoDb,omitempty"`             // http defines the ready condition via http protocol
	Shell     *ShellReadySpec   `yaml:"shell,omitempty" json:"shell,omitempty"`
	Tcp       *TcpReadySpec     `yaml:"tcp,omitempty" json:"tcp,omitempty"`
	Timeout   Duration          `yaml:"timeout,omitempty" json:"timeout,omitempty" default:"1m"` // timeout (default 1min)
}

ReadySpec provides a simple ready checker

type Runnable

type Runnable struct {
	Id             string     `yaml:"id" json:"id,omitempty"`                                // Id must be unique within the ProcMan.
	Command        string     `yaml:"command" json:"command,omitempty" envexp:""`            // Command to execute via bash
	CommandEx      string     `yaml:"command_ex,omitempty" json:"commandEx,omitempty"`       // Command without env expansaion. CommandEx has priority.
	Env            []string   `yaml:"environment,omitempty" json:"environment,omitempty"`    // Env is a list of env vars in the form of NAME=VALUE
	EnvFile        string     `yaml:"env_file,omitempty" json:"envFile,omitempty" envexp:""` // EnvFile to load env vars
	WorkDir        string     `yaml:"work_dir,omitempty" json:"workDir,omitempty" envexp:""` // Working directory
	LogDir         string     `yaml:"log_dir,omitempty" json:"logDir,omitempty" envexp:""`   // If LogDir is set, then the ${LogDir}/${id}.stdout.log and ${LogDir}/${id}.stderr.log files are created and stored.
	DependsOn      []string   `yaml:"depends_on,omitempty" json:"dependsOn,omitempty"`       // DependsOn is a list of runnables to run before this
	ReadyCondition *ReadySpec `yaml:"ready_condition,omitempty" json:"readyCheck,omitempty"` // ReadyCondition is the condition to determine if the process is ready

	Cmd   *cmd.Cmd     `yaml:"-" json:"-"` // Cmd is go's process execution object
	Ready ReadyChecker `yaml:"-" json:"-"`

	ConfDir string `yaml:"-" json:"-"` // the origin of this file
	// contains filtered or unexported fields
}

Runnable is a configuration of an executable.

func (*Runnable) CloneCmd

func (r *Runnable) CloneCmd()

CloneCmd clones the underlying cmd.

func (*Runnable) GetCommand

func (r *Runnable) GetCommand() string

GetCommand returns its command to run either from CommandRaw or Command.

func (*Runnable) GetRuntimeEnvs

func (r *Runnable) GetRuntimeEnvs() []string

GetRuntimeEnvs returns env vars of this runnable.

func (*Runnable) IsDone

func (r *Runnable) IsDone() bool

IsDone returns the underlying cmd's IsDone()

type ShellChecker

type ShellChecker struct {
	// contains filtered or unexported fields
}

ShellChecker is a a shell Script based ready checker. A shell command is executed periodically until it returns 0 exit code.

func (*ShellChecker) Init

func (sc *ShellChecker) Init(ctx context.Context, r *Runnable) error

Init is called before the runnable starts.

func (*ShellChecker) WaitForReady

func (rc *ShellChecker) WaitForReady(ctx context.Context, _ *Runnable) error

WaitForReady waits until a given runnable is ready.

type ShellReadySpec

type ShellReadySpec struct {
	Command  string   `yaml:"command,omitempty" json:"command,omitempty" envexp:""`      // shell command to run
	Interval Duration `yaml:"interval,omitempty" json:"interval,omitempty" default:"3s"` // polling interval (default 3s)
}

type TcpChecker

type TcpChecker struct {
	// contains filtered or unexported fields
}

func (*TcpChecker) Init

func (c *TcpChecker) Init(ctx context.Context, r *Runnable) error

Init is called before the runnable starts.

func (*TcpChecker) RetryCount

func (c *TcpChecker) RetryCount() int

func (*TcpChecker) WaitForReady

func (c *TcpChecker) WaitForReady(ctx context.Context, _ *Runnable) error

WaitForReady waits until the port is open.

type TcpReadySpec

type TcpReadySpec struct {
	Host     string   `yaml:"host,omitempty" json:"host,omitempty" default:"localhost"`
	Port     int      `yaml:"port,omitempty" json:"port,omitempty"`
	Interval Duration `yaml:"interval,omitempty" json:"interval,omitempty" default:"3s"` // polling interval (default 3s)
}

Directories

Path Synopsis
cmd
Package cmd runs external commands with concurrent access to output and status.
Package cmd runs external commands with concurrent access to output and status.
Package ps contains utility functions related to OS processes.
Package ps contains utility functions related to OS processes.
Package shutdown provides convenient interface for working with os.Signal.
Package shutdown provides convenient interface for working with os.Signal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL