Documentation
¶
Overview ¶
File store NATS request handlers.
Exposes pkg/filestore.Store operations over NATS request/reply so that TypeScript plugins (and any other NATS client) can access the file store without a direct JetStream Object Store connection.
Subjects:
files.put — store a file (base64 data) files.get — retrieve a file (returns base64 data) files.head — file metadata only files.delete — remove a file files.list — list files (optional prefix filter)
Package mesh provides NATS-based service mesh for jb-mesh nodes.
Each node in the mesh registers its tools as NATS micro services. Tool calls are routed via NATS subjects, with automatic load balancing when multiple nodes offer the same tool.
Index ¶
- func CancelSubject(callID string) string
- func Connect(cfg Config, extraOpts ...nats.Option) (*nats.Conn, error)
- func NodeStreamSubject(nodeName, toolName, method string) string
- func ParseKeyValuePairs(pairs []string) (map[string][]string, error)
- func ResolveFileParams(params map[string]interface{}, store *filestore.Store, nc ...*nats.Conn) (func(), error)
- func StreamSubject(toolName, method string) string
- type CallRequest
- type CallResult
- type Config
- type ConfigRequest
- type ConfigResult
- type FileDeleteRequest
- type FileDeleteResult
- type FileGetRequest
- type FileGetResult
- type FileHeadRequest
- type FileHeadResult
- type FileListItem
- type FileListRequest
- type FileListResult
- type FilePutRequest
- type FilePutResult
- type InstallRequest
- type InstallResult
- type Mesh
- func (m *Mesh) Call(toolName, method string, params map[string]interface{}, timeout time.Duration, ...) (*CallResult, error)
- func (m *Mesh) CallWithContext(ctx context.Context, toolName, method string, params map[string]interface{}, ...) (*CallResult, error)
- func (m *Mesh) Close()
- func (m *Mesh) Conn() *nats.Conn
- func (m *Mesh) Connected() bool
- func (m *Mesh) GetToolSchema(nodeName, toolName string, timeout time.Duration) (*SchemaResult, error)
- func (m *Mesh) JetStream() (jetstream.JetStream, error)
- func (m *Mesh) ListServices() ([]micro.Info, error)
- func (m *Mesh) NodeName() string
- func (m *Mesh) RegisterTool(toolName, version, description string, methods []string, handler ToolHandler, ...) error
- func (m *Mesh) RequestConfig(nodeName, toolName, action string, values map[string]interface{}, ...) (*ConfigResult, error)
- func (m *Mesh) RequestInstall(nodeName, source string, timeout time.Duration) (*InstallResult, error)
- func (m *Mesh) RequestReleaseInspect(nodeName, toolName string, timeout time.Duration) (*ReleaseInspectResult, error)
- func (m *Mesh) RequestUninstall(nodeName, toolName string, removeEnv bool, timeout time.Duration) (*UninstallResult, error)
- func (m *Mesh) RequestUpdate(nodeName, toolName string, clean bool, timeout time.Duration) (*UpdateResult, error)
- func (m *Mesh) SetFileStore(store *filestore.Store)
- func (m *Mesh) Stream(ctx context.Context, toolName, method string, params map[string]interface{}, ...) (<-chan StreamFrame, error)
- func (m *Mesh) SubscribeConfig(...) error
- func (m *Mesh) SubscribeFileHandlers(store *filestore.Store) error
- func (m *Mesh) SubscribeInstall(handler func(source string) (string, string, error)) error
- func (m *Mesh) SubscribeReleaseInspect(handler func(toolName string) (*ReleaseInspectInfo, error)) error
- func (m *Mesh) SubscribeToolSchema(handler func(toolName string) (map[string]interface{}, error)) error
- func (m *Mesh) SubscribeUninstall(handler func(toolName string, removeEnv bool) error) error
- func (m *Mesh) SubscribeUpdate(handler func(toolName string, clean bool) (oldVer, newVer string, err error)) error
- func (m *Mesh) UnregisterTool(toolName string) bool
- type MethodSchema
- type NATSWebSocketConfig
- type ReleaseInspectInfo
- type ReleaseInspectRepoState
- type ReleaseInspectRequest
- type ReleaseInspectResult
- type SchemaInfo
- type SchemaRequest
- type SchemaResult
- type StreamFrame
- type ToolEndpoint
- type ToolHandler
- type UninstallRequest
- type UninstallResult
- type UpdateRequest
- type UpdateResult
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CancelSubject ¶
CancelSubject returns the NATS subject a caller publishes to in order to signal cancellation of a specific in-flight call. The serving node subscribes to the same subject for the call's lifetime.
func Connect ¶
Connect opens a NATS connection using generic mesh config plus any extra nats.go options needed by the caller.
func NodeStreamSubject ¶
NodeStreamSubject returns the node-targeted streaming subject used for directing streaming calls to a specific node rather than load-balancing.
func ParseKeyValuePairs ¶
ParseKeyValuePairs parses repeated key=value CLI inputs into a multi-value map.
func ResolveFileParams ¶
func ResolveFileParams(params map[string]interface{}, store *filestore.Store, nc ...*nats.Conn) (func(), error)
ResolveFileParams checks params for file store keys and downloads them to temp files. A param is resolved if: 1. Its name is in the known file param set 2. Its value is a string that looks like a file store key (contains "/" but doesn't start with "/" or "~") 3. It doesn't look like an absolute path or URL
If the local store doesn't have the file, falls back to a NATS files.get request (which reaches the seed node's store). This handles the case where leaf nodes don't have JetStream replication from the seed.
Returns a cleanup function that removes temp files.
func StreamSubject ¶
StreamSubject returns the NATS subject for a streaming tool method. Streaming subjects are separate from the single-reply ones so old callers using Mesh.Call continue to work even after a method is upgraded to streaming.
Types ¶
type CallRequest ¶
type CallRequest struct {
Params map[string]interface{} `json:"params"`
Node string `json:"node,omitempty"` // target specific node (empty = any)
Corr string `json:"corr,omitempty"`
// CallID, if set, identifies this call for cancellation routing. When the
// caller's context fires, it publishes an empty message to
// `cancel.<CallID>` and the serving node subscribes to that subject for the
// duration of the call. Empty for non-cancellable calls.
CallID string `json:"call_id,omitempty"`
}
CallRequest is the JSON payload for a tool method call
type CallResult ¶
type CallResult struct {
OK bool `json:"ok"`
Result interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Node string `json:"node"`
}
CallResult is the JSON response from a tool method call
type Config ¶
type Config struct {
NATSUrl string // NATS server URL (default: nats://localhost:4222)
NodeName string // Human-readable node name
Token string // Optional auth token
WebSocket NATSWebSocketConfig // Optional WebSocket client settings for ws:// or wss:// URLs
}
Config holds mesh connection settings
type ConfigRequest ¶
type ConfigRequest struct {
ToolName string `json:"tool_name"`
Action string `json:"action"` // "get" or "set"
Values map[string]interface{} `json:"values,omitempty"`
}
ConfigRequest is sent over NATS to get or set tool config on a node
type ConfigResult ¶
type ConfigResult struct {
OK bool `json:"ok"`
Config map[string]interface{} `json:"config,omitempty"`
Error string `json:"error,omitempty"`
Node string `json:"node"`
}
ConfigResult is the response from a config request
type FileDeleteRequest ¶
type FileDeleteRequest struct {
Key string `json:"key"`
}
FileDeleteRequest is sent to files.delete.
type FileDeleteResult ¶
FileDeleteResult is the response from files.delete.
type FileGetRequest ¶
type FileGetRequest struct {
Key string `json:"key"`
}
FileGetRequest is sent to files.get.
type FileGetResult ¶
type FileGetResult struct {
OK bool `json:"ok"`
Key string `json:"key,omitempty"`
Data string `json:"data,omitempty"` // base64-encoded file content
ContentType string `json:"content_type,omitempty"`
Size int64 `json:"size,omitempty"`
ETag string `json:"etag,omitempty"`
Error string `json:"error,omitempty"`
}
FileGetResult is the response from files.get.
type FileHeadRequest ¶
type FileHeadRequest struct {
Key string `json:"key"`
}
FileHeadRequest is sent to files.head.
type FileHeadResult ¶
type FileHeadResult struct {
OK bool `json:"ok"`
Key string `json:"key,omitempty"`
ContentType string `json:"content_type,omitempty"`
Size int64 `json:"size,omitempty"`
ETag string `json:"etag,omitempty"`
Created string `json:"created,omitempty"` // RFC3339
Error string `json:"error,omitempty"`
}
FileHeadResult is the response from files.head.
type FileListItem ¶
type FileListItem struct {
Key string `json:"key"`
Size int64 `json:"size"`
ContentType string `json:"content_type"`
ETag string `json:"etag"`
Created string `json:"created"` // RFC3339
}
FileListItem describes a single file in a list response.
type FileListRequest ¶
type FileListRequest struct {
Prefix string `json:"prefix"`
}
FileListRequest is sent to files.list.
type FileListResult ¶
type FileListResult struct {
OK bool `json:"ok"`
Files []FileListItem `json:"files,omitempty"`
Error string `json:"error,omitempty"`
}
FileListResult is the response from files.list.
type FilePutRequest ¶
type FilePutRequest struct {
Key string `json:"key"`
Data string `json:"data"` // base64-encoded file content
ContentType string `json:"content_type"`
}
FilePutRequest is sent to files.put.
type FilePutResult ¶
type FilePutResult struct {
OK bool `json:"ok"`
Key string `json:"key,omitempty"`
Size int64 `json:"size,omitempty"`
ContentType string `json:"content_type,omitempty"`
ETag string `json:"etag,omitempty"`
Error string `json:"error,omitempty"`
}
FilePutResult is the response from files.put.
type InstallRequest ¶
type InstallRequest struct {
Source string `json:"source"` // Git URL or path
}
InstallRequest is sent over NATS to request a remote tool install
type InstallResult ¶
type InstallResult struct {
OK bool `json:"ok"`
ToolName string `json:"tool_name,omitempty"`
Version string `json:"version,omitempty"`
Error string `json:"error,omitempty"`
Node string `json:"node"`
}
InstallResult is the response from a remote install request
type Mesh ¶
type Mesh struct {
// contains filtered or unexported fields
}
Mesh manages the NATS connection and service registrations for a node
func (*Mesh) Call ¶
func (m *Mesh) Call(toolName, method string, params map[string]interface{}, timeout time.Duration, targetNode ...string) (*CallResult, error)
Call invokes a tool method anywhere in the mesh via NATS request/reply. If targetNode is non-empty, the call is routed directly to that node instead of being load-balanced across all nodes offering the tool.
func (*Mesh) CallWithContext ¶
func (m *Mesh) CallWithContext(ctx context.Context, toolName, method string, params map[string]interface{}, targetNode ...string) (*CallResult, error)
CallWithContext is like Call but propagates ctx cancellation to the serving node via a NATS publish to `cancel.<call_id>`. The serving node subscribes to that subject for the call's duration and forwards the signal to its jumpboot tool process (where a cooperatively-cancellable method can stop early).
A call_id is generated automatically and threaded into the request payload. If ctx fires before the response arrives, CallWithContext returns ctx.Err() after best-effort publishing the cancel signal — it does not wait for the server to acknowledge cancellation. Callers needing a hard deadline should derive ctx with context.WithTimeout.
Part of Phase 1 of the streaming+cancellation design — see the streaming and cancellation design.
func (*Mesh) Conn ¶
Conn returns the underlying NATS connection. Use sparingly — prefer higher-level Mesh methods.
func (*Mesh) GetToolSchema ¶
func (m *Mesh) GetToolSchema(nodeName, toolName string, timeout time.Duration) (*SchemaResult, error)
GetToolSchema requests the schema for a specific tool from a specific node Returns the schema as a map (keyed by method name, value is params schema) or an error if the node doesn't have the tool or schema fetch fails
func (*Mesh) JetStream ¶
JetStream returns a JetStream context for this connection. Returns an error if the NATS server does not have JetStream enabled.
func (*Mesh) ListServices ¶
ListServices discovers all services in the mesh using NATS micro discovery
func (*Mesh) RegisterTool ¶
func (m *Mesh) RegisterTool(toolName, version, description string, methods []string, handler ToolHandler, methodSchemas ...map[string]MethodSchema) error
RegisterTool registers a tool's methods as NATS micro service endpoints. Subject pattern: tools.<toolName>.<methodName> methodSchemas is optional: map of method name -> input schema (will be included in endpoint metadata).
func (*Mesh) RequestConfig ¶
func (m *Mesh) RequestConfig(nodeName, toolName, action string, values map[string]interface{}, timeout time.Duration) (*ConfigResult, error)
RequestConfig sends a config get/set request to a specific node
func (*Mesh) RequestInstall ¶
func (m *Mesh) RequestInstall(nodeName, source string, timeout time.Duration) (*InstallResult, error)
RequestInstall sends an install request to a specific node and waits for the result
func (*Mesh) RequestReleaseInspect ¶
func (m *Mesh) RequestReleaseInspect(nodeName, toolName string, timeout time.Duration) (*ReleaseInspectResult, error)
RequestReleaseInspect asks a node to describe the deployed checkout state for a tool.
func (*Mesh) RequestUninstall ¶
func (m *Mesh) RequestUninstall(nodeName, toolName string, removeEnv bool, timeout time.Duration) (*UninstallResult, error)
RequestUninstall sends an uninstall request to a specific node
func (*Mesh) RequestUpdate ¶
func (m *Mesh) RequestUpdate(nodeName, toolName string, clean bool, timeout time.Duration) (*UpdateResult, error)
RequestUpdate sends an update request to a specific node
func (*Mesh) SetFileStore ¶
SetFileStore sets the file store used for resolving file params in tool calls.
func (*Mesh) Stream ¶
func (m *Mesh) Stream(ctx context.Context, toolName, method string, params map[string]interface{}, targetNode ...string) (<-chan StreamFrame, error)
Stream invokes a streaming tool method and returns a channel that yields StreamFrame values as they arrive from the serving node. The channel is closed after the terminal frame (Done=true) is delivered, or after ctx fires / an error occurs.
The call is routed to the dedicated `tools.<tool>.<method>.stream` subject (or its node-targeted variant when targetNode is set), which the serving node only registers for methods declared with `stream: true` in their manifest. Calls to non-streaming methods on the stream subject get no responder.
On ctx.Done, Stream publishes to `cancel.<call_id>` so the serving node can propagate the signal to its tool process (the same cancel mechanism as CallWithContext). The channel is then closed without waiting for further frames.
the streaming RPC support.
func (*Mesh) SubscribeConfig ¶
func (m *Mesh) SubscribeConfig(handler func(toolName, action string, values map[string]interface{}) (map[string]interface{}, error)) error
SubscribeConfig listens for config requests on node.<nodeName>.config
func (*Mesh) SubscribeFileHandlers ¶
SubscribeFileHandlers registers all file store request handlers on the NATS connection. The handlers are not node-scoped — any node with a file store can serve requests. Since all nodes share the same NATS Object Store bucket, it doesn't matter which node handles the request.
func (*Mesh) SubscribeInstall ¶
SubscribeInstall listens for install requests on node.<nodeName>.install The handler is called with the source URL and should return (toolName, version, error)
func (*Mesh) SubscribeReleaseInspect ¶
func (m *Mesh) SubscribeReleaseInspect(handler func(toolName string) (*ReleaseInspectInfo, error)) error
SubscribeReleaseInspect listens for release inspection requests on node.<nodeName>.release.inspect
func (*Mesh) SubscribeToolSchema ¶
func (m *Mesh) SubscribeToolSchema(handler func(toolName string) (map[string]interface{}, error)) error
SubscribeToolSchema registers a handler for schema requests on node.<nodeName>.tools.*.schema The handler receives the tool name and should return the schema or an error
func (*Mesh) SubscribeUninstall ¶
SubscribeUninstall listens for uninstall requests on node.<nodeName>.uninstall
func (*Mesh) SubscribeUpdate ¶
func (m *Mesh) SubscribeUpdate(handler func(toolName string, clean bool) (oldVer, newVer string, err error)) error
SubscribeUpdate listens for update requests on node.<nodeName>.update
func (*Mesh) UnregisterTool ¶
UnregisterTool removes a tool's service registration from the mesh. Returns true if the tool was found and removed, false if not found.
type MethodSchema ¶
type MethodSchema struct {
Properties map[string]interface{} `json:"properties,omitempty"`
Required []string `json:"required,omitempty"`
Description string `json:"description,omitempty"`
Type string `json:"type,omitempty"`
// Stream is true when the method serves on the `.stream` subject. Callers
// can use this flag to choose Mesh.Stream vs Mesh.CallWithContext without
// an out-of-band registry.
// Omitted from JSON when false to keep wire compatibility with older
// non-streaming-aware consumers.
Stream bool `json:"stream,omitempty"`
}
MethodSchema holds JSON Schema for a single method's input parameters, plus per-method metadata not strictly part of JSON Schema.
type NATSWebSocketConfig ¶
type NATSWebSocketConfig struct {
ProxyPath string
BearerToken string
Headers http.Header
Query url.Values
}
NATSWebSocketConfig holds optional client settings for NATS-over-WebSocket connections that are mounted behind an HTTP proxy path.
type ReleaseInspectInfo ¶
type ReleaseInspectInfo struct {
ToolName string `json:"tool_name"`
ToolPath string `json:"tool_path"`
ManifestVersion string `json:"manifest_version"`
Repo *ReleaseInspectRepoState `json:"repo,omitempty"`
}
type ReleaseInspectRepoState ¶
type ReleaseInspectRepoState struct {
Root string `json:"root"`
ResolvedPath string `json:"resolved_path"`
Branch string `json:"branch"`
Commit string `json:"commit"`
Upstream string `json:"upstream,omitempty"`
UpstreamCommit string `json:"upstream_commit,omitempty"`
Ahead int `json:"ahead,omitempty"`
Behind int `json:"behind,omitempty"`
StatusShort string `json:"status_short"`
Dirty bool `json:"dirty"`
}
type ReleaseInspectRequest ¶
type ReleaseInspectRequest struct {
ToolName string `json:"tool_name"`
}
type ReleaseInspectResult ¶
type ReleaseInspectResult struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
Node string `json:"node"`
Info *ReleaseInspectInfo `json:"info,omitempty"`
}
type SchemaInfo ¶
type SchemaInfo struct {
MethodName string `json:"method"`
Params map[string]interface{} `json:"params,omitempty"` // JSON Schema for params
}
SchemaInfo holds method signature information for display This is extracted from the __jb_schema__ response
type SchemaRequest ¶
type SchemaRequest struct {
ToolName string `json:"tool_name"`
}
type SchemaResult ¶
type SchemaResult struct {
OK bool `json:"ok"`
Schema map[string]interface{} `json:"schema,omitempty"`
Error string `json:"error,omitempty"`
}
SchemaResult is the response containing the __jb_schema__ or error The schema is a JSON object with method signatures
type StreamFrame ¶
type StreamFrame struct {
Chunk interface{} `json:"chunk,omitempty"`
Result interface{} `json:"result,omitempty"`
Done bool `json:"done"`
Error string `json:"error,omitempty"`
Node string `json:"node,omitempty"`
}
StreamFrame is one frame received from a streaming tool call. Partial frames carry Chunk; the terminal frame carries Result (or Error) and has Done=true. The channel returned by Mesh.Stream is closed after the terminal frame is delivered.
the streaming RPC support.
type ToolEndpoint ¶
type ToolEndpoint struct {
Tool string `json:"tool"`
Method string `json:"method"`
Node string `json:"node"`
Subject string `json:"subject"`
}
ToolEndpoint describes a tool method registered in the mesh
type ToolHandler ¶
type ToolHandler func(req CallRequest, method string, params map[string]interface{}) (interface{}, error)
ToolHandler is a function that handles a tool method call
type UninstallRequest ¶
type UninstallRequest struct {
ToolName string `json:"tool_name"`
RemoveEnv bool `json:"remove_env"` // Also remove the jumpboot venv
}
UninstallRequest is sent over NATS to request a remote tool uninstall
type UninstallResult ¶
type UninstallResult struct {
OK bool `json:"ok"`
Error string `json:"error,omitempty"`
Node string `json:"node"`
}
UninstallResult is the response from a remote uninstall request
type UpdateRequest ¶
type UpdateRequest struct {
ToolName string `json:"tool_name"`
Clean bool `json:"clean"` // Force clean rebuild of environment
}
UpdateRequest is sent over NATS to request a remote tool update
type UpdateResult ¶
type UpdateResult struct {
OK bool `json:"ok"`
ToolName string `json:"tool_name,omitempty"`
OldVersion string `json:"old_version,omitempty"`
NewVersion string `json:"new_version,omitempty"`
Error string `json:"error,omitempty"`
Node string `json:"node"`
}
UpdateResult is the response from a remote update request