Documentation

Overview

Package eqmem implements an in-memory entroq that has fine-grained locking and can handle simultaneously stats/task listing and modifications to a large extent.

Example (Journal)
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	"entrogo.com/entroq"
	"entrogo.com/entroq/backend/eqmem"
)

func main() {
	journalDir, err := os.MkdirTemp("", "eqjournal-")
	if err != nil {
		log.Fatalf("Error opening temp dir for journal: %v", err)
	}
	defer os.RemoveAll(journalDir)

	ctx := context.Background()

	eq, err := entroq.New(ctx, eqmem.Opener(eqmem.WithJournal(journalDir)))
	if err != nil {
		log.Fatalf("Error opening client at dir %q: %v", journalDir, err)
	}

	inserted, _, err := eq.Modify(ctx,
		entroq.InsertingInto("/queue/of/tasks", entroq.WithValue([]byte("hey"))),
		entroq.InsertingInto("/queue/of/others", entroq.WithValue([]byte("other"))),
	)
	if err != nil {
		log.Fatalf("Error adding task: %v", err)
	}

	// Change the queue for the first insertion.
	if _, _, err := eq.Modify(ctx, inserted[0].AsChange(entroq.QueueTo("/queue/of/something"))); err != nil {
		log.Fatalf("Error modifying task: %v", err)
	}

	// Close and reopen, see that everything is still there.
	eq.Close()
	eq = nil

	if eq, err = entroq.New(ctx, eqmem.Opener(eqmem.WithJournal(journalDir))); err != nil {
		log.Fatalf("Error reopening client at dir %q: %v", journalDir, err)
	}

	empty, err := eq.QueuesEmpty(ctx, entroq.MatchExact("/queue/of/tasks"))
	if err != nil {
		log.Fatalf("Error checking for empty queues: %v", err)
	}
	fmt.Printf("Empty: %v\n", empty)

	ts1, err := eq.Tasks(ctx, "/queue/of/others")
	if err != nil {
		log.Fatalf("Error getting tasks for 'others': %v", err)
	}
	for _, t := range ts1 {
		fmt.Printf("%v: %q\n", t.Queue, t.Value)
	}

	ts2, err := eq.Tasks(ctx, "/queue/of/something")
	if err != nil {
		log.Fatalf("Error getting tasks for 'something': %v", err)
	}
	for _, t := range ts2 {
		fmt.Printf("%v: %q\n", t.Queue, t.Value)
	}

}
Output:

Empty: true
/queue/of/others: "other"
/queue/of/something: "hey"

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Opener

func Opener(opts ...Option) entroq.BackendOpener

Opener returns a constructor of the in-memory backend.

func TakeSnapshot

func TakeSnapshot(ctx context.Context, journalDir string, cleanup bool) error

TakeSnapshot brings the system up empty, loads a snapshot + journals, then outputs a new snapshot and exits. Cleans up old files after snapshotting if requested. Otherwise they are just moved out of the way.

Types

type EQMem

type EQMem struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, opts ...Option) (*EQMem, error)

New returns a new in-memory implementation, ready to be used.

func (*EQMem) Claim

func (m *EQMem) Claim(ctx context.Context, cq *entroq.ClaimQuery) (*entroq.Task, error)

Claim waits for a task to be available to claim.

func (*EQMem) Close

func (m *EQMem) Close() error

Close cleans up this implementation.

func (*EQMem) Modify

func (m *EQMem) Modify(ctx context.Context, mod *entroq.Modification) (inserted, changed []*entroq.Task, err error)

Modify attempts to do an atomic modification on the system, given a particular set of modification information (deletions, changes, insertions, dependencies).

func (*EQMem) QueueStats

func (m *EQMem) QueueStats(ctx context.Context, qq *entroq.QueuesQuery) (map[string]*entroq.QueueStat, error)

QueueStats returns statistics for each queue in the query.

func (*EQMem) Queues

func (m *EQMem) Queues(ctx context.Context, qq *entroq.QueuesQuery) (map[string]int, error)

Queues returns the list of queue and their sizes, based on query contents.

func (*EQMem) Tasks

func (m *EQMem) Tasks(ctx context.Context, tq *entroq.TasksQuery) ([]*entroq.Task, error)

Tasks lists tasks according to the given query. If specific IDs are given, it will block for brief periods to look up corresponding queues for them.

func (*EQMem) Time

func (m *EQMem) Time(_ context.Context) (time.Time, error)

Time returns the current time.

func (*EQMem) TryClaim

func (m *EQMem) TryClaim(ctx context.Context, cq *entroq.ClaimQuery) (*entroq.Task, error)

TryClaim attempts to claim a task from the given queue query. If no task is available, returns nil (not an error).

type Option

type Option func(*EQMem)

Option represents options for creationg of the in-memory implementation.

func WithJournal

func WithJournal(dir string) Option

WithJournal sets up a file-based journal system so that the in-memory implementation can be persisted.

func WithMaxJournalBytes

func WithMaxJournalBytes(max int64) Option

WithMaxJournalBytes sets a maximum on the number of bytes before rotation. Default is wal.DefaultMaxBytes.

func WithMaxJournalItems

func WithMaxJournalItems(max int) Option

WithMaxJournalItems sets a maximum on the number of entries in the journal before rotation. Default is wal.DefaultMaxIndices.