less

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2025 License: MIT Imports: 4 Imported by: 1

README

Work less, do more

This library provides storage agnostic leader election.

Usage

Create the candidate using New and pass your storage, that implements Storage interface. For additional configuration look at Option and WithXxx functions.

Now the candidate is already launched and (after a bit) will have the candidate status. To get it, just call IsLeader.

Here is a simple demo:

func main() {
	ctx := context.Background()

	var storage Storage
	// Fill with any shared storage implementation
    // You can also use our adapters from `adapter` package

	candidate := New(ctx, storage)

	workFn := func() {
		if !candidate.IsLeader() {
			return
		}
		// do work
		fmt.Println("notification sent")
	}

	for range time.Tick(1 * time.Minute) {
		workFn()
	}

	// Now if we run this code on 2 and more nodes, we can be sure that
	// only one of them will actively run the job
}

If you have multiple workers and you don't want them all to depend on single candidate (i.e. if leader then all workers will run, if not then no worker will run) you can create candidate for each of them with different keys using WithKey option:

func main() {
    // ...

	candidate1 := New(ctx, storage, WithKey("work1"))
	work1Fn := func() {
		if !candidate1.IsLeader() {
			return
		}
		// do work
		fmt.Println("notification sent")
	}

	candidate2 := New(ctx, storage, WithKey("work2"))
	work2Fn := func() {
		if !candidate2.IsLeader() {
			return
		}
		// do work
		fmt.Println("notification sent")
	}

    // etc
    // ...
}

This way each of the workers will has it's own indepentend leader election process.

You can integrate this easely with your asynchronous workers. E.g. you can pass it to go-co-op/gocron job scheduler, with their WithDistributedElector option (you need to decorate the Candidate by yourself to match the interface).

Documentation

Overview

Package less provides storage agnostic leader election.

Example
ctx := context.Background()

var storage Storage
// Fill with any shared storage implementation
// You can also use our adapters from `adapter` package

candidate := New(ctx, storage, WithKey("notify-users"))

workFn := func() {
	if !candidate.IsLeader() {
		return
	}
	// do work
	fmt.Println("notification sent")
}

for range time.Tick(1 * time.Minute) {
	workFn()
}

// Now if we run this code on 2 and more nodes, we can be sure that
// only one of them will actively run the job

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Candidate

type Candidate struct {
	// contains filtered or unexported fields
}

Candidate constantly tries to acquire the leadership. Once acquire, it tries to renew it's leader record to not lose it.

func New

func New(ctx context.Context, storage Storage, opts ...Option) *Candidate

New creates and launches the Candidate with default settings. Settings can be changed with [Option]s.

func (*Candidate) IsLeader

func (c *Candidate) IsLeader() bool

IsLeader returns whether this candidate is currently a leader or not.

type Logger added in v0.2.3

type Logger interface {
	Debug(msg string, kvargs ...any)
	Info(msg string, kvargs ...any)
	Warn(msg string, kvargs ...any)
	Error(msg string, kvargs ...any)
}

Logger accepts even amount of kvargs: logger.Info("something", "key1", "val1", "key2", "val2", "key3", "val3")

type Option

type Option func(c *Candidate)

func WithErrsToFallback added in v0.2.3

func WithErrsToFallback(count int) Option

WithErrsToFallback sets an amount of errors in a row that leader will need to get from Storage before falling back to following stage.

Default: 3

func WithFollowRate

func WithFollowRate(rate time.Duration) Option

WithFollowRate sets the interval at which candidate will call [Storage.SetNX] to set his id and acquire the leadership.

Default: 2 seconds

func WithHoldRate

func WithHoldRate(rate time.Duration) Option

WithHoldRate sets the interval at which leader will call [Storage.Renew] to prolong his leadership.

Default: 2 seconds

func WithID

func WithID(id string) Option

WithID sets the current candidate's ID, wich will be stored in the leader record when he acquires the leadership. If you use this, make sure that all candidates will have diffent IDs (unless you know what you are doing).

Default: new V4 UUID

func WithKey

func WithKey(key string) Option

WithKey sets the key name in which the leader record will be stored. Can be used when you want split the leadership for different asynchronous job (e.g. "job1", "job2", etc).

Default: "default"

func WithLogger added in v0.2.3

func WithLogger(logger Logger) Option

WithLogger sets logger for the candidate. In most cases leader election process recommended to remain silent. Can be used for debug purposes.

Default: noop logger

func WithTTL

func WithTTL(ttl time.Duration) Option

WithTTL sets the time-to-live time for both [Storage.SetNX] (when candidate acquires the leadership) and [Storage.Renew] (when leader holds his leadership) calls.

Default: 10 seconds

type Storage

type Storage interface {
	// Renew sets the new deadline for the record with provided key.
	// If such record is not created yet, it does nothing and returns nil.
	Renew(ctx context.Context, key string, deadline time.Time) error

	// Get gets the record value with provided key. If record is not created
	// or expired, empty string will be returned without error.
	Get(ctx context.Context, key string) (string, error)

	// SetNX creates/sets record on provided key only if it's not created
	// yet or if it's expired. Returns true if record was successfully
	// created/set, otherwise returns false.
	SetNX(ctx context.Context, key, val string, deadline time.Time) (bool, error)
}

Storage is a generic way to access the data storage shared between the candidates.

Directories

Path Synopsis
adapter

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL