node

package
v0.3.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: Apache-2.0 Imports: 48 Imported by: 2

Documentation

Index

Constants

View Source
const DefaultNodeInfoPublisherInterval = 30 * time.Second
View Source
const JobEventsTopic = "bacalhau-job-events"
View Source
const NodeInfoTopic = "bacalhau-node-info"

Variables

View Source
var DefaultComputeConfig = ComputeConfigParams{
	PhysicalResourcesProvider: system.NewPhysicalCapacityProvider(),
	DefaultJobResourceLimits: model.ResourceUsageData{
		CPU:    0.1,
		Memory: 100 * 1024 * 1024,
	},
	ExecutorBufferBackoffDuration: 50 * time.Millisecond,

	JobNegotiationTimeout:      3 * time.Minute,
	MinJobExecutionTimeout:     500 * time.Millisecond,
	MaxJobExecutionTimeout:     60 * time.Minute,
	DefaultJobExecutionTimeout: 10 * time.Minute,

	LogRunningExecutionsInterval: 10 * time.Second,
}
View Source
var DefaultRequesterConfig = RequesterConfigParams{
	MinJobExecutionTimeout:     0 * time.Second,
	DefaultJobExecutionTimeout: 30 * time.Minute,

	HousekeepingBackgroundTaskInterval: 30 * time.Second,
	NodeRankRandomnessRange:            10,

	MinBacalhauVersion: model.BuildVersionInfo{
		Major: "0", Minor: "3", GitVersion: "v0.3.20",
	},
}

Functions

This section is empty.

Types

type Compute

type Compute struct {
	// Visible for testing
	LocalEndpoint  compute.Endpoint
	Capacity       capacity.Tracker
	ExecutionStore store.ExecutionStore
	Executors      executor.ExecutorProvider
	// contains filtered or unexported fields
}

func NewComputeNode

func NewComputeNode(
	ctx context.Context,
	cleanupManager *system.CleanupManager,
	host host.Host,
	apiServer *publicapi.APIServer,
	config ComputeConfig,
	simulatorNodeID string,
	simulatorRequestHandler *simulator.RequestHandler,
	storages storage.StorageProvider,
	executors executor.ExecutorProvider,
	verifiers verifier.VerifierProvider,
	publishers publisher.PublisherProvider) (*Compute, error)

func (*Compute) RegisterLocalComputeCallback

func (c *Compute) RegisterLocalComputeCallback(callback compute.Callback)

type ComputeConfig

type ComputeConfig struct {
	// Capacity config
	TotalResourceLimits          model.ResourceUsageData
	QueueResourceLimits          model.ResourceUsageData
	JobResourceLimits            model.ResourceUsageData
	DefaultJobResourceLimits     model.ResourceUsageData
	IgnorePhysicalResourceLimits bool

	// How long the buffer would backoff before polling the queue again for new jobs
	ExecutorBufferBackoffDuration time.Duration

	// JobNegotiationTimeout default timeout value to hold a bid for a job
	JobNegotiationTimeout time.Duration
	// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
	// lower timeout requirements will not be bid on.
	MinJobExecutionTimeout time.Duration
	// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
	// higher timeout requirements will not be bid on.
	MaxJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
	// no timeout requirement defined.
	DefaultJobExecutionTimeout time.Duration

	// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
	// check.
	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy model.JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	SimulatorConfig model.SimulatorConfigCompute
}

func NewComputeConfigWith

func NewComputeConfigWith(params ComputeConfigParams) (config ComputeConfig)

func NewComputeConfigWithDefaults

func NewComputeConfigWithDefaults() ComputeConfig

type ComputeConfigParams

type ComputeConfigParams struct {
	// Capacity config
	TotalResourceLimits          model.ResourceUsageData
	QueueResourceLimits          model.ResourceUsageData
	JobResourceLimits            model.ResourceUsageData
	DefaultJobResourceLimits     model.ResourceUsageData
	PhysicalResourcesProvider    capacity.Provider
	IgnorePhysicalResourceLimits bool

	ExecutorBufferBackoffDuration time.Duration

	// Timeout config
	JobNegotiationTimeout      time.Duration
	MinJobExecutionTimeout     time.Duration
	MaxJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	JobExecutionTimeoutClientIDBypassList []string

	// Bid strategies config
	JobSelectionPolicy model.JobSelectionPolicy

	// logging running executions
	LogRunningExecutionsInterval time.Duration

	SimulatorConfig model.SimulatorConfigCompute
}

type ExecutorsFactory

type ExecutorsFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
}

type ExecutorsFactoryFunc

type ExecutorsFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)

func (ExecutorsFactoryFunc) Get

type Node

type Node struct {
	// Visible for testing
	APIServer      *publicapi.APIServer
	ComputeNode    *Compute
	RequesterNode  *Requester
	NodeInfoStore  routing.NodeInfoStore
	CleanupManager *system.CleanupManager
	IPFSClient     ipfs.Client
	Host           host.Host
}

func NewNode

func NewNode(
	ctx context.Context,
	config NodeConfig,
	injector NodeDependencyInjector) (*Node, error)

func NewStandardNode

func NewStandardNode(
	ctx context.Context,
	config NodeConfig) (*Node, error)

func (*Node) IsComputeNode

func (n *Node) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

func (*Node) IsRequesterNode

func (n *Node) IsRequesterNode() bool

IsRequesterNode returns true if the node is a requester node

func (*Node) Start

func (n *Node) Start(ctx context.Context) error

type NodeConfig

type NodeConfig struct {
	IPFSClient                ipfs.Client
	CleanupManager            *system.CleanupManager
	JobStore                  jobstore.Store
	Host                      host.Host
	FilecoinUnsealedPath      string
	EstuaryAPIKey             string
	HostAddress               string
	APIPort                   int
	ComputeConfig             ComputeConfig
	RequesterNodeConfig       RequesterConfig
	APIServerConfig           publicapi.APIServerConfig
	LotusConfig               *filecoinlotus.PublisherConfig
	SimulatorNodeID           string
	IsRequesterNode           bool
	IsComputeNode             bool
	Labels                    map[string]string
	NodeInfoPublisherInterval time.Duration
}

Node configuration

type NodeDependencyInjector

type NodeDependencyInjector struct {
	StorageProvidersFactory StorageProvidersFactory
	ExecutorsFactory        ExecutorsFactory
	VerifiersFactory        VerifiersFactory
	PublishersFactory       PublishersFactory
}

Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.

func NewStandardNodeDependencyInjector

func NewStandardNodeDependencyInjector() NodeDependencyInjector

type PublishersFactory

type PublishersFactory interface {
	Get(ctx context.Context,
		nodeConfig NodeConfig) (publisher.PublisherProvider, error)
}

type PublishersFactoryFunc

type PublishersFactoryFunc func(
	ctx context.Context,
	nodeConfig NodeConfig) (publisher.PublisherProvider, error)

func (PublishersFactoryFunc) Get

type Requester

type Requester struct {
	// Visible for testing
	Endpoint requester.Endpoint
	JobStore jobstore.Store
	// contains filtered or unexported fields
}

func NewRequesterNode

func NewRequesterNode(
	ctx context.Context,
	cleanupManager *system.CleanupManager,
	host host.Host,
	apiServer *publicapi.APIServer,
	config RequesterConfig,
	jobStore jobstore.Store,
	simulatorNodeID string,
	simulatorRequestHandler *simulator.RequestHandler,
	verifiers verifier.VerifierProvider,
	storageProviders storage.StorageProvider,
	gossipSub *libp2p_pubsub.PubSub,
	nodeInfoStore routing.NodeInfoStore,
) (*Requester, error)

func (*Requester) RegisterLocalComputeEndpoint

func (r *Requester) RegisterLocalComputeEndpoint(endpoint compute.Endpoint)

type RequesterConfig

type RequesterConfig struct {
	// MinJobExecutionTimeout requester will replace any job execution timeout that is less than this
	// value with DefaultJobExecutionTimeout.
	MinJobExecutionTimeout time.Duration
	// DefaultJobExecutionTimeout default value for running, verifying and publishing job results,
	// if the user didn't define one in the spec
	DefaultJobExecutionTimeout time.Duration

	// HousekeepingBackgroundTaskInterval background task interval that periodically checks for expired states
	HousekeepingBackgroundTaskInterval time.Duration
	// NodeRankRandomnessRange defines the range of randomness used to rank nodes
	NodeRankRandomnessRange int
	JobSelectionPolicy      model.JobSelectionPolicy
	SimulatorConfig         model.SimulatorConfigRequester

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion model.BuildVersionInfo
}

func NewRequesterConfigWith

func NewRequesterConfigWith(params RequesterConfigParams) (config RequesterConfig)

func NewRequesterConfigWithDefaults

func NewRequesterConfigWithDefaults() RequesterConfig

type RequesterConfigParams

type RequesterConfigParams struct {
	// Timeout config
	MinJobExecutionTimeout     time.Duration
	DefaultJobExecutionTimeout time.Duration

	HousekeepingBackgroundTaskInterval time.Duration
	NodeRankRandomnessRange            int
	JobSelectionPolicy                 model.JobSelectionPolicy
	SimulatorConfig                    model.SimulatorConfigRequester

	// minimum version of compute nodes that the requester will accept and route jobs to
	MinBacalhauVersion model.BuildVersionInfo
}

type StandardExecutorsFactory

type StandardExecutorsFactory struct{}

func NewStandardExecutorsFactory

func NewStandardExecutorsFactory() *StandardExecutorsFactory

func (*StandardExecutorsFactory) Get

type StandardPublishersFactory

type StandardPublishersFactory struct{}

func NewStandardPublishersFactory

func NewStandardPublishersFactory() *StandardPublishersFactory

func (*StandardPublishersFactory) Get

type StandardStorageProvidersFactory

type StandardStorageProvidersFactory struct{}

Standard implementations used in prod and when testing prod behavior

func NewStandardStorageProvidersFactory

func NewStandardStorageProvidersFactory() *StandardStorageProvidersFactory

func (*StandardStorageProvidersFactory) Get

type StandardVerifiersFactory

type StandardVerifiersFactory struct{}

func NewStandardVerifiersFactory

func NewStandardVerifiersFactory() *StandardVerifiersFactory

func (*StandardVerifiersFactory) Get

type StorageProvidersFactory

type StorageProvidersFactory interface {
	Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
}

Interfaces to inject dependencies into the stack

type StorageProvidersFactoryFunc

type StorageProvidersFactoryFunc func(
	ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)

Functions that implement the factories for easier creation of new implementations

func (StorageProvidersFactoryFunc) Get

type VerifiersFactory

type VerifiersFactory interface {
	Get(ctx context.Context,
		nodeConfig NodeConfig) (verifier.VerifierProvider, error)
}

type VerifiersFactoryFunc

type VerifiersFactoryFunc func(
	ctx context.Context,
	nodeConfig NodeConfig) (verifier.VerifierProvider, error)

func (VerifiersFactoryFunc) Get

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL