backfill

package
v0.0.0-...-1eb5c16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: MIT Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// StateEnqueued is the state of a backfill job when it is first created
	StateEnqueued = "enqueued"
	// StateInProgress is the state of a backfill job when it is being processed
	StateInProgress = "in_progress"
	// StateComplete is the state of a backfill job when it has been processed
	StateComplete = "complete"
)
View Source
var ErrJobComplete = errors.New("job is complete")

ErrJobComplete is returned when trying to buffer an op for a job that is complete

View Source
var ErrJobNotFound = errors.New("job not found")

ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist

Functions

This section is empty.

Types

type BackfillOptions

type BackfillOptions struct {
	ParallelBackfills     int
	ParallelRecordCreates int
	NSIDFilter            string
	SyncRequestsPerSecond int
	CheckoutPath          string
}

func DefaultBackfillOptions

func DefaultBackfillOptions() *BackfillOptions

type Backfiller

type Backfiller struct {
	Name               string
	HandleCreateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error
	HandleUpdateRecord func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error
	HandleDeleteRecord func(ctx context.Context, repo string, path string) error
	Store              Store

	// Number of backfills to process in parallel
	ParallelBackfills int
	// Number of records to process in parallel for each backfill
	ParallelRecordCreates int
	// Prefix match for records to backfill i.e. app.bsky.feed.app/
	// If empty, all records will be backfilled
	NSIDFilter   string
	CheckoutPath string
	// contains filtered or unexported fields
}

Backfiller is a struct which handles backfilling a repo

func NewBackfiller

func NewBackfiller(
	name string,
	store Store,
	handleCreate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error,
	handleUpdate func(ctx context.Context, repo string, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error,
	handleDelete func(ctx context.Context, repo string, path string) error,
	opts *BackfillOptions,
) *Backfiller

NewBackfiller creates a new Backfiller

func (*Backfiller) BackfillRepo

func (b *Backfiller) BackfillRepo(ctx context.Context, job Job)

BackfillRepo backfills a repo

func (*Backfiller) FlushBuffer

func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int

FlushBuffer processes buffered operations for a job

func (*Backfiller) Start

func (b *Backfiller) Start()

Start starts the backfill processor routine

func (*Backfiller) Stop

func (b *Backfiller) Stop()

Stop stops the backfill processor

type GormDBJob

type GormDBJob struct {
	gorm.Model
	Repo  string `gorm:"unique;index"`
	State string `gorm:"index"`
}

type Gormjob

type Gormjob struct {
	// contains filtered or unexported fields
}

func (*Gormjob) ClearBufferedOps

func (j *Gormjob) ClearBufferedOps(ctx context.Context) error

func (*Gormjob) FlushBufferedOps

func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error

func (*Gormjob) Repo

func (j *Gormjob) Repo() string

func (*Gormjob) SetState

func (j *Gormjob) SetState(ctx context.Context, state string) error

func (*Gormjob) State

func (j *Gormjob) State() string

type Gormstore

type Gormstore struct {
	// contains filtered or unexported fields
}

Gormstore is a gorm-backed implementation of the Backfill Store interface

func NewGormstore

func NewGormstore(db *gorm.DB) *Gormstore

func (*Gormstore) BufferOp

func (s *Gormstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error)

func (*Gormstore) EnqueueJob

func (s *Gormstore) EnqueueJob(repo string) error

func (*Gormstore) GetJob

func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error)

func (*Gormstore) GetNextEnqueuedJob

func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error)

func (*Gormstore) LoadJobs

func (s *Gormstore) LoadJobs(ctx context.Context) error

type Job

type Job interface {
	Repo() string
	State() string
	SetState(ctx context.Context, state string) error

	// FlushBufferedOps calls the given callback for each buffered operation
	// Once done it clears the buffer and marks the job as "complete"
	// Allowing the Job interface to abstract away the details of how buffered
	// operations are stored and/or locked
	FlushBufferedOps(ctx context.Context, cb func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error

	ClearBufferedOps(ctx context.Context) error
}

Job is an interface for a backfill job

type Memjob

type Memjob struct {
	// contains filtered or unexported fields
}

func (*Memjob) ClearBufferedOps

func (j *Memjob) ClearBufferedOps(ctx context.Context) error

func (*Memjob) FlushBufferedOps

func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) error) error

func (*Memjob) Repo

func (j *Memjob) Repo() string

func (*Memjob) SetState

func (j *Memjob) SetState(ctx context.Context, state string) error

func (*Memjob) State

func (j *Memjob) State() string

type Memstore

type Memstore struct {
	// contains filtered or unexported fields
}

Memstore is a simple in-memory implementation of the Backfill Store interface

func NewMemstore

func NewMemstore() *Memstore

func (*Memstore) BufferOp

func (s *Memstore) BufferOp(ctx context.Context, repo, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error)

func (*Memstore) EnqueueJob

func (s *Memstore) EnqueueJob(repo string) error

func (*Memstore) GetJob

func (s *Memstore) GetJob(ctx context.Context, repo string) (Job, error)

func (*Memstore) GetNextEnqueuedJob

func (s *Memstore) GetNextEnqueuedJob(ctx context.Context) (Job, error)

type Store

type Store interface {
	// BufferOp buffers an operation for a job and returns true if the operation was buffered
	// If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete)
	BufferOp(ctx context.Context, repo string, kind, path string, rec *typegen.CBORMarshaler, cid *cid.Cid) (bool, error)
	GetJob(ctx context.Context, repo string) (Job, error)
	GetNextEnqueuedJob(ctx context.Context) (Job, error)
}

Store is an interface for a backfill store which holds Jobs

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL