mesh

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

File store NATS request handlers.

Exposes pkg/filestore.Store operations over NATS request/reply so that TypeScript plugins (and any other NATS client) can access the file store without a direct JetStream Object Store connection.

Subjects:

files.put    — store a file (base64 data)
files.get    — retrieve a file (returns base64 data)
files.head   — file metadata only
files.delete — remove a file
files.list   — list files (optional prefix filter)

Package mesh provides NATS-based service mesh for jb-mesh nodes.

Each node in the mesh registers its tools as NATS micro services. Tool calls are routed via NATS subjects, with automatic load balancing when multiple nodes offer the same tool.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CancelSubject

func CancelSubject(callID string) string

CancelSubject returns the NATS subject a caller publishes to in order to signal cancellation of a specific in-flight call. The serving node subscribes to the same subject for the call's lifetime.

func Connect

func Connect(cfg Config, extraOpts ...nats.Option) (*nats.Conn, error)

Connect opens a NATS connection using generic mesh config plus any extra nats.go options needed by the caller.

func NodeStreamSubject

func NodeStreamSubject(nodeName, toolName, method string) string

NodeStreamSubject returns the node-targeted streaming subject used for directing streaming calls to a specific node rather than load-balancing.

func ParseKeyValuePairs

func ParseKeyValuePairs(pairs []string) (map[string][]string, error)

ParseKeyValuePairs parses repeated key=value CLI inputs into a multi-value map.

func ResolveFileParams

func ResolveFileParams(params map[string]interface{}, store *filestore.Store, nc ...*nats.Conn) (func(), error)

ResolveFileParams checks params for file store keys and downloads them to temp files. A param is resolved if: 1. Its name is in the known file param set 2. Its value is a string that looks like a file store key (contains "/" but doesn't start with "/" or "~") 3. It doesn't look like an absolute path or URL

If the local store doesn't have the file, falls back to a NATS files.get request (which reaches the seed node's store). This handles the case where leaf nodes don't have JetStream replication from the seed.

Returns a cleanup function that removes temp files.

func StreamSubject

func StreamSubject(toolName, method string) string

StreamSubject returns the NATS subject for a streaming tool method. Streaming subjects are separate from the single-reply ones so old callers using Mesh.Call continue to work even after a method is upgraded to streaming.

Types

type CallRequest

type CallRequest struct {
	Params map[string]interface{} `json:"params"`
	Node   string                 `json:"node,omitempty"` // target specific node (empty = any)
	Corr   string                 `json:"corr,omitempty"`

	// CallID, if set, identifies this call for cancellation routing. When the
	// caller's context fires, it publishes an empty message to
	// `cancel.<CallID>` and the serving node subscribes to that subject for the
	// duration of the call. Empty for non-cancellable calls.
	CallID string `json:"call_id,omitempty"`
}

CallRequest is the JSON payload for a tool method call

type CallResult

type CallResult struct {
	OK     bool        `json:"ok"`
	Result interface{} `json:"result,omitempty"`
	Error  string      `json:"error,omitempty"`
	Node   string      `json:"node"`
}

CallResult is the JSON response from a tool method call

type Config

type Config struct {
	NATSUrl   string              // NATS server URL (default: nats://localhost:4222)
	NodeName  string              // Human-readable node name
	Token     string              // Optional auth token
	WebSocket NATSWebSocketConfig // Optional WebSocket client settings for ws:// or wss:// URLs
}

Config holds mesh connection settings

type ConfigRequest

type ConfigRequest struct {
	ToolName string                 `json:"tool_name"`
	Action   string                 `json:"action"` // "get" or "set"
	Values   map[string]interface{} `json:"values,omitempty"`
}

ConfigRequest is sent over NATS to get or set tool config on a node

type ConfigResult

type ConfigResult struct {
	OK     bool                   `json:"ok"`
	Config map[string]interface{} `json:"config,omitempty"`
	Error  string                 `json:"error,omitempty"`
	Node   string                 `json:"node"`
}

ConfigResult is the response from a config request

type FileDeleteRequest

type FileDeleteRequest struct {
	Key string `json:"key"`
}

FileDeleteRequest is sent to files.delete.

type FileDeleteResult

type FileDeleteResult struct {
	OK    bool   `json:"ok"`
	Error string `json:"error,omitempty"`
}

FileDeleteResult is the response from files.delete.

type FileGetRequest

type FileGetRequest struct {
	Key string `json:"key"`
}

FileGetRequest is sent to files.get.

type FileGetResult

type FileGetResult struct {
	OK          bool   `json:"ok"`
	Key         string `json:"key,omitempty"`
	Data        string `json:"data,omitempty"` // base64-encoded file content
	ContentType string `json:"content_type,omitempty"`
	Size        int64  `json:"size,omitempty"`
	ETag        string `json:"etag,omitempty"`
	Error       string `json:"error,omitempty"`
}

FileGetResult is the response from files.get.

type FileHeadRequest

type FileHeadRequest struct {
	Key string `json:"key"`
}

FileHeadRequest is sent to files.head.

type FileHeadResult

type FileHeadResult struct {
	OK          bool   `json:"ok"`
	Key         string `json:"key,omitempty"`
	ContentType string `json:"content_type,omitempty"`
	Size        int64  `json:"size,omitempty"`
	ETag        string `json:"etag,omitempty"`
	Created     string `json:"created,omitempty"` // RFC3339
	Error       string `json:"error,omitempty"`
}

FileHeadResult is the response from files.head.

type FileListItem

type FileListItem struct {
	Key         string `json:"key"`
	Size        int64  `json:"size"`
	ContentType string `json:"content_type"`
	ETag        string `json:"etag"`
	Created     string `json:"created"` // RFC3339
}

FileListItem describes a single file in a list response.

type FileListRequest

type FileListRequest struct {
	Prefix string `json:"prefix"`
}

FileListRequest is sent to files.list.

type FileListResult

type FileListResult struct {
	OK    bool           `json:"ok"`
	Files []FileListItem `json:"files,omitempty"`
	Error string         `json:"error,omitempty"`
}

FileListResult is the response from files.list.

type FilePutRequest

type FilePutRequest struct {
	Key         string `json:"key"`
	Data        string `json:"data"` // base64-encoded file content
	ContentType string `json:"content_type"`
}

FilePutRequest is sent to files.put.

type FilePutResult

type FilePutResult struct {
	OK          bool   `json:"ok"`
	Key         string `json:"key,omitempty"`
	Size        int64  `json:"size,omitempty"`
	ContentType string `json:"content_type,omitempty"`
	ETag        string `json:"etag,omitempty"`
	Error       string `json:"error,omitempty"`
}

FilePutResult is the response from files.put.

type InstallRequest

type InstallRequest struct {
	Source string `json:"source"` // Git URL or path
}

InstallRequest is sent over NATS to request a remote tool install

type InstallResult

type InstallResult struct {
	OK       bool   `json:"ok"`
	ToolName string `json:"tool_name,omitempty"`
	Version  string `json:"version,omitempty"`
	Error    string `json:"error,omitempty"`
	Node     string `json:"node"`
}

InstallResult is the response from a remote install request

type Mesh

type Mesh struct {
	// contains filtered or unexported fields
}

Mesh manages the NATS connection and service registrations for a node

func New

func New(cfg Config) (*Mesh, error)

New creates a new mesh instance and connects to NATS

func (*Mesh) Call

func (m *Mesh) Call(toolName, method string, params map[string]interface{}, timeout time.Duration, targetNode ...string) (*CallResult, error)

Call invokes a tool method anywhere in the mesh via NATS request/reply. If targetNode is non-empty, the call is routed directly to that node instead of being load-balanced across all nodes offering the tool.

func (*Mesh) CallWithContext

func (m *Mesh) CallWithContext(ctx context.Context, toolName, method string, params map[string]interface{}, targetNode ...string) (*CallResult, error)

CallWithContext is like Call but propagates ctx cancellation to the serving node via a NATS publish to `cancel.<call_id>`. The serving node subscribes to that subject for the call's duration and forwards the signal to its jumpboot tool process (where a cooperatively-cancellable method can stop early).

A call_id is generated automatically and threaded into the request payload. If ctx fires before the response arrives, CallWithContext returns ctx.Err() after best-effort publishing the cancel signal — it does not wait for the server to acknowledge cancellation. Callers needing a hard deadline should derive ctx with context.WithTimeout.

Part of Phase 1 of the streaming+cancellation design — see the streaming and cancellation design.

func (*Mesh) Close

func (m *Mesh) Close()

Close disconnects from NATS and stops all services

func (*Mesh) Conn

func (m *Mesh) Conn() *nats.Conn

Conn returns the underlying NATS connection. Use sparingly — prefer higher-level Mesh methods.

func (*Mesh) Connected

func (m *Mesh) Connected() bool

Connected returns true if connected to NATS

func (*Mesh) GetToolSchema

func (m *Mesh) GetToolSchema(nodeName, toolName string, timeout time.Duration) (*SchemaResult, error)

GetToolSchema requests the schema for a specific tool from a specific node Returns the schema as a map (keyed by method name, value is params schema) or an error if the node doesn't have the tool or schema fetch fails

func (*Mesh) JetStream

func (m *Mesh) JetStream() (jetstream.JetStream, error)

JetStream returns a JetStream context for this connection. Returns an error if the NATS server does not have JetStream enabled.

func (*Mesh) ListServices

func (m *Mesh) ListServices() ([]micro.Info, error)

ListServices discovers all services in the mesh using NATS micro discovery

func (*Mesh) NodeName

func (m *Mesh) NodeName() string

NodeName returns this node's name

func (*Mesh) RegisterTool

func (m *Mesh) RegisterTool(toolName, version, description string, methods []string, handler ToolHandler, methodSchemas ...map[string]MethodSchema) error

RegisterTool registers a tool's methods as NATS micro service endpoints. Subject pattern: tools.<toolName>.<methodName> methodSchemas is optional: map of method name -> input schema (will be included in endpoint metadata).

func (*Mesh) RequestConfig

func (m *Mesh) RequestConfig(nodeName, toolName, action string, values map[string]interface{}, timeout time.Duration) (*ConfigResult, error)

RequestConfig sends a config get/set request to a specific node

func (*Mesh) RequestInstall

func (m *Mesh) RequestInstall(nodeName, source string, timeout time.Duration) (*InstallResult, error)

RequestInstall sends an install request to a specific node and waits for the result

func (*Mesh) RequestReleaseInspect

func (m *Mesh) RequestReleaseInspect(nodeName, toolName string, timeout time.Duration) (*ReleaseInspectResult, error)

RequestReleaseInspect asks a node to describe the deployed checkout state for a tool.

func (*Mesh) RequestUninstall

func (m *Mesh) RequestUninstall(nodeName, toolName string, removeEnv bool, timeout time.Duration) (*UninstallResult, error)

RequestUninstall sends an uninstall request to a specific node

func (*Mesh) RequestUpdate

func (m *Mesh) RequestUpdate(nodeName, toolName string, clean bool, timeout time.Duration) (*UpdateResult, error)

RequestUpdate sends an update request to a specific node

func (*Mesh) SetFileStore

func (m *Mesh) SetFileStore(store *filestore.Store)

SetFileStore sets the file store used for resolving file params in tool calls.

func (*Mesh) Stream

func (m *Mesh) Stream(ctx context.Context, toolName, method string, params map[string]interface{}, targetNode ...string) (<-chan StreamFrame, error)

Stream invokes a streaming tool method and returns a channel that yields StreamFrame values as they arrive from the serving node. The channel is closed after the terminal frame (Done=true) is delivered, or after ctx fires / an error occurs.

The call is routed to the dedicated `tools.<tool>.<method>.stream` subject (or its node-targeted variant when targetNode is set), which the serving node only registers for methods declared with `stream: true` in their manifest. Calls to non-streaming methods on the stream subject get no responder.

On ctx.Done, Stream publishes to `cancel.<call_id>` so the serving node can propagate the signal to its tool process (the same cancel mechanism as CallWithContext). The channel is then closed without waiting for further frames.

the streaming RPC support.

func (*Mesh) SubscribeConfig

func (m *Mesh) SubscribeConfig(handler func(toolName, action string, values map[string]interface{}) (map[string]interface{}, error)) error

SubscribeConfig listens for config requests on node.<nodeName>.config

func (*Mesh) SubscribeFileHandlers

func (m *Mesh) SubscribeFileHandlers(store *filestore.Store) error

SubscribeFileHandlers registers all file store request handlers on the NATS connection. The handlers are not node-scoped — any node with a file store can serve requests. Since all nodes share the same NATS Object Store bucket, it doesn't matter which node handles the request.

func (*Mesh) SubscribeInstall

func (m *Mesh) SubscribeInstall(handler func(source string) (string, string, error)) error

SubscribeInstall listens for install requests on node.<nodeName>.install The handler is called with the source URL and should return (toolName, version, error)

func (*Mesh) SubscribeReleaseInspect

func (m *Mesh) SubscribeReleaseInspect(handler func(toolName string) (*ReleaseInspectInfo, error)) error

SubscribeReleaseInspect listens for release inspection requests on node.<nodeName>.release.inspect

func (*Mesh) SubscribeToolSchema

func (m *Mesh) SubscribeToolSchema(handler func(toolName string) (map[string]interface{}, error)) error

SubscribeToolSchema registers a handler for schema requests on node.<nodeName>.tools.*.schema The handler receives the tool name and should return the schema or an error

func (*Mesh) SubscribeUninstall

func (m *Mesh) SubscribeUninstall(handler func(toolName string, removeEnv bool) error) error

SubscribeUninstall listens for uninstall requests on node.<nodeName>.uninstall

func (*Mesh) SubscribeUpdate

func (m *Mesh) SubscribeUpdate(handler func(toolName string, clean bool) (oldVer, newVer string, err error)) error

SubscribeUpdate listens for update requests on node.<nodeName>.update

func (*Mesh) UnregisterTool

func (m *Mesh) UnregisterTool(toolName string) bool

UnregisterTool removes a tool's service registration from the mesh. Returns true if the tool was found and removed, false if not found.

type MethodSchema

type MethodSchema struct {
	Properties  map[string]interface{} `json:"properties,omitempty"`
	Required    []string               `json:"required,omitempty"`
	Description string                 `json:"description,omitempty"`
	Type        string                 `json:"type,omitempty"`

	// Stream is true when the method serves on the `.stream` subject. Callers
	// can use this flag to choose Mesh.Stream vs Mesh.CallWithContext without
	// an out-of-band registry.
	// Omitted from JSON when false to keep wire compatibility with older
	// non-streaming-aware consumers.
	Stream bool `json:"stream,omitempty"`
}

MethodSchema holds JSON Schema for a single method's input parameters, plus per-method metadata not strictly part of JSON Schema.

type NATSWebSocketConfig

type NATSWebSocketConfig struct {
	ProxyPath   string
	BearerToken string
	Headers     http.Header
	Query       url.Values
}

NATSWebSocketConfig holds optional client settings for NATS-over-WebSocket connections that are mounted behind an HTTP proxy path.

type ReleaseInspectInfo

type ReleaseInspectInfo struct {
	ToolName        string                   `json:"tool_name"`
	ToolPath        string                   `json:"tool_path"`
	ManifestVersion string                   `json:"manifest_version"`
	Repo            *ReleaseInspectRepoState `json:"repo,omitempty"`
}

type ReleaseInspectRepoState

type ReleaseInspectRepoState struct {
	Root           string `json:"root"`
	ResolvedPath   string `json:"resolved_path"`
	Branch         string `json:"branch"`
	Commit         string `json:"commit"`
	Upstream       string `json:"upstream,omitempty"`
	UpstreamCommit string `json:"upstream_commit,omitempty"`
	Ahead          int    `json:"ahead,omitempty"`
	Behind         int    `json:"behind,omitempty"`
	StatusShort    string `json:"status_short"`
	Dirty          bool   `json:"dirty"`
}

type ReleaseInspectRequest

type ReleaseInspectRequest struct {
	ToolName string `json:"tool_name"`
}

type ReleaseInspectResult

type ReleaseInspectResult struct {
	OK    bool                `json:"ok"`
	Error string              `json:"error,omitempty"`
	Node  string              `json:"node"`
	Info  *ReleaseInspectInfo `json:"info,omitempty"`
}

type SchemaInfo

type SchemaInfo struct {
	MethodName string                 `json:"method"`
	Params     map[string]interface{} `json:"params,omitempty"` // JSON Schema for params
}

SchemaInfo holds method signature information for display This is extracted from the __jb_schema__ response

type SchemaRequest

type SchemaRequest struct {
	ToolName string `json:"tool_name"`
}

type SchemaResult

type SchemaResult struct {
	OK     bool                   `json:"ok"`
	Schema map[string]interface{} `json:"schema,omitempty"`
	Error  string                 `json:"error,omitempty"`
}

SchemaResult is the response containing the __jb_schema__ or error The schema is a JSON object with method signatures

type StreamFrame

type StreamFrame struct {
	Chunk  interface{} `json:"chunk,omitempty"`
	Result interface{} `json:"result,omitempty"`
	Done   bool        `json:"done"`
	Error  string      `json:"error,omitempty"`
	Node   string      `json:"node,omitempty"`
}

StreamFrame is one frame received from a streaming tool call. Partial frames carry Chunk; the terminal frame carries Result (or Error) and has Done=true. The channel returned by Mesh.Stream is closed after the terminal frame is delivered.

the streaming RPC support.

type ToolEndpoint

type ToolEndpoint struct {
	Tool    string `json:"tool"`
	Method  string `json:"method"`
	Node    string `json:"node"`
	Subject string `json:"subject"`
}

ToolEndpoint describes a tool method registered in the mesh

type ToolHandler

type ToolHandler func(req CallRequest, method string, params map[string]interface{}) (interface{}, error)

ToolHandler is a function that handles a tool method call

type UninstallRequest

type UninstallRequest struct {
	ToolName  string `json:"tool_name"`
	RemoveEnv bool   `json:"remove_env"` // Also remove the jumpboot venv
}

UninstallRequest is sent over NATS to request a remote tool uninstall

type UninstallResult

type UninstallResult struct {
	OK    bool   `json:"ok"`
	Error string `json:"error,omitempty"`
	Node  string `json:"node"`
}

UninstallResult is the response from a remote uninstall request

type UpdateRequest

type UpdateRequest struct {
	ToolName string `json:"tool_name"`
	Clean    bool   `json:"clean"` // Force clean rebuild of environment
}

UpdateRequest is sent over NATS to request a remote tool update

type UpdateResult

type UpdateResult struct {
	OK         bool   `json:"ok"`
	ToolName   string `json:"tool_name,omitempty"`
	OldVersion string `json:"old_version,omitempty"`
	NewVersion string `json:"new_version,omitempty"`
	Error      string `json:"error,omitempty"`
	Node       string `json:"node"`
}

UpdateResult is the response from a remote update request

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL