reaper

package module
v0.0.0-...-b82721b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: MIT Imports: 3 Imported by: 0

README

reaper

reaper implements a deterministic, bounded rewrite loop over a priority structure.

It repeatedly removes a single elment from a heap, allows controlled reinsertion, and enforces a strict upper bound on growth per step.

Motivation

Many systems require repeatedly rewriting or rescheduling items based on partial information. reaper addresses this need while enforcing three invariants:

  1. Only one element is processed at a time
  2. Reinsertion is explicitly bounded
  3. Callbacks run without holding heap locks

Core Concepts

Heap

A Heap is a priority structure protected by explicit locking. Reaper does not assume ownership of synchronization, allowing it to be integrated into exisiting concurrent systems.

Callback

A Callback inspects a popped element and may emit a replacements.

Visit(front T, emit func(...T) error) (stop bool)
  • emit buffers elements for reinsertion. It returns an error if the buffer is full.
  • emission is bounded by the configured degree
  • returning true stops the reaping process and restores front to the heap without reinsertion
Degree

The degree defines the maximum number of elements a callback may emit for reinsertion per visit.

  • degree > 0: bounded rewrite system (DAG)
  • degree == 0: pure consumer

Usage

r := reaper.New[int](2) // degree 2

cb := reaper.CallbackFunc[int](func(front int, emit func(...int) error) bool {
	if front == 1 {
		emit(4, 5)
	}

	retur false
})

res := r.Reap(heap, cb)
Results

Reap returns one of:

  • Exhausted: the heap became empty
  • Stopped: the callback requested termination

Both of which are perfectly normal outcomes.

Guarantees

  • No heap mutation occurs while callbacks exectue, except caused by another goroutine
  • No callback can cause unbounded growth
  • The heap is always in a valid state

Heap Adapter

The heapadapter subpackage provides adapters for using Go’s container/heap with reaper.

h := &MyHeap{} // implements container/heap.Interface of Item
mu := &sync.Mutex{}

heap := heapadapter.New[Item](h, mu)
r := reaper.New(0)
_ = r.Reap(heap, cb)

Documentation

Overview

Package reaper implements a deterministic, bounded rewrite loop over a priority structure.

A Reaper repeatedly removes a single element from a heap, allows a callback to inspect it and optionally emit replacement elements, and then reinserts those elements back into the heap. Each step is strictly bounded: a callback may emit at most a fixed number of elements ("degree") per visit.

The design enforces three core invariants:

  1. Only one element is popped from the heap at a time.
  2. Reinsertion is explicitly controlled and bounded.
  3. Callbacks are invoked without holding heap locks.

This makes Reaper suitable for schedulers, rewrite systems, planners, or any algorithm that requires controlled feedback into a priority queue.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrEmitBufferOverflow = errors.New("emit buffer overflow")

ErrEmitBufferOverflow is returned by the emit function when the number of emitted elements in a single Visit exceeds the configured degree.

Functions

This section is empty.

Types

type Callback

type Callback[T any] interface {
	Visit(front T, emit func(...T) error) (stop bool)
}

Callback is invoked for each element popped from the heap during a Reap operation.

The callback receives the popped element ("front") and an emit function. The emit function may be used to buffer elements for reinsertion into the heap after the callback returns.

Returning stop == true signals that reaping should terminate early. In that case, the popped element is reinserted into the heap unchanged, and no buffered emissions are committed.

type CallbackFunc

type CallbackFunc[T any] func(front T, emit func(...T) error) (stop bool)

CallbackFunc allows a function to be used as a Callback.

func (CallbackFunc[T]) Visit

func (f CallbackFunc[T]) Visit(front T, emit func(...T) error) (stop bool)

type Heap

type Heap[T any] interface {
	Push(elem T)
	Pop() T
	Empty() bool
	Lock()
	Unlock()
}

Heap is a thread-safe priority structure supporting push and pop operations.

Reaper assumes that all heap mutations are protected by the provided Lock and Unlock methods. Callbacks are always invoked without holding the heap lock.

type Reaper

type Reaper[T any] struct {
	// contains filtered or unexported fields
}

Reaper administrates the controlled feedback loop over a Heap.

Example (Basic)
package main

import (
	"cmp"
	"fmt"
	"sync"

	"github.com/codegrapple/reaper"
	"github.com/codegrapple/reaper/heapadapter"
	"github.com/codegrapple/reaper/sliceheap"
)

func main() {
	h := sliceheap.New[int](4, cmp.Less[int], 1, 2, 3)
	r := reaper.New[int](2)

	// Adapt h to reaper.Heap[int]
	heap := heapadapter.New[int](h, &sync.Mutex{})

	visit := func(x int, emit func(...int) error) bool {
		if x == 1 {
			emit(4, 5)
			return false
		}
		return true
	}

	result := r.Reap(heap, reaper.CallbackFunc[int](visit))

	fmt.Println(result)
	fmt.Println(h.Slice())

}
Output:

Stopped
[2 3 4 5]
Example (Consumer)
package main

import (
	"cmp"
	"fmt"
	"sync"

	"github.com/codegrapple/reaper"
	"github.com/codegrapple/reaper/heapadapter"
	"github.com/codegrapple/reaper/sliceheap"
)

func main() {
	h := sliceheap.New[int](4, cmp.Less[int], 1, 2, 3)
	r := reaper.New[int](0)

	heap := heapadapter.New[int](h, &sync.Mutex{})

	var visited []int
	visit := func(x int, emit func(...int) error) bool {
		visited = append(visited, x)
		return false
	}

	result := r.Reap(heap, reaper.CallbackFunc[int](visit))

	fmt.Println(result)
	fmt.Println(visited)
	fmt.Println(heap.Empty())

}
Output:

Exhausted
[1 2 3]
true

func New

func New[T any](degree int) *Reaper[T]

New creates a new Reaper with the specified degree.

Degree specifies the maximum number of elements that can be emitted per Visit call. A degree of zero disables emission entirely, turning the Reaper into a pure consumer.

func (*Reaper[T]) Reap

func (r *Reaper[T]) Reap(h Heap[T], cb Callback[T]) Result

Reap executes the rewrite loop over the provided heap using the given callback.

Reap repeatedly pops a single element from the heap and invokes the callback on it. If the callback emits elements, they are reinserted atomically after the callback returns. If the callback requests terminations, the popped element is restored, the emitted elments are discarded, and Reap returns Stopped.

Reap returns Exhausted if the heap becomes empty.

type Result

type Result int

Result indicates the outcome of a Reap operation.

const (
	// Exhausted indicates that the heap became empty and no further
	// elements were available for processing.
	Exhausted Result = iota

	// Stopped indicates that the callback requested early termination.
	// In this case, the last popped element was reinserted into the heap.
	Stopped
)

func (Result) String

func (i Result) String() string

Directories

Path Synopsis
Package heapadapter provides adapters from container/heap to reaper.Heap.
Package heapadapter provides adapters from container/heap to reaper.Heap.
Package slicehap provides a minimal generic heap implementation backed by a slice and compatible with container/heap.
Package slicehap provides a minimal generic heap implementation backed by a slice and compatible with container/heap.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL