Documentation
¶
Overview ¶
Package client provides the Go SDK for talking to a lockd cluster over HTTP. It mirrors CLI behaviour while exposing a typed API that is easy to embed in workers, controllers, and administrative tools.
Copyright (C) 2025 Michel Blomgren <https://pkt.systems>
Quick start ¶
Construct a client with either client.New (single endpoint) or client.NewWithEndpoints (ordered failover endpoints). Supported endpoint schemes are https:// for mTLS, http:// for plaintext trusted networks, and unix:///path/to/socket for Unix-domain sockets.
ctx := context.Background()
cli, err := client.New("https://lockd.example.com")
if err != nil {
log.Fatal(err)
}
defer cli.Close()
req := api.AcquireRequest{
Namespace: "workflows",
Key: "orders",
Owner: "worker-1",
TTLSeconds: 30,
BlockSecs: client.BlockWaitForever,
}
lease, err := cli.Acquire(ctx, req)
if err != nil {
log.Fatal(err)
}
defer lease.Close()
var checkpoint map[string]any
if err := lease.Load(ctx, &checkpoint); err != nil && !errors.Is(err, client.ErrStateNotFound) {
log.Fatal(err)
}
checkpoint["progress"] = "running"
if err := lease.Save(ctx, checkpoint); err != nil {
log.Fatal(err)
}
Acquire mints a transaction id and fencing token. Lease-bound mutations such as Update, Remove, UpdateMetadata, Release, and attachment changes are issued with the correct transaction and fencing headers automatically when using a LeaseSession.
When most operations use one namespace, use client.WithDefaultNamespace so per-call Namespace values can be omitted intentionally. Namespaces that start with a dot are reserved for lockd internals and rejected for user workloads.
Create-only acquire ¶
Set api.AcquireRequest.IfNotExists=true to request create-only semantics. When state already exists for the key, Acquire fails with API error code "already_exists". The SDK supports both check styles: client.IsAlreadyExists(err) and errors.Is(err, client.ErrAlreadyExists). You only need one check style; for SDK-returned acquire errors they are equivalent.
_, err = cli.Acquire(ctx, api.AcquireRequest{
Namespace: "workflows",
Key: "orders",
Owner: "initializer",
TTLSeconds: 30,
BlockSecs: client.BlockNoWait,
IfNotExists: true,
})
if err != nil {
if errors.Is(err, client.ErrAlreadyExists) {
return
}
log.Fatal(err)
}
Acquire for update ¶
AcquireForUpdate wraps the common acquire, load, mutate, save, release flow. It keeps the lease alive while the callback runs and always attempts release. The supplied api.AcquireRequest is forwarded to Acquire; setting IfNotExists=true applies create-only semantics to the initial handshake.
err := cli.AcquireForUpdate(ctx, api.AcquireRequest{
Namespace: "workflows",
Key: "orders",
Owner: "worker-1",
TTLSeconds: 45,
}, func(ctx context.Context, af *client.AcquireForUpdateContext) error {
var state map[string]any
if err := af.Load(ctx, &state); err != nil && !errors.Is(err, client.ErrStateNotFound) {
return err
}
state["progress"] = "done"
return af.Save(ctx, state)
})
if err != nil {
log.Fatal(err)
}
Attachments ¶
Leases can stage binary attachments alongside JSON state. Staged writes and staged deletes are committed on Release and discarded on Rollback.
file, _ := os.Open("invoice.pdf")
defer file.Close()
if _, err := lease.Attach(ctx, client.AttachRequest{
Name: "invoice.pdf",
ContentType: "application/pdf",
Body: file,
}); err != nil {
log.Fatal(err)
}
if err := lease.Release(ctx); err != nil {
log.Fatal(err)
}
Perimeter defence interoperability ¶
Servers can throttle requests with HTTP 429 while the perimeter defence is active. Responses can include Retry-After and X-Lockd-QRF-State headers. The SDK consumes those hints in retry loops and surfaces parsed values on APIError for custom handling.
Queue API ¶
Queue helpers implement at-least-once delivery. Enqueue accepts any io.Reader and streams payload bytes directly to lockd.
reader := strings.NewReader("{\"op\":\"ship\"}")
qres, err := cli.Enqueue(ctx, "orders", reader, client.EnqueueOptions{
ContentType: "application/json",
Delay: 2 * time.Second,
Visibility: 30 * time.Second,
})
if err != nil {
log.Fatal(err)
}
log.Printf("queued message %s", qres.MessageID)
Dequeue returns QueueMessage. DequeueWithState returns QueueMessage plus an attached QueueStateHandle in the message. QueueMessage implements io.ReadCloser and supports Ack, Nack, Defer, Extend, WritePayloadTo, and DecodePayloadJSON. QueueStateHandle mirrors lease-state helpers with Get, GetBytes, Load, Update, UpdateBytes, MutateLocal, Save, UpdateMetadata, and Remove.
msg, err := cli.Dequeue(ctx, "orders", client.DequeueOptions{
Namespace: "workflows",
Owner: "worker-1",
})
if err != nil {
log.Fatal(err)
}
defer msg.Close()
buf, err := io.ReadAll(msg)
if err != nil {
log.Fatal(err)
}
log.Printf("attempt %d payload=%s", msg.Attempts(), string(buf))
if err := msg.Ack(ctx); err != nil {
log.Fatal(err)
}
If a handler returns without explicit Ack/Nack/Defer, Close performs an automatic Nack so another worker can continue. Tune that with DequeueOptions.OnCloseDelay or QueueMessage.SetOnCloseDelay. DequeueOptions.BlockSeconds controls waiting: BlockNoWait for immediate return, positive values for bounded wait, and zero for wait-forever.
Subscribe and SubscribeWithState keep one streaming request open and invoke a user handler per delivery.
StartConsumer worker runner ¶
StartConsumer starts one or more managed consumer loops and blocks until they terminate. It is intended for long-running worker processes.
Each ConsumerConfig runs in its own goroutine. WithState selects SubscribeWithState; false selects Subscribe. Options.Owner can be left empty; StartConsumer generates a unique owner token based on consumer name, host, process id, and sequence.
The MessageHandler receives ConsumerMessage containing:
- Client: the active SDK client to reuse inside handlers
- Logger: the configured client logger; always non-nil
- Queue and Name(): queue identity and resolved consumer name
- Message: the leased QueueMessage
- State: QueueStateHandle for stateful consumers, nil for stateless
Restart behaviour is controlled by ConsumerRestartPolicy. Defaults are three immediate retries, then exponential backoff starting at 250ms with multiplier 2.0 and max delay 5 minutes. Jitter defaults to zero and can be enabled.
ErrorHandler receives ConsumerError before each restart. Returning nil keeps the loop running. Returning an error stops StartConsumer and returns that error. OnStart and OnStop lifecycle hooks can be used for observability. Panics in MessageHandler, ErrorHandler, OnStart, and OnStop are recovered and routed through the same failure path.
Cancel the StartConsumer context to stop all loops. Expected context cancellation returns nil.
handler := func(ctx context.Context, cm client.ConsumerMessage) error {
defer cm.Message.Close()
if cm.State != nil {
var state map[string]any
if err := cm.State.Load(ctx, &state); err != nil && !errors.Is(err, client.ErrStateNotFound) {
return err
}
state["last_message_id"] = cm.Message.MessageID()
if err := cm.State.Save(ctx, state); err != nil {
return err
}
}
return cm.Message.Ack(ctx)
}
err := cli.StartConsumer(ctx,
client.ConsumerConfig{
Name: "orders-fastlane",
Queue: "orders",
Options: client.SubscribeOptions{
Namespace: "workflows",
Prefetch: 16,
},
MessageHandler: handler,
},
client.ConsumerConfig{
Queue: "orders-stateful",
WithState: true,
Options: client.SubscribeOptions{
Namespace: "workflows",
},
RestartPolicy: client.ConsumerRestartPolicy{
MaxDelay: time.Minute,
},
MessageHandler: handler,
},
)
if err != nil {
log.Fatal(err)
}
Direct helpers Client.QueueAck, Client.QueueNack, and Client.QueueExtend are also available when metadata is managed outside QueueMessage helpers.
State removal ¶
Lease holders can delete state explicitly with LeaseSession.Remove or Client.Remove. CAS guards such as If-ETag and If-Version are supported by the API for concurrency-safe deletes.
Metadata attributes and query visibility ¶
Metadata holds lease internals plus user attributes. The SDK provides LeaseSession.UpdateMetadata and Client.UpdateMetadata, and metadata-aware state writes with WithMetadata, WithQueryHidden, and WithQueryVisible.
lease, err := cli.Acquire(ctx, api.AcquireRequest{Key: "orders", TTLSeconds: 30})
if err != nil {
log.Fatal(err)
}
if _, err := lease.UpdateMetadata(ctx, client.MetadataOptions{QueryHidden: client.Bool(true)}); err != nil {
log.Fatal(err)
}
Multi-endpoint failover ¶
NewWithEndpoints accepts multiple endpoints. The SDK rotates through them on transport errors while preserving bounded retry semantics.
cli, err := client.NewWithEndpoints([]string{
"https://lockd-primary.example.com",
"https://lockd-backup.example.com",
}, client.WithDisableMTLS(false))
if err != nil {
log.Fatal(err)
}
Correlation IDs and logging ¶
Use client.WithCorrelationID or client.GenerateCorrelationID to tie requests, queue deliveries, and retries together. QueueMessage carries correlation data and queue follow-up helpers forward it automatically.
The client logger is configured with client.WithLogger. Nil logger input is normalized to pslog.NoopLogger so SDK logging calls remain safe.
In-process testing ¶
client/inprocess starts a lockd server in-process and returns a ready client. It is useful for tests and local development.
ctx := context.Background()
inproc, err := inprocess.New(ctx, lockd.Config{Store: "mem://"})
if err != nil {
t.Fatal(err)
}
defer inproc.Close(ctx)
lease, err := inproc.Acquire(ctx, api.AcquireRequest{Owner: "test", TTLSeconds: 10})
if err != nil {
t.Fatal(err)
}
_ = lease
Authentication and mTLS ¶
mTLS is enabled by default for https endpoints. Configure bundle PEM data with client.WithBundlePEM or file paths with client.WithBundlePath. Bundle paths expand shell-style home and environment variables by default; use client.WithBundlePathDisableExpansion to treat a path literally.
To connect over plaintext http endpoints, use client.WithDisableMTLS(true) or provide an http URL. For mTLS, client certificates must include ClientAuth EKU and chain to a CA trusted by the server.
Index ¶
- Constants
- Variables
- func Bool(v bool) *bool
- func CorrelationIDFromContext(ctx context.Context) string
- func CorrelationIDFromResponse(resp *http.Response) string
- func GenerateCorrelationID() string
- func Int64(v int64) *int64
- func IsAlreadyExists(err error) bool
- func NormalizeCorrelationID(id string) (string, bool)
- func ParseEndpoints(raw string, disableMTLS bool) ([]string, error)
- func WithCorrelationHTTPClient(cli *http.Client, id string) *http.Client
- func WithCorrelationID(ctx context.Context, id string) context.Context
- func WithCorrelationTransport(base http.RoundTripper, id string) http.RoundTripper
- type APIError
- type AcquireConfig
- type AcquireForUpdateContext
- func (a *AcquireForUpdateContext) KeepAlive(ctx context.Context, ttl time.Duration) (*api.KeepAliveResponse, error)
- func (a *AcquireForUpdateContext) Load(ctx context.Context, v any) error
- func (a *AcquireForUpdateContext) Mutate(ctx context.Context, mutations []string, opts ...UpdateOption) (*UpdateResult, error)
- func (a *AcquireForUpdateContext) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)
- func (a *AcquireForUpdateContext) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)
- func (a *AcquireForUpdateContext) Remove(ctx context.Context) (*api.RemoveResponse, error)
- func (a *AcquireForUpdateContext) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)
- func (a *AcquireForUpdateContext) Save(ctx context.Context, v any, opts ...UpdateOption) error
- func (a *AcquireForUpdateContext) Update(ctx context.Context, body io.Reader, opts ...UpdateOption) (*UpdateResult, error)
- func (a *AcquireForUpdateContext) UpdateBytes(ctx context.Context, body []byte, opts ...UpdateOption) (*UpdateResult, error)
- func (a *AcquireForUpdateContext) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)
- func (a *AcquireForUpdateContext) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
- type AcquireForUpdateHandler
- type AcquireOption
- type AttachRequest
- type AttachResult
- type Attachment
- type AttachmentInfo
- type AttachmentList
- type AttachmentSelector
- type AttachmentStore
- func (s *AttachmentStore) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)
- func (s *AttachmentStore) Delete(ctx context.Context, selector AttachmentSelector) (*DeleteAttachmentResult, error)
- func (s *AttachmentStore) DeleteAll(ctx context.Context) (*DeleteAllAttachmentsResult, error)
- func (s *AttachmentStore) List(ctx context.Context) (*AttachmentList, error)
- func (s *AttachmentStore) Retrieve(ctx context.Context, selector AttachmentSelector) (*Attachment, error)
- func (s *AttachmentStore) RetrieveAll(ctx context.Context) ([]*Attachment, error)
- type Client
- func (c *Client) Acquire(ctx context.Context, req api.AcquireRequest, opts ...AcquireOption) (*LeaseSession, error)
- func (c *Client) AcquireForUpdate(ctx context.Context, req api.AcquireRequest, handler AcquireForUpdateHandler, ...) error
- func (c *Client) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)
- func (c *Client) ClearLeaseID(leaseID string)
- func (c *Client) Close() error
- func (c *Client) DeleteAllAttachments(ctx context.Context, req DeleteAllAttachmentsRequest) (*DeleteAllAttachmentsResult, error)
- func (c *Client) DeleteAttachment(ctx context.Context, req DeleteAttachmentRequest) (*DeleteAttachmentResult, error)
- func (c *Client) Dequeue(ctx context.Context, queue string, opts DequeueOptions) (*QueueMessage, error)
- func (c *Client) DequeueBatch(ctx context.Context, queue string, opts DequeueOptions) ([]*QueueMessage, error)
- func (c *Client) DequeueWithState(ctx context.Context, queue string, opts DequeueOptions) (*QueueMessage, error)
- func (c *Client) Describe(ctx context.Context, key string) (*api.DescribeResponse, error)
- func (c *Client) Enqueue(ctx context.Context, queue string, payload io.Reader, opts EnqueueOptions) (*api.EnqueueResponse, error)
- func (c *Client) EnqueueBytes(ctx context.Context, queue string, payload []byte, opts EnqueueOptions) (*api.EnqueueResponse, error)
- func (c *Client) FlushIndex(ctx context.Context, namespace string, optFns ...FlushOption) (*api.IndexFlushResponse, error)
- func (c *Client) Get(ctx context.Context, key string, optFns ...GetOption) (*GetResponse, error)
- func (c *Client) GetAttachment(ctx context.Context, req GetAttachmentRequest) (*Attachment, error)
- func (c *Client) GetNamespaceConfig(ctx context.Context, namespace string) (NamespaceConfigResult, error)
- func (c *Client) KeepAlive(ctx context.Context, req api.KeepAliveRequest) (*api.KeepAliveResponse, error)
- func (c *Client) ListAttachments(ctx context.Context, req ListAttachmentsRequest) (*AttachmentList, error)
- func (c *Client) Load(ctx context.Context, key string, v any, optFns ...LoadOption) error
- func (c *Client) Mutate(ctx context.Context, req MutateRequest) (*UpdateResult, error)
- func (c *Client) MutateLocal(ctx context.Context, req MutateLocalRequest) (*UpdateResult, error)
- func (c *Client) Namespace() string
- func (c *Client) Query(ctx context.Context, optFns ...QueryOption) (*QueryResponse, error)
- func (c *Client) QueueAck(ctx context.Context, req api.AckRequest) (*api.AckResponse, error)
- func (c *Client) QueueExtend(ctx context.Context, req api.ExtendRequest) (*api.ExtendResponse, error)
- func (c *Client) QueueNack(ctx context.Context, req api.NackRequest) (*api.NackResponse, error)
- func (c *Client) QueueStats(ctx context.Context, queue string, opts QueueStatsOptions) (*api.QueueStatsResponse, error)
- func (c *Client) RegisterLeaseToken(leaseID string, token int64)
- func (c *Client) Release(ctx context.Context, req api.ReleaseRequest) (*api.ReleaseResponse, error)
- func (c *Client) Remove(ctx context.Context, key, leaseID string, opts RemoveOptions) (*api.RemoveResponse, error)
- func (c *Client) Save(ctx context.Context, sess *LeaseSession, v any) error
- func (c *Client) StartConsumer(ctx context.Context, consumers ...ConsumerConfig) error
- func (c *Client) Subscribe(ctx context.Context, queue string, opts SubscribeOptions, ...) error
- func (c *Client) SubscribeWithState(ctx context.Context, queue string, opts SubscribeOptions, ...) error
- func (c *Client) TxnCommit(ctx context.Context, req api.TxnDecisionRequest) (*api.TxnDecisionResponse, error)
- func (c *Client) TxnPrepare(ctx context.Context, req api.TxnDecisionRequest) (*api.TxnDecisionResponse, error)
- func (c *Client) TxnReplay(ctx context.Context, txnID string) (*api.TxnReplayResponse, error)
- func (c *Client) TxnRollback(ctx context.Context, req api.TxnDecisionRequest) (*api.TxnDecisionResponse, error)
- func (c *Client) Update(ctx context.Context, key, leaseID string, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
- func (c *Client) UpdateBytes(ctx context.Context, key, leaseID string, body []byte, opts UpdateOptions) (*UpdateResult, error)
- func (c *Client) UpdateMetadata(ctx context.Context, key, leaseID string, opts UpdateOptions) (*MetadataResult, error)
- func (c *Client) UpdateNamespaceConfig(ctx context.Context, req api.NamespaceConfigRequest, ...) (NamespaceConfigResult, error)
- func (c *Client) UpdateStream(ctx context.Context, key, leaseID string, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
- func (c *Client) UseLeaseID(leaseID string)
- func (c *Client) UseNamespace(ns string) error
- func (c *Client) WatchQueue(ctx context.Context, queue string, opts WatchQueueOptions, ...) error
- type ConsumerConfig
- type ConsumerError
- type ConsumerErrorHandler
- type ConsumerLifecycleEvent
- type ConsumerMessage
- type ConsumerMessageHandler
- type ConsumerRestartPolicy
- type DeleteAllAttachmentsRequest
- type DeleteAllAttachmentsResult
- type DeleteAttachmentRequest
- type DeleteAttachmentResult
- type DequeueOptions
- type DequeueResult
- type Document
- func (d *Document) Bytes() ([]byte, error)
- func (d *Document) LoadFrom(r io.Reader) error
- func (d *Document) LoadInto(target any) error
- func (d *Document) Mutate(exprs ...string) error
- func (d *Document) MutateWithTime(now time.Time, exprs ...string) error
- func (d *Document) Reader() (io.Reader, error)
- func (d *Document) Write(p []byte) (int, error)
- type EnqueueOptions
- type FlushIndexOptions
- type FlushOption
- type GetAttachmentRequest
- type GetOption
- type GetOptions
- type GetResponse
- func (gr *GetResponse) Attachments() *AttachmentStore
- func (gr *GetResponse) Bytes() ([]byte, error)
- func (gr *GetResponse) Close() error
- func (gr *GetResponse) Document() (*Document, error)
- func (gr *GetResponse) ListAttachments(ctx context.Context) (*AttachmentList, error)
- func (gr *GetResponse) Reader() io.ReadCloser
- func (gr *GetResponse) RetrieveAttachment(ctx context.Context, selector AttachmentSelector) (*Attachment, error)
- type LeaseSession
- func (s *LeaseSession) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)
- func (s *LeaseSession) Attachments() *AttachmentStore
- func (s *LeaseSession) Close() error
- func (s *LeaseSession) CurrentFencingToken() int64
- func (s *LeaseSession) DeleteAllAttachments(ctx context.Context) (*DeleteAllAttachmentsResult, error)
- func (s *LeaseSession) DeleteAttachment(ctx context.Context, selector AttachmentSelector) (*DeleteAttachmentResult, error)
- func (s *LeaseSession) Get(ctx context.Context) (*StateSnapshot, error)
- func (s *LeaseSession) GetBytes(ctx context.Context) ([]byte, error)
- func (s *LeaseSession) KeepAlive(ctx context.Context, ttl time.Duration) (*api.KeepAliveResponse, error)
- func (s *LeaseSession) ListAttachments(ctx context.Context) (*AttachmentList, error)
- func (s *LeaseSession) Load(ctx context.Context, v any) error
- func (s *LeaseSession) Mutate(ctx context.Context, mutations []string, options ...UpdateOption) (*UpdateResult, error)
- func (s *LeaseSession) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)
- func (s *LeaseSession) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)
- func (s *LeaseSession) Release(ctx context.Context) error
- func (s *LeaseSession) ReleaseWithOptions(ctx context.Context, opts ReleaseOptions) error
- func (s *LeaseSession) Remove(ctx context.Context) (*api.RemoveResponse, error)
- func (s *LeaseSession) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)
- func (s *LeaseSession) RetrieveAllAttachments(ctx context.Context) ([]*Attachment, error)
- func (s *LeaseSession) RetrieveAttachment(ctx context.Context, selector AttachmentSelector) (*Attachment, error)
- func (s *LeaseSession) Save(ctx context.Context, v any, opts ...UpdateOption) error
- func (s *LeaseSession) Update(ctx context.Context, body io.Reader, options ...UpdateOption) (*UpdateResult, error)
- func (s *LeaseSession) UpdateBytes(ctx context.Context, body []byte, options ...UpdateOption) (*UpdateResult, error)
- func (s *LeaseSession) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)
- func (s *LeaseSession) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
- type ListAttachmentsRequest
- type LoadOption
- type LoadOptions
- type MessageHandler
- type MessageHandlerWithState
- type MetadataOptions
- type MetadataResult
- type MutateLocalOptions
- type MutateLocalRequest
- type MutateRequest
- type NamespaceConfigOptions
- type NamespaceConfigResult
- type Option
- func WithBundlePEM(pemBytes []byte) Option
- func WithBundlePath(path string) Option
- func WithBundlePathDisableExpansion() Option
- func WithCloseTimeout(d time.Duration) Option
- func WithDefaultNamespace(ns string) Option
- func WithDisableMTLS(disable bool) Option
- func WithDrainAwareShutdown(enabled bool) Option
- func WithEndpointShuffle(enabled bool) Option
- func WithFailureRetries(n int) Option
- func WithForUpdateTimeout(d time.Duration) Option
- func WithHTTPClient(cli *http.Client) Option
- func WithHTTPTimeout(d time.Duration) Option
- func WithHTTPTrace() Option
- func WithKeepAliveTimeout(d time.Duration) Option
- func WithLogger(logger pslog.Base) Option
- type QueryOption
- func WithQuery(expr string) QueryOption
- func WithQueryBlock() QueryOption
- func WithQueryCursor(cursor string) QueryOption
- func WithQueryEngine(engine string) QueryOption
- func WithQueryEngineAuto() QueryOption
- func WithQueryEngineIndex() QueryOption
- func WithQueryEngineScan() QueryOption
- func WithQueryFields(fields map[string]any) QueryOption
- func WithQueryLimit(limit int) QueryOption
- func WithQueryNamespace(ns string) QueryOption
- func WithQueryRefresh(mode string) QueryOption
- func WithQueryRefreshImmediate() QueryOption
- func WithQueryRefreshWaitFor() QueryOption
- func WithQueryRequest(req *api.QueryRequest) QueryOption
- func WithQueryReturn(mode QueryReturn) QueryOption
- func WithQueryReturnDocuments() QueryOption
- func WithQueryReturnKeys() QueryOption
- func WithQuerySelector(sel api.Selector) QueryOption
- type QueryOptions
- type QueryResponse
- type QueryReturn
- type QueryRow
- type QueueMessage
- func (m *QueueMessage) Ack(ctx context.Context) error
- func (m *QueueMessage) Attempts() int
- func (m *QueueMessage) Close() error
- func (m *QueueMessage) ClosePayload() error
- func (m *QueueMessage) ContentType() string
- func (m *QueueMessage) CorrelationID() string
- func (m *QueueMessage) Cursor() string
- func (m *QueueMessage) DecodePayloadJSON(v any) error
- func (m *QueueMessage) Defer(ctx context.Context, delay time.Duration) error
- func (m *QueueMessage) Extend(ctx context.Context, extendBy time.Duration) error
- func (m *QueueMessage) FailureAttempts() int
- func (m *QueueMessage) FencingToken() int64
- func (m *QueueMessage) LeaseExpiresAt() int64
- func (m *QueueMessage) LeaseID() string
- func (m *QueueMessage) MaxAttempts() int
- func (m *QueueMessage) MessageID() string
- func (m *QueueMessage) MetaETag() string
- func (m *QueueMessage) Nack(ctx context.Context, delay time.Duration, lastErr any) error
- func (m *QueueMessage) Namespace() string
- func (m *QueueMessage) NotVisibleUntil() time.Time
- func (m *QueueMessage) PayloadReader() (io.ReadCloser, error)
- func (m *QueueMessage) PayloadSize() int64
- func (m *QueueMessage) Queue() string
- func (m *QueueMessage) Read(p []byte) (int, error)
- func (m *QueueMessage) SetOnCloseDelay(d time.Duration)
- func (m *QueueMessage) StateHandle() *QueueStateHandle
- func (m *QueueMessage) TxnID() string
- func (m *QueueMessage) VisibilityTimeout() time.Duration
- func (m *QueueMessage) WritePayloadTo(w io.Writer) (int64, error)
- type QueueMessageHandle
- func (h *QueueMessageHandle) Ack(ctx context.Context) error
- func (h *QueueMessageHandle) Attempts() int
- func (h *QueueMessageHandle) ClosePayload() error
- func (h *QueueMessageHandle) ContentType() string
- func (h *QueueMessageHandle) CorrelationID() string
- func (h *QueueMessageHandle) Cursor() string
- func (h *QueueMessageHandle) DecodePayloadJSON(v any) error
- func (h *QueueMessageHandle) Defer(ctx context.Context, delay time.Duration) error
- func (h *QueueMessageHandle) Extend(ctx context.Context, extendBy time.Duration) error
- func (h *QueueMessageHandle) FailureAttempts() int
- func (h *QueueMessageHandle) FencingToken() int64
- func (h *QueueMessageHandle) LeaseExpiresAt() int64
- func (h *QueueMessageHandle) LeaseID() string
- func (h *QueueMessageHandle) MaxAttempts() int
- func (h *QueueMessageHandle) MessageID() string
- func (h *QueueMessageHandle) MetaETag() string
- func (h *QueueMessageHandle) Nack(ctx context.Context, delay time.Duration, lastErr any) error
- func (h *QueueMessageHandle) Namespace() string
- func (h *QueueMessageHandle) NotVisibleUntil() time.Time
- func (h *QueueMessageHandle) PayloadReader() (io.ReadCloser, error)
- func (h *QueueMessageHandle) PayloadSize() int64
- func (h *QueueMessageHandle) Queue() string
- func (h *QueueMessageHandle) StateHandle() *QueueStateHandle
- func (h *QueueMessageHandle) VisibilityTimeout() time.Duration
- func (h *QueueMessageHandle) WritePayloadTo(w io.Writer) (int64, error)
- type QueueStateHandle
- func (s *QueueStateHandle) CorrelationID() string
- func (s *QueueStateHandle) ETag() string
- func (s *QueueStateHandle) FencingToken() int64
- func (s *QueueStateHandle) Get(ctx context.Context) (*StateSnapshot, error)
- func (s *QueueStateHandle) GetBytes(ctx context.Context) ([]byte, error)
- func (s *QueueStateHandle) LeaseExpiresAt() int64
- func (s *QueueStateHandle) LeaseID() string
- func (s *QueueStateHandle) Load(ctx context.Context, v any) error
- func (s *QueueStateHandle) MessageID() string
- func (s *QueueStateHandle) Mutate(ctx context.Context, mutations []string, opts ...UpdateOption) (*UpdateResult, error)
- func (s *QueueStateHandle) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)
- func (s *QueueStateHandle) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)
- func (s *QueueStateHandle) Queue() string
- func (s *QueueStateHandle) Remove(ctx context.Context) (*api.RemoveResponse, error)
- func (s *QueueStateHandle) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)
- func (s *QueueStateHandle) Save(ctx context.Context, v any, opts ...UpdateOption) error
- func (s *QueueStateHandle) Update(ctx context.Context, body io.Reader, opts ...UpdateOption) (*UpdateResult, error)
- func (s *QueueStateHandle) UpdateBytes(ctx context.Context, body []byte, opts ...UpdateOption) (*UpdateResult, error)
- func (s *QueueStateHandle) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)
- func (s *QueueStateHandle) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
- type QueueStatsOptions
- type QueueWatchEvent
- type QueueWatchHandler
- type ReleaseOptions
- type RemoveOptions
- type StateSnapshot
- type SubscribeOptions
- type UpdateOption
- type UpdateOptions
- type UpdateResult
- type WatchQueueOptions
Examples ¶
Constants ¶
const ( DefaultHTTPTimeout = 15 * time.Second DefaultCloseTimeout = 5 * time.Second DefaultKeepAliveTimeout = 5 * time.Second DefaultMaxIdleConns = 256 DefaultMaxIdleConnsPerHost = 128 DefaultForUpdateTimeout = 15 * time.Minute DefaultAcquireBaseDelay = time.Second DefaultAcquireMaxDelay = 5 * time.Second DefaultAcquireMultiplier = 1.2 DefaultAcquireJitter = 100 * time.Millisecond DefaultFailureRetries = 5 DefaultAcquireFailureRetries = DefaultFailureRetries // DefaultConsumerImmediateRetries controls how many consecutive consumer // failures are retried without delay before backoff starts. DefaultConsumerImmediateRetries = 3 // DefaultConsumerBaseDelay is the first delayed retry duration after immediate retries. DefaultConsumerBaseDelay = 250 * time.Millisecond // DefaultConsumerMaxDelay caps consumer restart backoff growth. DefaultConsumerMaxDelay = 5 * time.Minute // DefaultConsumerMultiplier is the exponential growth factor for restart delay. DefaultConsumerMultiplier = 2.0 // DefaultConsumerJitter randomizes restart delay by +/- this duration. DefaultConsumerJitter = 0 * time.Second )
Default client tuning knobs exposed for callers that want to mirror lockd's defaults.
const ( BlockWaitForever int64 = 0 BlockNoWait int64 = api.BlockNoWait )
BlockWaitForever causes Acquire to wait indefinitely for a lease. BlockNoWait skips waiting entirely.
const MaxCorrelationIDLength = 128
MaxCorrelationIDLength bounds the length of client-supplied correlation identifiers.
Variables ¶
var ErrAlreadyExists = errors.New("lockd: key already exists")
ErrAlreadyExists is returned when create-only acquire semantics are requested for an existing key.
var ErrMissingFencingToken = errors.New("lockd: fencing token required")
ErrMissingFencingToken is returned when an operation needs a fencing token but none was found.
Functions ¶
func CorrelationIDFromContext ¶
CorrelationIDFromContext extracts the correlation identifier carried by ctx, if present.
func CorrelationIDFromResponse ¶
CorrelationIDFromResponse reads the X-Correlation-Id header from resp.
func GenerateCorrelationID ¶
func GenerateCorrelationID() string
GenerateCorrelationID creates a new random correlation identifier.
func IsAlreadyExists ¶ added in v0.6.0
IsAlreadyExists reports whether err indicates create-only acquire semantics failed because the key already exists.
func NormalizeCorrelationID ¶
NormalizeCorrelationID trims and validates an identifier.
func ParseEndpoints ¶
ParseEndpoints splits a comma-separated server list and normalizes each endpoint, applying default schemes based on whether mTLS is disabled.
func WithCorrelationHTTPClient ¶
WithCorrelationHTTPClient returns a shallow copy of cli (or a new client when cli is nil) with a transport that ensures X-Correlation-Id is set on all requests. The original client is left untouched.
func WithCorrelationID ¶
WithCorrelationID annotates ctx with a correlation identifier to be sent with subsequent requests.
func WithCorrelationTransport ¶
func WithCorrelationTransport(base http.RoundTripper, id string) http.RoundTripper
WithCorrelationTransport wraps base with a RoundTripper that overwrites the X-Correlation-Id header on every request. Invalid identifiers are ignored.
Types ¶
type APIError ¶
type APIError struct {
// Status is the HTTP status code returned by the server.
Status int
// Response is the decoded lockd error envelope, when available.
Response api.ErrorResponse
// Body contains the raw response body bytes for additional diagnostics.
Body []byte
// RetryAfter is the parsed retry delay hint from headers, when provided.
RetryAfter time.Duration
// QRFState carries queue-resilience-fallback diagnostics surfaced by the server.
QRFState string
}
APIError describes an error response from lockd.
func (*APIError) Is ¶ added in v0.6.0
Is maps structured API errors to sentinel errors for errors.Is checks.
func (*APIError) RetryAfterDuration ¶
RetryAfterDuration returns the recommended back-off hinted by the server.
type AcquireConfig ¶
type AcquireConfig struct {
// BaseDelay is the starting backoff delay after retryable failures.
BaseDelay time.Duration
// MaxDelay caps exponential backoff growth.
MaxDelay time.Duration
// Multiplier is the exponential growth factor applied between retries.
Multiplier float64
// Jitter randomizes capped delays by +/- Jitter to reduce thundering herds.
Jitter time.Duration
// FailureRetries controls retries for non-conflict transient failures; <0 means unbounded.
FailureRetries int
// contains filtered or unexported fields
}
AcquireConfig controls the client-side retry and backoff behaviour for Acquire and AcquireForUpdate.
type AcquireForUpdateContext ¶
type AcquireForUpdateContext struct {
// Session is the active lease context used for updates/removals/metadata changes.
Session *LeaseSession
// State is the pre-handler snapshot fetched after acquire and before user logic runs.
State *StateSnapshot
}
AcquireForUpdateContext exposes the active lease session and the snapshot that was read before the handler executed.
func (*AcquireForUpdateContext) KeepAlive ¶
func (a *AcquireForUpdateContext) KeepAlive(ctx context.Context, ttl time.Duration) (*api.KeepAliveResponse, error)
KeepAlive extends the lease TTL.
func (*AcquireForUpdateContext) Load ¶
func (a *AcquireForUpdateContext) Load(ctx context.Context, v any) error
Load unmarshals the latest state into v.
func (*AcquireForUpdateContext) Mutate ¶ added in v0.7.0
func (a *AcquireForUpdateContext) Mutate(ctx context.Context, mutations []string, opts ...UpdateOption) (*UpdateResult, error)
Mutate applies server-side LQL mutations while preserving the lease.
func (*AcquireForUpdateContext) MutateLocal ¶ added in v0.8.0
func (a *AcquireForUpdateContext) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)
MutateLocal applies client-local streaming LQL mutations while preserving the lease.
func (*AcquireForUpdateContext) MutateWithOptions ¶ added in v0.7.0
func (a *AcquireForUpdateContext) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)
MutateWithOptions applies server-side LQL mutations with explicit conditional overrides.
func (*AcquireForUpdateContext) Remove ¶
func (a *AcquireForUpdateContext) Remove(ctx context.Context) (*api.RemoveResponse, error)
Remove deletes the current state while the handler holds the lease.
func (*AcquireForUpdateContext) RemoveWithOptions ¶
func (a *AcquireForUpdateContext) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)
RemoveWithOptions deletes the state while allowing conditional overrides.
func (*AcquireForUpdateContext) Save ¶
func (a *AcquireForUpdateContext) Save(ctx context.Context, v any, opts ...UpdateOption) error
Save marshals and updates the state with the supplied value.
func (*AcquireForUpdateContext) Update ¶
func (a *AcquireForUpdateContext) Update(ctx context.Context, body io.Reader, opts ...UpdateOption) (*UpdateResult, error)
Update streams a new JSON document via Update, preserving the lease.
func (*AcquireForUpdateContext) UpdateBytes ¶
func (a *AcquireForUpdateContext) UpdateBytes(ctx context.Context, body []byte, opts ...UpdateOption) (*UpdateResult, error)
UpdateBytes is a convenience wrapper over Update that accepts a byte slice.
func (*AcquireForUpdateContext) UpdateMetadata ¶ added in v0.1.0
func (a *AcquireForUpdateContext) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)
UpdateMetadata mutates metadata for the active key.
func (*AcquireForUpdateContext) UpdateWithOptions ¶
func (a *AcquireForUpdateContext) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
UpdateWithOptions allows callers to override conditional headers for the update.
type AcquireForUpdateHandler ¶
type AcquireForUpdateHandler func(context.Context, *AcquireForUpdateContext) error
AcquireForUpdateHandler is invoked while a lease is held. The provided context is canceled if the client loses the lease or keepalive fails.
type AcquireOption ¶
type AcquireOption func(*AcquireConfig)
AcquireOption customises Acquire behaviour.
func WithAcquireBackoff ¶
func WithAcquireBackoff(base, max time.Duration, multiplier float64) AcquireOption
WithAcquireBackoff adjusts backoff parameters.
func WithAcquireFailureRetries ¶
func WithAcquireFailureRetries(n int) AcquireOption
WithAcquireFailureRetries overrides how many times the client retries after failures other than lease conflicts. A value <0 allows infinite retries.
func WithAcquireJitter ¶
func WithAcquireJitter(jitter time.Duration) AcquireOption
WithAcquireJitter adjusts the jitter window applied when the retry delay reaches the cap. A zero duration disables jitter. When positive, the final sleep duration is randomly offset by +/- jitter once the cap is hit.
type AttachRequest ¶ added in v0.1.0
type AttachRequest struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// TxnID associates the operation with a transaction coordinator record.
TxnID string
// FencingToken is the monotonic token used to fence stale writers.
FencingToken *int64
// Name is the human-readable identifier for the referenced object.
Name string
// Body provides the request or response payload stream/content.
Body io.Reader
// ContentType is the media type associated with the payload.
ContentType string
// MaxBytes optionally enforces an upper bound for attachment payload size in bytes.
MaxBytes *int64
// PreventOverwrite rejects the request when an attachment with the same selector already exists.
PreventOverwrite bool
}
AttachRequest captures parameters for staging an attachment.
type AttachResult ¶ added in v0.1.0
type AttachResult struct {
// Attachment contains metadata for the staged or retrieved attachment.
Attachment AttachmentInfo
// Noop is true when attach detected identical existing content and skipped a write.
Noop bool
// Version is the lockd monotonic version for the target object.
Version int64
}
AttachResult reports the staged attachment metadata.
type Attachment ¶ added in v0.1.0
type Attachment struct {
AttachmentInfo
// contains filtered or unexported fields
}
Attachment exposes a streaming attachment payload.
func (*Attachment) Close ¶ added in v0.1.0
func (a *Attachment) Close() error
Close releases the underlying reader.
type AttachmentInfo ¶ added in v0.1.0
type AttachmentInfo struct {
// ID is the unique identifier for the referenced object.
ID string
// Name is the human-readable identifier for the referenced object.
Name string
// Size is the payload size in bytes.
Size int64
// PlaintextSHA256 is the SHA-256 checksum of the uploaded plaintext payload.
PlaintextSHA256 string
// ContentType is the media type associated with the payload.
ContentType string
// CreatedAtUnix is the creation timestamp as Unix seconds.
CreatedAtUnix int64
// UpdatedAtUnix is the last update timestamp as Unix seconds.
UpdatedAtUnix int64
}
AttachmentInfo describes attachment metadata.
type AttachmentList ¶ added in v0.1.0
type AttachmentList struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// Attachments enumerates attachments associated with the target key.
Attachments []AttachmentInfo
}
AttachmentList collects attachment metadata for a key.
type AttachmentSelector ¶ added in v0.1.0
type AttachmentSelector struct {
// ID is the unique identifier for the referenced object.
ID string
// Name is the human-readable identifier for the referenced object.
Name string
}
AttachmentSelector identifies an attachment by id or name.
type AttachmentStore ¶ added in v0.1.0
type AttachmentStore struct {
// contains filtered or unexported fields
}
AttachmentStore exposes attachment operations scoped to a key.
func (*AttachmentStore) Attach ¶ added in v0.1.0
func (s *AttachmentStore) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)
Attach stages an attachment payload using the configured lease context.
func (*AttachmentStore) Delete ¶ added in v0.1.0
func (s *AttachmentStore) Delete(ctx context.Context, selector AttachmentSelector) (*DeleteAttachmentResult, error)
Delete removes a single attachment.
func (*AttachmentStore) DeleteAll ¶ added in v0.1.0
func (s *AttachmentStore) DeleteAll(ctx context.Context) (*DeleteAllAttachmentsResult, error)
DeleteAll removes all attachments for the key.
func (*AttachmentStore) List ¶ added in v0.1.0
func (s *AttachmentStore) List(ctx context.Context) (*AttachmentList, error)
List returns attachment metadata for the key.
func (*AttachmentStore) Retrieve ¶ added in v0.1.0
func (s *AttachmentStore) Retrieve(ctx context.Context, selector AttachmentSelector) (*Attachment, error)
Retrieve streams a single attachment payload.
func (*AttachmentStore) RetrieveAll ¶ added in v0.1.0
func (s *AttachmentStore) RetrieveAll(ctx context.Context) ([]*Attachment, error)
RetrieveAll loads all attachments and returns a slice of Attachment readers.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a convenience wrapper around the lockd HTTP API.
func New ¶
New creates a new client targeting baseURL (e.g. https://localhost:9341). Unix-domain sockets are supported via base URLs such as unix:///var/run/lockd.sock; ensure the server is running with mTLS disabled or supply a compatible client bundle. Example:
cli, err := client.New("unix:///tmp/lockd.sock")
if err != nil {
log.Fatal(err)
}
lease, _ := cli.Acquire(ctx, api.AcquireRequest{Key: "demo", Owner: "worker", TTLSeconds: 20})
defer cli.Release(ctx, api.ReleaseRequest{Key: "demo", LeaseID: lease.LeaseID})
func NewWithEndpoints ¶
NewWithEndpoints constructs a client from a slice of server endpoints.
func (*Client) Acquire ¶
func (c *Client) Acquire(ctx context.Context, req api.AcquireRequest, opts ...AcquireOption) (*LeaseSession, error)
Acquire acquires a lease, retrying conflicts and transient errors.
Example:
lease, err := cli.Acquire(ctx, api.AcquireRequest{
Key: "orders",
Owner: "worker-1",
TTLSeconds: 30,
})
if err != nil {
return err
}
defer lease.Close()
if err := lease.Save(ctx, map[string]any{"progress": "done"}); err != nil {
return err
}
Example ¶
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
lockd "pkt.systems/lockd"
"pkt.systems/lockd/api"
"pkt.systems/lockd/client"
"pkt.systems/lql"
)
func main() {
ctx := context.Background()
dir, err := os.MkdirTemp("", "lockd-example-")
if err != nil {
fmt.Println("error:", err)
return
}
defer os.RemoveAll(dir)
socketPath := filepath.Join(dir, "lockd.sock")
cfg := lockd.Config{
Store: "mem://",
ListenProto: "unix",
Listen: socketPath,
DisableMTLS: true,
DisableStorageEncryption: true,
}
handle, err := lockd.StartServer(ctx, cfg)
if err != nil {
fmt.Println("error:", err)
return
}
stop := handle.Stop
defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
if err != nil {
fmt.Println("error:", err)
return
}
lease, err := cli.Acquire(ctx, api.AcquireRequest{
Namespace: "payments",
Key: "batch-2025-11",
Owner: "worker-1",
TTLSeconds: 15,
})
if err != nil {
fmt.Println("error:", err)
return
}
defer lease.Release(ctx)
state := map[string]any{}
if err := lql.Mutate(state,
"/batch/id=batch-2025-11",
"/batch/status=pending",
); err != nil {
fmt.Println("error:", err)
return
}
if err := lease.Save(ctx, state); err != nil {
fmt.Println("error:", err)
return
}
fmt.Println("lease namespace:", lease.Namespace)
fmt.Println("lease key:", lease.Key)
}
Output: lease namespace: payments lease key: batch-2025-11
func (*Client) AcquireForUpdate ¶
func (c *Client) AcquireForUpdate(ctx context.Context, req api.AcquireRequest, handler AcquireForUpdateHandler, opts ...AcquireOption) error
AcquireForUpdate acquires a lease, runs handler while the lease is active, keeps the lease alive, and releases it on return. The helper retries the acquire/get handshake according to opts, surfaces the current state via AcquireForUpdateContext, and ensures Release is invoked even when the handler returns an error.
Example:
err := cli.AcquireForUpdate(ctx, api.AcquireRequest{
Key: "orders",
Owner: "worker-1",
TTLSeconds: 45,
}, func(ctx context.Context, af *client.AcquireForUpdateContext) error {
var doc map[string]any
if err := af.Load(ctx, &doc); err != nil && !errors.Is(err, client.ErrStateNotFound) {
return err
}
doc["checkpoint"] = "processing"
return af.Save(ctx, doc)
})
if err != nil {
log.Fatal(err)
}
Example ¶
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
lockd "pkt.systems/lockd"
"pkt.systems/lockd/api"
"pkt.systems/lockd/client"
"pkt.systems/lql"
)
func main() {
ctx := context.Background()
dir, err := os.MkdirTemp("", "lockd-example-")
if err != nil {
fmt.Println("error:", err)
return
}
defer os.RemoveAll(dir)
socketPath := filepath.Join(dir, "lockd.sock")
cfg := lockd.Config{
Store: "mem://",
ListenProto: "unix",
Listen: socketPath,
DisableMTLS: true,
DisableStorageEncryption: true,
}
handle, err := lockd.StartServer(ctx, cfg)
if err != nil {
fmt.Println("error:", err)
return
}
stop := handle.Stop
defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
if err != nil {
fmt.Println("error:", err)
return
}
err = cli.AcquireForUpdate(ctx, api.AcquireRequest{
Key: "ledger-checkpoint",
Owner: "worker-1",
TTLSeconds: 10,
}, func(ctx context.Context, af *client.AcquireForUpdateContext) error {
state := map[string]any{}
if err := lql.Mutate(state,
"/cursor/batch=42",
"time:/cursor/updated_at=2025-11-10T12:00:00Z",
); err != nil {
return err
}
return af.Save(ctx, state)
})
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println("state updated")
}
Output: state updated
func (*Client) Attach ¶ added in v0.1.0
func (c *Client) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)
Attach stages an attachment payload for the key.
func (*Client) ClearLeaseID ¶ added in v0.1.0
ClearLeaseID removes the sticky lease when it matches leaseID.
func (*Client) DeleteAllAttachments ¶ added in v0.1.0
func (c *Client) DeleteAllAttachments(ctx context.Context, req DeleteAllAttachmentsRequest) (*DeleteAllAttachmentsResult, error)
DeleteAllAttachments removes all attachments for a key.
func (*Client) DeleteAttachment ¶ added in v0.1.0
func (c *Client) DeleteAttachment(ctx context.Context, req DeleteAttachmentRequest) (*DeleteAttachmentResult, error)
DeleteAttachment removes a single attachment.
func (*Client) Dequeue ¶
func (c *Client) Dequeue(ctx context.Context, queue string, opts DequeueOptions) (*QueueMessage, error)
Dequeue pops a single message from the queue using /v1/queue/dequeue.
func (*Client) DequeueBatch ¶
func (c *Client) DequeueBatch(ctx context.Context, queue string, opts DequeueOptions) ([]*QueueMessage, error)
DequeueBatch retrieves up to opts.PageSize messages in a single dequeue request. The caller is responsible for acking or nacking every returned message.
func (*Client) DequeueWithState ¶
func (c *Client) DequeueWithState(ctx context.Context, queue string, opts DequeueOptions) (*QueueMessage, error)
DequeueWithState pops a queue message and returns its state payload in one call.
func (*Client) Enqueue ¶
func (c *Client) Enqueue(ctx context.Context, queue string, payload io.Reader, opts EnqueueOptions) (*api.EnqueueResponse, error)
Enqueue pushes a payload into the specified queue using /v1/queue/enqueue.
func (*Client) EnqueueBytes ¶
func (c *Client) EnqueueBytes(ctx context.Context, queue string, payload []byte, opts EnqueueOptions) (*api.EnqueueResponse, error)
EnqueueBytes is a convenience helper that enqueues an in-memory payload.
func (*Client) FlushIndex ¶ added in v0.1.0
func (c *Client) FlushIndex(ctx context.Context, namespace string, optFns ...FlushOption) (*api.IndexFlushResponse, error)
FlushIndex forces the namespace index writer to flush pending documents.
func (*Client) Get ¶
Get fetches the JSON state for key and returns a streaming response.
Example (Public) ¶
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
lockd "pkt.systems/lockd"
"pkt.systems/lockd/api"
"pkt.systems/lockd/client"
"pkt.systems/lql"
)
func main() {
ctx := context.Background()
dir, err := os.MkdirTemp("", "lockd-example-")
if err != nil {
fmt.Println("error:", err)
return
}
defer os.RemoveAll(dir)
socketPath := filepath.Join(dir, "lockd.sock")
cfg := lockd.Config{
Store: "mem://",
ListenProto: "unix",
Listen: socketPath,
DisableMTLS: true,
DisableStorageEncryption: true,
}
handle, err := lockd.StartServer(ctx, cfg)
if err != nil {
fmt.Println("error:", err)
return
}
stop := handle.Stop
defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
if err != nil {
fmt.Println("error:", err)
return
}
_ = cli.UseNamespace("reports")
lease, err := cli.Acquire(ctx, api.AcquireRequest{
Key: "payouts-2025-11",
Owner: "writer-1",
TTLSeconds: 10,
})
if err != nil {
fmt.Println("error:", err)
return
}
doc := make(map[string]any)
if err := lql.Mutate(doc,
"/report/status=published",
"time:/report/released_at=2025-11-01T09:00:00Z",
"/report/summary/total=1200.50",
); err != nil {
fmt.Println("error:", err)
return
}
if err := lease.Save(ctx, doc); err != nil {
fmt.Println("error:", err)
return
}
lease.Release(ctx)
resp, err := cli.Get(ctx, "payouts-2025-11")
if err != nil {
fmt.Println("error:", err)
return
}
defer resp.Close()
var snapshot map[string]any
if err := json.NewDecoder(resp.Reader()).Decode(&snapshot); err != nil {
fmt.Println("error:", err)
return
}
report := snapshot["report"].(map[string]any)
fmt.Println("status:", report["status"])
}
Output: status: published
func (*Client) GetAttachment ¶ added in v0.1.0
func (c *Client) GetAttachment(ctx context.Context, req GetAttachmentRequest) (*Attachment, error)
GetAttachment retrieves a single attachment payload.
func (*Client) GetNamespaceConfig ¶ added in v0.1.0
func (c *Client) GetNamespaceConfig(ctx context.Context, namespace string) (NamespaceConfigResult, error)
GetNamespaceConfig returns the namespace configuration document and its ETag.
func (*Client) KeepAlive ¶
func (c *Client) KeepAlive(ctx context.Context, req api.KeepAliveRequest) (*api.KeepAliveResponse, error)
KeepAlive extends a lease.
func (*Client) ListAttachments ¶ added in v0.1.0
func (c *Client) ListAttachments(ctx context.Context, req ListAttachmentsRequest) (*AttachmentList, error)
ListAttachments lists attachments for a key.
func (*Client) Load ¶
Load unmarshals the current state for key into v. When no state exists, v is left untouched.
func (*Client) Mutate ¶ added in v0.7.0
func (c *Client) Mutate(ctx context.Context, req MutateRequest) (*UpdateResult, error)
Mutate applies LQL mutation expressions server-side to the key under lease protection.
func (*Client) MutateLocal ¶ added in v0.8.0
func (c *Client) MutateLocal(ctx context.Context, req MutateLocalRequest) (*UpdateResult, error)
MutateLocal applies LQL mutations client-side without buffering the full JSON state in memory. This path supports file-backed LQL mutators.
func (*Client) Namespace ¶ added in v0.1.0
Namespace returns the default namespace currently configured on the client.
func (*Client) Query ¶ added in v0.1.0
func (c *Client) Query(ctx context.Context, optFns ...QueryOption) (*QueryResponse, error)
Query executes a selector search within a namespace. Usage is option-driven:
resp, err := cli.Query(ctx,
client.WithQueryNamespace("orders"),
client.WithQuery(`eq{field=/status,value=open}`),
client.WithQueryLimit(20),
client.WithQueryReturnDocuments(),
)
if err != nil { /* handle */ }
resp.ForEach(func(row client.QueryRow) error {
// row.Document is populated in return=documents mode
return nil
})
Callers can combine helpers (namespace, selector, engine hints, cursor, return mode, etc.) without assembling api.QueryRequest manually.
Example ¶
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"time"
lockd "pkt.systems/lockd"
"pkt.systems/lockd/api"
"pkt.systems/lockd/client"
"pkt.systems/lql"
)
func main() {
ctx := context.Background()
dir, err := os.MkdirTemp("", "lockd-example-")
if err != nil {
fmt.Println("error:", err)
return
}
defer os.RemoveAll(dir)
socketPath := filepath.Join(dir, "lockd.sock")
cfg := lockd.Config{
Store: "mem://",
ListenProto: "unix",
Listen: socketPath,
DisableMTLS: true,
DisableStorageEncryption: true,
}
handle, err := lockd.StartServer(ctx, cfg)
if err != nil {
fmt.Println("error:", err)
return
}
stop := handle.Stop
defer stop(context.Background(), lockd.WithDrainLeases(0), lockd.WithShutdownTimeout(500*time.Millisecond))
cli, err := client.New("unix://"+socketPath, client.WithDisableMTLS(true))
if err != nil {
fmt.Println("error:", err)
return
}
docs := []struct {
OrderNo string
Status string
}{
{OrderNo: "A-1001", Status: "open"},
{OrderNo: "A-1002", Status: "closed"},
{OrderNo: "A-1003", Status: "open"},
}
for _, doc := range docs {
lease, err := cli.Acquire(ctx, api.AcquireRequest{Namespace: "orders", Owner: "ingest", TTLSeconds: 5})
if err != nil {
fmt.Println("error:", err)
return
}
state := make(map[string]any)
if err := lql.Mutate(state,
"/order_no="+doc.OrderNo,
"/status="+doc.Status,
); err != nil {
fmt.Println("error:", err)
return
}
if err := lease.Save(ctx, state); err != nil {
fmt.Println("error:", err)
return
}
lease.Release(ctx)
}
resp, err := cli.Query(ctx,
client.WithQueryNamespace("orders"),
client.WithQuery(`eq{field=/status,value=open}`),
client.WithQueryReturnDocuments(),
)
if err != nil {
fmt.Println("error:", err)
return
}
var openOrders []string
resp.ForEach(func(row client.QueryRow) error {
docReader, err := row.DocumentReader()
if err != nil {
return err
}
defer docReader.Close()
var doc map[string]any
if err := json.NewDecoder(docReader).Decode(&doc); err != nil {
return err
}
if order, ok := doc["order_no"].(string); ok {
openOrders = append(openOrders, order)
}
return nil
})
sort.Strings(openOrders)
for _, order := range openOrders {
fmt.Println("open order:", order)
}
}
Output: open order: A-1001 open order: A-1003
func (*Client) QueueAck ¶
func (c *Client) QueueAck(ctx context.Context, req api.AckRequest) (*api.AckResponse, error)
QueueAck acknowledges a queue message via the /v1/queue/ack API.
func (*Client) QueueExtend ¶
func (c *Client) QueueExtend(ctx context.Context, req api.ExtendRequest) (*api.ExtendResponse, error)
QueueExtend extends a queue message's visibility window using /v1/queue/extend.
func (*Client) QueueNack ¶
func (c *Client) QueueNack(ctx context.Context, req api.NackRequest) (*api.NackResponse, error)
QueueNack requeues a message with an optional visibility delay via /v1/queue/nack.
func (*Client) QueueStats ¶ added in v0.7.0
func (c *Client) QueueStats(ctx context.Context, queue string, opts QueueStatsOptions) (*api.QueueStatsResponse, error)
QueueStats reads side-effect-free runtime and head snapshot stats for a queue.
func (*Client) RegisterLeaseToken ¶
RegisterLeaseToken stores a lease -> fencing token mapping for subsequent requests. This is useful when the token is obtained out-of-band (for example via environment variables between CLI invocations).
func (*Client) Release ¶
func (c *Client) Release(ctx context.Context, req api.ReleaseRequest) (*api.ReleaseResponse, error)
Release drops a lease.
func (*Client) Remove ¶ added in v0.1.0
func (c *Client) Remove(ctx context.Context, key, leaseID string, opts RemoveOptions) (*api.RemoveResponse, error)
Remove deletes the JSON state for key while ensuring the lease and conditional headers (when provided) are honoured.
func (*Client) StartConsumer ¶ added in v0.4.0
func (c *Client) StartConsumer(ctx context.Context, consumers ...ConsumerConfig) error
StartConsumer starts one or more long-running queue consumers and blocks until they terminate. Each ConsumerConfig runs in its own goroutine and restarts on failure according to RestartPolicy. Panics from message handlers, lifecycle hooks, and error handlers are recovered and treated as consume-loop failures. Cancel ctx to stop all consumers; context cancellation returns nil.
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, queue string, opts SubscribeOptions, handler MessageHandler) error
Subscribe streams queue messages continuously and invokes handler for each delivery. While the handler runs, the client renews the in-flight queue lease implicitly via QueueExtend to reduce timeout risk for long-running handlers.
func (*Client) SubscribeWithState ¶
func (c *Client) SubscribeWithState(ctx context.Context, queue string, opts SubscribeOptions, handler MessageHandlerWithState) error
SubscribeWithState streams queue messages with workflow state and invokes handler for each delivery. While the handler runs, the client renews both the message lease and the associated state lease implicitly via QueueExtend.
func (*Client) TxnCommit ¶ added in v0.1.0
func (c *Client) TxnCommit(ctx context.Context, req api.TxnDecisionRequest) (*api.TxnDecisionResponse, error)
TxnCommit records a commit decision and applies it to participants.
func (*Client) TxnPrepare ¶ added in v0.1.0
func (c *Client) TxnPrepare(ctx context.Context, req api.TxnDecisionRequest) (*api.TxnDecisionResponse, error)
TxnPrepare records a pending decision for the txn, merging participants/expiry.
func (*Client) TxnReplay ¶ added in v0.1.0
TxnReplay replays the decision (or rolls back expired pending) for txnID.
func (*Client) TxnRollback ¶ added in v0.1.0
func (c *Client) TxnRollback(ctx context.Context, req api.TxnDecisionRequest) (*api.TxnDecisionResponse, error)
TxnRollback records a rollback decision and applies it to participants.
func (*Client) Update ¶
func (c *Client) Update(ctx context.Context, key, leaseID string, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
Update uploads new JSON state from the provided reader.
func (*Client) UpdateBytes ¶
func (c *Client) UpdateBytes(ctx context.Context, key, leaseID string, body []byte, opts UpdateOptions) (*UpdateResult, error)
UpdateBytes uploads new JSON state from the provided byte slice.
func (*Client) UpdateMetadata ¶ added in v0.1.0
func (c *Client) UpdateMetadata(ctx context.Context, key, leaseID string, opts UpdateOptions) (*MetadataResult, error)
UpdateMetadata mutates lock metadata without modifying the JSON state.
func (*Client) UpdateNamespaceConfig ¶ added in v0.1.0
func (c *Client) UpdateNamespaceConfig(ctx context.Context, req api.NamespaceConfigRequest, opts NamespaceConfigOptions) (NamespaceConfigResult, error)
UpdateNamespaceConfig mutates namespace-level settings and returns the updated configuration.
func (*Client) UpdateStream ¶ added in v0.7.0
func (c *Client) UpdateStream(ctx context.Context, key, leaseID string, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
UpdateStream uploads JSON state from a non-replayable reader without buffering.
This variant is intended for streaming callers (for example MCP write streams) and performs a single transport attempt. On retry/failover requirements, callers must begin a new stream and replay bytes from their source of truth.
func (*Client) UseLeaseID ¶ added in v0.1.0
UseLeaseID configures the client to reuse the provided lease for subsequent requests.
func (*Client) UseNamespace ¶ added in v0.1.0
UseNamespace updates the default namespace used when callers omit one.
func (*Client) WatchQueue ¶ added in v0.7.0
func (c *Client) WatchQueue(ctx context.Context, queue string, opts WatchQueueOptions, handler QueueWatchHandler) error
WatchQueue streams non-consuming queue visibility changes over SSE.
type ConsumerConfig ¶ added in v0.4.0
type ConsumerConfig struct {
// Name labels this consumer in logs and lifecycle/error callbacks.
// Empty defaults to Queue.
Name string
// Queue is the queue name to subscribe to.
Queue string
// Namespace scopes this consumer when Options.Namespace is empty.
// Empty falls back to the client's default namespace.
Namespace string
// Options configures subscription behavior (namespace, owner, prefetch, etc.).
// When Options.Owner is empty, StartConsumer generates a unique owner value.
Options SubscribeOptions
// WithState switches between Subscribe (false) and SubscribeWithState (true).
WithState bool
// MessageHandler processes each delivered message.
MessageHandler ConsumerMessageHandler
// ErrorHandler observes subscribe failures before restart.
// When nil, StartConsumer logs and continues.
ErrorHandler ConsumerErrorHandler
// OnStart runs when a consumer subscribe attempt starts.
OnStart func(context.Context, ConsumerLifecycleEvent)
// OnStop runs when a consumer subscribe attempt stops (success, context
// cancellation, or error).
OnStop func(context.Context, ConsumerLifecycleEvent)
// RestartPolicy controls retry/backoff behavior after failures.
RestartPolicy ConsumerRestartPolicy
}
ConsumerConfig describes one queue consumer managed by StartConsumer.
type ConsumerError ¶ added in v0.4.0
type ConsumerError struct {
// Name is the logical consumer name from ConsumerConfig.
Name string
// Queue is the queue whose consume loop failed.
Queue string
// WithState indicates whether the failed loop used SubscribeWithState.
WithState bool
// Attempt is the current consecutive failure count for this consumer.
Attempt int
// RestartIn is the delay before the next subscribe attempt.
RestartIn time.Duration
// Err is the underlying failure returned by Subscribe/SubscribeWithState.
Err error
}
ConsumerError describes a recoverable consumer loop failure before restart.
type ConsumerErrorHandler ¶ added in v0.4.0
type ConsumerErrorHandler func(context.Context, ConsumerError) error
ConsumerErrorHandler is invoked when a consume loop fails and is about to be restarted. Returning nil continues restart handling. Returning a non-nil error stops StartConsumer and returns that error (wrapped with queue context).
type ConsumerLifecycleEvent ¶ added in v0.4.0
type ConsumerLifecycleEvent struct {
// Name is the logical consumer name from ConsumerConfig.
Name string
// Queue is the queue this lifecycle event belongs to.
Queue string
// WithState indicates whether this consumer uses SubscribeWithState.
WithState bool
// Attempt is the 1-based subscribe attempt sequence for this consumer.
Attempt int
// Err is the terminal error for the attempt. It is nil for OnStart and for
// clean attempt completion.
Err error
}
ConsumerLifecycleEvent describes lifecycle transitions for one consumer.
type ConsumerMessage ¶ added in v0.4.0
type ConsumerMessage struct {
// Client is the active SDK client used by StartConsumer, allowing handlers to
// perform additional lockd operations (enqueue, acquire, queries, etc.)
// without constructing a second client.
Client *Client
// Logger is the client logger configured via WithLogger.
// It is always non-nil (defaults to pslog.NoopLogger()).
Logger pslog.Base
// Queue is the subscribed queue name for this delivery.
Queue string
// WithState indicates whether this delivery came from a stateful subscription.
WithState bool
// Message is the leased queue delivery payload/metadata wrapper.
Message *QueueMessage
// State is the workflow state lease handle for stateful subscriptions.
// It is nil when WithState is false.
State *QueueStateHandle
// contains filtered or unexported fields
}
ConsumerMessage bundles the runtime context provided to ConsumerMessageHandler. The same handler can be reused across multiple ConsumerConfig entries and inspect Queue/WithState to branch behavior.
func (ConsumerMessage) Name ¶ added in v0.4.0
func (m ConsumerMessage) Name() string
Name returns the logical consumer name resolved from ConsumerConfig.Name, defaulting to Queue when no explicit name was configured.
type ConsumerMessageHandler ¶ added in v0.4.0
type ConsumerMessageHandler func(context.Context, ConsumerMessage) error
ConsumerMessageHandler handles one queue delivery produced by StartConsumer.
type ConsumerRestartPolicy ¶ added in v0.4.0
type ConsumerRestartPolicy struct {
// ImmediateRetries is the number of consecutive failures retried with zero
// delay before exponential backoff starts.
ImmediateRetries int
// BaseDelay is the first delayed retry duration once immediate retries are exhausted.
BaseDelay time.Duration
// MaxDelay caps the restart delay.
MaxDelay time.Duration
// Multiplier controls exponential growth between delayed retries.
Multiplier float64
// Jitter randomizes delay by +/- Jitter to reduce synchronized retries.
Jitter time.Duration
// MaxFailures optionally stops the consumer after N consecutive failures.
// Zero or negative means retry forever.
MaxFailures int
}
ConsumerRestartPolicy configures restart behavior for failed consume loops.
type DeleteAllAttachmentsRequest ¶ added in v0.1.0
type DeleteAllAttachmentsRequest struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// TxnID associates the operation with a transaction coordinator record.
TxnID string
// FencingToken is the monotonic token used to fence stale writers.
FencingToken *int64
}
DeleteAllAttachmentsRequest removes all attachments for a key.
type DeleteAllAttachmentsResult ¶ added in v0.1.0
type DeleteAllAttachmentsResult struct {
// Deleted reports delete results for the requested attachment operation.
Deleted int
// Version is the lockd monotonic version for the target object.
Version int64
}
DeleteAllAttachmentsResult reports delete status for all attachments.
type DeleteAttachmentRequest ¶ added in v0.1.0
type DeleteAttachmentRequest struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// TxnID associates the operation with a transaction coordinator record.
TxnID string
// FencingToken is the monotonic token used to fence stale writers.
FencingToken *int64
// Selector identifies which attachment to delete (by ID, Name, or both).
Selector AttachmentSelector
}
DeleteAttachmentRequest removes a single attachment.
type DeleteAttachmentResult ¶ added in v0.1.0
type DeleteAttachmentResult struct {
// Deleted reports delete results for the requested attachment operation.
Deleted bool
// Version is the lockd monotonic version for the target object.
Version int64
}
DeleteAttachmentResult reports delete status for a single attachment.
type DequeueOptions ¶
type DequeueOptions struct {
// Namespace scopes the queue operation. Empty uses the client's default namespace.
Namespace string
// Owner identifies the worker/consumer acquiring the queue lease.
Owner string
// TxnID binds queue state operations to an existing transaction when required.
TxnID string
// Visibility controls the message lease timeout returned by dequeue.
Visibility time.Duration
// BlockSeconds controls long-poll behavior: BlockNoWait (-1) for immediate return,
// 0 to wait indefinitely, and >0 to wait up to that many seconds.
BlockSeconds int64
// PageSize caps batched dequeue result size (for APIs that support multi-message responses).
PageSize int
// StartAfter resumes dequeue scanning from a server-issued cursor.
StartAfter string
// OnCloseDelay applies a delay before auto-nack when QueueMessage.Close is called without ack.
OnCloseDelay time.Duration
}
DequeueOptions guides dequeue behaviour.
type DequeueResult ¶
type DequeueResult struct {
// Message is the primary dequeued message handle for single-message consumption paths.
Message *QueueMessageHandle
// Messages contains all dequeued message handles when batch dequeue is requested.
Messages []*QueueMessageHandle
// NextCursor is the server cursor for continuing dequeue scans.
NextCursor string
}
DequeueResult captures the outcome of a dequeue request.
type Document ¶ added in v0.1.0
type Document struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// Version is the lockd monotonic version for the target object.
Version string
// ETag is the entity tag used for optimistic concurrency and cache validation.
ETag string
// Metadata carries metadata values returned by the server for this object.
Metadata map[string]string
// Body holds the mutable JSON document content.
Body map[string]any
}
Document models a lockd state document with helper methods for JSON-pointer mutations and streaming interop.
func NewDocument ¶ added in v0.1.0
NewDocument initialises a mutable document for namespace/key.
func (*Document) Bytes ¶ added in v0.1.0
Bytes returns the compact JSON representation of the document body.
func (*Document) LoadFrom ¶ added in v0.1.0
LoadFrom replaces the document contents with data streamed from r.
func (*Document) MutateWithTime ¶ added in v0.1.0
MutateWithTime applies LQL mutations using the supplied timestamp.
type EnqueueOptions ¶
type EnqueueOptions struct {
// Namespace scopes the queue operation. Empty uses the client's default namespace.
Namespace string
// Delay postpones first visibility after enqueue.
Delay time.Duration
// Visibility controls how long a dequeued message stays hidden before it can be redelivered.
Visibility time.Duration
// TTL sets message retention. Zero uses server defaults.
TTL time.Duration
// MaxAttempts limits failed attempts before dead-letter handling.
MaxAttempts int
// Attributes stores arbitrary JSON-serializable metadata on the message.
Attributes map[string]any
// ContentType is sent as the payload media type. Empty defaults to application/octet-stream.
ContentType string
}
EnqueueOptions controls enqueue behaviour.
type FlushIndexOptions ¶ added in v0.1.0
type FlushIndexOptions struct {
// Mode accepts "async" (default) or "wait" for synchronous completion.
Mode string
}
FlushIndexOptions customises index flush behaviour.
type FlushOption ¶ added in v0.1.0
type FlushOption func(*FlushIndexOptions)
FlushOption customises FlushIndex behaviour.
func WithFlushModeAsync ¶ added in v0.1.0
func WithFlushModeAsync() FlushOption
WithFlushModeAsync schedules FlushIndex asynchronously and returns immediately.
func WithFlushModeWait ¶ added in v0.1.0
func WithFlushModeWait() FlushOption
WithFlushModeWait forces FlushIndex to block until indexing catches up.
type GetAttachmentRequest ¶ added in v0.1.0
type GetAttachmentRequest struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// TxnID associates the operation with a transaction coordinator record.
TxnID string
// FencingToken is the monotonic token used to fence stale writers.
FencingToken *int64
// Public enables read-only attachment retrieval without lease credentials when public reads are allowed.
Public bool
// Selector identifies which attachment to retrieve (by ID, Name, or both).
Selector AttachmentSelector
}
GetAttachmentRequest retrieves a single attachment payload.
type GetOption ¶ added in v0.1.0
type GetOption func(*GetOptions)
GetOption applies custom behaviour to Client.Get.
func WithGetLeaseID ¶ added in v0.1.0
WithGetLeaseID sets lease id used for lease-bound reads. Leave unset for public/read-only snapshots when allowed.
func WithGetNamespace ¶ added in v0.1.0
WithGetNamespace overrides namespace for Get. Empty values are normalized by server/client defaults.
func WithGetPublicDisabled ¶ added in v0.1.0
WithGetPublicDisabled disables public-read fallback and requires lease-backed semantics.
type GetOptions ¶ added in v0.1.0
type GetOptions struct {
// Namespace scopes the read. Empty uses the client's default namespace.
Namespace string
// LeaseID enforces lease-bound reads when provided.
LeaseID string
// DisablePublic forces authenticated/lease-backed reads instead of public fast-path reads.
DisablePublic bool
}
GetOptions tweaks the behaviour of Client.Get / Client.Load.
type GetResponse ¶ added in v0.1.0
type GetResponse struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// ETag is the entity tag used for optimistic concurrency and cache validation.
ETag string
// Version is the server version header for the returned key state.
Version string
// HasState reports whether the requested key currently has committed state.
HasState bool
// contains filtered or unexported fields
}
GetResponse encapsulates the payload and headers returned by Client.Get.
func (*GetResponse) Attachments ¶ added in v0.1.0
func (gr *GetResponse) Attachments() *AttachmentStore
Attachments returns an attachment store scoped to the get response.
func (*GetResponse) Bytes ¶ added in v0.1.0
func (gr *GetResponse) Bytes() ([]byte, error)
Bytes loads the state blob into memory and closes the underlying reader.
func (*GetResponse) Close ¶ added in v0.1.0
func (gr *GetResponse) Close() error
Close releases the underlying HTTP body when streaming isn�t required.
func (*GetResponse) Document ¶ added in v0.1.0
func (gr *GetResponse) Document() (*Document, error)
Document hydrates the response body into a Document and consumes the reader.
func (*GetResponse) ListAttachments ¶ added in v0.1.0
func (gr *GetResponse) ListAttachments(ctx context.Context) (*AttachmentList, error)
ListAttachments lists attachment metadata for the get response.
func (*GetResponse) Reader ¶ added in v0.1.0
func (gr *GetResponse) Reader() io.ReadCloser
Reader exposes the underlying body stream. Call Close when finished.
func (*GetResponse) RetrieveAttachment ¶ added in v0.1.0
func (gr *GetResponse) RetrieveAttachment(ctx context.Context, selector AttachmentSelector) (*Attachment, error)
RetrieveAttachment streams a single attachment payload for the get response.
type LeaseSession ¶
type LeaseSession struct {
api.AcquireResponse
// contains filtered or unexported fields
}
LeaseSession models an active lease returned by Acquire.
func (*LeaseSession) Attach ¶ added in v0.1.0
func (s *LeaseSession) Attach(ctx context.Context, req AttachRequest) (*AttachResult, error)
Attach stages an attachment on the active lease.
func (*LeaseSession) Attachments ¶ added in v0.1.0
func (s *LeaseSession) Attachments() *AttachmentStore
Attachments returns an attachment store scoped to the lease session.
func (*LeaseSession) Close ¶
func (s *LeaseSession) Close() error
Close is equivalent to Release using a background context.
func (*LeaseSession) CurrentFencingToken ¶ added in v0.7.0
func (s *LeaseSession) CurrentFencingToken() int64
CurrentFencingToken returns the current fencing token associated with the lease.
func (*LeaseSession) DeleteAllAttachments ¶ added in v0.1.0
func (s *LeaseSession) DeleteAllAttachments(ctx context.Context) (*DeleteAllAttachmentsResult, error)
DeleteAllAttachments removes all attachments for the lease key.
func (*LeaseSession) DeleteAttachment ¶ added in v0.1.0
func (s *LeaseSession) DeleteAttachment(ctx context.Context, selector AttachmentSelector) (*DeleteAttachmentResult, error)
DeleteAttachment removes a single attachment for the lease key.
func (*LeaseSession) Get ¶
func (s *LeaseSession) Get(ctx context.Context) (*StateSnapshot, error)
Get refreshes the current state snapshot using the active lease.
func (*LeaseSession) GetBytes ¶
func (s *LeaseSession) GetBytes(ctx context.Context) ([]byte, error)
GetBytes returns the current state blob as a byte slice.
func (*LeaseSession) KeepAlive ¶
func (s *LeaseSession) KeepAlive(ctx context.Context, ttl time.Duration) (*api.KeepAliveResponse, error)
KeepAlive extends the lease TTL without altering the stored state.
func (*LeaseSession) ListAttachments ¶ added in v0.1.0
func (s *LeaseSession) ListAttachments(ctx context.Context) (*AttachmentList, error)
ListAttachments lists attachment metadata for the lease key.
func (*LeaseSession) Load ¶
func (s *LeaseSession) Load(ctx context.Context, v any) error
Load reads the current state into v. When no state exists, v is untouched.
func (*LeaseSession) Mutate ¶ added in v0.7.0
func (s *LeaseSession) Mutate(ctx context.Context, mutations []string, options ...UpdateOption) (*UpdateResult, error)
Mutate applies server-side LQL mutations to the session's key while preserving the lease.
func (*LeaseSession) MutateLocal ¶ added in v0.8.0
func (s *LeaseSession) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)
MutateLocal applies client-local streaming LQL mutations to the session's key while preserving the lease.
func (*LeaseSession) MutateWithOptions ¶ added in v0.7.0
func (s *LeaseSession) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)
MutateWithOptions allows callers to override conditional metadata for server-side mutation.
func (*LeaseSession) Release ¶
func (s *LeaseSession) Release(ctx context.Context) error
Release relinquishes the lease early; it is safe to call multiple times. It commits staged changes by default. Use ReleaseWithOptions to request a rollback.
func (*LeaseSession) ReleaseWithOptions ¶ added in v0.1.0
func (s *LeaseSession) ReleaseWithOptions(ctx context.Context, opts ReleaseOptions) error
ReleaseWithOptions relinquishes the lease and allows callers to rollback staged changes.
func (*LeaseSession) Remove ¶
func (s *LeaseSession) Remove(ctx context.Context) (*api.RemoveResponse, error)
Remove deletes the current state while holding the lease, enforcing the cached version and etag when present.
func (*LeaseSession) RemoveWithOptions ¶
func (s *LeaseSession) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)
RemoveWithOptions allows callers to override conditional metadata for delete.
func (*LeaseSession) RetrieveAllAttachments ¶ added in v0.1.0
func (s *LeaseSession) RetrieveAllAttachments(ctx context.Context) ([]*Attachment, error)
RetrieveAllAttachments streams all attachments for the lease key.
func (*LeaseSession) RetrieveAttachment ¶ added in v0.1.0
func (s *LeaseSession) RetrieveAttachment(ctx context.Context, selector AttachmentSelector) (*Attachment, error)
RetrieveAttachment streams a single attachment payload for the lease key.
func (*LeaseSession) Save ¶
func (s *LeaseSession) Save(ctx context.Context, v any, opts ...UpdateOption) error
Save serialises v as JSON and updates the state.
func (*LeaseSession) Update ¶
func (s *LeaseSession) Update(ctx context.Context, body io.Reader, options ...UpdateOption) (*UpdateResult, error)
Update streams new JSON state for the session's key while preserving the lease.
func (*LeaseSession) UpdateBytes ¶
func (s *LeaseSession) UpdateBytes(ctx context.Context, body []byte, options ...UpdateOption) (*UpdateResult, error)
UpdateBytes is a convenience wrapper around Update that accepts an in-memory payload.
func (*LeaseSession) UpdateMetadata ¶ added in v0.1.0
func (s *LeaseSession) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)
UpdateMetadata toggles metadata flags without mutating the JSON state.
func (*LeaseSession) UpdateWithOptions ¶
func (s *LeaseSession) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
UpdateWithOptions allows callers to override conditional metadata.
type ListAttachmentsRequest ¶ added in v0.1.0
type ListAttachmentsRequest struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// TxnID associates the operation with a transaction coordinator record.
TxnID string
// FencingToken is the monotonic token used to fence stale writers.
FencingToken *int64
// Public enables read-only attachment listing without lease credentials when public reads are allowed.
Public bool
}
ListAttachmentsRequest lists attachments for a key.
type LoadOption ¶ added in v0.1.0
type LoadOption func(*LoadOptions)
LoadOption customises Client.Load.
func WithLoadLeaseID ¶ added in v0.1.0
func WithLoadLeaseID(id string) LoadOption
WithLoadLeaseID sets lease id for Load when lease-bound reads are required.
func WithLoadNamespace ¶ added in v0.1.0
func WithLoadNamespace(ns string) LoadOption
WithLoadNamespace overrides namespace for Load.
func WithLoadPublicDisabled ¶ added in v0.1.0
func WithLoadPublicDisabled(disable bool) LoadOption
WithLoadPublicDisabled disables public-read fallback for Load.
type LoadOptions ¶ added in v0.1.0
type LoadOptions struct {
// GetOptions carries namespace/lease/public-read behavior for Load.
GetOptions
}
LoadOptions mirrors GetOptions for Client.Load.
type MessageHandler ¶
type MessageHandler func(context.Context, *QueueMessage) error
MessageHandler is invoked for each message delivered via Subscribe.
type MessageHandlerWithState ¶
type MessageHandlerWithState func(context.Context, *QueueMessage, *QueueStateHandle) error
MessageHandlerWithState is invoked for stateful subscriptions and receives the associated workflow state handle.
type MetadataOptions ¶ added in v0.1.0
type MetadataOptions struct {
// QueryHidden marks a key hidden/visible from query results when non-nil.
QueryHidden *bool
// TxnID binds metadata-only mutations to a transaction.
TxnID string
}
MetadataOptions captures metadata mutations attached to updates.
type MetadataResult ¶ added in v0.1.0
type MetadataResult struct {
// Version is the new metadata version after mutation.
Version int64
// Metadata is the server's effective metadata document after mutation.
Metadata api.MetadataAttributes
}
MetadataResult captures metadata-only mutation outcomes.
type MutateLocalOptions ¶ added in v0.8.0
type MutateLocalOptions struct {
// Update controls namespace, CAS guards, fencing, and txn headers.
Update UpdateOptions
// DisableFetchedCAS skips defaulting IfETag/IfVersion from the streamed Get.
DisableFetchedCAS bool
// FileValueBaseDir resolves relative file:/textfile:/base64file: paths.
FileValueBaseDir string
// FileValueResolver overrides file opening for tests and custom callers.
FileValueResolver lql.MutationFileValueResolver
}
MutateLocalOptions customizes session-local streaming mutation helpers.
type MutateLocalRequest ¶ added in v0.8.0
type MutateLocalRequest struct {
// Key identifies the state object to mutate.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// Mutations contains one or more LQL mutation expressions in execution order.
Mutations []string
// Options controls namespace, CAS guards, fencing, and txn headers.
Options UpdateOptions
// DisableFetchedCAS skips defaulting IfETag/IfVersion from the streamed Get.
DisableFetchedCAS bool
// FileValueBaseDir resolves relative file:/textfile:/base64file: paths.
FileValueBaseDir string
// FileValueResolver overrides file opening for tests and custom callers.
FileValueResolver lql.MutationFileValueResolver
}
MutateLocalRequest describes a client-local streaming LQL mutation flow.
The client loads the lease-visible JSON state, applies mutations locally via lql.MutateStream, and streams the mutated JSON back through UpdateStream. This is the path that supports file:/textfile:/base64file: mutators.
type MutateRequest ¶ added in v0.7.0
type MutateRequest struct {
// Key identifies the state object to mutate.
Key string
// LeaseID identifies the active lease required for protected mutations.
LeaseID string
// Mutations contains one or more LQL mutation expressions in execution order.
Mutations []string
// Options controls namespace, CAS guards, fencing, and txn headers.
Options UpdateOptions
}
MutateRequest captures a server-side LQL mutation operation for one key.
type NamespaceConfigOptions ¶ added in v0.1.0
type NamespaceConfigOptions struct {
// IfMatch enforces optimistic concurrency against the current namespace-config ETag.
IfMatch string
}
NamespaceConfigOptions controls concurrency for namespace configuration mutations.
type NamespaceConfigResult ¶ added in v0.1.0
type NamespaceConfigResult struct {
// Config is the effective namespace configuration document returned by the server.
Config *api.NamespaceConfigResponse
// ETag is the namespace configuration ETag for optimistic concurrency control.
ETag string
}
NamespaceConfigResult captures a namespace config document and its ETag.
type Option ¶
type Option func(*Client)
Option customises client construction.
func WithBundlePEM ¶ added in v0.7.0
WithBundlePEM configures an in-memory mTLS client bundle PEM (CA cert + client cert + key). This option overrides any previously configured bundle path.
func WithBundlePath ¶ added in v0.4.0
WithBundlePath configures an mTLS client bundle PEM (CA cert + client cert + key). By default, "$VARS" and leading "~/" are expanded before loading. Use WithBundlePathDisableExpansion() to treat the path literally.
func WithBundlePathDisableExpansion ¶ added in v0.4.0
func WithBundlePathDisableExpansion() Option
WithBundlePathDisableExpansion disables env/tilde expansion for WithBundlePath.
func WithCloseTimeout ¶
WithCloseTimeout overrides timeout used when Close() auto-releases tracked leases.
func WithDefaultNamespace ¶ added in v0.1.0
WithDefaultNamespace overrides the namespace applied when request payloads/options omit Namespace.
func WithDisableMTLS ¶ added in v0.1.0
WithDisableMTLS toggles mutual TLS expectations for scheme inference. When disabled (false, default), base URLs without a scheme assume HTTPS. When enabled (true), bare endpoints default to HTTP and TLS client certificates are not loaded automatically.
func WithDrainAwareShutdown ¶ added in v0.1.0
WithDrainAwareShutdown toggles automatic lease release when server responses include Shutdown-Imminent drain signaling.
func WithEndpointShuffle ¶
WithEndpointShuffle toggles random shuffling of endpoints before each request. When disabled, endpoints are tried in the order provided.
func WithFailureRetries ¶
WithFailureRetries overrides how many times non-acquire operations retry on failure. A value <0 allows infinite retries.
func WithForUpdateTimeout ¶
WithForUpdateTimeout overrides timeout bound for AcquireForUpdate orchestration. Handler execution still follows the caller's context deadline/cancellation.
func WithHTTPClient ¶
WithHTTPClient supplies a custom HTTP client/transport stack. Use this when you need custom TLS roots, proxies, tracing round-trippers, or connection pooling behavior not covered by SDK defaults.
func WithHTTPTimeout ¶
WithHTTPTimeout overrides per-request timeout used by SDK-issued HTTP calls. This timeout does not apply to acquire/dequeue wait-forever calls that intentionally hold long-poll requests open.
func WithHTTPTrace ¶ added in v0.1.0
func WithHTTPTrace() Option
WithHTTPTrace enables net/http/httptrace diagnostics on SDK requests. Traces are emitted through the configured client logger.
func WithKeepAliveTimeout ¶
WithKeepAliveTimeout overrides timeout used for lease keepalive requests.
func WithLogger ¶
WithLogger supplies a logger for client diagnostics. Passing nil falls back to pslog.NoopLogger().
type QueryOption ¶ added in v0.1.0
type QueryOption func(*QueryOptions)
QueryOption customizes query execution.
func WithQuery ¶ added in v0.1.0
func WithQuery(expr string) QueryOption
WithQuery parses an LQL expression and sets the selector for the request. Parse errors are surfaced when Query executes.
func WithQueryBlock ¶ added in v0.1.0
func WithQueryBlock() QueryOption
WithQueryBlock waits for the queryable view to observe in-flight documents.
func WithQueryCursor ¶ added in v0.1.0
func WithQueryCursor(cursor string) QueryOption
WithQueryCursor resumes a previous query page using server-provided cursor token.
func WithQueryEngine ¶ added in v0.1.0
func WithQueryEngine(engine string) QueryOption
WithQueryEngine forces query execution engine ("auto", "index", or "scan").
func WithQueryEngineAuto ¶ added in v0.1.0
func WithQueryEngineAuto() QueryOption
WithQueryEngineAuto explicitly selects the auto engine.
func WithQueryEngineIndex ¶ added in v0.1.0
func WithQueryEngineIndex() QueryOption
WithQueryEngineIndex selects the secondary index engine.
func WithQueryEngineScan ¶ added in v0.1.0
func WithQueryEngineScan() QueryOption
WithQueryEngineScan selects the scan engine.
func WithQueryFields ¶ added in v0.1.0
func WithQueryFields(fields map[string]any) QueryOption
WithQueryFields sets field projection map passed to query execution.
func WithQueryLimit ¶ added in v0.1.0
func WithQueryLimit(limit int) QueryOption
WithQueryLimit caps number of rows returned by the server.
func WithQueryNamespace ¶ added in v0.1.0
func WithQueryNamespace(ns string) QueryOption
WithQueryNamespace overrides namespace used for query execution.
func WithQueryRefresh ¶ added in v0.1.0
func WithQueryRefresh(mode string) QueryOption
WithQueryRefresh selects refresh policy (for example "wait_for" to block until indexed visibility).
func WithQueryRefreshImmediate ¶ added in v0.1.0
func WithQueryRefreshImmediate() QueryOption
WithQueryRefreshImmediate clears the refresh hint (default behaviour).
func WithQueryRefreshWaitFor ¶ added in v0.1.0
func WithQueryRefreshWaitFor() QueryOption
WithQueryRefreshWaitFor waits until documents are visible in the selected engine.
func WithQueryRequest ¶ added in v0.1.0
func WithQueryRequest(req *api.QueryRequest) QueryOption
WithQueryRequest copies a full QueryRequest into the option set. Subsequent WithQuery* options can override individual fields.
func WithQueryReturn ¶ added in v0.1.0
func WithQueryReturn(mode QueryReturn) QueryOption
WithQueryReturn selects payload mode for /v1/query ("keys" or "documents").
func WithQueryReturnDocuments ¶ added in v0.1.0
func WithQueryReturnDocuments() QueryOption
WithQueryReturnDocuments streams documents as NDJSON rows.
func WithQueryReturnKeys ¶ added in v0.1.0
func WithQueryReturnKeys() QueryOption
WithQueryReturnKeys forces the default keys-only response mode.
func WithQuerySelector ¶ added in v0.1.0
func WithQuerySelector(sel api.Selector) QueryOption
WithQuerySelector installs an already-parsed selector (useful when callers construct selector ASTs directly).
type QueryOptions ¶ added in v0.1.0
type QueryOptions struct {
// Engine forces execution strategy ("auto", "index", "scan").
Engine string
// Refresh controls visibility semantics (for example "wait_for").
Refresh string
// Return controls payload shape (keys or streamed documents).
Return QueryReturn
// contains filtered or unexported fields
}
QueryOptions controls /v1/query execution hints.
type QueryResponse ¶ added in v0.1.0
type QueryResponse struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Cursor is the pagination cursor returned by the query engine.
Cursor string
// IndexSeq is the index sequence observed by the query execution.
IndexSeq uint64
// Metadata carries metadata values returned by the server for this object.
Metadata map[string]string
// contains filtered or unexported fields
}
QueryResponse describes the result set returned by Client.Query.
func (*QueryResponse) Close ¶ added in v0.1.0
func (qr *QueryResponse) Close() error
Close releases the underlying reader when the response streams documents.
func (*QueryResponse) ForEach ¶ added in v0.1.0
func (qr *QueryResponse) ForEach(fn func(QueryRow) error) error
ForEach invokes fn for every entry in the response. For document streams the handler receives fully populated QueryRow values.
func (*QueryResponse) Keys ¶ added in v0.1.0
func (qr *QueryResponse) Keys() []string
Keys returns a defensive copy of the key slice. When the response streams documents, Keys drains the stream, collects the keys, and closes the reader.
func (*QueryResponse) Mode ¶ added in v0.1.0
func (qr *QueryResponse) Mode() QueryReturn
Mode reports whether the query streamed keys or documents.
type QueryReturn ¶ added in v0.1.0
type QueryReturn string
QueryReturn describes the payload mode exposed by /v1/query.
const ( // QueryReturnKeys streams the default keys-only JSON object. QueryReturnKeys QueryReturn = QueryReturn(api.QueryReturnKeys) // QueryReturnDocuments streams newline-delimited documents. QueryReturnDocuments QueryReturn = QueryReturn(api.QueryReturnDocuments) )
type QueryRow ¶ added in v0.1.0
type QueryRow struct {
// Namespace scopes the request or response to a lockd namespace.
Namespace string
// Key identifies the lock/state key within the namespace.
Key string
// Version is the key version associated with this query row.
Version int64
// contains filtered or unexported fields
}
QueryRow represents a single row returned from /v1/query.
func (QueryRow) DocumentInto ¶ added in v0.1.0
DocumentInto unmarshals the document payload into target when present.
func (QueryRow) DocumentReader ¶ added in v0.1.0
func (row QueryRow) DocumentReader() (io.ReadCloser, error)
DocumentReader returns a streaming reader for the row's document. Callers must Close the returned reader when finished.
func (QueryRow) HasDocument ¶ added in v0.1.0
HasDocument reports whether the row was populated with a document payload.
type QueueMessage ¶
type QueueMessage struct {
// contains filtered or unexported fields
}
QueueMessage wraps a QueueMessageHandle and provides io.ReadCloser semantics with automatic nack-on-close when the caller forgets to ack explicitly.
func (*QueueMessage) Ack ¶
func (m *QueueMessage) Ack(ctx context.Context) error
Ack removes the message from the queue.
func (*QueueMessage) Attempts ¶
func (m *QueueMessage) Attempts() int
Attempts returns the recorded delivery attempts.
func (*QueueMessage) Close ¶
func (m *QueueMessage) Close() error
Close releases underlying resources and auto-nacks the message when it has not been acked or nacked explicitly.
func (*QueueMessage) ClosePayload ¶
func (m *QueueMessage) ClosePayload() error
ClosePayload releases the underlying payload stream without altering the lease state.
func (*QueueMessage) ContentType ¶
func (m *QueueMessage) ContentType() string
ContentType returns the payload content type.
func (*QueueMessage) CorrelationID ¶
func (m *QueueMessage) CorrelationID() string
CorrelationID returns the lifecycle correlation identifier for the message.
func (*QueueMessage) Cursor ¶
func (m *QueueMessage) Cursor() string
Cursor returns the next-cursor associated with the dequeue response.
func (*QueueMessage) DecodePayloadJSON ¶
func (m *QueueMessage) DecodePayloadJSON(v any) error
DecodePayloadJSON decodes the payload JSON into v and closes the payload afterwards.
func (*QueueMessage) Defer ¶ added in v0.5.0
Defer releases the lease and requeues intentionally without consuming failure budget.
func (*QueueMessage) FailureAttempts ¶ added in v0.5.0
func (m *QueueMessage) FailureAttempts() int
FailureAttempts returns the recorded failed attempts.
func (*QueueMessage) FencingToken ¶
func (m *QueueMessage) FencingToken() int64
FencingToken returns the monotonic fencing token issued with the lease.
func (*QueueMessage) LeaseExpiresAt ¶
func (m *QueueMessage) LeaseExpiresAt() int64
LeaseExpiresAt returns when the lease currently expires.
func (*QueueMessage) LeaseID ¶
func (m *QueueMessage) LeaseID() string
LeaseID exposes the underlying lease identifier.
func (*QueueMessage) MaxAttempts ¶
func (m *QueueMessage) MaxAttempts() int
MaxAttempts returns the configured maximum failed attempts.
func (*QueueMessage) MessageID ¶
func (m *QueueMessage) MessageID() string
MessageID returns the message identifier.
func (*QueueMessage) MetaETag ¶
func (m *QueueMessage) MetaETag() string
MetaETag returns the metadata ETag currently associated with the message.
func (*QueueMessage) Namespace ¶ added in v0.1.0
func (m *QueueMessage) Namespace() string
Namespace returns the namespace associated with the message.
func (*QueueMessage) NotVisibleUntil ¶
func (m *QueueMessage) NotVisibleUntil() time.Time
NotVisibleUntil reports when the message will become visible again.
func (*QueueMessage) PayloadReader ¶
func (m *QueueMessage) PayloadReader() (io.ReadCloser, error)
PayloadReader returns an independent ReadCloser view over the payload.
func (*QueueMessage) PayloadSize ¶
func (m *QueueMessage) PayloadSize() int64
PayloadSize reports the payload size in bytes when known.
func (*QueueMessage) Read ¶
func (m *QueueMessage) Read(p []byte) (int, error)
Read streams bytes from the message payload.
func (*QueueMessage) SetOnCloseDelay ¶
func (m *QueueMessage) SetOnCloseDelay(d time.Duration)
SetOnCloseDelay adjusts the delay used when Close() auto-nacks the message.
func (*QueueMessage) StateHandle ¶
func (m *QueueMessage) StateHandle() *QueueStateHandle
StateHandle exposes the workflow state handle when using DequeueWithState.
func (*QueueMessage) TxnID ¶ added in v0.1.0
func (m *QueueMessage) TxnID() string
TxnID returns the transaction id associated with the message lease, if any.
func (*QueueMessage) VisibilityTimeout ¶
func (m *QueueMessage) VisibilityTimeout() time.Duration
VisibilityTimeout returns the current visibility timeout.
func (*QueueMessage) WritePayloadTo ¶
func (m *QueueMessage) WritePayloadTo(w io.Writer) (int64, error)
WritePayloadTo streams the payload into w and closes the payload afterwards.
type QueueMessageHandle ¶
type QueueMessageHandle struct {
// contains filtered or unexported fields
}
QueueMessageHandle models a leased queue message and provides helpers to ack/nack/extend.
func (*QueueMessageHandle) Ack ¶
func (h *QueueMessageHandle) Ack(ctx context.Context) error
Ack removes the message (and state, when present) from the queue.
func (*QueueMessageHandle) Attempts ¶
func (h *QueueMessageHandle) Attempts() int
Attempts returns the delivery attempts recorded for the message.
func (*QueueMessageHandle) ClosePayload ¶
func (h *QueueMessageHandle) ClosePayload() error
ClosePayload releases any remaining payload stream resources without reading.
func (*QueueMessageHandle) ContentType ¶
func (h *QueueMessageHandle) ContentType() string
ContentType returns the payload content type.
func (*QueueMessageHandle) CorrelationID ¶
func (h *QueueMessageHandle) CorrelationID() string
CorrelationID returns the correlation identifier associated with the message lifecycle.
func (*QueueMessageHandle) Cursor ¶
func (h *QueueMessageHandle) Cursor() string
Cursor returns the next-cursor value associated with the dequeue call.
func (*QueueMessageHandle) DecodePayloadJSON ¶
func (h *QueueMessageHandle) DecodePayloadJSON(v any) error
DecodePayloadJSON decodes the payload as JSON into v.
func (*QueueMessageHandle) Defer ¶ added in v0.5.0
Defer releases the message with an optional delay without consuming failure budget.
func (*QueueMessageHandle) FailureAttempts ¶ added in v0.5.0
func (h *QueueMessageHandle) FailureAttempts() int
FailureAttempts returns the recorded failed attempts for the message.
func (*QueueMessageHandle) FencingToken ¶
func (h *QueueMessageHandle) FencingToken() int64
FencingToken exposes the current message fencing token.
func (*QueueMessageHandle) LeaseExpiresAt ¶
func (h *QueueMessageHandle) LeaseExpiresAt() int64
LeaseExpiresAt returns the unix timestamp when the message lease expires.
func (*QueueMessageHandle) LeaseID ¶
func (h *QueueMessageHandle) LeaseID() string
LeaseID returns the active message lease identifier.
func (*QueueMessageHandle) MaxAttempts ¶
func (h *QueueMessageHandle) MaxAttempts() int
MaxAttempts returns the configured maximum failed attempts for the message.
func (*QueueMessageHandle) MessageID ¶
func (h *QueueMessageHandle) MessageID() string
MessageID returns the message identifier.
func (*QueueMessageHandle) MetaETag ¶
func (h *QueueMessageHandle) MetaETag() string
MetaETag returns the metadata ETag associated with the message.
func (*QueueMessageHandle) Nack ¶
Nack releases the message with an optional delay and error payload.
func (*QueueMessageHandle) Namespace ¶ added in v0.1.0
func (h *QueueMessageHandle) Namespace() string
Namespace returns the namespace associated with the message.
func (*QueueMessageHandle) NotVisibleUntil ¶
func (h *QueueMessageHandle) NotVisibleUntil() time.Time
NotVisibleUntil reports when the message becomes visible again.
func (*QueueMessageHandle) PayloadReader ¶
func (h *QueueMessageHandle) PayloadReader() (io.ReadCloser, error)
PayloadReader returns a streaming reader for the message payload. Callers must close the returned reader when finished. The payload can only be consumed once; subsequent calls return an error.
func (*QueueMessageHandle) PayloadSize ¶
func (h *QueueMessageHandle) PayloadSize() int64
PayloadSize returns the declared payload size in bytes.
func (*QueueMessageHandle) Queue ¶
func (h *QueueMessageHandle) Queue() string
Queue returns the queue name.
func (*QueueMessageHandle) StateHandle ¶
func (h *QueueMessageHandle) StateHandle() *QueueStateHandle
StateHandle exposes the state lease handle when using DequeueWithState.
func (*QueueMessageHandle) VisibilityTimeout ¶
func (h *QueueMessageHandle) VisibilityTimeout() time.Duration
VisibilityTimeout returns the current visibility timeout.
func (*QueueMessageHandle) WritePayloadTo ¶
func (h *QueueMessageHandle) WritePayloadTo(w io.Writer) (int64, error)
WritePayloadTo streams the payload into w, closing the payload afterwards.
type QueueStateHandle ¶
type QueueStateHandle struct {
// contains filtered or unexported fields
}
QueueStateHandle exposes the workflow state lease associated with a stateful dequeue.
func (*QueueStateHandle) CorrelationID ¶
func (s *QueueStateHandle) CorrelationID() string
CorrelationID returns the message correlation identifier associated with the state lease.
func (*QueueStateHandle) ETag ¶
func (s *QueueStateHandle) ETag() string
ETag returns the workflow state ETag.
func (*QueueStateHandle) FencingToken ¶
func (s *QueueStateHandle) FencingToken() int64
FencingToken returns the workflow state fencing token.
func (*QueueStateHandle) Get ¶ added in v0.4.0
func (s *QueueStateHandle) Get(ctx context.Context) (*StateSnapshot, error)
Get reads the current workflow state snapshot for the message's state lease.
func (*QueueStateHandle) GetBytes ¶ added in v0.4.0
func (s *QueueStateHandle) GetBytes(ctx context.Context) ([]byte, error)
GetBytes returns the current workflow state payload as bytes.
func (*QueueStateHandle) LeaseExpiresAt ¶
func (s *QueueStateHandle) LeaseExpiresAt() int64
LeaseExpiresAt returns the unix timestamp when the state lease expires.
func (*QueueStateHandle) LeaseID ¶
func (s *QueueStateHandle) LeaseID() string
LeaseID returns the workflow state lease identifier.
func (*QueueStateHandle) Load ¶ added in v0.4.0
func (s *QueueStateHandle) Load(ctx context.Context, v any) error
Load unmarshals the current workflow state into v.
func (*QueueStateHandle) MessageID ¶
func (s *QueueStateHandle) MessageID() string
MessageID returns the associated message ID.
func (*QueueStateHandle) Mutate ¶ added in v0.7.0
func (s *QueueStateHandle) Mutate(ctx context.Context, mutations []string, opts ...UpdateOption) (*UpdateResult, error)
Mutate applies server-side LQL mutations while preserving the state lease.
func (*QueueStateHandle) MutateLocal ¶ added in v0.8.0
func (s *QueueStateHandle) MutateLocal(ctx context.Context, mutations []string, options MutateLocalOptions) (*UpdateResult, error)
MutateLocal applies client-local streaming LQL mutations while preserving the state lease.
func (*QueueStateHandle) MutateWithOptions ¶ added in v0.7.0
func (s *QueueStateHandle) MutateWithOptions(ctx context.Context, mutations []string, opts UpdateOptions) (*UpdateResult, error)
MutateWithOptions applies server-side LQL mutations with explicit conditional/header overrides.
func (*QueueStateHandle) Queue ¶
func (s *QueueStateHandle) Queue() string
Queue returns the queue name.
func (*QueueStateHandle) Remove ¶ added in v0.4.0
func (s *QueueStateHandle) Remove(ctx context.Context) (*api.RemoveResponse, error)
Remove deletes workflow state while preserving queue lease lifecycle semantics.
func (*QueueStateHandle) RemoveWithOptions ¶ added in v0.4.0
func (s *QueueStateHandle) RemoveWithOptions(ctx context.Context, opts RemoveOptions) (*api.RemoveResponse, error)
RemoveWithOptions deletes workflow state with optional conditional overrides.
func (*QueueStateHandle) Save ¶ added in v0.4.0
func (s *QueueStateHandle) Save(ctx context.Context, v any, opts ...UpdateOption) error
Save marshals v as JSON and updates workflow state through the state lease.
func (*QueueStateHandle) Update ¶ added in v0.4.0
func (s *QueueStateHandle) Update(ctx context.Context, body io.Reader, opts ...UpdateOption) (*UpdateResult, error)
Update streams new workflow state JSON while preserving the state lease.
func (*QueueStateHandle) UpdateBytes ¶ added in v0.4.0
func (s *QueueStateHandle) UpdateBytes(ctx context.Context, body []byte, opts ...UpdateOption) (*UpdateResult, error)
UpdateBytes is a convenience wrapper around Update for in-memory payloads.
func (*QueueStateHandle) UpdateMetadata ¶ added in v0.4.0
func (s *QueueStateHandle) UpdateMetadata(ctx context.Context, meta MetadataOptions) (*MetadataResult, error)
UpdateMetadata mutates metadata for the workflow state key.
func (*QueueStateHandle) UpdateWithOptions ¶ added in v0.4.0
func (s *QueueStateHandle) UpdateWithOptions(ctx context.Context, body io.Reader, opts UpdateOptions) (*UpdateResult, error)
UpdateWithOptions updates workflow state while allowing conditional/header overrides.
type QueueStatsOptions ¶ added in v0.7.0
type QueueStatsOptions struct {
// Namespace scopes the queue operation. Empty uses the client's default namespace.
Namespace string
}
QueueStatsOptions configures queue stats reads.
type QueueWatchEvent ¶ added in v0.7.0
type QueueWatchEvent struct {
Namespace string
Queue string
Available bool
HeadMessageID string
ChangedAt time.Time
CorrelationID string
}
QueueWatchEvent describes queue visibility changes emitted by WatchQueue.
type QueueWatchHandler ¶ added in v0.7.0
type QueueWatchHandler func(context.Context, QueueWatchEvent) error
QueueWatchHandler is invoked for each WatchQueue event.
type ReleaseOptions ¶ added in v0.1.0
type ReleaseOptions struct {
// Rollback discards staged state/attachments instead of committing them.
Rollback bool
}
ReleaseOptions controls commit vs rollback semantics during lease release.
type RemoveOptions ¶ added in v0.1.0
type RemoveOptions struct {
// IfETag sets a conditional ETag guard (maps to If-Match semantics).
IfETag string
// IfVersion sets a conditional version guard (X-If-Version).
IfVersion *int64
// FencingToken provides explicit fencing when not already registered on the client.
FencingToken *int64
// Namespace scopes the delete. Empty uses the client's default namespace.
Namespace string
// TxnID binds the delete to a transaction coordinator record.
TxnID string
}
RemoveOptions controls conditional delete semantics.
type StateSnapshot ¶
type StateSnapshot struct {
// Reader streams the state payload. It may be nil when HasState is false.
Reader io.ReadCloser
// ETag is the backend entity tag for CAS-protected writes/deletes.
ETag string
// Version is lockd's monotonic version counter for the key.
Version int64
// HasState reports whether the key currently has a committed JSON document.
HasState bool
}
StateSnapshot represents the current JSON state and metadata for a lease.
func (*StateSnapshot) Bytes ¶
func (s *StateSnapshot) Bytes() ([]byte, error)
Bytes returns the raw JSON payload.
func (*StateSnapshot) Close ¶
func (s *StateSnapshot) Close() error
Close releases the underlying reader if present.
func (*StateSnapshot) Decode ¶
func (s *StateSnapshot) Decode(v any) error
Decode unmarshals the JSON payload into v.
type SubscribeOptions ¶
type SubscribeOptions struct {
// Namespace scopes the queue operation. Empty uses the client's default namespace.
Namespace string
// Owner identifies the worker/consumer processing streamed deliveries.
// For direct Subscribe calls, this is required. StartConsumer auto-fills a
// generated unique owner when left empty.
Owner string
// Visibility controls per-message lease timeout for streamed deliveries.
Visibility time.Duration
// BlockSeconds controls server-side wait behavior between deliveries.
BlockSeconds int64
// Prefetch controls how many messages the server may pipeline before handler ack/nack.
Prefetch int
// StartAfter resumes a previous subscription stream from a cursor.
StartAfter string
// OnCloseDelay applies a delay before auto-nack when handlers close without ack.
OnCloseDelay time.Duration
}
SubscribeOptions configures continuous streaming consumption via Subscribe.
type UpdateOption ¶ added in v0.1.0
type UpdateOption interface {
// contains filtered or unexported methods
}
UpdateOption customizes update behaviour.
func WithMetadata ¶ added in v0.1.0
func WithMetadata(meta MetadataOptions) UpdateOption
WithMetadata attaches metadata mutations to the same request as the state update.
func WithQueryHidden ¶ added in v0.1.0
func WithQueryHidden() UpdateOption
WithQueryHidden marks the key hidden from /v1/query after the update commits.
func WithQueryVisible ¶ added in v0.1.0
func WithQueryVisible() UpdateOption
WithQueryVisible clears the query-hidden metadata flag after update commit.
func WithTxnID ¶ added in v0.1.0
func WithTxnID(txnID string) UpdateOption
WithTxnID binds an update/metadata mutation to a transaction coordinator id. The value is copied to UpdateOptions.TxnID and MetadataOptions.TxnID.
type UpdateOptions ¶ added in v0.1.0
type UpdateOptions struct {
// IfETag sets a conditional ETag guard (maps to If-Match semantics).
IfETag string
// IfVersion sets a conditional version guard (X-If-Version).
IfVersion *int64
// ExpectedSHA256 enforces the submitted JSON payload SHA-256 (pre-compaction).
ExpectedSHA256 string
// ExpectedBytes enforces the submitted JSON payload byte length (pre-compaction).
ExpectedBytes *int64
// FencingToken provides explicit fencing when not already registered on the client.
FencingToken *int64
// Namespace scopes the mutation. Empty uses the client's default namespace.
Namespace string
// Metadata applies metadata mutations alongside the state update.
Metadata MetadataOptions
// TxnID binds the mutation to a transaction coordinator record.
TxnID string
}
UpdateOptions controls conditional update semantics.
type UpdateResult ¶ added in v0.1.0
type UpdateResult struct {
// NewVersion is the updated monotonic version after a successful mutation.
NewVersion int64 `json:"new_version"`
// NewStateETag is the updated state entity tag after a successful mutation.
NewStateETag string `json:"new_state_etag"`
// BytesWritten is the number of state bytes accepted by the update.
BytesWritten int64 `json:"bytes"`
// Metadata carries metadata values returned by the server for this object.
Metadata api.MetadataAttributes `json:"metadata,omitempty"`
}
UpdateResult captures the response from Update.
type WatchQueueOptions ¶ added in v0.7.0
type WatchQueueOptions struct {
// Namespace scopes the queue operation. Empty uses the client's default namespace.
Namespace string
}
WatchQueueOptions configures non-consuming queue visibility watches.