state

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 12, 2022 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Package state implements the representation of system state.

Index

Constants

View Source
const (
	// Messages logged in tasks are guaranteed to use the time formatted
	// per RFC3339 plus the following strings as a prefix, so these may
	// be handled programmatically and parsed or stripped for presentation.
	LogInfo  = "INFO"
	LogError = "ERROR"
)

Variables

View Source
var (
	DefaultRepeatAfter = time.Hour * 24
	DefaultExpireAfter = time.Hour * 24 * 28
)
View Source
var ErrNoState = errors.New("no state entry for key")

ErrNoState represents the case of no state entry for a given key.

Functions

func CopyState

func CopyState(srcStatePath, dstStatePath string, dataEntries []string) error

CopyState takes a state from the srcStatePath and copies all dataEntries to the dstPath. Note that srcStatePath should never point to a state that is in use.

func MockTime

func MockTime(now time.Time) (restore func())

func TagTimingsWithChange

func TagTimingsWithChange(t *timings.Timings, change *Change)

TagTimingsWithChange sets the "change-id" tag on the Timings object.

func TimingsForTask

func TimingsForTask(task *Task) *timings.Timings

TimingsForTask creates a new Timings tree for the given task. Returned Timings tree has "task-id", "change-id" and "task-kind" tags set automatically from the respective task.

Types

type Backend

type Backend interface {
	Checkpoint(data []byte) error
	EnsureBefore(d time.Duration)
}

A Backend is used by State to checkpoint on every unlock operation and to mediate requests to ensure the state sooner or request restarts.

type Change

type Change struct {
	// contains filtered or unexported fields
}

Change represents a tracked modification to the system state.

The Change provides both the justification for individual tasks to be performed and the grouping of them.

As an example, if an administrator requests an interface connection, multiple hooks might be individually run to accomplish the task. The Change summary would reflect the request for an interface connection, while the individual Task values would track the running of the hooks themselves.

func (*Change) Abort

func (c *Change) Abort()

Abort flags the change for cancellation, whether in progress or not. Cancellation will proceed at the next ensure pass.

func (*Change) AbortLanes

func (c *Change) AbortLanes(lanes []int)

AbortLanes aborts all tasks in the provided lanes and any tasks waiting on them, except for tasks that are also in a healthy lane (not aborted, and not waiting on aborted).

func (*Change) AbortUnreadyLanes

func (c *Change) AbortUnreadyLanes()

AbortUnreadyLanes aborts the tasks from lanes that aren't fully ready, where a ready lane is one in which all tasks are ready.

func (*Change) AddAll

func (c *Change) AddAll(ts *TaskSet)

AddAll registers all tasks in the set as required for the state change to be accomplished.

func (*Change) AddTask

func (c *Change) AddTask(t *Task)

AddTask registers a task as required for the state change to be accomplished.

func (*Change) CheckTaskDependencies

func (c *Change) CheckTaskDependencies() error

CheckTaskDependencies checks the tasks in the change for cyclic dependencies and returns an error in such case.

func (*Change) Err

func (c *Change) Err() error

Err returns an error value based on errors that were logged for tasks registered in this change, or nil if the change is not in ErrorStatus.

func (*Change) Get

func (c *Change) Get(key string, value interface{}) error

Get unmarshals the stored value associated with the provided key into the value parameter.

func (*Change) Has

func (c *Change) Has(key string) bool

Has returns whether the provided key has an associated value.

func (*Change) ID

func (c *Change) ID() string

ID returns the individual random key for the change.

func (*Change) IsClean

func (c *Change) IsClean() bool

IsClean returns whether all tasks in the change have been cleaned. See SetClean.

func (*Change) IsReady

func (c *Change) IsReady() bool

IsReady returns whether the change is considered ready.

The result is similar to calling Ready on the status returned by the Status method, but this function is more efficient as it doesn't need to recompute the aggregated state of tasks on every call.

As an exception, IsReady returns false for a Change without any tasks that never had its status explicitly set and was never unmarshalled out of the persistent state, despite its initial status being Hold. This is how the system represents changes right after they are created.

func (*Change) Kind

func (c *Change) Kind() string

Kind returns the nature of the change for managers to know how to handle it.

func (*Change) LaneTasks

func (c *Change) LaneTasks(lanes ...int) []*Task

LaneTasks returns all tasks from given lanes the state change depends on.

func (*Change) MarshalJSON

func (c *Change) MarshalJSON() ([]byte, error)

MarshalJSON makes Change a json.Marshaller

func (*Change) Ready

func (c *Change) Ready() <-chan struct{}

Ready returns a channel that is closed the first time the change becomes ready.

func (*Change) ReadyTime

func (c *Change) ReadyTime() time.Time

ReadyTime returns the time when the change became ready.

func (*Change) Set

func (c *Change) Set(key string, value interface{})

Set associates value with key for future consulting by managers. The provided value must properly marshal and unmarshal with encoding/json.

func (*Change) SetStatus

func (c *Change) SetStatus(s Status)

SetStatus sets the change status, overriding the default behavior (see Status method).

func (*Change) SpawnTime

func (c *Change) SpawnTime() time.Time

SpawnTime returns the time when the change was created.

func (*Change) State

func (c *Change) State() *State

State returns the system State

func (*Change) Status

func (c *Change) Status() Status

Status returns the current status of the change. If the status was not explicitly set the result is derived from the status of the individual tasks related to the change, according to the following decision sequence:

  • With at least one task in DoStatus, return DoStatus
  • With at least one task in ErrorStatus, return ErrorStatus
  • Otherwise, return DoneStatus

func (*Change) Summary

func (c *Change) Summary() string

Summary returns a summary describing what the change is about.

func (*Change) Tasks

func (c *Change) Tasks() []*Task

Tasks returns all the tasks this state change depends on.

func (*Change) UnmarshalJSON

func (c *Change) UnmarshalJSON(data []byte) error

UnmarshalJSON makes Change a json.Unmarshaller

type HandlerFunc

type HandlerFunc func(task *Task, tomb *tomb.Tomb) error

HandlerFunc is the type of function for the handlers

type Hold

type Hold struct {
	Reason string
}

Hold is returned from a handler to signal that the task cannot proceed at the moment maybe because some manual action from the user required at this point or because of errors.

func (*Hold) Error

func (r *Hold) Error() string

type NoStateError

type NoStateError struct {
	// Key is the key for which no state could be found.
	Key string
}

NoStateError represents the case where no state could be found for a given key.

func (*NoStateError) Error

func (e *NoStateError) Error() string

func (*NoStateError) Is

func (e *NoStateError) Is(err error) bool

Is returns true if the error is of type *NoStateError or equal to ErrNoState. NoStateError's key isn't compared between errors.

type Retry

type Retry struct {
	After  time.Duration
	Reason string
}

Retry is returned from a handler to signal that is ok to rerun the task at a later point. It's to be used also when a task goroutine is asked to stop through its tomb. After can be used to indicate how much to postpone the retry, 0 (the default) means at the next ensure pass and is what should be used if stopped through its tomb. Reason is an optional explanation of the conflict.

func (*Retry) Error

func (r *Retry) Error() string

type State

type State struct {
	// contains filtered or unexported fields
}

State represents an evolving system state that persists across restarts.

The State is concurrency-safe, and all reads and writes to it must be performed with the state locked. It's a runtime error (panic) to perform operations without it.

The state is persisted on every unlock operation via the StateBackend it was initialized with.

func New

func New(backend Backend) *State

New returns a new empty state.

func ReadState

func ReadState(backend Backend, r io.Reader) (*State, error)

ReadState returns the state deserialized from r.

func (*State) AllWarnings

func (s *State) AllWarnings() []*Warning

AllWarnings returns all the warnings in the system, whether they're due to be shown or not. They'll be sorted by lastAdded.

func (*State) Cache

func (s *State) Cache(key, value interface{})

Cache associates value with key for future consulting by managers. The cached value is not persisted.

func (*State) Cached

func (s *State) Cached(key interface{}) interface{}

Cached returns the cached value associated with the provided key. It returns nil if there is no entry for key.

func (*State) Change

func (s *State) Change(id string) *Change

Change returns the change for the given ID.

func (*State) Changes

func (s *State) Changes() []*Change

Changes returns all changes currently known to the state.

func (*State) EnsureBefore

func (s *State) EnsureBefore(d time.Duration)

EnsureBefore asks for an ensure pass to happen sooner within duration from now.

func (*State) Get

func (s *State) Get(key string, value interface{}) error

Get unmarshals the stored value associated with the provided key into the value parameter. It returns ErrNoState if there is no entry for key.

func (*State) GetMaybeTimings

func (s *State) GetMaybeTimings(timings interface{}) error

GetMaybeTimings implements timings.GetSaver

func (*State) Has

func (s *State) Has(key string) bool

Has returns whether the provided key has an associated value.

func (*State) Lock

func (s *State) Lock()

Lock acquires the state lock.

func (*State) MarshalJSON

func (s *State) MarshalJSON() ([]byte, error)

MarshalJSON makes State a json.Marshaller

func (*State) Modified

func (s *State) Modified() bool

Modified returns whether the state was modified since the last checkpoint.

func (*State) NewChange

func (s *State) NewChange(kind, summary string) *Change

NewChange adds a new change to the state.

func (*State) NewLane

func (s *State) NewLane() int

NewLane creates a new lane in the state.

func (*State) NewTask

func (s *State) NewTask(kind, summary string) *Task

NewTask creates a new task. It usually will be registered with a Change using AddTask or through a TaskSet.

func (*State) OkayWarnings

func (s *State) OkayWarnings(t time.Time) int

OkayWarnings marks warnings that were showable at the given time as shown.

func (*State) PendingWarnings

func (s *State) PendingWarnings() ([]*Warning, time.Time)

PendingWarnings returns the list of warnings to show the user, sorted by lastAdded, and a timestamp than can be used to refer to these warnings.

Warnings to show to the user are those that have not been shown before, or that have been shown earlier than repeatAfter ago.

func (*State) Prune

func (s *State) Prune(startOfOperation time.Time, pruneWait, abortWait time.Duration, maxReadyChanges int)

Prune does several cleanup tasks to the in-memory state:

  • it removes changes that became ready for more than pruneWait and aborts tasks spawned for more than abortWait unless prevented by predicates registered with RegisterPendingChangeByAttr.

  • it removes tasks unlinked to changes after pruneWait. When there are more changes than the limit set via "maxReadyChanges" those changes in ready state will also removed even if they are below the pruneWait duration.

  • it removes expired warnings.

func (*State) RegisterPendingChangeByAttr

func (s *State) RegisterPendingChangeByAttr(attr string, f func(*Change) bool)

RegisterPendingChangeByAttr registers predicates that will be invoked by Prune on changes with the specified attribute set to check whether even if they meet the time criteria they must not be aborted yet.

func (*State) SaveTimings

func (s *State) SaveTimings(timings interface{})

SaveTimings implements timings.GetSaver

func (*State) Set

func (s *State) Set(key string, value interface{})

Set associates value with key for future consulting by managers. The provided value must properly marshal and unmarshal with encoding/json.

func (*State) Task

func (s *State) Task(id string) *Task

Task returns the task for the given ID if the task has been linked to a change.

func (*State) TaskCount

func (s *State) TaskCount() int

TaskCount returns the number of tasks that currently exist in the state, whether linked to a change or not.

func (*State) Tasks

func (s *State) Tasks() []*Task

Tasks returns all tasks currently known to the state and linked to changes.

func (*State) Unlock

func (s *State) Unlock()

Unlock releases the state lock and checkpoints the state. It does not return until the state is correctly checkpointed. After too many unsuccessful checkpoint attempts, it panics.

func (*State) UnmarshalJSON

func (s *State) UnmarshalJSON(data []byte) error

UnmarshalJSON makes State a json.Unmarshaller

func (*State) UnshowAllWarnings

func (s *State) UnshowAllWarnings()

UnshowAllWarnings clears the lastShown timestamp from all the warnings. For use in debugging.

func (*State) Warnf

func (s *State) Warnf(template string, args ...interface{})

Warnf records a warning: if it's the first Warning with this message it'll be added (with its firstAdded and lastAdded set to the current time), otherwise the existing one will have its lastAdded updated.

func (*State) WarningsSummary

func (s *State) WarningsSummary() (int, time.Time)

WarningsSummary returns the number of warnings that are ready to be shown to the user, and the timestamp of the most recently added warning (useful for silencing the warning alerts, and OKing the returned warnings).

type Status

type Status int

Status is used for status values for changes and tasks.

const (
	// DefaultStatus is the standard computed status for a change or task.
	// For tasks it's always mapped to DoStatus, and for change its mapped
	// to an aggregation of its tasks' statuses. See Change.Status for details.
	DefaultStatus Status = 0

	// HoldStatus means the task should not run for the moment, perhaps as a
	// consequence of an error on another task or because an external action
	// is needed.
	HoldStatus Status = 1

	// DoStatus means the change or task is ready to start.
	DoStatus Status = 2

	// DoingStatus means the change or task is running or an attempt was made to run it.
	DoingStatus Status = 3

	// DoneStatus means the change or task was accomplished successfully.
	DoneStatus Status = 4

	// AbortStatus means the task should stop doing its activities and then undo.
	AbortStatus Status = 5

	// UndoStatus means the change or task should be undone, probably due to an error elsewhere.
	UndoStatus Status = 6

	// UndoingStatus means the change or task is being undone or an attempt was made to undo it.
	UndoingStatus Status = 7

	// UndoneStatus means a task was first done and then undone after an error elsewhere.
	// Changes go directly into the error status instead of being marked as undone.
	UndoneStatus Status = 8

	// ErrorStatus means the change or task has errored out while running or being undone.
	ErrorStatus Status = 9
)

Admitted status values for changes and tasks.

func (Status) Ready

func (s Status) Ready() bool

Ready returns whether a task or change with this status needs further work or has completed its attempt to perform the current goal.

func (Status) String

func (s Status) String() string

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task represents an individual operation to be performed for accomplishing one or more state changes.

See Change for more details.

func (*Task) At

func (t *Task) At(when time.Time)

At schedules the task, if it's not ready, to happen no earlier than when, if when is the zero time any previous special scheduling is suppressed.

func (*Task) AtTime

func (t *Task) AtTime() time.Time

AtTime returns the time at which the task is scheduled to run. A zero time means no special schedule, i.e. run as soon as prerequisites are met.

func (*Task) Change

func (t *Task) Change() *Change

Change returns the change the task is registered with.

func (*Task) Clear

func (t *Task) Clear(key string)

Clear disassociates the value from key.

func (*Task) DoingTime

func (t *Task) DoingTime() time.Duration

func (*Task) Errorf

func (t *Task) Errorf(format string, args ...interface{})

Errorf logs error information about the progress of the task.

func (*Task) Get

func (t *Task) Get(key string, value interface{}) error

Get unmarshals the stored value associated with the provided key into the value parameter.

func (*Task) HaltTasks

func (t *Task) HaltTasks() []*Task

HaltTasks returns the list of tasks registered to wait for t.

func (*Task) Has

func (t *Task) Has(key string) bool

Has returns whether the provided key has an associated value.

func (*Task) ID

func (t *Task) ID() string

ID returns the individual random key for this task.

func (*Task) IsClean

func (t *Task) IsClean() bool

IsClean returns whether the task has been cleaned. See SetClean.

func (*Task) JoinLane

func (t *Task) JoinLane(lane int)

JoinLane registers the task in the provided lane. Tasks in different lanes abort independently on errors. See Change.AbortLane for details.

func (*Task) Kind

func (t *Task) Kind() string

Kind returns the nature of this task for managers to know how to handle it.

func (*Task) Lanes

func (t *Task) Lanes() []int

Lanes returns the lanes the task is in.

func (*Task) Log

func (t *Task) Log() []string

Log returns the most recent messages logged into the task.

Only the most recent entries logged are returned, potentially with different behavior for different task statuses. How many entries are returned is an implementation detail and may change over time.

Messages are prefixed with one of the known message kinds. See details about LogInfo and LogError.

The returned slice should not be read from without the state lock held, and should not be written to.

func (*Task) Logf

func (t *Task) Logf(format string, args ...interface{})

Logf logs information about the progress of the task.

func (*Task) MarshalJSON

func (t *Task) MarshalJSON() ([]byte, error)

MarshalJSON makes Task a json.Marshaller

func (*Task) NumHaltTasks

func (t *Task) NumHaltTasks() int

NumHaltTasks returns the number of tasks registered to wait for t.

func (*Task) Progress

func (t *Task) Progress() (label string, done, total int)

Progress returns the current progress for the task. If progress is not explicitly set, it returns (0, 1) if the status is DoStatus and (1, 1) otherwise.

func (*Task) ReadyTime

func (t *Task) ReadyTime() time.Time

ReadyTime returns the time when the change became ready.

func (*Task) Set

func (t *Task) Set(key string, value interface{})

Set associates value with key for future consulting by managers. The provided value must properly marshal and unmarshal with encoding/json.

func (*Task) SetClean

func (t *Task) SetClean()

SetClean flags the task as clean after any left over data was removed.

Cleaning a task must only be done after the change is ready.

func (*Task) SetProgress

func (t *Task) SetProgress(label string, done, total int)

SetProgress sets the task progress to cur out of total steps.

func (*Task) SetStatus

func (t *Task) SetStatus(new Status)

SetStatus sets the task status, overriding the default behavior (see Status method).

func (*Task) SpawnTime

func (t *Task) SpawnTime() time.Time

SpawnTime returns the time when the change was created.

func (*Task) State

func (t *Task) State() *State

State returns the system State

func (*Task) Status

func (t *Task) Status() Status

Status returns the current task status.

func (*Task) Summary

func (t *Task) Summary() string

Summary returns a summary describing what the task is about.

func (*Task) UndoingTime

func (t *Task) UndoingTime() time.Duration

func (*Task) UnmarshalJSON

func (t *Task) UnmarshalJSON(data []byte) error

UnmarshalJSON makes Task a json.Unmarshaller

func (*Task) WaitAll

func (t *Task) WaitAll(ts *TaskSet)

WaitAll registers all the tasks in the set as a requirement for t to make progress.

func (*Task) WaitFor

func (t *Task) WaitFor(another *Task)

WaitFor registers another task as a requirement for t to make progress.

func (*Task) WaitTasks

func (t *Task) WaitTasks() []*Task

WaitTasks returns the list of tasks registered for t to wait for.

type TaskDependencyCycleError

type TaskDependencyCycleError struct {
	IDs []string
	// contains filtered or unexported fields
}

func (*TaskDependencyCycleError) Error

func (e *TaskDependencyCycleError) Error() string

func (*TaskDependencyCycleError) Is

func (e *TaskDependencyCycleError) Is(err error) bool

type TaskRunner

type TaskRunner struct {
	// contains filtered or unexported fields
}

TaskRunner controls the running of goroutines to execute known task kinds.

func NewTaskRunner

func NewTaskRunner(s *State) *TaskRunner

NewTaskRunner creates a new TaskRunner

func (*TaskRunner) AddBlocked

func (r *TaskRunner) AddBlocked(pred func(t *Task, running []*Task) bool)

AddBlocked adds a predicate function to decide whether to block a task from running based on the current running tasks. It can be used to control task serialisation. All added predicates are considered in turn until one returns true, or none.

func (*TaskRunner) AddCleanup

func (r *TaskRunner) AddCleanup(kind string, cleanup HandlerFunc)

AddCleanup registers a function to be called after the change completes, for cleaning up data left behind by tasks of the specified kind. The provided function will be called no matter what the final status of the task is. This mechanism enables keeping data around for a potential undo until there's no more chance of the task being undone.

The cleanup function is run concurrently with other cleanup functions, despite any wait ordering between the tasks. If it returns an error, it will be retried later.

The handler for tasks of the provided kind must have been previously registered before AddCleanup is called for it.

func (*TaskRunner) AddHandler

func (r *TaskRunner) AddHandler(kind string, do, undo HandlerFunc)

AddHandler registers the functions to concurrently call for doing and undoing tasks of the given kind. The undo handler may be nil.

func (*TaskRunner) AddOptionalHandler

func (r *TaskRunner) AddOptionalHandler(match func(t *Task) bool, do, undo HandlerFunc)

AddOptionalHandler register functions for doing and undoing tasks that match the given predicate if no explicit handler was registered for the task kind.

func (*TaskRunner) Ensure

func (r *TaskRunner) Ensure() error

Ensure starts new goroutines for all known tasks with no pending dependencies. Note that Ensure will lock the state.

func (*TaskRunner) KnownTaskKinds

func (r *TaskRunner) KnownTaskKinds() []string

KnownTaskKinds returns all tasks kinds handled by this runner.

func (*TaskRunner) OnTaskError

func (r *TaskRunner) OnTaskError(f func(err error))

OnTaskError sets an error callback executed when any task errors out.

func (*TaskRunner) SetBlocked

func (r *TaskRunner) SetBlocked(pred func(t *Task, running []*Task) bool)

SetBlocked sets a predicate function to decide whether to block a task from running based on the current running tasks. It can be used to control task serialisation.

func (*TaskRunner) Stop

func (r *TaskRunner) Stop()

Stop kills all concurrent activities and returns after that's done.

func (*TaskRunner) StopKinds

func (r *TaskRunner) StopKinds(kind ...string)

StopKinds kills all concurrent tasks of the given kinds and returns after that's done.

func (*TaskRunner) Wait

func (r *TaskRunner) Wait()

Wait waits for all concurrent activities and returns after that's done.

type TaskSet

type TaskSet struct {
	// contains filtered or unexported fields
}

A TaskSet holds a set of tasks.

func NewTaskSet

func NewTaskSet(tasks ...*Task) *TaskSet

NewTaskSet returns a new TaskSet comprising the given tasks.

func (*TaskSet) AddAll

func (ts *TaskSet) AddAll(anotherTs *TaskSet)

AddAll adds all the tasks in the argument set to the target set ts.

func (*TaskSet) AddAllWithEdges

func (ts *TaskSet) AddAllWithEdges(anotherTs *TaskSet) error

AddAllWithEdges adds all the tasks in the argument set to the target set ts and also adds all TaskSetEdges. Duplicated TaskSetEdges are an error.

func (*TaskSet) AddTask

func (ts *TaskSet) AddTask(task *Task)

AddTask adds the task to the task set.

func (TaskSet) Edge

func (ts TaskSet) Edge(e TaskSetEdge) (*Task, error)

Edge returns the task marked with the given edge name or an error.

func (*TaskSet) JoinLane

func (ts *TaskSet) JoinLane(lane int)

JoinLane adds all the tasks in the current taskset to the given lane.

func (*TaskSet) MarkEdge

func (ts *TaskSet) MarkEdge(task *Task, edge TaskSetEdge)

MarkEdge marks the given task as a specific edge. Any pre-existing edge mark will be overridden.

func (TaskSet) MaybeEdge

func (ts TaskSet) MaybeEdge(e TaskSetEdge) *Task

MaybeEdge returns the task marked with the given edge name or nil if no such task exists.

func (TaskSet) Tasks

func (ts TaskSet) Tasks() []*Task

Tasks returns the tasks in the task set.

func (*TaskSet) WaitAll

func (ts *TaskSet) WaitAll(anotherTs *TaskSet)

WaitAll registers all the tasks in the argument set as requirements for ts the target set to make progress.

func (TaskSet) WaitFor

func (ts TaskSet) WaitFor(another *Task)

WaitFor registers a task as a requirement for the tasks in the set to make progress.

type TaskSetEdge

type TaskSetEdge string

TaskSetEdge designates tasks inside a TaskSet for outside reference.

This is useful to give tasks inside TaskSets a special meaning. It is used to mark e.g. the last task used for downloading a snap.

type Warning

type Warning struct {
	// contains filtered or unexported fields
}

func (*Warning) ExpiredBefore

func (w *Warning) ExpiredBefore(now time.Time) bool

func (*Warning) MarshalJSON

func (w *Warning) MarshalJSON() ([]byte, error)

func (*Warning) ShowAfter

func (w *Warning) ShowAfter(t time.Time) bool

func (*Warning) String

func (w *Warning) String() string

func (*Warning) UnmarshalJSON

func (w *Warning) UnmarshalJSON(data []byte) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL