Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Scan

    Scan scans the given partition of the Reminders' keyspace.

    Returns a list of stale reminders which likely match crashed AddTask calls. The caller is expected to eventually execute corresponding Cloud Tasks calls and delete these reminders, lest they'll be rediscovered during the next scan.

    If unable to complete the scan of the given part of the keyspace and Level is less than 2, it intelligently partitions the not-yet-scanned keyspace into several partitions for the follow up and returns them as well.

    Logs errors inside, but doesn't return them.

    Types

    type BatchProcessor

    type BatchProcessor struct {
    	Context   context.Context    // the context to use for processing
    	DB        db.DB              // DB to use to fetch reminders from
    	Submitter internal.Submitter // knows how to submit tasks
    
    	BatchSize         int // max size of a single reminder batch
    	ConcurrentBatches int // how many concurrent batches to process
    	// contains filtered or unexported fields
    }

      BatchProcessor handles reminders in batches.

      func (*BatchProcessor) Enqueue

      func (p *BatchProcessor) Enqueue(ctx context.Context, r []*reminder.Reminder)

        Enqueue adds reminder to the to-be-processed queue.

        Must be called only between Start and Stop. Drops reminders on the floor if the context is canceled.

        func (*BatchProcessor) Start

        func (p *BatchProcessor) Start() error

          Start launches background processor goroutines.

          func (*BatchProcessor) Stop

          func (p *BatchProcessor) Stop() int

            Stop waits until all enqueues reminders are processed and then stops the processor.

            Returns the total number of successfully processed reminders.

            type Distributed

            type Distributed struct {
            	// EnqueueSweepTask submits the task for execution somewhere in the fleet.
            	EnqueueSweepTask func(ctx context.Context, task *tqpb.SweepTask) error
            	// Submitter is used to submit Cloud Tasks requests.
            	Submitter internal.Submitter
            }

              Distributed implements distributed sweeping.

              Requires its EnqueueSweepTask callback to be configured in a way that enqueued tasks eventually result in ExecSweepTask call (perhaps in a different process).

              func (*Distributed) ExecSweepTask

              func (d *Distributed) ExecSweepTask(ctx context.Context, task *tqpb.SweepTask) error

                ExecSweepTask executes a previously enqueued sweep task.

                Note: we never want to retry failed ExecSweepTask. These tasks fork. If we retry on transient errors that are not really transient we may accidentally blow up with exponential number of tasks. Better just to wait for the next fresh sweep. For that reason the implementation is careful not to return errors marked with transient.Tag.

                type ScanParams

                type ScanParams struct {
                	DB            db.DB                // DB to use to fetch reminders
                	Partition     *partition.Partition // the keyspace partition to scan
                	KeySpaceBytes int                  // length of the reminder keys (usually 16)
                
                	TasksPerScan        int // caps maximum number of reminders to process
                	SecondaryScanShards int // caps the number of follow-up scans
                
                	Level int // recursion level (0 == the root task)
                }

                  ScanParams contains parameters for the Scan call.