client

package
v0.9.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: MIT Imports: 39 Imported by: 0

Documentation

Overview

Package client provides the Go SDK for talking to a lockd cluster over HTTP. It mirrors CLI behaviour while exposing a typed API that is easy to embed in workers, controllers, and administrative tools.

Copyright (C) 2025 Michel Blomgren <https://pkt.systems>

Quick start

Construct a client with either client.New (single endpoint) or client.NewWithEndpoints (ordered failover endpoints). Supported endpoint schemes are https:// for mTLS, http:// for plaintext trusted networks, and unix:///path/to/socket for Unix-domain sockets.

ctx := context.Background()
cli, err := client.New("https://lockd.example.com")
if err != nil {
    log.Fatal(err)
}
defer cli.Close()

req := api.AcquireRequest{
    Namespace:  "workflows",
    Key:        "orders",
    Owner:      "worker-1",
    TTLSeconds: 30,
    BlockSecs:  client.BlockWaitForever,
}
lease, err := cli.Acquire(ctx, req)
if err != nil {
    log.Fatal(err)
}
defer lease.Close()

var checkpoint map[string]any
if err := lease.Load(ctx, &checkpoint); err != nil && !errors.Is(err, client.ErrStateNotFound) {
    log.Fatal(err)
}
checkpoint["progress"] = "running"
if err := lease.Save(ctx, checkpoint); err != nil {
    log.Fatal(err)
}

Acquire mints a transaction id and fencing token. Lease-bound mutations such as Update, Remove, UpdateMetadata, Release, and attachment changes are issued with the correct transaction and fencing headers automatically when using a LeaseSession.

When most operations use one namespace, use client.WithDefaultNamespace so per-call Namespace values can be omitted intentionally. Namespaces that start with a dot are reserved for lockd internals and rejected for user workloads.

Create-only acquire

Set api.AcquireRequest.IfNotExists=true to request create-only semantics. When state already exists for the key, Acquire fails with API error code "already_exists". The SDK supports both check styles: client.IsAlreadyExists(err) and errors.Is(err, client.ErrAlreadyExists). You only need one check style; for SDK-returned acquire errors they are equivalent.

_, err = cli.Acquire(ctx, api.AcquireRequest{
    Namespace:   "workflows",
    Key:         "orders",
    Owner:       "initializer",
    TTLSeconds:  30,
    BlockSecs:   client.BlockNoWait,
    IfNotExists: true,
})
if err != nil {
    if errors.Is(err, client.ErrAlreadyExists) {
        return
    }
    log.Fatal(err)
}

Acquire for update

AcquireForUpdate wraps the common acquire, load, mutate, save, release flow. It keeps the lease alive while the callback runs and always attempts release. The supplied api.AcquireRequest is forwarded to Acquire; setting IfNotExists=true applies create-only semantics to the initial handshake.

err := cli.AcquireForUpdate(ctx, api.AcquireRequest{
    Namespace:  "workflows",
    Key:        "orders",
    Owner:      "worker-1",
    TTLSeconds: 45,
}, func(ctx context.Context, af *client.AcquireForUpdateContext) error {
    var state map[string]any
    if err := af.Load(ctx, &state); err != nil && !errors.Is(err, client.ErrStateNotFound) {
        return err
    }
    state["progress"] = "done"
    return af.Save(ctx, state)
})
if err != nil {
    log.Fatal(err)
}

Attachments

Leases can stage binary attachments alongside JSON state. Staged writes and staged deletes are committed on Release and discarded on Rollback.

file, _ := os.Open("invoice.pdf")
defer file.Close()
if _, err := lease.Attach(ctx, client.AttachRequest{
    Name:        "invoice.pdf",
    ContentType: "application/pdf",
    Body:        file,
}); err != nil {
    log.Fatal(err)
}
if err := lease.Release(ctx); err != nil {
    log.Fatal(err)
}

Perimeter defence interoperability

Servers can throttle requests with HTTP 429 while the perimeter defence is active. Responses can include Retry-After and X-Lockd-QRF-State headers. The SDK consumes those hints in retry loops and surfaces parsed values on APIError for custom handling.

Queue API

Queue helpers implement at-least-once delivery. Enqueue accepts any io.Reader and streams payload bytes directly to lockd.

reader := strings.NewReader("{\"op\":\"ship\"}")
qres, err := cli.Enqueue(ctx, "orders", reader, client.EnqueueOptions{
    ContentType: "application/json",
    Delay:       2 * time.Second,
    Visibility:  30 * time.Second,
})
if err != nil {
    log.Fatal(err)
}
log.Printf("queued message %s", qres.MessageID)

Dequeue returns QueueMessage. DequeueWithState returns QueueMessage plus an attached QueueStateHandle in the message. QueueMessage implements io.ReadCloser and supports Ack, Nack, Defer, Extend, WritePayloadTo, and DecodePayloadJSON. QueueStateHandle mirrors lease-state helpers with Get, GetBytes, Load, Update, UpdateBytes, MutateLocal, Save, UpdateMetadata, and Remove.

msg, err := cli.Dequeue(ctx, "orders", client.DequeueOptions{
    Namespace: "workflows",
    Owner:     "worker-1",
})
if err != nil {
    log.Fatal(err)
}
defer msg.Close()

buf, err := io.ReadAll(msg)
if err != nil {
    log.Fatal(err)
}
log.Printf("attempt %d payload=%s", msg.Attempts(), string(buf))
if err := msg.Ack(ctx); err != nil {
    log.Fatal(err)
}

If a handler returns without explicit Ack/Nack/Defer, Close performs an automatic Nack so another worker can continue. Tune that with DequeueOptions.OnCloseDelay or QueueMessage.SetOnCloseDelay. DequeueOptions.BlockSeconds controls waiting: BlockNoWait for immediate return, positive values for bounded wait, and zero for wait-forever.

Subscribe and SubscribeWithState keep one streaming request open and invoke a user handler per delivery.

StartConsumer worker runner

StartConsumer starts one or more managed consumer loops and blocks until they terminate. It is intended for long-running worker processes.

Each ConsumerConfig runs in its own goroutine. WithState selects SubscribeWithState; false selects Subscribe. Options.Owner can be left empty; StartConsumer generates a unique owner token based on consumer name, host, process id, and sequence.

The MessageHandler receives ConsumerMessage containing:

  • Client: the active SDK client to reuse inside handlers
  • Logger: the configured client logger; always non-nil
  • Queue and Name(): queue identity and resolved consumer name
  • Message: the leased QueueMessage
  • State: QueueStateHandle for stateful consumers, nil for stateless

Restart behaviour is controlled by ConsumerRestartPolicy. Defaults are three immediate retries, then exponential backoff starting at 250ms with multiplier 2.0 and max delay 5 minutes. Jitter defaults to zero and can be enabled.

ErrorHandler receives ConsumerError before each restart. Returning nil keeps the loop running. Returning an error stops StartConsumer and returns that error. OnStart and OnStop lifecycle hooks can be used for observability. Panics in MessageHandler, ErrorHandler, OnStart, and OnStop are recovered and routed through the same failure path.

Cancel the StartConsumer context to stop all loops. Expected context cancellation returns nil.

handler := func(ctx context.Context, cm client.ConsumerMessage) error {
    defer cm.Message.Close()

    if cm.State != nil {
        var state map[string]any
        if err := cm.State.Load(ctx, &state); err != nil && !errors.Is(err, client.ErrStateNotFound) {
            return err
        }
        state["last_message_id"] = cm.Message.MessageID()
        if err := cm.State.Save(ctx, state); err != nil {
            return err
        }
    }

    return cm.Message.Ack(ctx)
}

err := cli.StartConsumer(ctx,
    client.ConsumerConfig{
        Name:  "orders-fastlane",
        Queue: "orders",
        Options: client.SubscribeOptions{
            Namespace: "workflows",
            Prefetch:  16,
        },
        MessageHandler: handler,
    },
    client.ConsumerConfig{
        Queue:     "orders-stateful",
        WithState: true,
        Options: client.SubscribeOptions{
            Namespace: "workflows",
        },
        RestartPolicy: client.ConsumerRestartPolicy{
            MaxDelay: time.Minute,
        },
        MessageHandler: handler,
    },
)
if err != nil {
    log.Fatal(err)
}

Direct helpers Client.QueueAck, Client.QueueNack, and Client.QueueExtend are also available when metadata is managed outside QueueMessage helpers.

State removal

Lease holders can delete state explicitly with LeaseSession.Remove or Client.Remove. CAS guards such as If-ETag and If-Version are supported by the API for concurrency-safe deletes.

Metadata attributes and query visibility

Metadata holds lease internals plus user attributes. The SDK provides LeaseSession.UpdateMetadata and Client.UpdateMetadata, and metadata-aware state writes with WithMetadata, WithQueryHidden, and WithQueryVisible.

lease, err := cli.Acquire(ctx, api.AcquireRequest{Key: "orders", TTLSeconds: 30})
if err != nil {
    log.Fatal(err)
}
if _, err := lease.UpdateMetadata(ctx, client.MetadataOptions{QueryHidden: client.Bool(true)}); err != nil {
    log.Fatal(err)
}

Multi-endpoint failover

NewWithEndpoints accepts multiple endpoints. The SDK rotates through them on transport errors while preserving bounded retry semantics.

cli, err := client.NewWithEndpoints([]string{
    "https://lockd-primary.example.com",
    "https://lockd-backup.example.com",
}, client.WithDisableMTLS(false))
if err != nil {
    log.Fatal(err)
}

Correlation IDs and logging

Use client.WithCorrelationID or client.GenerateCorrelationID to tie requests, queue deliveries, and retries together. QueueMessage carries correlation data and queue follow-up helpers forward it automatically.

The client logger is configured with client.WithLogger. Nil logger input is normalized to pslog.NoopLogger so SDK logging calls remain safe.

In-process testing

client/inprocess starts a lockd server in-process and returns a ready client. It is useful for tests and local development.

ctx := context.Background()
inproc, err := inprocess.New(ctx, lockd.Config{Store: "mem://"})
if err != nil {
    t.Fatal(err)
}
defer inproc.Close(ctx)

lease, err := inproc.Acquire(ctx, api.AcquireRequest{Owner: "test", TTLSeconds: 10})
if err != nil {
    t.Fatal(err)
}
_ = lease

Authentication and mTLS

mTLS is enabled by default for https endpoints. Configure bundle PEM data with client.WithBundlePEM or file paths with client.WithBundlePath. Bundle paths expand shell-style home and environment variables by default; use client.WithBundlePathDisableExpansion to treat a path literally.

To connect over plaintext http endpoints, use client.WithDisableMTLS(true) or provide an http URL. For mTLS, client certificates must include ClientAuth EKU and chain to a CA trusted by the server.

Index

Examples

Constants

View Source
const (
	DefaultHTTPTimeout           = 15 * time.Second
	DefaultCloseTimeout          = 5 * time.Second
	DefaultKeepAliveTimeout      = 5 * time.Second
	DefaultMaxIdleConns          = 256
	DefaultMaxIdleConnsPerHost   = 128
	DefaultForUpdateTimeout      = 15 * time.Minute
	DefaultAcquireBaseDelay      = time.Second
	DefaultAcquireMaxDelay       = 5 * time.Second
	DefaultAcquireMultiplier     = 1.2
	DefaultAcquireJitter         = 100 * time.Millisecond
	DefaultFailureRetries        = 5
	DefaultAcquireFailureRetries = DefaultFailureRetries
	// DefaultConsumerImmediateRetries controls how many consecutive consumer
	// failures are retried without delay before backoff starts.
	DefaultConsumerImmediateRetries = 3
	// DefaultConsumerBaseDelay is the first delayed retry duration after immediate retries.
	DefaultConsumerBaseDelay = 250 * time.Millisecond
	// DefaultConsumerMaxDelay caps consumer restart backoff growth.
	DefaultConsumerMaxDelay = 5 * time.Minute
	// DefaultConsumerMultiplier is the exponential growth factor for restart delay.
	DefaultConsumerMultiplier = 2.0
	// DefaultConsumerJitter randomizes restart delay by +/- this duration.
	DefaultConsumerJitter = 0 * time.Second
)

Default client tuning knobs exposed for callers that want to mirror lockd's defaults.

View Source
const (
	BlockWaitForever int64 = 0
	BlockNoWait      int64 = api.BlockNoWait
)

BlockWaitForever causes Acquire to wait indefinitely for a lease. BlockNoWait skips waiting entirely.

View Source
const MaxCorrelationIDLength = 128

MaxCorrelationIDLength bounds the length of client-supplied correlation identifiers.

Variables

View Source
var ErrAlreadyExists = errors.New("lockd: key already exists")

ErrAlreadyExists is returned when create-only acquire semantics are requested for an existing key.

View Source
var ErrMissingFencingToken = errors.New("lockd: fencing token required")

ErrMissingFencingToken is returned when an operation needs a fencing token but none was found.

Functions

func Bool added in v0.1.0

func Bool(v bool) *bool

Bool returns a pointer to v.

func CorrelationIDFromContext

func CorrelationIDFromContext(ctx context.Context) string

CorrelationIDFromContext extracts the correlation identifier carried by ctx, if present.

func CorrelationIDFromResponse

func CorrelationIDFromResponse(resp *http.Response) string

CorrelationIDFromResponse reads the X-Correlation-Id header from resp.

func GenerateCorrelationID

func GenerateCorrelationID() string

GenerateCorrelationID creates a new random correlation identifier.

func Int64 added in v0.7.0

func Int64(v int64) *int64

Int64 returns a pointer to v.

func IsAlreadyExists added in v0.6.0

func IsAlreadyExists(err error) bool

IsAlreadyExists reports whether err indicates create-only acquire semantics failed because the key already exists.

func NormalizeCorrelationID

func NormalizeCorrelationID(id string) (string, bool)

NormalizeCorrelationID trims and validates an identifier.

func ParseEndpoints

func ParseEndpoints(raw string, disableMTLS bool) ([]string, error)

ParseEndpoints splits a comma-separated server list and normalizes each endpoint, applying default schemes based on whether mTLS is disabled.

func WithCorrelationHTTPClient

func WithCorrelationHTTPClient(cli *http.Client, id string) *http.Client

WithCorrelationHTTPClient returns a shallow copy of cli (or a new client when cli is nil) with a transport that ensures X-Correlation-Id is set on all requests. The original client is left untouched.

func WithCorrelationID

func WithCorrelationID(ctx context.Context, id string) context.Context

WithCorrelationID annotates ctx with a correlation identifier to be sent with subsequent requests.

func WithCorrelationTransport

func WithCorrelationTransport(base http.RoundTripper, id string) http.RoundTripper

WithCorrelationTransport wraps base with a RoundTripper that overwrites the X-Correlation-Id header on every request. Invalid identifiers are ignored.

Types

type APIError

type APIError struct {
	// Status is the HTTP status code returned by the server.
	Status int
	// Response is the decoded lockd error envelope, when available.
	Response api.ErrorResponse
	// Body contains the raw response body bytes for additional diagnostics.
	Body []byte
	// RetryAfter is the parsed retry delay hint from headers, when provided.
	RetryAfter time.Duration
	// QRFState carries queue-resilience-fallback diagnostics surfaced by the server.
	QRFState string
}

APIError describes an error response from lockd.

func (*APIError) Error

func (e *APIError) Error() string

func (*APIError) Is added in v0.6.0

func (e *APIError) Is(target error) bool

Is maps structured API errors to sentinel errors for errors.Is checks.

func (*APIError) RetryAfterDuration

func (e *APIError) RetryAfterDuration() time.Duration

RetryAfterDuration returns the recommended back-off hinted by the server.

type AcquireConfig

type AcquireConfig struct {
	// BaseDelay is the starting backoff delay after retryable failures.
	BaseDelay time.Duration
	// MaxDelay caps exponential backoff growth.
	MaxDelay time.Duration
	// Multiplier is the exponential growth factor applied between retries.
	Multiplier float64
	// Jitter randomizes capped delays by +/- Jitter to reduce thundering herds.
	Jitter time.Duration
	// FailureRetries controls retries for non-conflict transient failures; <0 means unbounded.
	FailureRetries int
	// contains filtered or unexported fields
}

AcquireConfig controls the client-side retry and backoff behaviour for Acquire and AcquireForUpdate.

type AcquireForUpdateContext

type AcquireForUpdateContext struct {
	// Session is the active lease context used for updates/removals/metadata changes.
	Session *LeaseSession
	// State is the pre-handler snapshot fetched after acquire and before user logic runs.
	State *StateSnapshot
}

AcquireForUpdateContext exposes the active lease session and the snapshot that was read before the handler executed.

func (*AcquireForUpdateContext) KeepAlive

KeepAlive extends the lease TTL.

func (*AcquireForUpdateContext) Load

Load unmarshals the latest state into v.

func (*AcquireForUpdateContext) Mutate added in v0.7.0

func (a *AcquireForUpdateContext) Mutate(ctx context.Context, mutations []string, opts ...UpdateOption) (*UpdateResult, error)

Mutate applies server-side LQL mutations while preserving the lease.

func (*AcquireForUpdateContext) MutateLocal added in v0.8.0

func (a *AcquireForUpdateContext) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)

MutateLocal applies client-local streaming LQL mutations while preserving the lease.

func (*AcquireForUpdateContext) MutateWithOptions added in v0.7.0

func (a *AcquireForUpdateContext) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)

MutateWithOptions applies server-side LQL mutations with explicit conditional overrides.

func (*AcquireForUpdateContext) Remove

Remove deletes the current state while the handler holds the lease.

func (*AcquireForUpdateContext) RemoveWithOptions

func (a *AcquireForUpdateContext) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)

RemoveWithOptions deletes the state while allowing conditional overrides.

func (*AcquireForUpdateContext) Save

func (a *AcquireForUpdateContext) Save(ctx context.Context, v any, opts ...UpdateOption) error

Save marshals and updates the state with the supplied value.

func (*AcquireForUpdateContext) Update

func (a *AcquireForUpdateContext) Update(ctx context.Context, body io.Reader, opts ...UpdateOption) (*UpdateResult, error)

Update streams a new JSON document via Update, preserving the lease.

func (*AcquireForUpdateContext) UpdateBytes

func (a *AcquireForUpdateContext) UpdateBytes(ctx context.Context, body []byte, opts ...UpdateOption) (*UpdateResult, error)

UpdateBytes is a convenience wrapper over Update that accepts a byte slice.

func (*AcquireForUpdateContext) UpdateMetadata added in v0.1.0

UpdateMetadata mutates metadata for the active key.

func (*AcquireForUpdateContext) UpdateWithOptions

func (a *AcquireForUpdateContext) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)

UpdateWithOptions allows callers to override conditional headers for the update.

type AcquireForUpdateHandler

type AcquireForUpdateHandler func(context.Context, *AcquireForUpdateContext) error

AcquireForUpdateHandler is invoked while a lease is held. The provided context is canceled if the client loses the lease or keepalive fails.

type AcquireOption

type AcquireOption func(*AcquireConfig)

AcquireOption customises Acquire behaviour.

func WithAcquireBackoff

func WithAcquireBackoff(base, max time.Duration, multiplier float64) AcquireOption

WithAcquireBackoff adjusts backoff parameters.

func WithAcquireFailureRetries

func WithAcquireFailureRetries(n int) AcquireOption

WithAcquireFailureRetries overrides how many times the client retries after failures other than lease conflicts. A value <0 allows infinite retries.

func WithAcquireJitter

func WithAcquireJitter(jitter time.Duration) AcquireOption

WithAcquireJitter adjusts the jitter window applied when the retry delay reaches the cap. A zero duration disables jitter. When positive, the final sleep duration is randomly offset by +/- jitter once the cap is hit.

type AttachRequest added in v0.1.0

type AttachRequest struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// TxnID associates the operation with a transaction coordinator record.
	TxnID string
	// FencingToken is the monotonic token used to fence stale writers.
	FencingToken *int64
	// Name is the human-readable identifier for the referenced object.
	Name string
	// Body provides the request or response payload stream/content.
	Body io.Reader
	// ContentType is the media type associated with the payload.
	ContentType string
	// MaxBytes optionally enforces an upper bound for attachment payload size in bytes.
	MaxBytes *int64
	// PreventOverwrite rejects the request when an attachment with the same selector already exists.
	PreventOverwrite bool
}

AttachRequest captures parameters for staging an attachment.

type AttachResult added in v0.1.0

type AttachResult struct {
	// Attachment contains metadata for the staged or retrieved attachment.
	Attachment AttachmentInfo
	// Noop is true when attach detected identical existing content and skipped a write.
	Noop bool
	// Version is the lockd monotonic version for the target object.
	Version int64
}

AttachResult reports the staged attachment metadata.

type Attachment added in v0.1.0

type Attachment struct {
	AttachmentInfo
	// contains filtered or unexported fields
}

Attachment exposes a streaming attachment payload.

func (*Attachment) Close added in v0.1.0

func (a *Attachment) Close() error

Close releases the underlying reader.

func (*Attachment) Delete added in v0.1.0

func (a *Attachment) Delete(ctx context.Context) error

Delete removes the attachment from the state key when a lease is available.

func (*Attachment) Read added in v0.1.0

func (a *Attachment) Read(p []byte) (int, error)

Read implements io.Reader.

type AttachmentInfo added in v0.1.0

type AttachmentInfo struct {
	// ID is the unique identifier for the referenced object.
	ID string
	// Name is the human-readable identifier for the referenced object.
	Name string
	// Size is the payload size in bytes.
	Size int64
	// PlaintextSHA256 is the SHA-256 checksum of the uploaded plaintext payload.
	PlaintextSHA256 string
	// ContentType is the media type associated with the payload.
	ContentType string
	// CreatedAtUnix is the creation timestamp as Unix seconds.
	CreatedAtUnix int64
	// UpdatedAtUnix is the last update timestamp as Unix seconds.
	UpdatedAtUnix int64
}

AttachmentInfo describes attachment metadata.

type AttachmentList added in v0.1.0

type AttachmentList struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// Attachments enumerates attachments associated with the target key.
	Attachments []AttachmentInfo
}

AttachmentList collects attachment metadata for a key.

type AttachmentSelector added in v0.1.0

type AttachmentSelector struct {
	// ID is the unique identifier for the referenced object.
	ID string
	// Name is the human-readable identifier for the referenced object.
	Name string
}

AttachmentSelector identifies an attachment by id or name.

type AttachmentStore added in v0.1.0

type AttachmentStore struct {
	// contains filtered or unexported fields
}

AttachmentStore exposes attachment operations scoped to a key.

func (*AttachmentStore) Attach added in v0.1.0

Attach stages an attachment payload using the configured lease context.

func (*AttachmentStore) Delete added in v0.1.0

Delete removes a single attachment.

func (*AttachmentStore) DeleteAll added in v0.1.0

DeleteAll removes all attachments for the key.

func (*AttachmentStore) List added in v0.1.0

List returns attachment metadata for the key.

func (*AttachmentStore) Retrieve added in v0.1.0

func (s *AttachmentStore) Retrieve(ctx context.Context, selector AttachmentSelector) (*Attachment, error)

Retrieve streams a single attachment payload.

func (*AttachmentStore) RetrieveAll added in v0.1.0

func (s *AttachmentStore) RetrieveAll(ctx context.Context) ([]*Attachment, error)

RetrieveAll loads all attachments and returns a slice of Attachment readers.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a convenience wrapper around the lockd HTTP API.

func New

func New(baseURL string, opts ...Option) (*Client, error)

New creates a new client targeting baseURL (e.g. https://localhost:9341). Unix-domain sockets are supported via base URLs such as unix:///var/run/lockd.sock; ensure the server is running with mTLS disabled or supply a compatible client bundle. Example:

cli, err := client.New("unix:///tmp/lockd.sock")
if err != nil {
    log.Fatal(err)
}
lease, _ := cli.Acquire(ctx, api.AcquireRequest{Key: "demo", Owner: "worker", TTLSeconds: 20})
defer cli.Release(ctx, api.ReleaseRequest{Key: "demo", LeaseID: lease.LeaseID})

func NewWithEndpoints

func NewWithEndpoints(endpoints []string, opts ...Option) (*Client, error)

NewWithEndpoints constructs a client from a slice of server endpoints.

func (*Client) Acquire

func (c *Client) Acquire(ctx context.Context, req api.AcquireRequest, opts ...AcquireOption) (*LeaseSession, error)

Acquire acquires a lease, retrying conflicts and transient errors.

Example:

lease, err := cli.Acquire(ctx, api.AcquireRequest{
    Key:        "orders",
    Owner:      "worker-1",
    TTLSeconds: 30,
})
if err != nil {
    return err
}
defer lease.Close()
if err := lease.Save(ctx, map[string]any{"progress": "done"}); err != nil {
    return err
}
Example
package main

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"time"

	lockd "pkt.systems/lockd"
	"pkt.systems/lockd/api"
	"pkt.systems/lockd/client"
	"pkt.systems/lql"
)

func main() {
	ctx := context.Background()
	dir, err := os.MkdirTemp("", "lockd-example-")
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	defer os.RemoveAll(dir)
	socketPath := filepath.Join(dir, "lockd.sock")
	cfg := lockd.Config{
		Store:                    "mem://",
		ListenProto:              "unix",
		Listen:                   socketPath,
		DisableMTLS:              true,
		DisableStorageEncryption: true,
	}
	handle, err := lockd.StartServer(ctx, cfg)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	stop := handle.Stop
	defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
	cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	lease, err := cli.Acquire(ctx, api.AcquireRequest{
		Namespace:  "payments",
		Key:        "batch-2025-11",
		Owner:      "worker-1",
		TTLSeconds: 15,
	})
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	defer lease.Release(ctx)
	state := map[string]any{}
	if err := lql.Mutate(state,
		"/batch/id=batch-2025-11",
		"/batch/status=pending",
	); err != nil {
		fmt.Println("error:", err)
		return
	}
	if err := lease.Save(ctx, state); err != nil {
		fmt.Println("error:", err)
		return
	}
	fmt.Println("lease namespace:", lease.Namespace)
	fmt.Println("lease key:", lease.Key)
}
Output:
lease namespace: payments
lease key: batch-2025-11

func (*Client) AcquireForUpdate

func (c *Client) AcquireForUpdate(ctx context.Context, req api.AcquireRequest, handler AcquireForUpdateHandler, opts ...AcquireOption) error

AcquireForUpdate acquires a lease, runs handler while the lease is active, keeps the lease alive, and releases it on return. The helper retries the acquire/get handshake according to opts, surfaces the current state via AcquireForUpdateContext, and ensures Release is invoked even when the handler returns an error.

Example:

err := cli.AcquireForUpdate(ctx, api.AcquireRequest{
    Key:        "orders",
    Owner:      "worker-1",
    TTLSeconds: 45,
}, func(ctx context.Context, af *client.AcquireForUpdateContext) error {
    var doc map[string]any
    if err := af.Load(ctx, &doc); err != nil && !errors.Is(err, client.ErrStateNotFound) {
        return err
    }
    doc["checkpoint"] = "processing"
    return af.Save(ctx, doc)
})
if err != nil {
    log.Fatal(err)
}
Example
package main

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"time"

	lockd "pkt.systems/lockd"
	"pkt.systems/lockd/api"
	"pkt.systems/lockd/client"
	"pkt.systems/lql"
)

func main() {
	ctx := context.Background()
	dir, err := os.MkdirTemp("", "lockd-example-")
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	defer os.RemoveAll(dir)
	socketPath := filepath.Join(dir, "lockd.sock")
	cfg := lockd.Config{
		Store:                    "mem://",
		ListenProto:              "unix",
		Listen:                   socketPath,
		DisableMTLS:              true,
		DisableStorageEncryption: true,
	}
	handle, err := lockd.StartServer(ctx, cfg)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	stop := handle.Stop
	defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
	cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	err = cli.AcquireForUpdate(ctx, api.AcquireRequest{
		Key:        "ledger-checkpoint",
		Owner:      "worker-1",
		TTLSeconds: 10,
	}, func(ctx context.Context, af *client.AcquireForUpdateContext) error {
		state := map[string]any{}
		if err := lql.Mutate(state,
			"/cursor/batch=42",
			"time:/cursor/updated_at=2025-11-10T12:00:00Z",
		); err != nil {
			return err
		}
		return af.Save(ctx, state)
	})
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	fmt.Println("state updated")
}
Output:
state updated

func (*Client) Attach added in v0.1.0

func (c *Client) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)

Attach stages an attachment payload for the key.

func (*Client) ClearLeaseID added in v0.1.0

func (c *Client) ClearLeaseID(leaseID string)

ClearLeaseID removes the sticky lease when it matches leaseID.

func (*Client) Close

func (c *Client) Close() error

Close releases any idle HTTP connections held by the client.

func (*Client) DeleteAllAttachments added in v0.1.0

func (c *Client) DeleteAllAttachments(ctx context.Context, req DeleteAllAttachmentsRequest) (*DeleteAllAttachmentsResult, error)

DeleteAllAttachments removes all attachments for a key.

func (*Client) DeleteAttachment added in v0.1.0

func (c *Client) DeleteAttachment(ctx context.Context, req DeleteAttachmentRequest) (*DeleteAttachmentResult, error)

DeleteAttachment removes a single attachment.

func (*Client) Dequeue

func (c *Client) Dequeue(ctx context.Context, queue string, opts DequeueOptions) (*QueueMessage, error)

Dequeue pops a single message from the queue using /v1/queue/dequeue.

func (*Client) DequeueBatch

func (c *Client) DequeueBatch(ctx context.Context, queue string, opts DequeueOptions) ([]*QueueMessage, error)

DequeueBatch retrieves up to opts.PageSize messages in a single dequeue request. The caller is responsible for acking or nacking every returned message.

func (*Client) DequeueWithState

func (c *Client) DequeueWithState(ctx context.Context, queue string, opts DequeueOptions) (*QueueMessage, error)

DequeueWithState pops a queue message and returns its state payload in one call.

func (*Client) Describe

func (c *Client) Describe(ctx context.Context, key string) (*api.DescribeResponse, error)

Describe fetches key metadata without state.

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, queue string, payload io.Reader, opts EnqueueOptions) (*api.EnqueueResponse, error)

Enqueue pushes a payload into the specified queue using /v1/queue/enqueue.

func (*Client) EnqueueBytes

func (c *Client) EnqueueBytes(ctx context.Context, queue string, payload []byte, opts EnqueueOptions) (*api.EnqueueResponse, error)

EnqueueBytes is a convenience helper that enqueues an in-memory payload.

func (*Client) FlushIndex added in v0.1.0

func (c *Client) FlushIndex(ctx context.Context, namespace string, optFns ...FlushOption) (*api.IndexFlushResponse, error)

FlushIndex forces the namespace index writer to flush pending documents.

func (*Client) Get

func (c *Client) Get(ctx context.Context, key string, optFns ...GetOption) (*GetResponse, error)

Get fetches the JSON state for key and returns a streaming response.

Example (Public)
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"path/filepath"
	"time"

	lockd "pkt.systems/lockd"
	"pkt.systems/lockd/api"
	"pkt.systems/lockd/client"
	"pkt.systems/lql"
)

func main() {
	ctx := context.Background()
	dir, err := os.MkdirTemp("", "lockd-example-")
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	defer os.RemoveAll(dir)
	socketPath := filepath.Join(dir, "lockd.sock")
	cfg := lockd.Config{
		Store:                    "mem://",
		ListenProto:              "unix",
		Listen:                   socketPath,
		DisableMTLS:              true,
		DisableStorageEncryption: true,
	}
	handle, err := lockd.StartServer(ctx, cfg)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	stop := handle.Stop
	defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
	cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	_ = cli.UseNamespace("reports")
	lease, err := cli.Acquire(ctx, api.AcquireRequest{
		Key:        "payouts-2025-11",
		Owner:      "writer-1",
		TTLSeconds: 10,
	})
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	doc := make(map[string]any)
	if err := lql.Mutate(doc,
		"/report/status=published",
		"time:/report/released_at=2025-11-01T09:00:00Z",
		"/report/summary/total=1200.50",
	); err != nil {
		fmt.Println("error:", err)
		return
	}
	if err := lease.Save(ctx, doc); err != nil {
		fmt.Println("error:", err)
		return
	}
	lease.Release(ctx)
	resp, err := cli.Get(ctx, "payouts-2025-11")
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	defer resp.Close()
	var snapshot map[string]any
	if err := json.NewDecoder(resp.Reader()).Decode(&snapshot); err != nil {
		fmt.Println("error:", err)
		return
	}
	report := snapshot["report"].(map[string]any)
	fmt.Println("status:", report["status"])
}
Output:
status: published

func (*Client) GetAttachment added in v0.1.0

func (c *Client) GetAttachment(ctx context.Context, req GetAttachmentRequest) (*Attachment, error)

GetAttachment retrieves a single attachment payload.

func (*Client) GetNamespaceConfig added in v0.1.0

func (c *Client) GetNamespaceConfig(ctx context.Context, namespace string) (NamespaceConfigResult, error)

GetNamespaceConfig returns the namespace configuration document and its ETag.

func (*Client) KeepAlive

func (c *Client) KeepAlive(ctx context.Context, req api.KeepAliveRequest) (*api.KeepAliveResponse, error)

KeepAlive extends a lease.

func (*Client) ListAttachments added in v0.1.0

func (c *Client) ListAttachments(ctx context.Context, req ListAttachmentsRequest) (*AttachmentList, error)

ListAttachments lists attachments for a key.

func (*Client) Load

func (c *Client) Load(ctx context.Context, key string, v any, optFns ...LoadOption) error

Load unmarshals the current state for key into v. When no state exists, v is left untouched.

func (*Client) Mutate added in v0.7.0

func (c *Client) Mutate(ctx context.Context, req MutateRequest) (*UpdateResult, error)

Mutate applies LQL mutation expressions server-side to the key under lease protection.

func (*Client) MutateLocal added in v0.8.0

func (c *Client) MutateLocal(ctx context.Context, req MutateLocalRequest) (*UpdateResult, error)

MutateLocal applies LQL mutations client-side without buffering the full JSON state in memory. This path supports file-backed LQL mutators.

func (*Client) Namespace added in v0.1.0

func (c *Client) Namespace() string

Namespace returns the default namespace currently configured on the client.

func (*Client) Query added in v0.1.0

func (c *Client) Query(ctx context.Context, optFns ...QueryOption) (*QueryResponse, error)

Query executes a selector search within a namespace. Usage is option-driven:

resp, err := cli.Query(ctx,
    client.WithQueryNamespace("orders"),
    client.WithQuery(`eq{field=/status,value=open}`),
    client.WithQueryLimit(20),
    client.WithQueryReturnDocuments(),
)
if err != nil { /* handle */ }
resp.ForEach(func(row client.QueryRow) error {
    // row.Document is populated in return=documents mode
    return nil
})

Callers can combine helpers (namespace, selector, engine hints, cursor, return mode, etc.) without assembling api.QueryRequest manually.

Example
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"path/filepath"
	"sort"
	"time"

	lockd "pkt.systems/lockd"
	"pkt.systems/lockd/api"
	"pkt.systems/lockd/client"
	"pkt.systems/lql"
)

func main() {
	ctx := context.Background()
	dir, err := os.MkdirTemp("", "lockd-example-")
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	defer os.RemoveAll(dir)
	socketPath := filepath.Join(dir, "lockd.sock")
	cfg := lockd.Config{
		Store:                    "mem://",
		ListenProto:              "unix",
		Listen:                   socketPath,
		DisableMTLS:              true,
		DisableStorageEncryption: true,
	}
	handle, err := lockd.StartServer(ctx, cfg)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	stop := handle.Stop
	defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
	cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	docs := []struct {
		OrderNo string
		Status  string
	}{
		{OrderNo: "A-1001", Status: "open"},
		{OrderNo: "A-1002", Status: "closed"},
		{OrderNo: "A-1003", Status: "open"},
	}
	for _, doc := range docs {
		lease, err := cli.Acquire(ctx, api.AcquireRequest{Namespace: "orders", Owner: "ingest", TTLSeconds: 5})
		if err != nil {
			fmt.Println("error:", err)
			return
		}
		state := make(map[string]any)
		if err := lql.Mutate(state,
			"/order_no="+doc.OrderNo,
			"/status="+doc.Status,
		); err != nil {
			fmt.Println("error:", err)
			return
		}
		if err := lease.Save(ctx, state); err != nil {
			fmt.Println("error:", err)
			return
		}
		lease.Release(ctx)
	}
	resp, err := cli.Query(ctx,
		client.WithQueryNamespace("orders"),
		client.WithQuery(`eq{field=/status,value=open}`),
		client.WithQueryReturnDocuments(),
	)
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	var openOrders []string
	resp.ForEach(func(row client.QueryRow) error {
		docReader, err := row.DocumentReader()
		if err != nil {
			return err
		}
		defer docReader.Close()
		var doc map[string]any
		if err := json.NewDecoder(docReader).Decode(&doc); err != nil {
			return err
		}
		if order, ok := doc["order_no"].(string); ok {
			openOrders = append(openOrders, order)
		}
		return nil
	})
	sort.Strings(openOrders)
	for _, order := range openOrders {
		fmt.Println("open order:", order)
	}
}
Output:
open order: A-1001
open order: A-1003

func (*Client) QueueAck

func (c *Client) QueueAck(ctx context.Context, req api.AckRequest) (*api.AckResponse, error)

QueueAck acknowledges a queue message via the /v1/queue/ack API.

func (*Client) QueueExtend

func (c *Client) QueueExtend(ctx context.Context, req api.ExtendRequest) (*api.ExtendResponse, error)

QueueExtend extends a queue message's visibility window using /v1/queue/extend.

func (*Client) QueueNack

func (c *Client) QueueNack(ctx context.Context, req api.NackRequest) (*api.NackResponse, error)

QueueNack requeues a message with an optional visibility delay via /v1/queue/nack.

func (*Client) QueueStats added in v0.7.0

func (c *Client) QueueStats(ctx context.Context, queue string, opts QueueStatsOptions) (*api.QueueStatsResponse, error)

QueueStats reads side-effect-free runtime and head snapshot stats for a queue.

func (*Client) RegisterLeaseToken

func (c *Client) RegisterLeaseToken(leaseID string, token int64)

RegisterLeaseToken stores a lease -> fencing token mapping for subsequent requests. This is useful when the token is obtained out-of-band (for example via environment variables between CLI invocations).

func (*Client) Release

func (c *Client) Release(ctx context.Context, req api.ReleaseRequest) (*api.ReleaseResponse, error)

Release drops a lease.

func (*Client) Remove added in v0.1.0

func (c *Client) Remove(ctx context.Context, key, leaseID string, opts RemoveOptions) (*api.RemoveResponse, error)

Remove deletes the JSON state for key while ensuring the lease and conditional headers (when provided) are honoured.

func (*Client) Save

func (c *Client) Save(ctx context.Context, sess *LeaseSession, v any) error

Save delegates to sess.Save.

func (*Client) StartConsumer added in v0.4.0

func (c *Client) StartConsumer(ctx context.Context, consumers ...ConsumerConfig) error

StartConsumer starts one or more long-running queue consumers and blocks until they terminate. Each ConsumerConfig runs in its own goroutine and restarts on failure according to RestartPolicy. Panics from message handlers, lifecycle hooks, and error handlers are recovered and treated as consume-loop failures. Cancel ctx to stop all consumers; context cancellation returns nil.

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, queue string, opts SubscribeOptions, handler MessageHandler) error

Subscribe streams queue messages continuously and invokes handler for each delivery. While the handler runs, the client renews the in-flight queue lease implicitly via QueueExtend to reduce timeout risk for long-running handlers.

func (*Client) SubscribeWithState

func (c *Client) SubscribeWithState(ctx context.Context, queue string, opts SubscribeOptions, handler MessageHandlerWithState) error

SubscribeWithState streams queue messages with workflow state and invokes handler for each delivery. While the handler runs, the client renews both the message lease and the associated state lease implicitly via QueueExtend.

func (*Client) TxnCommit added in v0.1.0

TxnCommit records a commit decision and applies it to participants.

func (*Client) TxnPrepare added in v0.1.0

TxnPrepare records a pending decision for the txn, merging participants/expiry.

func (*Client) TxnReplay added in v0.1.0

func (c *Client) TxnReplay(ctx context.Context, txnID string) (*api.TxnReplayResponse, error)

TxnReplay replays the decision (or rolls back expired pending) for txnID.

func (*Client) TxnRollback added in v0.1.0

TxnRollback records a rollback decision and applies it to participants.

func (*Client) Update

func (c *Client) Update(ctx context.Context, key, leaseID string, body io.Reader, opts UpdateOptions) (*UpdateResult, error)

Update uploads new JSON state from the provided reader.

func (*Client) UpdateBytes

func (c *Client) UpdateBytes(ctx context.Context, key, leaseID string, body []byte, opts UpdateOptions) (*UpdateResult, error)

UpdateBytes uploads new JSON state from the provided byte slice.

func (*Client) UpdateMetadata added in v0.1.0

func (c *Client) UpdateMetadata(ctx context.Context, key, leaseID string, opts UpdateOptions) (*MetadataResult, error)

UpdateMetadata mutates lock metadata without modifying the JSON state.

func (*Client) UpdateNamespaceConfig added in v0.1.0

UpdateNamespaceConfig mutates namespace-level settings and returns the updated configuration.

func (*Client) UpdateStream added in v0.7.0

func (c *Client) UpdateStream(ctx context.Context, key, leaseID string, body io.Reader, opts UpdateOptions) (*UpdateResult, error)

UpdateStream uploads JSON state from a non-replayable reader without buffering.

This variant is intended for streaming callers (for example MCP write streams) and performs a single transport attempt. On retry/failover requirements, callers must begin a new stream and replay bytes from their source of truth.

func (*Client) UseLeaseID added in v0.1.0

func (c *Client) UseLeaseID(leaseID string)

UseLeaseID configures the client to reuse the provided lease for subsequent requests.

func (*Client) UseNamespace added in v0.1.0

func (c *Client) UseNamespace(ns string) error

UseNamespace updates the default namespace used when callers omit one.

func (*Client) WatchQueue added in v0.7.0

func (c *Client) WatchQueue(ctx context.Context, queue string, opts WatchQueueOptions, handler QueueWatchHandler) error

WatchQueue streams non-consuming queue visibility changes over SSE.

type ConsumerConfig added in v0.4.0

type ConsumerConfig struct {
	// Name labels this consumer in logs and lifecycle/error callbacks.
	// Empty defaults to Queue.
	Name string
	// Queue is the queue name to subscribe to.
	Queue string
	// Namespace scopes this consumer when Options.Namespace is empty.
	// Empty falls back to the client's default namespace.
	Namespace string
	// Options configures subscription behavior (namespace, owner, prefetch, etc.).
	// When Options.Owner is empty, StartConsumer generates a unique owner value.
	Options SubscribeOptions
	// WithState switches between Subscribe (false) and SubscribeWithState (true).
	WithState bool
	// MessageHandler processes each delivered message.
	MessageHandler ConsumerMessageHandler
	// ErrorHandler observes subscribe failures before restart.
	// When nil, StartConsumer logs and continues.
	ErrorHandler ConsumerErrorHandler
	// OnStart runs when a consumer subscribe attempt starts.
	OnStart func(context.Context, ConsumerLifecycleEvent)
	// OnStop runs when a consumer subscribe attempt stops (success, context
	// cancellation, or error).
	OnStop func(context.Context, ConsumerLifecycleEvent)
	// RestartPolicy controls retry/backoff behavior after failures.
	RestartPolicy ConsumerRestartPolicy
}

ConsumerConfig describes one queue consumer managed by StartConsumer.

type ConsumerError added in v0.4.0

type ConsumerError struct {
	// Name is the logical consumer name from ConsumerConfig.
	Name string
	// Queue is the queue whose consume loop failed.
	Queue string
	// WithState indicates whether the failed loop used SubscribeWithState.
	WithState bool
	// Attempt is the current consecutive failure count for this consumer.
	Attempt int
	// RestartIn is the delay before the next subscribe attempt.
	RestartIn time.Duration
	// Err is the underlying failure returned by Subscribe/SubscribeWithState.
	Err error
}

ConsumerError describes a recoverable consumer loop failure before restart.

type ConsumerErrorHandler added in v0.4.0

type ConsumerErrorHandler func(context.Context, ConsumerError) error

ConsumerErrorHandler is invoked when a consume loop fails and is about to be restarted. Returning nil continues restart handling. Returning a non-nil error stops StartConsumer and returns that error (wrapped with queue context).

type ConsumerLifecycleEvent added in v0.4.0

type ConsumerLifecycleEvent struct {
	// Name is the logical consumer name from ConsumerConfig.
	Name string
	// Queue is the queue this lifecycle event belongs to.
	Queue string
	// WithState indicates whether this consumer uses SubscribeWithState.
	WithState bool
	// Attempt is the 1-based subscribe attempt sequence for this consumer.
	Attempt int
	// Err is the terminal error for the attempt. It is nil for OnStart and for
	// clean attempt completion.
	Err error
}

ConsumerLifecycleEvent describes lifecycle transitions for one consumer.

type ConsumerMessage added in v0.4.0

type ConsumerMessage struct {
	// Client is the active SDK client used by StartConsumer, allowing handlers to
	// perform additional lockd operations (enqueue, acquire, queries, etc.)
	// without constructing a second client.
	Client *Client
	// Logger is the client logger configured via WithLogger.
	// It is always non-nil (defaults to pslog.NoopLogger()).
	Logger pslog.Base
	// Queue is the subscribed queue name for this delivery.
	Queue string
	// WithState indicates whether this delivery came from a stateful subscription.
	WithState bool
	// Message is the leased queue delivery payload/metadata wrapper.
	Message *QueueMessage
	// State is the workflow state lease handle for stateful subscriptions.
	// It is nil when WithState is false.
	State *QueueStateHandle
	// contains filtered or unexported fields
}

ConsumerMessage bundles the runtime context provided to ConsumerMessageHandler. The same handler can be reused across multiple ConsumerConfig entries and inspect Queue/WithState to branch behavior.

func (ConsumerMessage) Name added in v0.4.0

func (m ConsumerMessage) Name() string

Name returns the logical consumer name resolved from ConsumerConfig.Name, defaulting to Queue when no explicit name was configured.

type ConsumerMessageHandler added in v0.4.0

type ConsumerMessageHandler func(context.Context, ConsumerMessage) error

ConsumerMessageHandler handles one queue delivery produced by StartConsumer.

type ConsumerRestartPolicy added in v0.4.0

type ConsumerRestartPolicy struct {
	// ImmediateRetries is the number of consecutive failures retried with zero
	// delay before exponential backoff starts.
	ImmediateRetries int
	// BaseDelay is the first delayed retry duration once immediate retries are exhausted.
	BaseDelay time.Duration
	// MaxDelay caps the restart delay.
	MaxDelay time.Duration
	// Multiplier controls exponential growth between delayed retries.
	Multiplier float64
	// Jitter randomizes delay by +/- Jitter to reduce synchronized retries.
	Jitter time.Duration
	// MaxFailures optionally stops the consumer after N consecutive failures.
	// Zero or negative means retry forever.
	MaxFailures int
}

ConsumerRestartPolicy configures restart behavior for failed consume loops.

type DeleteAllAttachmentsRequest added in v0.1.0

type DeleteAllAttachmentsRequest struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// TxnID associates the operation with a transaction coordinator record.
	TxnID string
	// FencingToken is the monotonic token used to fence stale writers.
	FencingToken *int64
}

DeleteAllAttachmentsRequest removes all attachments for a key.

type DeleteAllAttachmentsResult added in v0.1.0

type DeleteAllAttachmentsResult struct {
	// Deleted reports delete results for the requested attachment operation.
	Deleted int
	// Version is the lockd monotonic version for the target object.
	Version int64
}

DeleteAllAttachmentsResult reports delete status for all attachments.

type DeleteAttachmentRequest added in v0.1.0

type DeleteAttachmentRequest struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// TxnID associates the operation with a transaction coordinator record.
	TxnID string
	// FencingToken is the monotonic token used to fence stale writers.
	FencingToken *int64
	// Selector identifies which attachment to delete (by ID, Name, or both).
	Selector AttachmentSelector
}

DeleteAttachmentRequest removes a single attachment.

type DeleteAttachmentResult added in v0.1.0

type DeleteAttachmentResult struct {
	// Deleted reports delete results for the requested attachment operation.
	Deleted bool
	// Version is the lockd monotonic version for the target object.
	Version int64
}

DeleteAttachmentResult reports delete status for a single attachment.

type DequeueOptions

type DequeueOptions struct {
	// Namespace scopes the queue operation. Empty uses the client's default namespace.
	Namespace string
	// Owner identifies the worker/consumer acquiring the queue lease.
	Owner string
	// TxnID binds queue state operations to an existing transaction when required.
	TxnID string
	// Visibility controls the message lease timeout returned by dequeue.
	Visibility time.Duration
	// BlockSeconds controls long-poll behavior: BlockNoWait (-1) for immediate return,
	// 0 to wait indefinitely, and >0 to wait up to that many seconds.
	BlockSeconds int64
	// PageSize caps batched dequeue result size (for APIs that support multi-message responses).
	PageSize int
	// StartAfter resumes dequeue scanning from a server-issued cursor.
	StartAfter string
	// OnCloseDelay applies a delay before auto-nack when QueueMessage.Close is called without ack.
	OnCloseDelay time.Duration
}

DequeueOptions guides dequeue behaviour.

type DequeueResult

type DequeueResult struct {
	// Message is the primary dequeued message handle for single-message consumption paths.
	Message *QueueMessageHandle
	// Messages contains all dequeued message handles when batch dequeue is requested.
	Messages []*QueueMessageHandle
	// NextCursor is the server cursor for continuing dequeue scans.
	NextCursor string
}

DequeueResult captures the outcome of a dequeue request.

type Document added in v0.1.0

type Document struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// Version is the lockd monotonic version for the target object.
	Version string
	// ETag is the entity tag used for optimistic concurrency and cache validation.
	ETag string
	// Metadata carries metadata values returned by the server for this object.
	Metadata map[string]string

	// Body holds the mutable JSON document content.
	Body map[string]any
}

Document models a lockd state document with helper methods for JSON-pointer mutations and streaming interop.

func NewDocument added in v0.1.0

func NewDocument(namespace, key string) *Document

NewDocument initialises a mutable document for namespace/key.

func (*Document) Bytes added in v0.1.0

func (d *Document) Bytes() ([]byte, error)

Bytes returns the compact JSON representation of the document body.

func (*Document) LoadFrom added in v0.1.0

func (d *Document) LoadFrom(r io.Reader) error

LoadFrom replaces the document contents with data streamed from r.

func (*Document) LoadInto added in v0.1.0

func (d *Document) LoadInto(target any) error

LoadInto decodes the document body into target.

func (*Document) Mutate added in v0.1.0

func (d *Document) Mutate(exprs ...string) error

Mutate applies LQL mutations using the current time.

func (*Document) MutateWithTime added in v0.1.0

func (d *Document) MutateWithTime(now time.Time, exprs ...string) error

MutateWithTime applies LQL mutations using the supplied timestamp.

func (*Document) Reader added in v0.1.0

func (d *Document) Reader() (io.Reader, error)

Reader returns a fresh reader over the document JSON.

func (*Document) Write added in v0.1.0

func (d *Document) Write(p []byte) (int, error)

Write implements io.Writer so documents can be used with io.Copy.

type EnqueueOptions

type EnqueueOptions struct {
	// Namespace scopes the queue operation. Empty uses the client's default namespace.
	Namespace string
	// Delay postpones first visibility after enqueue.
	Delay time.Duration
	// Visibility controls how long a dequeued message stays hidden before it can be redelivered.
	Visibility time.Duration
	// TTL sets message retention. Zero uses server defaults.
	TTL time.Duration
	// MaxAttempts limits failed attempts before dead-letter handling.
	MaxAttempts int
	// Attributes stores arbitrary JSON-serializable metadata on the message.
	Attributes map[string]any
	// ContentType is sent as the payload media type. Empty defaults to application/octet-stream.
	ContentType string
}

EnqueueOptions controls enqueue behaviour.

type FlushIndexOptions added in v0.1.0

type FlushIndexOptions struct {
	// Mode accepts "async" (default) or "wait" for synchronous completion.
	Mode string
}

FlushIndexOptions customises index flush behaviour.

type FlushOption added in v0.1.0

type FlushOption func(*FlushIndexOptions)

FlushOption customises FlushIndex behaviour.

func WithFlushModeAsync added in v0.1.0

func WithFlushModeAsync() FlushOption

WithFlushModeAsync schedules FlushIndex asynchronously and returns immediately.

func WithFlushModeWait added in v0.1.0

func WithFlushModeWait() FlushOption

WithFlushModeWait forces FlushIndex to block until indexing catches up.

type GetAttachmentRequest added in v0.1.0

type GetAttachmentRequest struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// TxnID associates the operation with a transaction coordinator record.
	TxnID string
	// FencingToken is the monotonic token used to fence stale writers.
	FencingToken *int64
	// Public enables read-only attachment retrieval without lease credentials when public reads are allowed.
	Public bool
	// Selector identifies which attachment to retrieve (by ID, Name, or both).
	Selector AttachmentSelector
}

GetAttachmentRequest retrieves a single attachment payload.

type GetOption added in v0.1.0

type GetOption func(*GetOptions)

GetOption applies custom behaviour to Client.Get.

func WithGetLeaseID added in v0.1.0

func WithGetLeaseID(id string) GetOption

WithGetLeaseID sets lease id used for lease-bound reads. Leave unset for public/read-only snapshots when allowed.

func WithGetNamespace added in v0.1.0

func WithGetNamespace(ns string) GetOption

WithGetNamespace overrides namespace for Get. Empty values are normalized by server/client defaults.

func WithGetPublicDisabled added in v0.1.0

func WithGetPublicDisabled(disable bool) GetOption

WithGetPublicDisabled disables public-read fallback and requires lease-backed semantics.

type GetOptions added in v0.1.0

type GetOptions struct {
	// Namespace scopes the read. Empty uses the client's default namespace.
	Namespace string
	// LeaseID enforces lease-bound reads when provided.
	LeaseID string
	// DisablePublic forces authenticated/lease-backed reads instead of public fast-path reads.
	DisablePublic bool
}

GetOptions tweaks the behaviour of Client.Get / Client.Load.

type GetResponse added in v0.1.0

type GetResponse struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// ETag is the entity tag used for optimistic concurrency and cache validation.
	ETag string
	// Version is the server version header for the returned key state.
	Version string
	// HasState reports whether the requested key currently has committed state.
	HasState bool
	// contains filtered or unexported fields
}

GetResponse encapsulates the payload and headers returned by Client.Get.

func (*GetResponse) Attachments added in v0.1.0

func (gr *GetResponse) Attachments() *AttachmentStore

Attachments returns an attachment store scoped to the get response.

func (*GetResponse) Bytes added in v0.1.0

func (gr *GetResponse) Bytes() ([]byte, error)

Bytes loads the state blob into memory and closes the underlying reader.

func (*GetResponse) Close added in v0.1.0

func (gr *GetResponse) Close() error

Close releases the underlying HTTP body when streaming isn�t required.

func (*GetResponse) Document added in v0.1.0

func (gr *GetResponse) Document() (*Document, error)

Document hydrates the response body into a Document and consumes the reader.

func (*GetResponse) ListAttachments added in v0.1.0

func (gr *GetResponse) ListAttachments(ctx context.Context) (*AttachmentList, error)

ListAttachments lists attachment metadata for the get response.

func (*GetResponse) Reader added in v0.1.0

func (gr *GetResponse) Reader() io.ReadCloser

Reader exposes the underlying body stream. Call Close when finished.

func (*GetResponse) RetrieveAttachment added in v0.1.0

func (gr *GetResponse) RetrieveAttachment(ctx context.Context, selector AttachmentSelector) (*Attachment, error)

RetrieveAttachment streams a single attachment payload for the get response.

type LeaseSession

type LeaseSession struct {
	api.AcquireResponse
	// contains filtered or unexported fields
}

LeaseSession models an active lease returned by Acquire.

func (*LeaseSession) Attach added in v0.1.0

func (s *LeaseSession) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)

Attach stages an attachment on the active lease.

func (*LeaseSession) Attachments added in v0.1.0

func (s *LeaseSession) Attachments() *AttachmentStore

Attachments returns an attachment store scoped to the lease session.

func (*LeaseSession) Close

func (s *LeaseSession) Close() error

Close is equivalent to Release using a background context.

func (*LeaseSession) CurrentFencingToken added in v0.7.0

func (s *LeaseSession) CurrentFencingToken() int64

CurrentFencingToken returns the current fencing token associated with the lease.

func (*LeaseSession) DeleteAllAttachments added in v0.1.0

func (s *LeaseSession) DeleteAllAttachments(ctx context.Context) (*DeleteAllAttachmentsResult, error)

DeleteAllAttachments removes all attachments for the lease key.

func (*LeaseSession) DeleteAttachment added in v0.1.0

func (s *LeaseSession) DeleteAttachment(ctx context.Context, selector AttachmentSelector) (*DeleteAttachmentResult, error)

DeleteAttachment removes a single attachment for the lease key.

func (*LeaseSession) Get

Get refreshes the current state snapshot using the active lease.

func (*LeaseSession) GetBytes

func (s *LeaseSession) GetBytes(ctx context.Context) ([]byte, error)

GetBytes returns the current state blob as a byte slice.

func (*LeaseSession) KeepAlive

func (s *LeaseSession) KeepAlive(ctx context.Context, ttl time.Duration) (*api.KeepAliveResponse, error)

KeepAlive extends the lease TTL without altering the stored state.

func (*LeaseSession) ListAttachments added in v0.1.0

func (s *LeaseSession) ListAttachments(ctx context.Context) (*AttachmentList, error)

ListAttachments lists attachment metadata for the lease key.

func (*LeaseSession) Load

func (s *LeaseSession) Load(ctx context.Context, v any) error

Load reads the current state into v. When no state exists, v is untouched.

func (*LeaseSession) Mutate added in v0.7.0

func (s *LeaseSession) Mutate(ctx context.Context, mutations []string, options ...UpdateOption) (*UpdateResult, error)

Mutate applies server-side LQL mutations to the session's key while preserving the lease.

func (*LeaseSession) MutateLocal added in v0.8.0

func (s *LeaseSession) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)

MutateLocal applies client-local streaming LQL mutations to the session's key while preserving the lease.

func (*LeaseSession) MutateWithOptions added in v0.7.0

func (s *LeaseSession) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)

MutateWithOptions allows callers to override conditional metadata for server-side mutation.

func (*LeaseSession) Release

func (s *LeaseSession) Release(ctx context.Context) error

Release relinquishes the lease early; it is safe to call multiple times. It commits staged changes by default. Use ReleaseWithOptions to request a rollback.

func (*LeaseSession) ReleaseWithOptions added in v0.1.0

func (s *LeaseSession) ReleaseWithOptions(ctx context.Context, opts ReleaseOptions) error

ReleaseWithOptions relinquishes the lease and allows callers to rollback staged changes.

func (*LeaseSession) Remove

func (s *LeaseSession) Remove(ctx context.Context) (*api.RemoveResponse, error)

Remove deletes the current state while holding the lease, enforcing the cached version and etag when present.

func (*LeaseSession) RemoveWithOptions

func (s *LeaseSession) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)

RemoveWithOptions allows callers to override conditional metadata for delete.

func (*LeaseSession) RetrieveAllAttachments added in v0.1.0

func (s *LeaseSession) RetrieveAllAttachments(ctx context.Context) ([]*Attachment, error)

RetrieveAllAttachments streams all attachments for the lease key.

func (*LeaseSession) RetrieveAttachment added in v0.1.0

func (s *LeaseSession) RetrieveAttachment(ctx context.Context, selector AttachmentSelector) (*Attachment, error)

RetrieveAttachment streams a single attachment payload for the lease key.

func (*LeaseSession) Save

func (s *LeaseSession) Save(ctx context.Context, v any, opts ...UpdateOption) error

Save serialises v as JSON and updates the state.

func (*LeaseSession) Update

func (s *LeaseSession) Update(ctx context.Context, body io.Reader, options ...UpdateOption) (*UpdateResult, error)

Update streams new JSON state for the session's key while preserving the lease.

func (*LeaseSession) UpdateBytes

func (s *LeaseSession) UpdateBytes(ctx context.Context, body []byte, options ...UpdateOption) (*UpdateResult, error)

UpdateBytes is a convenience wrapper around Update that accepts an in-memory payload.

func (*LeaseSession) UpdateMetadata added in v0.1.0

func (s *LeaseSession) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)

UpdateMetadata toggles metadata flags without mutating the JSON state.

func (*LeaseSession) UpdateWithOptions

func (s *LeaseSession) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)

UpdateWithOptions allows callers to override conditional metadata.

type ListAttachmentsRequest added in v0.1.0

type ListAttachmentsRequest struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// TxnID associates the operation with a transaction coordinator record.
	TxnID string
	// FencingToken is the monotonic token used to fence stale writers.
	FencingToken *int64
	// Public enables read-only attachment listing without lease credentials when public reads are allowed.
	Public bool
}

ListAttachmentsRequest lists attachments for a key.

type LoadOption added in v0.1.0

type LoadOption func(*LoadOptions)

LoadOption customises Client.Load.

func WithLoadLeaseID added in v0.1.0

func WithLoadLeaseID(id string) LoadOption

WithLoadLeaseID sets lease id for Load when lease-bound reads are required.

func WithLoadNamespace added in v0.1.0

func WithLoadNamespace(ns string) LoadOption

WithLoadNamespace overrides namespace for Load.

func WithLoadPublicDisabled added in v0.1.0

func WithLoadPublicDisabled(disable bool) LoadOption

WithLoadPublicDisabled disables public-read fallback for Load.

type LoadOptions added in v0.1.0

type LoadOptions struct {
	// GetOptions carries namespace/lease/public-read behavior for Load.
	GetOptions
}

LoadOptions mirrors GetOptions for Client.Load.

type MessageHandler

type MessageHandler func(context.Context, *QueueMessage) error

MessageHandler is invoked for each message delivered via Subscribe.

type MessageHandlerWithState

type MessageHandlerWithState func(context.Context, *QueueMessage, *QueueStateHandle) error

MessageHandlerWithState is invoked for stateful subscriptions and receives the associated workflow state handle.

type MetadataOptions added in v0.1.0

type MetadataOptions struct {
	// QueryHidden marks a key hidden/visible from query results when non-nil.
	QueryHidden *bool
	// TxnID binds metadata-only mutations to a transaction.
	TxnID string
}

MetadataOptions captures metadata mutations attached to updates.

type MetadataResult added in v0.1.0

type MetadataResult struct {
	// Version is the new metadata version after mutation.
	Version int64
	// Metadata is the server's effective metadata document after mutation.
	Metadata api.MetadataAttributes
}

MetadataResult captures metadata-only mutation outcomes.

type MutateLocalOptions added in v0.8.0

type MutateLocalOptions struct {
	// Update controls namespace, CAS guards, fencing, and txn headers.
	Update UpdateOptions
	// DisableFetchedCAS skips defaulting IfETag/IfVersion from the streamed Get.
	DisableFetchedCAS bool
	// FileValueBaseDir resolves relative file:/textfile:/base64file: paths.
	FileValueBaseDir string
	// FileValueResolver overrides file opening for tests and custom callers.
	FileValueResolver lql.MutationFileValueResolver
}

MutateLocalOptions customizes session-local streaming mutation helpers.

type MutateLocalRequest added in v0.8.0

type MutateLocalRequest struct {
	// Key identifies the state object to mutate.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// Mutations contains one or more LQL mutation expressions in execution order.
	Mutations []string
	// Options controls namespace, CAS guards, fencing, and txn headers.
	Options UpdateOptions
	// DisableFetchedCAS skips defaulting IfETag/IfVersion from the streamed Get.
	DisableFetchedCAS bool
	// FileValueBaseDir resolves relative file:/textfile:/base64file: paths.
	FileValueBaseDir string
	// FileValueResolver overrides file opening for tests and custom callers.
	FileValueResolver lql.MutationFileValueResolver
}

MutateLocalRequest describes a client-local streaming LQL mutation flow.

The client loads the lease-visible JSON state, applies mutations locally via lql.MutateStream, and streams the mutated JSON back through UpdateStream. This is the path that supports file:/textfile:/base64file: mutators.

type MutateRequest added in v0.7.0

type MutateRequest struct {
	// Key identifies the state object to mutate.
	Key string
	// LeaseID identifies the active lease required for protected mutations.
	LeaseID string
	// Mutations contains one or more LQL mutation expressions in execution order.
	Mutations []string
	// Options controls namespace, CAS guards, fencing, and txn headers.
	Options UpdateOptions
}

MutateRequest captures a server-side LQL mutation operation for one key.

type NamespaceConfigOptions added in v0.1.0

type NamespaceConfigOptions struct {
	// IfMatch enforces optimistic concurrency against the current namespace-config ETag.
	IfMatch string
}

NamespaceConfigOptions controls concurrency for namespace configuration mutations.

type NamespaceConfigResult added in v0.1.0

type NamespaceConfigResult struct {
	// Config is the effective namespace configuration document returned by the server.
	Config *api.NamespaceConfigResponse
	// ETag is the namespace configuration ETag for optimistic concurrency control.
	ETag string
}

NamespaceConfigResult captures a namespace config document and its ETag.

type Option

type Option func(*Client)

Option customises client construction.

func WithBundlePEM added in v0.7.0

func WithBundlePEM(pemBytes []byte) Option

WithBundlePEM configures an in-memory mTLS client bundle PEM (CA cert + client cert + key). This option overrides any previously configured bundle path.

func WithBundlePath added in v0.4.0

func WithBundlePath(path string) Option

WithBundlePath configures an mTLS client bundle PEM (CA cert + client cert + key). By default, "$VARS" and leading "~/" are expanded before loading. Use WithBundlePathDisableExpansion() to treat the path literally.

func WithBundlePathDisableExpansion added in v0.4.0

func WithBundlePathDisableExpansion() Option

WithBundlePathDisableExpansion disables env/tilde expansion for WithBundlePath.

func WithCloseTimeout

func WithCloseTimeout(d time.Duration) Option

WithCloseTimeout overrides timeout used when Close() auto-releases tracked leases.

func WithDefaultNamespace added in v0.1.0

func WithDefaultNamespace(ns string) Option

WithDefaultNamespace overrides the namespace applied when request payloads/options omit Namespace.

func WithDisableMTLS added in v0.1.0

func WithDisableMTLS(disable bool) Option

WithDisableMTLS toggles mutual TLS expectations for scheme inference. When disabled (false, default), base URLs without a scheme assume HTTPS. When enabled (true), bare endpoints default to HTTP and TLS client certificates are not loaded automatically.

func WithDrainAwareShutdown added in v0.1.0

func WithDrainAwareShutdown(enabled bool) Option

WithDrainAwareShutdown toggles automatic lease release when server responses include Shutdown-Imminent drain signaling.

func WithEndpointShuffle

func WithEndpointShuffle(enabled bool) Option

WithEndpointShuffle toggles random shuffling of endpoints before each request. When disabled, endpoints are tried in the order provided.

func WithFailureRetries

func WithFailureRetries(n int) Option

WithFailureRetries overrides how many times non-acquire operations retry on failure. A value <0 allows infinite retries.

func WithForUpdateTimeout

func WithForUpdateTimeout(d time.Duration) Option

WithForUpdateTimeout overrides timeout bound for AcquireForUpdate orchestration. Handler execution still follows the caller's context deadline/cancellation.

func WithHTTPClient

func WithHTTPClient(cli *http.Client) Option

WithHTTPClient supplies a custom HTTP client/transport stack. Use this when you need custom TLS roots, proxies, tracing round-trippers, or connection pooling behavior not covered by SDK defaults.

func WithHTTPTimeout

func WithHTTPTimeout(d time.Duration) Option

WithHTTPTimeout overrides per-request timeout used by SDK-issued HTTP calls. This timeout does not apply to acquire/dequeue wait-forever calls that intentionally hold long-poll requests open.

func WithHTTPTrace added in v0.1.0

func WithHTTPTrace() Option

WithHTTPTrace enables net/http/httptrace diagnostics on SDK requests. Traces are emitted through the configured client logger.

func WithKeepAliveTimeout

func WithKeepAliveTimeout(d time.Duration) Option

WithKeepAliveTimeout overrides timeout used for lease keepalive requests.

func WithLogger

func WithLogger(logger pslog.Base) Option

WithLogger supplies a logger for client diagnostics. Passing nil falls back to pslog.NoopLogger().

type QueryOption added in v0.1.0

type QueryOption func(*QueryOptions)

QueryOption customizes query execution.

func WithQuery added in v0.1.0

func WithQuery(expr string) QueryOption

WithQuery parses an LQL expression and sets the selector for the request. Parse errors are surfaced when Query executes.

func WithQueryBlock added in v0.1.0

func WithQueryBlock() QueryOption

WithQueryBlock waits for the queryable view to observe in-flight documents.

func WithQueryCursor added in v0.1.0

func WithQueryCursor(cursor string) QueryOption

WithQueryCursor resumes a previous query page using server-provided cursor token.

func WithQueryEngine added in v0.1.0

func WithQueryEngine(engine string) QueryOption

WithQueryEngine forces query execution engine ("auto", "index", or "scan").

func WithQueryEngineAuto added in v0.1.0

func WithQueryEngineAuto() QueryOption

WithQueryEngineAuto explicitly selects the auto engine.

func WithQueryEngineIndex added in v0.1.0

func WithQueryEngineIndex() QueryOption

WithQueryEngineIndex selects the secondary index engine.

func WithQueryEngineScan added in v0.1.0

func WithQueryEngineScan() QueryOption

WithQueryEngineScan selects the scan engine.

func WithQueryFields added in v0.1.0

func WithQueryFields(fields map[string]any) QueryOption

WithQueryFields sets field projection map passed to query execution.

func WithQueryLimit added in v0.1.0

func WithQueryLimit(limit int) QueryOption

WithQueryLimit caps number of rows returned by the server.

func WithQueryNamespace added in v0.1.0

func WithQueryNamespace(ns string) QueryOption

WithQueryNamespace overrides namespace used for query execution.

func WithQueryRefresh added in v0.1.0

func WithQueryRefresh(mode string) QueryOption

WithQueryRefresh selects refresh policy (for example "wait_for" to block until indexed visibility).

func WithQueryRefreshImmediate added in v0.1.0

func WithQueryRefreshImmediate() QueryOption

WithQueryRefreshImmediate clears the refresh hint (default behaviour).

func WithQueryRefreshWaitFor added in v0.1.0

func WithQueryRefreshWaitFor() QueryOption

WithQueryRefreshWaitFor waits until documents are visible in the selected engine.

func WithQueryRequest added in v0.1.0

func WithQueryRequest(req *api.QueryRequest) QueryOption

WithQueryRequest copies a full QueryRequest into the option set. Subsequent WithQuery* options can override individual fields.

func WithQueryReturn added in v0.1.0

func WithQueryReturn(mode QueryReturn) QueryOption

WithQueryReturn selects payload mode for /v1/query ("keys" or "documents").

func WithQueryReturnDocuments added in v0.1.0

func WithQueryReturnDocuments() QueryOption

WithQueryReturnDocuments streams documents as NDJSON rows.

func WithQueryReturnKeys added in v0.1.0

func WithQueryReturnKeys() QueryOption

WithQueryReturnKeys forces the default keys-only response mode.

func WithQuerySelector added in v0.1.0

func WithQuerySelector(sel api.Selector) QueryOption

WithQuerySelector installs an already-parsed selector (useful when callers construct selector ASTs directly).

type QueryOptions added in v0.1.0

type QueryOptions struct {

	// Engine forces execution strategy ("auto", "index", "scan").
	Engine string
	// Refresh controls visibility semantics (for example "wait_for").
	Refresh string
	// Return controls payload shape (keys or streamed documents).
	Return QueryReturn
	// contains filtered or unexported fields
}

QueryOptions controls /v1/query execution hints.

type QueryResponse added in v0.1.0

type QueryResponse struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Cursor is the pagination cursor returned by the query engine.
	Cursor string
	// IndexSeq is the index sequence observed by the query execution.
	IndexSeq uint64
	// Metadata carries metadata values returned by the server for this object.
	Metadata map[string]string
	// contains filtered or unexported fields
}

QueryResponse describes the result set returned by Client.Query.

func (*QueryResponse) Close added in v0.1.0

func (qr *QueryResponse) Close() error

Close releases the underlying reader when the response streams documents.

func (*QueryResponse) ForEach added in v0.1.0

func (qr *QueryResponse) ForEach(fn func(QueryRow) error) error

ForEach invokes fn for every entry in the response. For document streams the handler receives fully populated QueryRow values.

func (*QueryResponse) Keys added in v0.1.0

func (qr *QueryResponse) Keys() []string

Keys returns a defensive copy of the key slice. When the response streams documents, Keys drains the stream, collects the keys, and closes the reader.

func (*QueryResponse) Mode added in v0.1.0

func (qr *QueryResponse) Mode() QueryReturn

Mode reports whether the query streamed keys or documents.

type QueryReturn added in v0.1.0

type QueryReturn string

QueryReturn describes the payload mode exposed by /v1/query.

const (
	// QueryReturnKeys streams the default keys-only JSON object.
	QueryReturnKeys QueryReturn = QueryReturn(api.QueryReturnKeys)
	// QueryReturnDocuments streams newline-delimited documents.
	QueryReturnDocuments QueryReturn = QueryReturn(api.QueryReturnDocuments)
)

type QueryRow added in v0.1.0

type QueryRow struct {
	// Namespace scopes the request or response to a lockd namespace.
	Namespace string
	// Key identifies the lock/state key within the namespace.
	Key string
	// Version is the key version associated with this query row.
	Version int64
	// contains filtered or unexported fields
}

QueryRow represents a single row returned from /v1/query.

func (QueryRow) Document added in v0.1.0

func (row QueryRow) Document() (*Document, error)

Document loads the payload into a client.Document.

func (QueryRow) DocumentInto added in v0.1.0

func (row QueryRow) DocumentInto(target any) error

DocumentInto unmarshals the document payload into target when present.

func (QueryRow) DocumentReader added in v0.1.0

func (row QueryRow) DocumentReader() (io.ReadCloser, error)

DocumentReader returns a streaming reader for the row's document. Callers must Close the returned reader when finished.

func (QueryRow) HasDocument added in v0.1.0

func (row QueryRow) HasDocument() bool

HasDocument reports whether the row was populated with a document payload.

type QueueMessage

type QueueMessage struct {
	// contains filtered or unexported fields
}

QueueMessage wraps a QueueMessageHandle and provides io.ReadCloser semantics with automatic nack-on-close when the caller forgets to ack explicitly.

func (*QueueMessage) Ack

func (m *QueueMessage) Ack(ctx context.Context) error

Ack removes the message from the queue.

func (*QueueMessage) Attempts

func (m *QueueMessage) Attempts() int

Attempts returns the recorded delivery attempts.

func (*QueueMessage) Close

func (m *QueueMessage) Close() error

Close releases underlying resources and auto-nacks the message when it has not been acked or nacked explicitly.

func (*QueueMessage) ClosePayload

func (m *QueueMessage) ClosePayload() error

ClosePayload releases the underlying payload stream without altering the lease state.

func (*QueueMessage) ContentType

func (m *QueueMessage) ContentType() string

ContentType returns the payload content type.

func (*QueueMessage) CorrelationID

func (m *QueueMessage) CorrelationID() string

CorrelationID returns the lifecycle correlation identifier for the message.

func (*QueueMessage) Cursor

func (m *QueueMessage) Cursor() string

Cursor returns the next-cursor associated with the dequeue response.

func (*QueueMessage) DecodePayloadJSON

func (m *QueueMessage) DecodePayloadJSON(v any) error

DecodePayloadJSON decodes the payload JSON into v and closes the payload afterwards.

func (*QueueMessage) Defer added in v0.5.0

func (m *QueueMessage) Defer(ctx context.Context, delay time.Duration) error

Defer releases the lease and requeues intentionally without consuming failure budget.

func (*QueueMessage) Extend

func (m *QueueMessage) Extend(ctx context.Context, extendBy time.Duration) error

Extend pushes the lease and visibility timeout forward.

func (*QueueMessage) FailureAttempts added in v0.5.0

func (m *QueueMessage) FailureAttempts() int

FailureAttempts returns the recorded failed attempts.

func (*QueueMessage) FencingToken

func (m *QueueMessage) FencingToken() int64

FencingToken returns the monotonic fencing token issued with the lease.

func (*QueueMessage) LeaseExpiresAt

func (m *QueueMessage) LeaseExpiresAt() int64

LeaseExpiresAt returns when the lease currently expires.

func (*QueueMessage) LeaseID

func (m *QueueMessage) LeaseID() string

LeaseID exposes the underlying lease identifier.

func (*QueueMessage) MaxAttempts

func (m *QueueMessage) MaxAttempts() int

MaxAttempts returns the configured maximum failed attempts.

func (*QueueMessage) MessageID

func (m *QueueMessage) MessageID() string

MessageID returns the message identifier.

func (*QueueMessage) MetaETag

func (m *QueueMessage) MetaETag() string

MetaETag returns the metadata ETag currently associated with the message.

func (*QueueMessage) Nack

func (m *QueueMessage) Nack(ctx context.Context, delay time.Duration, lastErr any) error

Nack releases the lease and requeues the message.

func (*QueueMessage) Namespace added in v0.1.0

func (m *QueueMessage) Namespace() string

Namespace returns the namespace associated with the message.

func (*QueueMessage) NotVisibleUntil

func (m *QueueMessage) NotVisibleUntil() time.Time

NotVisibleUntil reports when the message will become visible again.

func (*QueueMessage) PayloadReader

func (m *QueueMessage) PayloadReader() (io.ReadCloser, error)

PayloadReader returns an independent ReadCloser view over the payload.

func (*QueueMessage) PayloadSize

func (m *QueueMessage) PayloadSize() int64

PayloadSize reports the payload size in bytes when known.

func (*QueueMessage) Queue

func (m *QueueMessage) Queue() string

Queue returns the queue name.

func (*QueueMessage) Read

func (m *QueueMessage) Read(p []byte) (int, error)

Read streams bytes from the message payload.

func (*QueueMessage) SetOnCloseDelay

func (m *QueueMessage) SetOnCloseDelay(d time.Duration)

SetOnCloseDelay adjusts the delay used when Close() auto-nacks the message.

func (*QueueMessage) StateHandle

func (m *QueueMessage) StateHandle() *QueueStateHandle

StateHandle exposes the workflow state handle when using DequeueWithState.

func (*QueueMessage) TxnID added in v0.1.0

func (m *QueueMessage) TxnID() string

TxnID returns the transaction id associated with the message lease, if any.

func (*QueueMessage) VisibilityTimeout

func (m *QueueMessage) VisibilityTimeout() time.Duration

VisibilityTimeout returns the current visibility timeout.

func (*QueueMessage) WritePayloadTo

func (m *QueueMessage) WritePayloadTo(w io.Writer) (int64, error)

WritePayloadTo streams the payload into w and closes the payload afterwards.

type QueueMessageHandle

type QueueMessageHandle struct {
	// contains filtered or unexported fields
}

QueueMessageHandle models a leased queue message and provides helpers to ack/nack/extend.

func (*QueueMessageHandle) Ack

Ack removes the message (and state, when present) from the queue.

func (*QueueMessageHandle) Attempts

func (h *QueueMessageHandle) Attempts() int

Attempts returns the delivery attempts recorded for the message.

func (*QueueMessageHandle) ClosePayload

func (h *QueueMessageHandle) ClosePayload() error

ClosePayload releases any remaining payload stream resources without reading.

func (*QueueMessageHandle) ContentType

func (h *QueueMessageHandle) ContentType() string

ContentType returns the payload content type.

func (*QueueMessageHandle) CorrelationID

func (h *QueueMessageHandle) CorrelationID() string

CorrelationID returns the correlation identifier associated with the message lifecycle.

func (*QueueMessageHandle) Cursor

func (h *QueueMessageHandle) Cursor() string

Cursor returns the next-cursor value associated with the dequeue call.

func (*QueueMessageHandle) DecodePayloadJSON

func (h *QueueMessageHandle) DecodePayloadJSON(v any) error

DecodePayloadJSON decodes the payload as JSON into v.

func (*QueueMessageHandle) Defer added in v0.5.0

func (h *QueueMessageHandle) Defer(ctx context.Context, delay time.Duration) error

Defer releases the message with an optional delay without consuming failure budget.

func (*QueueMessageHandle) Extend

func (h *QueueMessageHandle) Extend(ctx context.Context, extendBy time.Duration) error

Extend pushes the visibility timeout and lease forward.

func (*QueueMessageHandle) FailureAttempts added in v0.5.0

func (h *QueueMessageHandle) FailureAttempts() int

FailureAttempts returns the recorded failed attempts for the message.

func (*QueueMessageHandle) FencingToken

func (h *QueueMessageHandle) FencingToken() int64

FencingToken exposes the current message fencing token.

func (*QueueMessageHandle) LeaseExpiresAt

func (h *QueueMessageHandle) LeaseExpiresAt() int64

LeaseExpiresAt returns the unix timestamp when the message lease expires.

func (*QueueMessageHandle) LeaseID

func (h *QueueMessageHandle) LeaseID() string

LeaseID returns the active message lease identifier.

func (*QueueMessageHandle) MaxAttempts

func (h *QueueMessageHandle) MaxAttempts() int

MaxAttempts returns the configured maximum failed attempts for the message.

func (*QueueMessageHandle) MessageID

func (h *QueueMessageHandle) MessageID() string

MessageID returns the message identifier.

func (*QueueMessageHandle) MetaETag

func (h *QueueMessageHandle) MetaETag() string

MetaETag returns the metadata ETag associated with the message.

func (*QueueMessageHandle) Nack

func (h *QueueMessageHandle) Nack(ctx context.Context, delay time.Duration, lastErr any) error

Nack releases the message with an optional delay and error payload.

func (*QueueMessageHandle) Namespace added in v0.1.0

func (h *QueueMessageHandle) Namespace() string

Namespace returns the namespace associated with the message.

func (*QueueMessageHandle) NotVisibleUntil

func (h *QueueMessageHandle) NotVisibleUntil() time.Time

NotVisibleUntil reports when the message becomes visible again.

func (*QueueMessageHandle) PayloadReader

func (h *QueueMessageHandle) PayloadReader() (io.ReadCloser, error)

PayloadReader returns a streaming reader for the message payload. Callers must close the returned reader when finished. The payload can only be consumed once; subsequent calls return an error.

func (*QueueMessageHandle) PayloadSize

func (h *QueueMessageHandle) PayloadSize() int64

PayloadSize returns the declared payload size in bytes.

func (*QueueMessageHandle) Queue

func (h *QueueMessageHandle) Queue() string

Queue returns the queue name.

func (*QueueMessageHandle) StateHandle

func (h *QueueMessageHandle) StateHandle() *QueueStateHandle

StateHandle exposes the state lease handle when using DequeueWithState.

func (*QueueMessageHandle) VisibilityTimeout

func (h *QueueMessageHandle) VisibilityTimeout() time.Duration

VisibilityTimeout returns the current visibility timeout.

func (*QueueMessageHandle) WritePayloadTo

func (h *QueueMessageHandle) WritePayloadTo(w io.Writer) (int64, error)

WritePayloadTo streams the payload into w, closing the payload afterwards.

type QueueStateHandle

type QueueStateHandle struct {
	// contains filtered or unexported fields
}

QueueStateHandle exposes the workflow state lease associated with a stateful dequeue.

func (*QueueStateHandle) CorrelationID

func (s *QueueStateHandle) CorrelationID() string

CorrelationID returns the message correlation identifier associated with the state lease.

func (*QueueStateHandle) ETag

func (s *QueueStateHandle) ETag() string

ETag returns the workflow state ETag.

func (*QueueStateHandle) FencingToken

func (s *QueueStateHandle) FencingToken() int64

FencingToken returns the workflow state fencing token.

func (*QueueStateHandle) Get added in v0.4.0

Get reads the current workflow state snapshot for the message's state lease.

func (*QueueStateHandle) GetBytes added in v0.4.0

func (s *QueueStateHandle) GetBytes(ctx context.Context) ([]byte, error)

GetBytes returns the current workflow state payload as bytes.

func (*QueueStateHandle) LeaseExpiresAt

func (s *QueueStateHandle) LeaseExpiresAt() int64

LeaseExpiresAt returns the unix timestamp when the state lease expires.

func (*QueueStateHandle) LeaseID

func (s *QueueStateHandle) LeaseID() string

LeaseID returns the workflow state lease identifier.

func (*QueueStateHandle) Load added in v0.4.0

func (s *QueueStateHandle) Load(ctx context.Context, v any) error

Load unmarshals the current workflow state into v.

func (*QueueStateHandle) MessageID

func (s *QueueStateHandle) MessageID() string

MessageID returns the associated message ID.

func (*QueueStateHandle) Mutate added in v0.7.0

func (s *QueueStateHandle) Mutate(ctx context.Context, mutations []string, opts ...UpdateOption) (*UpdateResult, error)

Mutate applies server-side LQL mutations while preserving the state lease.

func (*QueueStateHandle) MutateLocal added in v0.8.0

func (s *QueueStateHandle) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)

MutateLocal applies client-local streaming LQL mutations while preserving the state lease.

func (*QueueStateHandle) MutateWithOptions added in v0.7.0

func (s *QueueStateHandle) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)

MutateWithOptions applies server-side LQL mutations with explicit conditional/header overrides.

func (*QueueStateHandle) Queue

func (s *QueueStateHandle) Queue() string

Queue returns the queue name.

func (*QueueStateHandle) Remove added in v0.4.0

Remove deletes workflow state while preserving queue lease lifecycle semantics.

func (*QueueStateHandle) RemoveWithOptions added in v0.4.0

func (s *QueueStateHandle) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)

RemoveWithOptions deletes workflow state with optional conditional overrides.

func (*QueueStateHandle) Save added in v0.4.0

func (s *QueueStateHandle) Save(ctx context.Context, v any, opts ...UpdateOption) error

Save marshals v as JSON and updates workflow state through the state lease.

func (*QueueStateHandle) Update added in v0.4.0

func (s *QueueStateHandle) Update(ctx context.Context, body io.Reader, opts ...UpdateOption) (*UpdateResult, error)

Update streams new workflow state JSON while preserving the state lease.

func (*QueueStateHandle) UpdateBytes added in v0.4.0

func (s *QueueStateHandle) UpdateBytes(ctx context.Context, body []byte, opts ...UpdateOption) (*UpdateResult, error)

UpdateBytes is a convenience wrapper around Update for in-memory payloads.

func (*QueueStateHandle) UpdateMetadata added in v0.4.0

func (s *QueueStateHandle) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)

UpdateMetadata mutates metadata for the workflow state key.

func (*QueueStateHandle) UpdateWithOptions added in v0.4.0

func (s *QueueStateHandle) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)

UpdateWithOptions updates workflow state while allowing conditional/header overrides.

type QueueStatsOptions added in v0.7.0

type QueueStatsOptions struct {
	// Namespace scopes the queue operation. Empty uses the client's default namespace.
	Namespace string
}

QueueStatsOptions configures queue stats reads.

type QueueWatchEvent added in v0.7.0

type QueueWatchEvent struct {
	Namespace     string
	Queue         string
	Available     bool
	HeadMessageID string
	ChangedAt     time.Time
	CorrelationID string
}

QueueWatchEvent describes queue visibility changes emitted by WatchQueue.

type QueueWatchHandler added in v0.7.0

type QueueWatchHandler func(context.Context, QueueWatchEvent) error

QueueWatchHandler is invoked for each WatchQueue event.

type ReleaseOptions added in v0.1.0

type ReleaseOptions struct {
	// Rollback discards staged state/attachments instead of committing them.
	Rollback bool
}

ReleaseOptions controls commit vs rollback semantics during lease release.

type RemoveOptions added in v0.1.0

type RemoveOptions struct {
	// IfETag sets a conditional ETag guard (maps to If-Match semantics).
	IfETag string
	// IfVersion sets a conditional version guard (X-If-Version).
	IfVersion *int64
	// FencingToken provides explicit fencing when not already registered on the client.
	FencingToken *int64
	// Namespace scopes the delete. Empty uses the client's default namespace.
	Namespace string
	// TxnID binds the delete to a transaction coordinator record.
	TxnID string
}

RemoveOptions controls conditional delete semantics.

type StateSnapshot

type StateSnapshot struct {
	// Reader streams the state payload. It may be nil when HasState is false.
	Reader io.ReadCloser
	// ETag is the backend entity tag for CAS-protected writes/deletes.
	ETag string
	// Version is lockd's monotonic version counter for the key.
	Version int64
	// HasState reports whether the key currently has a committed JSON document.
	HasState bool
}

StateSnapshot represents the current JSON state and metadata for a lease.

func (*StateSnapshot) Bytes

func (s *StateSnapshot) Bytes() ([]byte, error)

Bytes returns the raw JSON payload.

func (*StateSnapshot) Close

func (s *StateSnapshot) Close() error

Close releases the underlying reader if present.

func (*StateSnapshot) Decode

func (s *StateSnapshot) Decode(v any) error

Decode unmarshals the JSON payload into v.

type SubscribeOptions

type SubscribeOptions struct {
	// Namespace scopes the queue operation. Empty uses the client's default namespace.
	Namespace string
	// Owner identifies the worker/consumer processing streamed deliveries.
	// For direct Subscribe calls, this is required. StartConsumer auto-fills a
	// generated unique owner when left empty.
	Owner string
	// Visibility controls per-message lease timeout for streamed deliveries.
	Visibility time.Duration
	// BlockSeconds controls server-side wait behavior between deliveries.
	BlockSeconds int64
	// Prefetch controls how many messages the server may pipeline before handler ack/nack.
	Prefetch int
	// StartAfter resumes a previous subscription stream from a cursor.
	StartAfter string
	// OnCloseDelay applies a delay before auto-nack when handlers close without ack.
	OnCloseDelay time.Duration
}

SubscribeOptions configures continuous streaming consumption via Subscribe.

type UpdateOption added in v0.1.0

type UpdateOption interface {
	// contains filtered or unexported methods
}

UpdateOption customizes update behaviour.

func WithMetadata added in v0.1.0

func WithMetadata(meta MetadataOptions) UpdateOption

WithMetadata attaches metadata mutations to the same request as the state update.

func WithQueryHidden added in v0.1.0

func WithQueryHidden() UpdateOption

WithQueryHidden marks the key hidden from /v1/query after the update commits.

func WithQueryVisible added in v0.1.0

func WithQueryVisible() UpdateOption

WithQueryVisible clears the query-hidden metadata flag after update commit.

func WithTxnID added in v0.1.0

func WithTxnID(txnID string) UpdateOption

WithTxnID binds an update/metadata mutation to a transaction coordinator id. The value is copied to UpdateOptions.TxnID and MetadataOptions.TxnID.

type UpdateOptions added in v0.1.0

type UpdateOptions struct {
	// IfETag sets a conditional ETag guard (maps to If-Match semantics).
	IfETag string
	// IfVersion sets a conditional version guard (X-If-Version).
	IfVersion *int64
	// ExpectedSHA256 enforces the submitted JSON payload SHA-256 (pre-compaction).
	ExpectedSHA256 string
	// ExpectedBytes enforces the submitted JSON payload byte length (pre-compaction).
	ExpectedBytes *int64
	// FencingToken provides explicit fencing when not already registered on the client.
	FencingToken *int64
	// Namespace scopes the mutation. Empty uses the client's default namespace.
	Namespace string
	// Metadata applies metadata mutations alongside the state update.
	Metadata MetadataOptions
	// TxnID binds the mutation to a transaction coordinator record.
	TxnID string
}

UpdateOptions controls conditional update semantics.

type UpdateResult added in v0.1.0

type UpdateResult struct {
	// NewVersion is the updated monotonic version after a successful mutation.
	NewVersion int64 `json:"new_version"`
	// NewStateETag is the updated state entity tag after a successful mutation.
	NewStateETag string `json:"new_state_etag"`
	// BytesWritten is the number of state bytes accepted by the update.
	BytesWritten int64 `json:"bytes"`
	// Metadata carries metadata values returned by the server for this object.
	Metadata api.MetadataAttributes `json:"metadata,omitempty"`
}

UpdateResult captures the response from Update.

type WatchQueueOptions added in v0.7.0

type WatchQueueOptions struct {
	// Namespace scopes the queue operation. Empty uses the client's default namespace.
	Namespace string
}

WatchQueueOptions configures non-consuming queue visibility watches.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL