model

package
v0.3.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: Apache-2.0 Imports: 27 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DownloadFilenameStdout    = "stdout"
	DownloadFilenameStderr    = "stderr"
	DownloadFilenameExitCode  = "exitCode"
	DownloadVolumesFolderName = "combined_results"
	DownloadShardsFolderName  = "per_shard"
	DownloadCIDsFolderName    = "raw"
	DownloadFolderPerm        = 0755
	DownloadFilePerm          = 0644
	DefaultIPFSTimeout        = 5 * time.Minute
)
View Source
const (
	TracerAttributeNameNodeID = "nodeid"
	TracerAttributeNameJobID  = "jobid"
)
View Source
const MaxNumberOfObjectsToSerialize = 1000

Arbitrarily choosing 1000 jobs to serialize - this is a pretty high

View Source
const MaxSerializedStringInput = int(10 * datasize.MB)
View Source
const ShortIDLength = 8

Variables

View Source
var (
	IncludeAny  []IncludedTag = []IncludedTag{}
	ExcludeNone []ExcludedTag = []ExcludedTag{}
)

Set of annotations that will not do any filtering of jobs.

Functions

func ConfirmMaxSliceSize

func ConfirmMaxSliceSize[T any](t T, maxSize int) error

func EngineNames

func EngineNames() []string

func FromLabelSelectorRequirements

func FromLabelSelectorRequirements(requirements ...LabelSelectorRequirement) ([]labels.Requirement, error)

func IsValidEngine

func IsValidEngine(e Engine) bool

func IsValidPublisher

func IsValidPublisher(publisherType Publisher) bool

func IsValidStorageSourceType

func IsValidStorageSourceType(sourceType StorageSourceType) bool

func IsValidVerifier

func IsValidVerifier(verifierType Verifier) bool

func JSONMarshalIndentWithMax

func JSONMarshalIndentWithMax[T any](t T, indentSpaces int) ([]byte, error)

func JSONMarshalWithMax

func JSONMarshalWithMax[T any](t T) ([]byte, error)

func JSONUnmarshalWithMax

func JSONUnmarshalWithMax[T any](b []byte, t *T) error

func PublisherNames

func PublisherNames() []string

func Reinterpret

func Reinterpret[T any](node datamodel.Node, schema *Schema) (*T, error)

Reinterpret re-parses the datamodel.Node as an object of the defined type.

func StorageSourceNames

func StorageSourceNames() []string

func UnmarshalIPLD

func UnmarshalIPLD[T any](b []byte, decoder codec.Decoder, schema *Schema) (*T, error)

UnmarshalIPLD parses the given bytes as a Go object using the passed decoder. Returns an error if the object cannot be parsed.

func VerifierNames

func VerifierNames() []string

func YAMLMarshalWithMax

func YAMLMarshalWithMax[T any](t T) ([]byte, error)

func YAMLUnmarshalWithMax

func YAMLUnmarshalWithMax[T any](b []byte, t *T) error

Types

type APIVersion

type APIVersion int
const (
	V1alpha1 APIVersion
	V1beta1
)

func APIVersionLatest

func APIVersionLatest() APIVersion

func ParseAPIVersion

func ParseAPIVersion(str string) (APIVersion, error)

func (APIVersion) String

func (i APIVersion) String() string

type BacalhauConfig

type BacalhauConfig struct {
	Publisher   Publisher
	Verifier    Verifier
	Timeout     time.Duration
	Resources   ResourceSpec
	Annotations []string
	Dnt         bool
}

type BuildVersionInfo

type BuildVersionInfo struct {
	Major      string    `json:"major,omitempty" example:"0"`
	Minor      string    `json:"minor,omitempty" example:"3"`
	GitVersion string    `json:"gitversion" example:"v0.3.12"`
	GitCommit  string    `json:"gitcommit" example:"d612b63108f2b5ce1ab2b9e02444eb1dac1d922d"`
	BuildDate  time.Time `json:"builddate" example:"2022-11-16T14:03:31Z"`
	GOOS       string    `json:"goos" example:"linux"`
	GOARCH     string    `json:"goarch" example:"amd64"`
}

BuildVersionInfo is the version of a Bacalhau binary (either client or server)

type ComputeNodeInfo

type ComputeNodeInfo struct {
	ExecutionEngines   []Engine          `json:"ExecutionEngines"`
	MaxCapacity        ResourceUsageData `json:"MaxCapacity"`
	AvailableCapacity  ResourceUsageData `json:"AvailableCapacity"`
	MaxJobRequirements ResourceUsageData `json:"MaxJobRequirements"`
	RunningExecutions  int               `json:"RunningExecutions"`
	EnqueuedExecutions int               `json:"EnqueuedExecutions"`
}

type ComputeNodeInfoProvider

type ComputeNodeInfoProvider interface {
	GetComputeInfo(ctx context.Context) ComputeNodeInfo
}

type Deal

type Deal struct {
	// The maximum number of concurrent compute node bids that will be
	// accepted by the requester node on behalf of the client.
	Concurrency int `json:"Concurrency,omitempty"`
	// The number of nodes that must agree on a verification result
	// this is used by the different verifiers - for example the
	// deterministic verifier requires the winning group size
	// to be at least this size
	Confidence int `json:"Confidence,omitempty"`
	// The minimum number of bids that must be received before the Requester
	// node will randomly accept concurrency-many of them. This allows the
	// Requester node to get some level of guarantee that the execution of the
	// jobs will be spread evenly across the network (assuming that this value
	// is some large proportion of the size of the network).
	MinBids int `json:"MinBids,omitempty"`
}

The deal the client has made with the bacalhau network. This is updateable by the client who submitted the job

type DebugInfo

type DebugInfo struct {
	Component string      `json:"component"`
	Info      interface{} `json:"info"`
}

type DebugInfoProvider

type DebugInfoProvider interface {
	GetDebugInfo() (DebugInfo, error)
}

type DockerInputs

type DockerInputs struct {
	Entrypoint []string
	Workdir    string
	Mounts     IPLDMap[string, Resource]
	Outputs    IPLDMap[string, datamodel.Node]
	Env        IPLDMap[string, string]
}

func (DockerInputs) UnmarshalInto

func (docker DockerInputs) UnmarshalInto(with string, spec *Spec) error

type DownloaderSettings

type DownloaderSettings struct {
	Timeout        time.Duration
	OutputDir      string
	IPFSSwarmAddrs string
}

type Engine

type Engine int
const (
	EngineNoop Engine
	EngineDocker
	EngineWasm
	EngineLanguage   // wraps python_wasm
	EnginePythonWasm // wraps docker

)

func EngineTypes

func EngineTypes() []Engine

func ParseEngine

func ParseEngine(str string) (Engine, error)

func (Engine) MarshalText

func (e Engine) MarshalText() ([]byte, error)

func (Engine) String

func (i Engine) String() string

func (*Engine) UnmarshalText

func (e *Engine) UnmarshalText(text []byte) (err error)

type ExcludedTag

type ExcludedTag string

We use these types to make it harder to accidentally mix up passing the wrong annotations to the wrong argument, e.g. avoid Excluded = []string{"included"}

type ExecutionID

type ExecutionID struct {
	JobID       string `json:"JobID,omitempty"`
	ShardIndex  int    `json:"ShardIndex,omitempty"`
	NodeID      string `json:"NodeID,omitempty"`
	ExecutionID string `json:"ExecutionID,omitempty"`
}

ExecutionID a globally unique identifier for an execution

func (ExecutionID) ShardID

func (e ExecutionID) ShardID() ShardID

ShardID returns the shard ID for this execution id

func (ExecutionID) String

func (e ExecutionID) String() string

String returns a string representation of the execution id

type ExecutionState

type ExecutionState struct {
	// JobID the job id
	JobID string `json:"JobID"`
	// what shard is this we are running
	ShardIndex int `json:"ShardIndex"`
	// which node is running this execution
	NodeID string `json:"NodeId"`
	// Compute node reference for this shard execution
	ComputeReference string `json:"ComputeReference"`
	// State is the current state of the execution
	State ExecutionStateType `json:"State"`
	// an arbitrary status message
	Status string `json:"Status,omitempty"`
	// the proposed results for this execution
	// this will be resolved by the verifier somehow
	VerificationProposal []byte             `json:"VerificationProposal,omitempty"`
	VerificationResult   VerificationResult `json:"VerificationResult,omitempty"`
	PublishedResult      StorageSpec        `json:"PublishedResults,omitempty"`

	// RunOutput of the job
	RunOutput *RunCommandResult `json:"RunOutput,omitempty"`
	// Version is the version of the job state. It is incremented every time the job state is updated.
	Version int `json:"Version"`
	// CreateTime is the time when the job was created.
	CreateTime time.Time `json:"CreateTime"`
	// UpdateTime is the time when the job state was last updated.
	UpdateTime time.Time `json:"UpdateTime"`
}

func (ExecutionState) HasAcceptedAskForBid

func (e ExecutionState) HasAcceptedAskForBid() bool

HasAcceptedAskForBid returns true if the execution has been accepted by the node we rely on the value of the ExecutionID to determine if the askForBid has been accepted

func (ExecutionState) ID

func (e ExecutionState) ID() ExecutionID

ID returns the ID for this execution

func (ExecutionState) ShardID

func (e ExecutionState) ShardID() ShardID

ShardID returns the shard ID for this execution

func (ExecutionState) String

func (e ExecutionState) String() string

String returns a string representation of the execution

type ExecutionStateType

type ExecutionStateType int

ExecutionStateType The state of an execution. An execution represents a single attempt to execute a shard on a node. A compute node can have multiple executions for the same shard due to retries, but there can only be a single active execution per node at any given time.

const (
	ExecutionStateNew ExecutionStateType = iota
	// ExecutionStateAskForBid A node has been selected to execute a shard, and is being asked to bid on the shard.
	ExecutionStateAskForBid
	// ExecutionStateAskForBidAccepted compute node has rejected the ask for bid.
	ExecutionStateAskForBidAccepted
	// ExecutionStateAskForBidRejected compute node has rejected the ask for bid.
	ExecutionStateAskForBidRejected
	// ExecutionStateBidAccepted requester has accepted the bid, and the execution is expected to be running on the compute node.
	ExecutionStateBidAccepted // aka running
	// ExecutionStateBidRejected requester has rejected the bid.
	ExecutionStateBidRejected
	// ExecutionStateResultProposed The execution is done, and is waiting for verification.
	ExecutionStateResultProposed
	// ExecutionStateResultAccepted The execution result has been accepted by the requester, and publishing of the result is in progress.
	ExecutionStateResultAccepted // aka publishing
	// ExecutionStateResultRejected The execution result has been rejected by the requester.
	ExecutionStateResultRejected
	// ExecutionStateCompleted The execution has been completed, and the result has been published.
	ExecutionStateCompleted
	// ExecutionStateFailed The execution has failed.
	ExecutionStateFailed
	// ExecutionStateCanceled The execution has been canceled by the user
	ExecutionStateCanceled
)

func ExecutionStateTypes

func ExecutionStateTypes() []ExecutionStateType

func (ExecutionStateType) IsActive

func (s ExecutionStateType) IsActive() bool

IsActive returns true if the execution is running or has completed

func (ExecutionStateType) IsDiscarded

func (s ExecutionStateType) IsDiscarded() bool

IsDiscarded returns true if the execution has been discarded due to a failure, rejection or cancellation

func (ExecutionStateType) IsTerminal

func (s ExecutionStateType) IsTerminal() bool

IsTerminal returns true if the execution is in a terminal state where no further state changes are possible

func (ExecutionStateType) MarshalText

func (s ExecutionStateType) MarshalText() ([]byte, error)

func (ExecutionStateType) String

func (i ExecutionStateType) String() string

func (*ExecutionStateType) UnmarshalText

func (s *ExecutionStateType) UnmarshalText(text []byte) (err error)

type HTTPResource

type HTTPResource string

type IPFSResource

type IPFSResource string

type IPLDMap

type IPLDMap[K comparable, V any] struct {
	Keys   []K
	Values map[K]V
}

IPLD Maps are parsed by the ipld library into structures of this type rather than just plain Go maps.

type IncludedTag

type IncludedTag string

We use these types to make it harder to accidentally mix up passing the wrong annotations to the wrong argument, e.g. avoid Excluded = []string{"included"}

type Job

type Job struct {
	APIVersion string `json:"APIVersion" example:"V1beta1"`

	Metadata Metadata `json:"Metadata,omitempty"`

	// The specification of this job.
	Spec Spec `json:"Spec,omitempty"`
}

Job contains data about a job request in the bacalhau network.

func NewJob

func NewJob() *Job

TODO: There's probably a better way we want to globally version APIs

func NewJobWithSaneProductionDefaults

func NewJobWithSaneProductionDefaults() (*Job, error)

type JobCancelPayload

type JobCancelPayload struct {
	// the id of the client that is submitting the job
	ClientID string `json:"ClientID,omitempty" validate:"required"`

	// the job id of the job to be canceled
	JobID string `json:"JobID,omitempty" validate:"required"`

	// The reason that the job is being canceled
	Reason string `json:"Reason,omitempty"`
}

type JobCreatePayload

type JobCreatePayload struct {
	// the id of the client that is submitting the job
	ClientID string `json:"ClientID,omitempty" validate:"required"`

	APIVersion string `json:"APIVersion,omitempty" example:"V1beta1" validate:"required"`

	// The specification of this job.
	Spec *Spec `json:"Spec,omitempty" validate:"required"`
}

type JobEvent

type JobEvent struct {
	// APIVersion of the Job
	APIVersion string `json:"APIVersion,omitempty" example:"V1beta1"`

	JobID string `json:"JobID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"`
	// what shard is this event for
	ShardIndex int `json:"ShardIndex,omitempty"`
	// compute execution identifier
	ExecutionID string `json:"ExecutionID,omitempty" example:"9304c616-291f-41ad-b862-54e133c0149e"`
	// optional clientID if this is an externally triggered event (like create job)
	ClientID string `json:"ClientID,omitempty" example:"ac13188e93c97a9c2e7cf8e86c7313156a73436036f30da1ececc2ce79f9ea51"`
	// the node that emitted this event
	SourceNodeID string `json:"SourceNodeID,omitempty" example:"QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF"`
	// the node that this event is for
	// e.g. "AcceptJobBid" was emitted by Requester but it targeting compute node
	TargetNodeID string       `json:"TargetNodeID,omitempty" example:"QmdZQ7ZbhnvWY1J12XYKGHApJ6aufKyLNSvf8jZBrBaAVL"`
	EventName    JobEventType `json:"EventName,omitempty"`
	// this is only defined in "create" events
	Spec Spec `json:"Spec,omitempty"`
	// this is only defined in "create" events
	JobExecutionPlan JobExecutionPlan `json:"JobExecutionPlan,omitempty"`
	// this is only defined in "update_deal" events
	Deal                 Deal               `json:"Deal,omitempty"`
	Status               string             `json:"Status,omitempty" example:"Got results proposal of length: 0"`
	VerificationProposal []byte             `json:"VerificationProposal,omitempty"`
	VerificationResult   VerificationResult `json:"VerificationResult,omitempty"`
	PublishedResult      StorageSpec        `json:"PublishedResult,omitempty"`

	EventTime       time.Time `json:"EventTime,omitempty" example:"2022-11-17T13:32:55.756658941Z"`
	SenderPublicKey PublicKey `json:"SenderPublicKey,omitempty"`

	// RunOutput of the job
	RunOutput *RunCommandResult `json:"RunOutput,omitempty"`
}

we emit these to other nodes so they update their state locally and can emit events locally

type JobEventType

type JobEventType int
const (

	// Job has been created by client and is communicating with requestor node
	JobEventInitialSubmission JobEventType

	// Job has been created on the requestor node
	JobEventCreated

	// the concurrency or other mutable properties of the job were
	// changed by the client
	JobEventDealUpdated

	// a compute node bid on a job
	JobEventBid

	// a requester node accepted for rejected a job bid
	JobEventBidAccepted
	JobEventBidRejected

	// a compute node canceled a job bid
	JobEventBidCancelled

	// a compute node progressed with running a job
	// this is called periodically for running jobs
	// to give the client confidence the job is still running
	// this is like a heartbeat for running jobs
	JobEventRunning

	// a compute node had an error running a job
	JobEventComputeError

	// a compute node completed running a job
	JobEventResultsProposed

	// a Requester node accepted the results from a node for a job
	JobEventResultsAccepted

	// a Requester node rejected the results from a node for a job
	JobEventResultsRejected

	// once the results have been accepted or rejected
	// the compute node will publish them and issue this event
	JobEventResultsPublished

	// a requester node declared an error running a job
	JobEventError

	// a user canceled a job
	JobEventCanceled

	// the requester node gives a compute node permission
	// to forget about the job and free any resources it might
	// currently be reserving - this can happen if a compute node
	// bids when a job has completed - if the compute node does
	// not hear back it will be stuck in reserving the resources for the job
	JobEventInvalidRequest
)

func JobEventTypes

func JobEventTypes() []JobEventType

func ParseJobEventType

func ParseJobEventType(str string) (JobEventType, error)

func (JobEventType) IsIgnorable

func (je JobEventType) IsIgnorable() bool

IsIgnorable returns true if given event type signals that a node can safely ignore the rest of the job's lifecycle. This is the case for events caused by a node's bid being rejected.

func (JobEventType) IsTerminal

func (je JobEventType) IsTerminal() bool

IsTerminal returns true if the given event type signals the end of the lifecycle of a job. After this, all nodes can safely ignore the job.

func (JobEventType) MarshalText

func (je JobEventType) MarshalText() ([]byte, error)

func (JobEventType) String

func (i JobEventType) String() string

func (*JobEventType) UnmarshalText

func (je *JobEventType) UnmarshalText(text []byte) (err error)

type JobExecutionPlan

type JobExecutionPlan struct {
	// how many shards are there in total for this job
	// we are expecting this number x concurrency total
	// ShardState objects for this job
	TotalShards int `json:"ShardsTotal,omitempty"`
}

type JobHistory

type JobHistory struct {
	Type             JobHistoryType     `json:"Type"`
	JobID            string             `json:"JobID"`
	ShardIndex       int                `json:"ShardIndex,omitempty"`
	NodeID           string             `json:"NodeID,omitempty"`
	ComputeReference string             `json:"ComputeReference,omitempty"`
	PreviousState    string             `json:"PreviousState"`
	NewState         string             `json:"NewState"`
	NewStateType     ExecutionStateType `json:"NewStateType,omitempty"` // only present for execution level events
	NewVersion       int                `json:"NewVersion"`
	Comment          string             `json:"Comment,omitempty"`
	Time             time.Time          `json:"Time"`
}

JobHistory represents a single event in the history of a job. An event can be at the job level, shard level, or execution (node) level.

type JobHistoryType

type JobHistoryType int
const (
	JobHistoryTypeJobLevel JobHistoryType
	JobHistoryTypeShardLevel
	JobHistoryTypeExecutionLevel
)

func (JobHistoryType) MarshalText

func (s JobHistoryType) MarshalText() ([]byte, error)

func (JobHistoryType) String

func (i JobHistoryType) String() string

func (*JobHistoryType) UnmarshalText

func (s *JobHistoryType) UnmarshalText(text []byte) (err error)

type JobLocalEventType

type JobLocalEventType int
const (

	// compute node
	// this means "we have selected this job"
	// used to avoid calling external selection hooks
	// where capacity manager says we can't quite run
	// the job yet but we will want to bid when there
	// is space
	JobLocalEventSelected JobLocalEventType
	// compute node
	// this means "we have bid" on a job where "we"
	// is the compute node
	JobLocalEventBid
	// requester node
	// used to avoid race conditions with the requester
	// node knowing which bids it's already responded to
	JobLocalEventBidAccepted
	JobLocalEventBidRejected

	// requester node
	// flag a job as having already had it's verification done
	JobLocalEventVerified
)

func JobLocalEventTypes

func JobLocalEventTypes() []JobLocalEventType

func ParseJobLocalEventType

func ParseJobLocalEventType(str string) (JobLocalEventType, error)

func (JobLocalEventType) MarshalText

func (jle JobLocalEventType) MarshalText() ([]byte, error)

func (JobLocalEventType) String

func (i JobLocalEventType) String() string

func (*JobLocalEventType) UnmarshalText

func (jle *JobLocalEventType) UnmarshalText(text []byte) (err error)

type JobRequester

type JobRequester struct {
	// The ID of the requester node that owns this job.
	RequesterNodeID string `json:"RequesterNodeID,omitempty" example:"QmXaXu9N5GNetatsvwnTfQqNtSeKAD6uCmarbh3LMRYAcF"`

	// The public key of the Requester node that created this job
	// This can be used to encrypt messages back to the creator
	RequesterPublicKey PublicKey `json:"RequesterPublicKey,omitempty"`
}

type JobSelectionDataLocality

type JobSelectionDataLocality int64

Job selection policy configuration

const (
	Local    JobSelectionDataLocality = 0
	Anywhere JobSelectionDataLocality = 1
)

type JobSelectionPolicy

type JobSelectionPolicy struct {
	// this describes if we should run a job based on
	// where the data is located - i.e. if the data is "local"
	// or if the data is "anywhere"
	Locality JobSelectionDataLocality `json:"locality"`
	// should we reject jobs that don't specify any data
	// the default is "accept"
	RejectStatelessJobs bool `json:"reject_stateless_jobs"`
	// should we accept jobs that specify networking
	// the default is "reject"
	AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
	// external hooks that decide if we should take on the job or not
	// if either of these are given they will override the data locality settings
	ProbeHTTP string `json:"probe_http,omitempty"`
	ProbeExec string `json:"probe_exec,omitempty"`
}

describe the rules for how a compute node selects an incoming job

func NewDefaultJobSelectionPolicy

func NewDefaultJobSelectionPolicy() JobSelectionPolicy

generate a default empty job selection policy

type JobShard

type JobShard struct {
	Job   *Job `json:"Job,omitempty"`
	Index int  `json:"Index,omitempty"`
}

JobShard contains data about a job shard in the bacalhau network.

func (JobShard) ID

func (shard JobShard) ID() string

func (JobShard) ShardID

func (shard JobShard) ShardID() ShardID

func (JobShard) String

func (shard JobShard) String() string

type JobShardingConfig

type JobShardingConfig struct {
	// divide the inputs up into the smallest possible unit
	// for example /* would mean "all top level files or folders"
	// this being an empty string means "no sharding"
	GlobPattern string `json:"GlobPattern,omitempty"`
	// how many "items" are to be processed in each shard
	// we first apply the glob pattern which will result in a flat list of items
	// this number decides how to group that flat list into actual shards run by compute nodes
	BatchSize int `json:"BatchSize,omitempty"`
	// when using multiple input volumes
	// what path do we treat as the common mount path to apply the glob pattern to
	BasePath string `json:"GlobPatternBasePath,omitempty"`
}

describe how we chunk a job up into shards

type JobSpecDocker

type JobSpecDocker struct {
	// this should be pullable by docker
	Image string `json:"Image,omitempty"`
	// optionally override the default entrypoint
	Entrypoint []string `json:"Entrypoint,omitempty"`
	// a map of env to run the container with
	EnvironmentVariables []string `json:"EnvironmentVariables,omitempty"`
	// working directory inside the container
	WorkingDirectory string `json:"WorkingDirectory,omitempty"`
}

for VM style executors

type JobSpecLanguage

type JobSpecLanguage struct {
	Language        string `json:"Language,omitempty"`        // e.g. python
	LanguageVersion string `json:"LanguageVersion,omitempty"` // e.g. 3.8
	// must this job be run in a deterministic context?
	Deterministic bool `json:"DeterministicExecution,omitempty"`
	// context is a tar file stored in ipfs, containing e.g. source code and requirements
	Context StorageSpec `json:"JobContext,omitempty"`
	// optional program specified on commandline, like python -c "print(1+1)"
	Command string `json:"Command,omitempty"`
	// optional program path relative to the context dir. one of Command or ProgramPath must be specified
	ProgramPath string `json:"ProgramPath,omitempty"`
	// optional requirements.txt (or equivalent) path relative to the context dir
	RequirementsPath string `json:"RequirementsPath,omitempty"`
}

for language style executors (can target docker or wasm)

type JobSpecWasm

type JobSpecWasm struct {
	// The module that contains the WASM code to start running.
	EntryModule StorageSpec `json:"EntryModule,omitempty"`

	// The name of the function in the EntryModule to call to run the job. For
	// WASI jobs, this will always be `_start`, but jobs can choose to call
	// other WASM functions instead. The EntryPoint must be a zero-parameter
	// zero-result function.
	EntryPoint string `json:"EntryPoint,omitempty"`

	// The arguments supplied to the program (i.e. as ARGV).
	Parameters []string `json:"Parameters,omitempty"`

	// The variables available in the environment of the running program.
	EnvironmentVariables map[string]string `json:"EnvironmentVariables,omitempty"`

	// TODO #880: Other WASM modules whose exports will be available as imports
	// to the EntryModule.
	ImportModules []StorageSpec `json:"ImportModules,omitempty"`
}

Describes a raw WASM job

type JobState

type JobState struct {
	// JobID is the unique identifier for the job
	JobID string `json:"JobID"`
	// Shards is a map of shard index to shard state.
	// The number of shards are fixed at the time of job creation.
	Shards map[int]ShardState `json:"Shards"`
	// State is the current state of the job
	State JobStateType `json:"State"`
	// Version is the version of the job state. It is incremented every time the job state is updated.
	Version int `json:"Version"`
	// CreateTime is the time when the job was created.
	CreateTime time.Time `json:"CreateTime"`
	// UpdateTime is the time when the job state was last updated.
	UpdateTime time.Time `json:"UpdateTime"`
	// TimeoutAt is the time when the job will be timed out if it is not completed.
	TimeoutAt time.Time `json:"TimeoutAt,omitempty"`
}

JobState The state of a job across the whole network that represents an aggregate view across the shards and nodes.

type JobStateType

type JobStateType int

JobStateType The state of a job across the whole network that represents an aggregate view across the shards and nodes.

const (
	JobStateNew JobStateType = iota // must be first

	JobStateInProgress

	// Job is canceled by the user.
	JobStateCancelled

	// All shards have failed
	JobStateError

	// Some shards have failed, while others have completed successfully
	JobStatePartialError

	// All shards have completed successfully
	JobStateCompleted
)

these are the states a job can be in against a single node

func (JobStateType) IsTerminal

func (s JobStateType) IsTerminal() bool

IsTerminal returns true if the given job type signals the end of the lifecycle of that job and that no change in the state can be expected.

func (JobStateType) MarshalText

func (s JobStateType) MarshalText() ([]byte, error)

func (JobStateType) String

func (i JobStateType) String() string

func (*JobStateType) UnmarshalText

func (s *JobStateType) UnmarshalText(text []byte) (err error)

type JobType

type JobType interface {
	UnmarshalInto(with string, spec *Spec) error
}

type JobWithInfo

type JobWithInfo struct {
	// Job info
	Job Job `json:"Job"`
	// The current state of the job
	State JobState `json:"State"`
	// History of changes to the job state. Not always populated in the job description
	History []JobHistory `json:"History,omitempty"`
}

JobWithInfo is the job request + the result of attempting to run it on the network

type KeyInt

type KeyInt int

type KeyString

type KeyString string

type LabelSelectorRequirement

type LabelSelectorRequirement struct {
	// key is the label key that the selector applies to.
	Key string `json:"Key"`
	// operator represents a key's relationship to a set of values.
	// Valid operators are In, NotIn, Exists and DoesNotExist.
	Operator selection.Operator `json:"Operator"`
	// values is an array of string values. If the operator is In or NotIn,
	// the values array must be non-empty. If the operator is Exists or DoesNotExist,
	// the values array must be empty. This array is replaced during a strategic
	Values []string `json:"Values,omitempty"`
}

LabelSelectorRequirement A selector that contains values, a key, and an operator that relates the key and values. These are based on labels library from kubernetes package. While we use labels.Requirement to represent the label selector requirements in the command line arguments as the library supports multiple parsing formats, and we also use it when matching selectors to labels as that's what the library expects, labels.Requirements are not serializable, so we need to convert them to LabelSelectorRequirements.

func ToLabelSelectorRequirements

func ToLabelSelectorRequirements(requirements ...labels.Requirement) []LabelSelectorRequirement

type MappedProvider

type MappedProvider[Key ProviderKey, Value Providable] struct {
	// contains filtered or unexported fields
}

A MappedProvider is a Provider that stores the providables in a simple map, and caches permanently the results of checking installation status

func NewMappedProvider

func NewMappedProvider[Key ProviderKey, Value Providable](providables map[Key]Value) *MappedProvider[Key, Value]

func (*MappedProvider[Key, Value]) Add

func (provider *MappedProvider[Key, Value]) Add(key Key, value Value)

func (*MappedProvider[Key, Value]) Get

func (provider *MappedProvider[Key, Value]) Get(ctx context.Context, key Key) (v Value, err error)

Get implements Provider

func (*MappedProvider[Key, Value]) Has

func (provider *MappedProvider[Key, Value]) Has(ctx context.Context, key Key) bool

Has implements Provider

type Metadata

type Metadata struct {
	// The unique global ID of this job in the bacalhau network.
	ID string `json:"ID,omitempty" example:"92d5d4ee-3765-4f78-8353-623f5f26df08"`

	// Time the job was submitted to the bacalhau network.
	CreatedAt time.Time `json:"CreatedAt,omitempty" example:"2022-11-17T13:29:01.871140291Z"`

	// The ID of the client that created this job.
	ClientID string `json:"ClientID,omitempty" example:"ac13188e93c97a9c2e7cf8e86c7313156a73436036f30da1ececc2ce79f9ea51"`

	Requester JobRequester `json:"Requester,omitempty"`
}

type Millicores

type Millicores int

A Millicore represents a thousandth of a CPU core, which is a unit of measure used by Kubernetes. See also https://github.com/BTBurke/k8sresource.

const (
	Millicore Millicores = 1
	Core      Millicores = 1000
)

func (Millicores) String

func (m Millicores) String() string

String returns a string representation of this Millicore, which is either an integer if this Millicore represents a whole number of cores or the number of Millicores suffixed with "m".

type Network

type Network int
const (
	// NetworkNone specifies that the job does not require networking.
	NetworkNone Network = iota

	// NetworkFull specifies that the job requires unfiltered raw IP networking.
	NetworkFull

	// NetworkHTTP specifies that the job requires HTTP networking to certain domains.
	//
	// The model is: the job specifier submits a job with the domain(s) it will
	// need to communicate with, the compute provider uses this to make some
	// decision about the risk of the job and bids accordingly, and then at run
	// time the traffic is limited to only the domain(s) specified.
	//
	// As a command, something like:
	//
	//  bacalhau docker run —network=http —domain=crates.io —domain=github.com -v Qmy1234myd4t4:/code rust/compile
	//
	// The “risk” for the compute provider is that the job does something that
	// violates its terms, the terms of its hosting provider or ISP, or even the
	// law in its jurisdiction (e.g. accessing and spreading illegal content,
	// performing cyberattacks). So the same sort of risk as operating a Tor
	// exit node.
	//
	// The risk for the job specifier is that we are operating in an environment
	// they are paying for, so there is an incentive to hijack that environment
	// (e.g. via a compromised package download that runs a crypto miner on
	// install, and uses up all the paid-for job time). Having the traffic
	// enforced to only domains specified makes those sorts of attacks much
	// trickier and less valuable.
	//
	// The compute provider might well enforce its limits by other means, but
	// having the domains specified up front allows it to skip bidding on jobs
	// it knows will fail in its executor. So this is hopefully a better UX for
	// job specifiers who can have their job picked up only by someone who will
	// run it successfully.
	NetworkHTTP
)

func ParseNetwork

func ParseNetwork(s string) (Network, error)

func (Network) MarshalText

func (n Network) MarshalText() ([]byte, error)

func (Network) String

func (i Network) String() string

func (*Network) UnmarshalText

func (n *Network) UnmarshalText(text []byte) (err error)

type NetworkConfig

type NetworkConfig struct {
	Type    Network  `json:"Type"`
	Domains []string `json:"Domains,omitempty"`
}

func (NetworkConfig) Disabled

func (n NetworkConfig) Disabled() bool

Disabled returns whether network connections should be completely disabled according to this config.

func (NetworkConfig) DomainSet

func (n NetworkConfig) DomainSet() []string

DomainSet returns the "unique set" of domains from the network config. Domains listed multiple times and any subdomain that is also matched by a wildcard is removed.

This is something of an implementation detail – it matches the behavior expected by our Docker HTTP gateway, which complains and/or fails to start if these requirements are not met.

func (NetworkConfig) IsValid

func (n NetworkConfig) IsValid() (err error)

IsValid returns an error if any of the fields do not pass validation, or nil otherwise.

type NodeInfo

type NodeInfo struct {
	BacalhauVersion BuildVersionInfo  `json:"BacalhauVersion"`
	PeerInfo        peer.AddrInfo     `json:"PeerInfo"`
	NodeType        NodeType          `json:"NodeType"`
	Labels          map[string]string `json:"Labels"`
	ComputeNodeInfo ComputeNodeInfo   `json:"ComputeNodeInfo"`
}

func (NodeInfo) IsComputeNode

func (n NodeInfo) IsComputeNode() bool

IsComputeNode returns true if the node is a compute node

type NodeInfoProvider

type NodeInfoProvider interface {
	GetNodeInfo(ctx context.Context) NodeInfo
}

type NodeType

type NodeType int
const (
	NodeTypeCompute NodeType
)

func (NodeType) String

func (i NodeType) String() string

type NoopProvider

type NoopProvider[Key ProviderKey, Value Providable] struct {
	// contains filtered or unexported fields
}

A NoopProvider is a provider that always returns a singleton providable regardless of requested type

func (*NoopProvider[Key, Value]) Get

func (p *NoopProvider[Key, Value]) Get(context.Context, Key) (Value, error)

Get implements Provider

func (*NoopProvider[Key, Value]) Has

func (p *NoopProvider[Key, Value]) Has(context.Context, Key) bool

Has implements Provider

type NoopTask

type NoopTask struct{}

func (NoopTask) UnmarshalInto

func (n NoopTask) UnmarshalInto(with string, spec *Spec) error

type Providable

type Providable interface {
	IsInstalled(context.Context) (bool, error)
}

A Providable is something that a Provider can check for installation status

type Provider

type Provider[Key ProviderKey, Value Providable] interface {
	Get(context.Context, Key) (Value, error)
	Has(context.Context, Key) bool
}

A Provider is an object which is configured to provide certain objects and will check their installation status before providing them

func NewNoopProvider

func NewNoopProvider[Key ProviderKey, Value Providable](noopProvidable Value) Provider[Key, Value]

type ProviderKey

type ProviderKey interface {
	fmt.Stringer
	comparable
}

A ProviderKey will usually be some lookup value implemented as an enum member

type PublicKey

type PublicKey []byte

func (PublicKey) MarshalText

func (pk PublicKey) MarshalText() ([]byte, error)

func (*PublicKey) UnmarshalText

func (pk *PublicKey) UnmarshalText(text []byte) error

type PublishedResult

type PublishedResult struct {
	NodeID     string      `json:"NodeID,omitempty"`
	ShardIndex int         `json:"ShardIndex,omitempty"`
	Data       StorageSpec `json:"Data,omitempty"`
}

PublishedStorageSpec is a wrapper for a StorageSpec that has been published by a compute provider - it keeps info about the host, job and shard that lead to the given storage spec being published

type Publisher

type Publisher int
const (
	PublisherNoop Publisher
	PublisherIpfs
	PublisherFilecoin
	PublisherEstuary
)

func ParsePublisher

func ParsePublisher(str string) (Publisher, error)

func PublisherTypes

func PublisherTypes() []Publisher

func (Publisher) MarshalText

func (p Publisher) MarshalText() ([]byte, error)

func (Publisher) String

func (i Publisher) String() string

func (*Publisher) UnmarshalText

func (p *Publisher) UnmarshalText(text []byte) (err error)

type Resource

type Resource struct {
	IPFS *IPFSResource
	HTTP *HTTPResource
}

type ResourceSpec

type ResourceSpec struct {
	Cpu    Millicores //nolint:stylecheck // name required by IPLD
	Disk   datasize.ByteSize
	Memory datasize.ByteSize
	Gpu    int
}

type ResourceUsageConfig

type ResourceUsageConfig struct {
	// https://github.com/BTBurke/k8sresource string
	CPU string `json:"CPU,omitempty"`
	// github.com/c2h5oh/datasize string
	Memory string `json:"Memory,omitempty"`

	Disk string `json:"Disk,omitempty"`
	GPU  string `json:"GPU"` // unsigned integer string

}

type ResourceUsageData

type ResourceUsageData struct {
	// cpu units
	CPU float64 `json:"CPU,omitempty" example:"9.600000000000001"`
	// bytes
	Memory uint64 `json:"Memory,omitempty" example:"27487790694"`
	// bytes
	Disk uint64 `json:"Disk,omitempty" example:"212663867801"`
	GPU  uint64 `json:"GPU,omitempty" example:"1"` //nolint:lll // Support whole GPUs only, like https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/
}

these are the numeric values in bytes for ResourceUsageConfig

func (ResourceUsageData) Add

func (ResourceUsageData) Intersect

func (ResourceUsageData) IsZero

func (r ResourceUsageData) IsZero() bool

func (ResourceUsageData) LessThan

func (r ResourceUsageData) LessThan(other ResourceUsageData) bool

func (ResourceUsageData) LessThanEq

func (r ResourceUsageData) LessThanEq(other ResourceUsageData) bool

func (ResourceUsageData) Max

func (ResourceUsageData) Multi

func (ResourceUsageData) String

func (r ResourceUsageData) String() string

return string representation of ResourceUsageData

func (ResourceUsageData) Sub

type ResourceUsageProfile

type ResourceUsageProfile struct {
	// how many resources does the job want to consume
	Job ResourceUsageData `json:"Job,omitempty"`
	// how many resources is the system currently using
	SystemUsing ResourceUsageData `json:"SystemUsing,omitempty"`
	// what is the total amount of resources available to the system
	SystemTotal ResourceUsageData `json:"SystemTotal,omitempty"`
}

type RunCommandResult

type RunCommandResult struct {
	// stdout of the run. Yaml provided for `describe` output
	STDOUT string `json:"stdout"`

	// bool describing if stdout was truncated
	StdoutTruncated bool `json:"stdouttruncated"`

	// stderr of the run.
	STDERR string `json:"stderr"`

	// bool describing if stderr was truncated
	StderrTruncated bool `json:"stderrtruncated"`

	// exit code of the run.
	ExitCode int `json:"exitCode"`

	// Runner error
	ErrorMsg string `json:"runnerError"`
}

func NewRunCommandResult

func NewRunCommandResult() *RunCommandResult

type Schema

type Schema schema.TypeSystem
var (
	// The UCAN Task schema is the standardized Invocation IPLD schema, defined
	// by https://github.com/ucan-wg/invocation.
	UCANTaskSchema *Schema = load(ucanTaskSchemaPath)

	// The Bacalhau schema includes the Bacalhau specific extensions to the UCAN
	// Task IPLD spec, i.e. input structures for specific job types.
	BacalhauTaskSchema *Schema = load(bacalhauTaskSchemaPath)
)

func (*Schema) GetSchemaType

func (s *Schema) GetSchemaType(obj interface{}) schema.Type

GetSchemaType returns the IPLD type from the schema for the passed Go object. If the type is not in the schema, it returns nil.

func (*Schema) GetSchemaTypeName

func (s *Schema) GetSchemaTypeName(obj interface{}) string

GetSchemaTypeName returns the name of the corresponding IPLD type in the schema for the passed Go object. If the type cannot be in the schema, it returns an empty string. It may return a non-empty string even if the type is not in the schema.

type ShardID

type ShardID struct {
	JobID string `json:"JobID,omitempty"`
	Index int    `json:"Index,omitempty"`
}

ShardID represents a unique identifier for a shard across all jobs.

func (ShardID) ID

func (shard ShardID) ID() string

func (ShardID) String

func (shard ShardID) String() string

type ShardState

type ShardState struct {
	// JobID is the unique identifier for the job
	JobID string `json:"JobID"`
	// ShardIndex is the index of the shard in the job
	ShardIndex int `json:"ShardIndex"`
	// Executions is a list of executions of the shard across the nodes.
	// A new execution is created when a node is selected to execute the shard, and a node can have multiple executions for the same
	// shard due to retries, but there can only be a single active execution per node at any given time.
	Executions []ExecutionState `json:"Executions"`
	// State is the current state of the shard
	State ShardStateType `json:"State"`
	// Version is the version of the shard state. It is incremented every time the shard state is updated.
	Version int `json:"Version"`
	// CreateTime is the time when the shard was created, which is the same as the job creation time.
	CreateTime time.Time `json:"CreateTime"`
	// UpdateTime is the time when the shard state was last updated.
	UpdateTime time.Time `json:"UpdateTime"`
}

ShardState represents the state of a shard in a job that represents an aggregate view across the nodes that are executing the shard.

func (ShardState) ID

func (s ShardState) ID() ShardID

ID returns the shard ID for this execution

type ShardStateType

type ShardStateType int

ShardStateType represents the state of a shard in a job that represents an aggregate view across the nodes that are executing the shard.

const (
	ShardStateNew ShardStateType = iota
	ShardStateInProgress
	// The job/shard is canceled by the user.
	ShardStateCancelled
	// The shard has failed due to an error.
	ShardStateError
	// The shard has been completed successfully.
	ShardStateCompleted
)

func (ShardStateType) IsTerminal

func (s ShardStateType) IsTerminal() bool

IsTerminal returns true if the given shard state type signals the end of the execution of the shard.

func (ShardStateType) MarshalText

func (s ShardStateType) MarshalText() ([]byte, error)

func (ShardStateType) String

func (i ShardStateType) String() string

func (*ShardStateType) UnmarshalText

func (s *ShardStateType) UnmarshalText(text []byte) (err error)

type SimulatorConfig

type SimulatorConfig struct {
	Compute   SimulatorConfigCompute
	Requester SimulatorConfigRequester
}

type SimulatorConfigCompute

type SimulatorConfigCompute struct {
	IsBadActor bool
}

type SimulatorConfigRequester

type SimulatorConfigRequester struct {
	IsBadActor bool
}

type Spec

type Spec struct {
	// e.g. docker or language
	Engine Engine `json:"Engine,omitempty"`

	Verifier Verifier `json:"Verifier,omitempty"`

	// there can be multiple publishers for the job
	Publisher Publisher `json:"Publisher,omitempty"`

	// executor specific data
	Docker   JobSpecDocker   `json:"Docker,omitempty"`
	Language JobSpecLanguage `json:"Language,omitempty"`
	Wasm     JobSpecWasm     `json:"Wasm,omitempty"`

	// the compute (cpu, ram) resources this job requires
	Resources ResourceUsageConfig `json:"Resources,omitempty"`

	// The type of networking access that the job needs
	Network NetworkConfig `json:"Network,omitempty"`

	// How long a job can run in seconds before it is killed.
	// This includes the time required to run, verify and publish results
	Timeout float64 `json:"Timeout,omitempty"`

	// the data volumes we will read in the job
	// for example "read this ipfs cid"
	// TODO: #667 Replace with "Inputs", "Outputs" (note the caps) for yaml/json when we update the n.js file
	Inputs []StorageSpec `json:"inputs,omitempty"`

	// Input volumes that will not be sharded
	// for example to upload code into a base image
	// every shard will get the full range of context volumes
	Contexts []StorageSpec `json:"Contexts,omitempty"`

	// the data volumes we will write in the job
	// for example "write the results to ipfs"
	Outputs []StorageSpec `json:"outputs,omitempty"`

	// Annotations on the job - could be user or machine assigned
	Annotations []string `json:"Annotations,omitempty"`

	// NodeSelectors is a selector which must be true for the compute node to run this job.
	NodeSelectors []LabelSelectorRequirement `json:"NodeSelectors,omitempty"`
	// the sharding config for this job
	// describes how the job might be split up into parallel shards
	Sharding JobShardingConfig `json:"Sharding,omitempty"`

	// Do not track specified by the client
	DoNotTrack bool `json:"DoNotTrack,omitempty"`

	// how will this job be executed by nodes on the network
	ExecutionPlan JobExecutionPlan `json:"ExecutionPlan,omitempty"`

	// The deal the client has made, such as which job bids they have accepted.
	Deal Deal `json:"Deal,omitempty"`
}

Spec is a complete specification of a job that can be run on some execution provider.

func (*Spec) AllStorageSpecs

func (s *Spec) AllStorageSpecs() []*StorageSpec

Return pointers to all the storage specs in the spec.

func (*Spec) GetTimeout

func (s *Spec) GetTimeout() time.Duration

Return timeout duration

type StorageSourceType

type StorageSourceType int

StorageSourceType is somewhere we can get data from e.g. ipfs / S3 are storage sources there can be multiple drivers for the same source e.g. ipfs fuse vs ipfs api copy

const (
	StorageSourceIPFS StorageSourceType
	StorageSourceURLDownload
	StorageSourceFilecoinUnsealed
	StorageSourceFilecoin
	StorageSourceEstuary
	StorageSourceInline
	StorageSourceLocalDirectory
)

func ParseStorageSourceType

func ParseStorageSourceType(str string) (StorageSourceType, error)

func StorageSourceTypes

func StorageSourceTypes() []StorageSourceType

func (StorageSourceType) MarshalText

func (ss StorageSourceType) MarshalText() ([]byte, error)

func (StorageSourceType) String

func (i StorageSourceType) String() string

func (*StorageSourceType) UnmarshalText

func (ss *StorageSourceType) UnmarshalText(text []byte) (err error)

type StorageSpec

type StorageSpec struct {
	// StorageSource is the abstract source of the data. E.g. a storage source
	// might be a URL download, but doesn't specify how the execution engine
	// does the download or what it will do with the downloaded data.
	StorageSource StorageSourceType `json:"StorageSource,omitempty"`

	// Name of the spec's data, for reference.
	Name string `` //nolint:lll
	/* 132-byte string literal not displayed */

	// The unique ID of the data, where it makes sense (for example, in an
	// IPFS storage spec this will be the data's CID).
	// NOTE: The below is capitalized to match IPFS & IPLD (even though it's out of golang fmt)
	CID string `json:"CID,omitempty" example:"QmTVmC7JBD2ES2qGPqBNVWnX1KeEPNrPGb7rJ8cpFgtefe"`

	// Source URL of the data
	URL string `json:"URL,omitempty"`

	// The path of the host data if we are using local directory paths
	SourcePath string `json:"SourcePath,omitempty"`

	// The path that the spec's data should be mounted on, where it makes
	// sense (for example, in a Docker storage spec this will be a filesystem
	// path).
	// TODO: #668 Replace with "Path" (note the caps) for yaml/json when we update the n.js file
	Path string `json:"path,omitempty"`

	// Additional properties specific to each driver
	Metadata map[string]string `json:"Metadata,omitempty"`
}

StorageSpec represents some data on a storage engine. Storage engines are specific to particular execution engines, as different execution engines will mount data in different ways.

type Task

type Task struct {
	With   string
	Do     TaskType
	Inputs datamodel.Node
	Meta   IPLDMap[string, datamodel.Node]
}

func (*Task) ToSpec

func (task *Task) ToSpec() (*Spec, error)

type TaskType

type TaskType string
const (
	TaskTypeDocker TaskType = "docker/run"
	TaskTypeWasm   TaskType = "wasm32-wasi/run"
	TaskTypeNoop   TaskType = "noop"
)

type TestFatalErrorHandlerContents

type TestFatalErrorHandlerContents struct {
	Message string
	Code    int
}

type VerificationResult

type VerificationResult struct {
	Complete bool `json:"Complete,omitempty"`
	Result   bool `json:"Result,omitempty"`
}

we need to use a struct for the result because: a) otherwise we don't know if VerificationResult==false means "I've not verified yet" or "verification failed" b) we might want to add further fields to the result later

type Verifier

type Verifier int
const (
	VerifierNoop Verifier
	VerifierDeterministic
)

func ParseVerifier

func ParseVerifier(str string) (Verifier, error)

func VerifierTypes

func VerifierTypes() []Verifier

func (Verifier) MarshalText

func (v Verifier) MarshalText() ([]byte, error)

func (Verifier) String

func (i Verifier) String() string

func (*Verifier) UnmarshalText

func (v *Verifier) UnmarshalText(text []byte) (err error)

type WasmInputs

type WasmInputs struct {
	Entrypoint string
	Parameters []string
	Modules    []Resource
	Mounts     IPLDMap[string, Resource] // Resource
	Outputs    IPLDMap[string, datamodel.Node]
	Env        IPLDMap[string, string]
}

func (*WasmInputs) UnmarshalInto

func (wasm *WasmInputs) UnmarshalInto(with string, spec *Spec) error

UnmarshalInto implements taskUnmarshal

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL