ipc

package
v0.302.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPrimaryExists is returned when a primary instance already holds the lock.
	ErrPrimaryExists = errors.New("ipc: primary instance already running")

	// ErrNotPrimary is returned when an operation requires being the primary instance.
	ErrNotPrimary = errors.New("ipc: this instance is not the primary")

	// ErrMethodNotFound is returned when a requested RPC method has no handler.
	ErrMethodNotFound = errors.New("ipc: method not found")

	// ErrTimeout is returned when an RPC call times out.
	ErrTimeout = errors.New("ipc: call timeout")

	// ErrConnectionFailed is returned when a connection attempt fails.
	ErrConnectionFailed = errors.New("ipc: connection failed")
)
View Source
var DefaultOptions = Options{
	DialTimeout: 5 * time.Second,
	CallTimeout: 10 * time.Second,
}

DefaultOptions holds the default IPC options.

Functions

func PortsForPath

func PortsForPath(absPath string) (pub, rpc int)

PortsForPath returns the deterministic PUB and RPC ports for a given absolute path. The ports are derived from a FNV-32a hash of the path:

base_port = 40000 + (fnv32a(abs_path) % 20000)
PUB port  = base_port
RPC port  = base_port + 1

func ReleaseLock

func ReleaseLock(lockFile *os.File)

ReleaseLock releases the flock and removes the lock file.

Types

type Bus

type Bus struct {
	// PubAddr is the address the PUB socket is bound to (e.g. "tcp://127.0.0.1:40000").
	PubAddr string
	// RPCAddr is the address the ROUTER socket is bound to (e.g. "tcp://127.0.0.1:40001").
	RPCAddr string
	// contains filtered or unexported fields
}

Bus is the server-side ZMQ transport backbone. Only the primary instance creates a Bus. It binds a PUB socket for event broadcasting and a ROUTER socket for JSON-RPC requests.

func NewBus

func NewBus(instanceID string) *Bus

NewBus creates a Bus for the given instanceID.

func (*Bus) Publish

func (b *Bus) Publish(topic string, payload any) error

Publish sends an Envelope on the PUB socket. The ZMQ message frame layout is: [topic_bytes + 0x00 + json_envelope_bytes] so subscribers can filter by topic prefix.

func (*Bus) RegisterMethod

func (b *Bus) RegisterMethod(method string, handler HandlerFunc)

RegisterMethod registers a JSON-RPC handler for the given method name.

func (*Bus) Shutdown

func (b *Bus) Shutdown() error

Shutdown closes both sockets gracefully.

func (*Bus) Start

func (b *Bus) Start(ctx context.Context, pubPort, rpcPort int) error

Start binds the PUB and ROUTER sockets on the given ports and starts a background goroutine to handle incoming ROUTER messages.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects to a primary's Bus using SUB and DEALER sockets. A single persistent DEALER socket is reused per endpoint to avoid the overhead of creating/destroying zmq4 goroutines on every RPC call.

func NewClient

func NewClient(ctx context.Context) (*Client, error)

NewClient creates a new Client for subscribing to events and making RPC calls.

func (*Client) Call

func (c *Client) Call(ctx context.Context, routerEndpoint, method string, params any) (json.RawMessage, error)

Call sends a JSON-RPC request to a ROUTER endpoint and waits for the response. A persistent DEALER socket is reused per endpoint to avoid the overhead of creating/closing zmq4 goroutines on every call.

func (*Client) Close

func (c *Client) Close() error

Close closes any resources held by the client, including cached dealer sockets.

func (*Client) SubscribeTo

func (c *Client) SubscribeTo(pubEndpoint string, topics ...string) (<-chan Envelope, error)

SubscribeTo connects to a PUB endpoint and subscribes to the given topics. Returns a channel that receives Envelope messages until the context is cancelled or the socket is closed.

type Envelope

type Envelope struct {
	// InstanceID identifies the sending Pando instance.
	InstanceID string `json:"instanceId"`
	// ProjectID identifies the project associated with this event.
	ProjectID string `json:"projectId"`
	// SessionID identifies the session, if applicable.
	SessionID string `json:"sessionId,omitempty"`
	// Topic is the event topic used for subscriber filtering.
	Topic string `json:"topic"`
	// Timestamp records when this envelope was created.
	Timestamp time.Time `json:"timestamp"`
	// Payload is the raw JSON body of the event.
	Payload json.RawMessage `json:"payload"`
}

Envelope is the standard wrapper for all ZMQ messages published over the PUB socket. Every message carries identity, routing, and timing metadata alongside the payload.

type HandlerFunc

type HandlerFunc func(ctx context.Context, method string, params json.RawMessage) (json.RawMessage, error)

HandlerFunc is a JSON-RPC method handler.

type LockInfo

type LockInfo struct {
	InstanceID string    `json:"instance_id"`
	PID        int       `json:"pid"`
	PubPort    int       `json:"pub_port"`
	RPCPort    int       `json:"rpc_port"`
	StartedAt  time.Time `json:"started_at"`
}

LockInfo is stored inside the lock file so other instances can find this primary's ports.

func AcquireLock

func AcquireLock(workdir, instanceID string, pubPort, rpcPort int) (isPrimary bool, info *LockInfo, lockFile *os.File, err error)

AcquireLock tries to acquire an exclusive flock on <workdir>/.pando/ipc.lock.

Returns isPrimary=true if the lock was acquired, false if another instance already holds it. If not primary, info contains the connection details of the running primary. The caller must call ReleaseLock when done if isPrimary is true.

type Options

type Options struct {
	// DialTimeout is the max time to wait when connecting. Default: 5s.
	DialTimeout time.Duration
	// CallTimeout is the max time to wait for an RPC response. Default: 10s.
	CallTimeout time.Duration
}

Options configures the IPC package behaviour.

Directories

Path Synopsis
Package bridge connects in-process pubsub events to the ZMQ Bus, enabling real-time event broadcasting to all connected Pando instances and observers.
Package bridge connects in-process pubsub events to the ZMQ Bus, enabling real-time event broadcasting to all connected Pando instances and observers.
Package dbproxy provides a db.Querier implementation that transparently routes write operations to the primary Pando instance via ZMQ JSON-RPC, while serving reads from the local (possibly read-only) SQLite database.
Package dbproxy provides a db.Querier implementation that transparently routes write operations to the primary Pando instance via ZMQ JSON-RPC, while serving reads from the local (possibly read-only) SQLite database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL