datasetworker

package
v0.5.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2024 License: Apache-2.0, MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrDagDisabled = errors.New("dag generation is disabled for this preparation")
View Source
var ErrDagNotReady = errors.New("dag is not ready to be generated")

Functions

This section is empty.

Types

type Config added in v0.3.0

type Config struct {
	Concurrency    int
	ExitOnComplete bool
	EnableScan     bool
	EnablePack     bool
	EnableDag      bool
	ExitOnError    bool
	MinInterval    time.Duration
	MaxInterval    time.Duration
}

type DagGenerator added in v0.4.0

type DagGenerator struct {
	// contains filtered or unexported fields
}

func NewDagGenerator added in v0.4.0

func NewDagGenerator(ctx context.Context, db *gorm.DB, attachmentID model.SourceAttachmentID, root cid.Cid, noInline bool) *DagGenerator

func (*DagGenerator) Close added in v0.4.0

func (d *DagGenerator) Close() error

func (*DagGenerator) Read added in v0.4.0

func (d *DagGenerator) Read(p []byte) (int, error)

Read implements the io.Reader interface for the DagGenerator. It generates a CAR (Content Addressable Archive) representation of directories from a database, which can be read in chunks using the provided byte slice.

Read operation involves several key steps:

  1. It checks if the context has been canceled or if an error has occurred.
  2. If there's an existing buffer, it reads from it.
  3. If reading reaches the end of the current buffer, or if no buffer has been initialized, the method fetches the next directory from the database and processes it.
  4. The directory data is then converted to CAR format, and the resulting bytes are buffered.
  5. Finally, the buffered data is read into the provided slice.

Parameters:

  • p: A byte slice that will be filled with the generated CAR data.

Returns:

  • The number of bytes read.
  • An error if there's an issue during the operation. If the end of the data is reached, it returns io.EOF.

type StateMonitor added in v0.5.1

type StateMonitor struct {
	// contains filtered or unexported fields
}

func NewStateMonitor added in v0.5.1

func NewStateMonitor(db *gorm.DB) *StateMonitor

func (*StateMonitor) AddJob added in v0.5.1

func (s *StateMonitor) AddJob(jobID model.JobID, cancel context.CancelFunc)

func (*StateMonitor) Done added in v0.5.9

func (s *StateMonitor) Done() <-chan struct{}

func (*StateMonitor) RemoveJob added in v0.5.1

func (s *StateMonitor) RemoveJob(jobID model.JobID)

func (*StateMonitor) Start added in v0.5.1

func (s *StateMonitor) Start(ctx context.Context)

type Thread added in v0.3.0

type Thread struct {
	// contains filtered or unexported fields
}

func (*Thread) ExportDag added in v0.3.0

func (w *Thread) ExportDag(ctx context.Context, job model.Job) error

ExportDag exports a Directed Acyclic Graph (DAG) for a given source. The function takes a source, iterates through the related directories (as rows from the database), and constructs the DAG in the form of a CAR (Content Addressable Archive) file. This CAR file represents the block structure of the data.

The function:

  • Initializes necessary components like writers and calculators
  • Iterates through the directories linked with the source and fetches blocks
  • Writes the blocks into a CAR file
  • Closes the CAR file and renames it appropriately
  • Saves the CAR meta-information into the database

Parameters:

  • ctx context.Context: The context to control cancellations and timeouts.
  • source model.Source: The source for which the DAG needs to be generated.

The function performs several database and file system operations, each of which might result in an error. Errors are wrapped with context information and returned.

Returns:

  • error: Standard error interface, returns nil if no error occurred during execution.

func (*Thread) Name added in v0.3.0

func (w *Thread) Name() string

func (*Thread) Start added in v0.3.0

func (w *Thread) Start(ctx context.Context, exitErr chan<- error) error

Start initializes and starts the execution of a worker thread. This function:

  1. Creates a cancellable context derived from the input context.
  2. Registers the worker with a health check service, providing a state function for reporting its status.
  3. Launches separate goroutines to report health status, clean up old health check records, execute the worker's task, and handle cleanup.
  4. Returns channels that are closed when the health reporting, health check cleanup, worker execution, and worker cleanup are complete.

Parameters:

  • ctx : The parent context for this thread, used to propagate cancellations.

Returns:

  • []service.Done : A slice of channels that are closed when respective components of the worker complete their execution.
  • service.Fail : A channel that receives an error if the worker encounters a failure during its execution.
  • error : An error is returned if the worker fails to register with the health check service. Otherwise, it returns nil.

type Worker added in v0.3.0

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker added in v0.3.0

func NewWorker(db *gorm.DB, config Config) *Worker

func (Worker) Name added in v0.5.0

func (w Worker) Name() string

func (Worker) Run added in v0.3.0

func (w Worker) Run(ctx context.Context) error

Run initializes and starts a set of worker threads based on the Concurrency specified in the configuration. This function:

  1. Creates an array of worker threads, each having a unique identifier.
  2. Initializes each thread with a shared set of dependencies (e.g., database, logger) and individual configuration.
  3. Invokes the StartServers function to run all the threads, passing the initialized threads and a logger.

Parameters:

  • ctx : The context under which all the worker threads are run, used to propagate cancellations.

Returns:

  • error : An error is returned if the StartServers function encounters an issue while starting the threads. Otherwise, it returns nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL