Back to godoc.org

Package agent

v0.0.0-...-4f1dd1e
Latest Go to latest

The highest tagged major version is .

Published: May 18, 2020 | License: MIT | Module: github.com/edwardsp/lemur

Overview

Package agent implements a Parallel Data Mover to copy or migrate data between various storage systems. It supports multliple types of sources and destinations, including POSIX, S3, HPSS, etc.

Use cases include:

* Data movement for Lustre HSM.
* Offsite replication for DR
* Lustre file-level replication
* Storage rebalancing within a single tier
* Migration between filesytems (e.g GPFS - > Lustre)

Initially the main focus is for HSM.

Index

Constants

const UnmountTimeout = 10

UnmountTimeout is the time, in seconds, that an unmount will be retried before failing with an error.

func CleanupMounts

func CleanupMounts(cfg *Config) error

CleanupMounts unmounts the Lustre client mounts configured by ConfigureMounts().

func ConfigureMounts

func ConfigureMounts(cfg *Config) error

ConfigureMounts configures a set of Lustre client mounts; one for the agent and one for each configure data mover.

func MarshalActionData

func MarshalActionData(fileID []byte, moverData interface{}) ([]byte, error)

MarshalActionData returns an initallized and marshalled ActionData struct. The moverData value is also marshalled before adding it to the ActionData.

func RegisterTransport

func RegisterTransport(name string, t Transport)

RegisterTransport registers the transport in the list of known transports

type Action

type Action struct {
	UUID string
	Hash []byte
	URL  string
	Data []byte
	// contains filtered or unexported fields
}

Action represents an HSM action

func (*Action) AsMessage

func (action *Action) AsMessage() *pb.ActionItem

AsMessage returns the protobuf version of an Action.

func (*Action) Fail

func (action *Action) Fail(rc int) error

Fail signals that the action has failed

func (*Action) Handle

func (action *Action) Handle() hsm.ActionHandle

Handle returns the raw hsm.ActionHandle (temporary function until queue transport is updated)

func (*Action) ID

func (action *Action) ID() ActionID

ID Returns the action id.

func (*Action) Prepare

func (action *Action) Prepare() error

Prepare ensure action is ready to be sent. Complete any actions that may require accessing the filesystem.

func (*Action) String

func (action *Action) String() string

func (*Action) Update

func (action *Action) Update(status *pb.ActionStatus) (bool, error)

Update handles the Status messages from the data mover. The Status updates the current progress of the Action. if the Completed flag is true, then the Action is completed and true is returned so the transport can remove any related state. After an action is completed any further status updates should be ignored.

If this function returns an error then the transport layer should notify the mover that this action has been terminated. In this case the Action will be completed immediately and no further updates are required.

type ActionData

type ActionData struct {
	FileID    []byte `json:"file_id"`
	MoverData []byte `json:"mover_data"`
}

ActionData is extra data passed to the Agent by policy engine

type ActionID

type ActionID uint64

ActionID is a unique (per agent instance) ID for HSM actions

func NextActionID

func NextActionID() ActionID

NextActionID returns monotonically-increasing ActionIDs

type ActionStats

type ActionStats struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ActionStats is a synchronized container for ArchiveStats instances

func NewActionStats

func NewActionStats() *ActionStats

NewActionStats initializes a new ActionStats container

func (*ActionStats) Archives

func (as *ActionStats) Archives() (v []int)

Archives returns a slice of archive numbers corresponding to instrumented backends

func (*ActionStats) CompleteAction

func (as *ActionStats) CompleteAction(a *Action, rc int)

CompleteAction updates various stats when an action is complete

func (*ActionStats) GetIndex

func (as *ActionStats) GetIndex(i int) *ArchiveStats

GetIndex returns the *ArchiveStats corresponding to the supplied archive number

func (*ActionStats) Start

func (as *ActionStats) Start(ctx context.Context)

Start creates a new goroutine for collecting archive stats

func (*ActionStats) StartAction

func (as *ActionStats) StartAction(a *Action)

StartAction increments stats counters when an action starts

type ArchiveStats

type ArchiveStats struct {
	// contains filtered or unexported fields
}

ArchiveStats is a per-archive container of statistics for that backend

func (*ArchiveStats) String

func (s *ArchiveStats) String() string

type Config

type Config struct {
	MountRoot          string             `hcl:"mount_root" json:"mount_root"`
	ClientDevice       *spec.ClientDevice `json:"client_device"`
	ClientMountOptions clientMountOptions `hcl:"client_mount_options" json:"client_mount_options"`

	Processes int `hcl:"handler_count" json:"handler_count"`

	InfluxDB *influxConfig `hcl:"influxdb" json:"influxdb"`

	EnabledPlugins []string `hcl:"enabled_plugins" json:"enabled_plugins"`
	PluginDir      string   `hcl:"plugin_dir" json:"plugin_dir"`

	Snapshots *snapshotConfig  `hcl:"snapshots" json:"snapshots"`
	Transport *transportConfig `hcl:"transport" json:"transport"`
}

Config represents HSM Agent configuration

func ConfigInitMust

func ConfigInitMust() *Config

ConfigInitMust returns a valid *Config or fails trying

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig initializes a new Config struct with default values

func LoadConfig

func LoadConfig(configPath string) (*Config, error)

LoadConfig reads a config at the supplied path

func NewConfig

func NewConfig() *Config

NewConfig initializes a new Config struct with zero values

func (*Config) AgentMountpoint

func (c *Config) AgentMountpoint() string

AgentMountpoint returns the calculated agent mountpoint under the agent mount root.

func (*Config) Merge

func (c *Config) Merge(other *Config) *Config

Merge combines the supplied configuration's values with this one's

func (*Config) Plugins

func (c *Config) Plugins() []*PluginConfig

Plugins returns a slice of *PluginConfig instances for enabled plugins

func (*Config) String

func (c *Config) String() string

type Endpoint

type Endpoint interface {
	Send(*Action)
}

Endpoint defines an interface for HSM backends

type Endpoints

type Endpoints struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Endpoints represents a collection of Endpoints and their handles

func NewEndpoints

func NewEndpoints() *Endpoints

NewEndpoints returns a new *Endpoints instance

func (*Endpoints) Add

func (all *Endpoints) Add(a uint32, e Endpoint) (*Handle, error)

Add registers a new Endpoint

func (*Endpoints) Get

func (all *Endpoints) Get(a uint32) (Endpoint, bool)

Get returns an Endpoint or nil, given a lookup id

func (*Endpoints) GetWithHandle

func (all *Endpoints) GetWithHandle(h *Handle) (Endpoint, bool)

GetWithHandle returns an Endpoint or nil, given a Handle

func (*Endpoints) NewHandle

func (all *Endpoints) NewHandle(a uint32) (*Handle, error)

NewHandle returns a new *Handle

func (*Endpoints) Remove

func (all *Endpoints) Remove(h *Handle) Endpoint

Remove removes the given handle and its associated Endpoint

func (*Endpoints) RemoveHandle

func (all *Endpoints) RemoveHandle(h *Handle)

RemoveHandle removes the given handle from the collection of handles

type Handle

type Handle uint64

Handle is an endpoint handle (unique id)

type HsmAgent

type HsmAgent struct {
	Endpoints *Endpoints
	// contains filtered or unexported fields
}

HsmAgent for a single filesytem and a collection of backends.

func New

func New(cfg *Config, client fsroot.Client, as hsm.ActionSource) (*HsmAgent, error)

New accepts a config and returns a *HsmAgent

func (*HsmAgent) Root

func (ct *HsmAgent) Root() fs.RootDir

Root returns a fs.RootDir representing the Lustre filesystem root

func (*HsmAgent) Start

func (ct *HsmAgent) Start(ctx context.Context) error

Start backgrounds the agent and starts backend data movers

func (*HsmAgent) StartWaitFor

func (ct *HsmAgent) StartWaitFor(n time.Duration) error

StartWaitFor will wait for Agent to startup with time out of n.

func (*HsmAgent) Stop

func (ct *HsmAgent) Stop()

Stop shuts down all backend data movers and kills the agent

type PluginConfig

type PluginConfig struct {
	Name             string
	BinPath          string
	AgentConnection  string
	ClientMount      string
	Args             []string
	RestartOnFailure bool
	// contains filtered or unexported fields
}

PluginConfig represents configuration for a single plugin

func NewPlugin

func NewPlugin(name, binPath, conn, mountRoot string, args ...string) *PluginConfig

NewPlugin returns a plugin configuration

func (*PluginConfig) NoRestart

func (p *PluginConfig) NoRestart() *PluginConfig

NoRestart optionally sets a plugin to not be restarted on failure

func (*PluginConfig) RestartDelay

func (p *PluginConfig) RestartDelay() time.Duration

RestartDelay returns a time.Duration to delay restarts based on the number of restarts and the last restart time.

func (*PluginConfig) String

func (p *PluginConfig) String() string

type PluginMonitor

type PluginMonitor struct {
	// contains filtered or unexported fields
}

PluginMonitor watches monitored plugins and restarts them as needed.

func NewMonitor

func NewMonitor() *PluginMonitor

NewMonitor creates a new plugin monitor

func (*PluginMonitor) Start

func (m *PluginMonitor) Start(ctx context.Context)

Start creates a new plugin monitor

func (*PluginMonitor) StartPlugin

func (m *PluginMonitor) StartPlugin(cfg *PluginConfig) error

StartPlugin starts the plugin and monitors it

type Transport

type Transport interface {
	Init(*Config, *HsmAgent) error
	Shutdown()
}

Transport for backend plugins

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier