procman

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2025 License: MIT Imports: 13 Imported by: 0

README

Go Process (Routine) Manager

This is not a replacement for proper handling control channels or contexts and cancelation. The goal of this package is to provide a controller for handling multiple long-running go routines. Best way to describe it is a lightweight supervisor.

Examples

Take a look at the examples file

Future improvements

WIP

  • Extensive improvements on how things start and stop and general internals of things.
  • Provide helpers to create several common worker types.
  • More usage examples.

Documentation

Overview

Package procman is a "process" controller which provides helper methods for managing long running go routines as well as creating wrappers for functions in order to convert them into a manageable process.

Index

Examples

Constants

View Source
const (
	ProcessStateReady int32 = iota
	ProcessStateStarting
	ProcessStateStarted
	ProcessStateStopping
	ProcessStateStopped
	ProcessStateAborted
)

Variables

This section is empty.

Functions

func Rosebud

func Rosebud(logFn func(stack []string, err error))

Rosebud makes sure the last words of a great being, such as your process, will be properly recorded for mankind to dwell uppon and conjecture on their meaning. Call this as first line of your main(). You can pass the log function which receives either a stack trace of the original panic or an error when Rosebud failed to monitor your process for some reason. If nil is passed the default logger is used and on error a panic occurs. More info you MUST read before using this: https://github.com/mitchellh/panicwrap/blob/master/panicwrap.go. Last but not least, go see the movie if you didn't get the reference.

Types

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager handles your processes.

Example
package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	procman "github.com/vredens/go-procman"
)

func main() {
	var pman = procman.NewManager()

	var w1 = &MyWorker{Name: "one"}
	pman.AddProcess("one", w1)

	var w2 = &MyWorker{Name: "two"}
	pman.AddProcess("two", w2)

	//
	// Periodical Jobs
	//
	var job = func(ctx context.Context) error {
		// your batch job here
		for i := 0; i < 1000; i++ {
			// do your work here
			select {
			case <-ctx.Done():
				// our execution was aborted during an iteration
				// time for savepoint, cleanup, etc
				return nil
			default:
				continue
			}
		}
		return nil
	}
	pman.AddProcess("periodical-1", procman.NewPeriodicalJob(time.Second, job))
	pman.AddProcess("periodical-2", procman.NewPeriodicalJob(time.Second, job, procman.PeriodicalOptions{}))

	//
	// Workers
	//
	var worker = func(ctx context.Context) error {
		for {
			// do your work here
			select {
			case <-ctx.Done():
				// this could as easily be shutting down an http server or message queue consumer.
				return nil
			default:
				continue
			}
		}
	}
	pman.AddProcess("worker-1", procman.NewWorker(worker))
	pman.AddProcess("worker-2", procman.NewWorker(worker, procman.WorkerOptions{}))

	// this simulates the part where stop should be something called externally.
	// Stop is called when SIGTERM, SIGINT or SIGUSR1 are called.
	time.AfterFunc(10*time.Second, func() {
		pman.Stop()
	})

	pman.Start()
}

// MyWorker is a sample worker which implements the Start and Stop methods required by the ProcessManager for all registered Processes.
type MyWorker struct {
	ctrl  chan struct{}
	state int32
	Name  string
}

func (w *MyWorker) Start() error {
	w.ctrl = make(chan struct{})

	if !atomic.CompareAndSwapInt32(&w.state, procman.ProcessStateReady, procman.ProcessStateStarted) {
		return fmt.Errorf("worker is not in initialized state, can not start")
	}

	var ticker = time.NewTicker(time.Second)
	for {
		select {
		case <-w.ctrl:
			fmt.Println("terminating")
			ticker.Stop()
			return nil
		case <-ticker.C:
			fmt.Printf("%s ticked\n", w.Name)
		}
	}
}

func (w *MyWorker) Stop() error {
	if !atomic.CompareAndSwapInt32(&w.state, procman.ProcessStateStarted, procman.ProcessStateStopping) {
		return fmt.Errorf("can not stop worker unless it is in a started state")
	}
	close(w.ctrl)
	return nil
}

func NewCustomManager

func NewCustomManager(params Parameters) *Manager

NewCustomManager instance.

func NewManager

func NewManager() *Manager

NewManager instance using default parameters.

func (*Manager) AddProcess

func (manager *Manager) AddProcess(name string, process Process)

AddProcess stores a proces in the list of processes controlled by the ProcessManager.

func (*Manager) Destroy

func (manager *Manager) Destroy() map[string]int32

Destroy removes all processes and closes all channels.

func (*Manager) IsStarted

func (manager *Manager) IsStarted() bool

IsStarted returns true if the ProcessManager has already started.

func (*Manager) Start

func (manager *Manager) Start() error

Start blocks until it receives a signal in its control channel or a SIGTERM, SIGINT or SIGUSR1, and should be the last method in your main.

func (*Manager) StatusCheck

func (manager *Manager) StatusCheck() (bool, map[string]int32)

StatusCheck returns a tupple where the first value is a bool indicating if all processes are OK, second value is a map for de individual status of each process.

func (*Manager) Stop

func (manager *Manager) Stop()

Stop will signal the ProcessManager to stop.

type Parameters

type Parameters struct {
	// Logger for the different stages the manager and each process go through.
	// Defaults to slog.Default().
	Logger *slog.Logger
}

Parameters for the ProcessManager initializer.

type PeriodicalOptions

type PeriodicalOptions struct {
	// Idle time between executions of your job function; has no effect if period is 0 or Once is set to true.
	Idle time.Duration
	// Once, if true, will run the job only once and then stop.
	Once bool
	// ShutdownTimeout defines the max time to wait for a job to finish after Stop() is called. Defaults to 60 seconds. Must be at least 1 second.
	ShutdownTimeout time.Duration
	// Dbgf function is called for debugging purposes when certain periodical job controller's events happen.
	Dbgf func(fmt string, args ...interface{})
}

PeriodicalOptions for tuning periodical job execution.

type Process

type Process interface {
	// Start should block while running the processes.
	Start() error
	// Stop should signal the Start method to return. It is not required for stop to only return after start has returned.
	Stop() error
}

Process is the basic interface for any assynchronous process launched and stopped by the process manager.

func NewPeriodicalJob

func NewPeriodicalJob(period time.Duration, job func(ctx context.Context) error, options ...PeriodicalOptions) Process

NewPeriodicalJob creates a periodical runner of a "job" function which will be executed one at a time and no more than once each period. A cancelable context is provided to the job and if context is canceled it should stop execution as soon as possible. Job is allowed to shutdown without error, on error the periodical controller stops immediately. Job will be executed after each period elapses unless it is already running (runs only one at a time). Period can be 0 for just setting up continous execution.

func NewWorker

func NewWorker(main func(ctx context.Context) error, opts ...WorkerOptions) Process

NewWorker creates a wrapper around a worker function which is expected to return only after ctx.Done() or an error occurs. WorkerOptions, if multiple are passed, will overwrite each other unless a zero value is present. Remember to wrap the provided context into your own cancelable contexts.

type WorkerOptions

type WorkerOptions struct {
	// ShutdownTimeout sets the timeout to wait for Start() to finish after Stop() is called.
	ShutdownTimeout time.Duration
}

WorkerOptions for defining some worker options.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL