actions

package
v0.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: GPL-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Package actions implements the Graywolf Actions subsystem: an APRS message-driven trigger surface that dispatches operator-defined commands and webhooks. Inbound messages prefixed `@@` and addressed to the trigger surface are diverted from the messages router by the classifier, parsed, OTP-verified (when required), and dispatched through a per-Action FIFO runner to one of the registered Executors. Every attempt is recorded in the action_invocations audit log and a reply is sent back to the originator over the matching transport.

Index

Constants

View Source
const (
	DefaultMaxInvocationRows = 1000
	DefaultMaxInvocationAge  = 30 * 24 * time.Hour
	DefaultPruneInterval     = 24 * time.Hour
)
View Source
const ActionPrefix = "@@"

ActionPrefix is the on-air sentinel that diverts an inbound message from the inbox to the Actions runner.

View Source
const DefaultMaxReplyLines = 1

DefaultMaxReplyLines is the value used when an Action row pre-dates the column or the operator left it at zero.

View Source
const DefaultOTPSweepInterval = 5 * time.Minute

DefaultOTPSweepInterval is how often the Service-owned ticker invokes Sweep. The replay-ring TTL is ~150s; 5 minutes is a small constant multiple, keeping the residual entry count bounded without burning a goroutine on a tight loop.

View Source
const FreeformArgKey = "arg"

FreeformArgKey is the synthetic key used for the single freeform value. Stable so executors, audit log, and webhook templates can refer to it as `arg` (env var GW_ARG, token {{arg}}). Not operator- settable.

View Source
const (

	// FreeformValueCeiling is the absolute server-side cap on a single
	// freeform payload, regardless of operator MaxLen. Matches the APRS
	// message body limit comfortably and prevents an over-permissive
	// schema from accepting payloads larger than the message subsystem
	// can stage.
	FreeformValueCeiling = 200
)
View Source
const MaxActionNameLen = 32

MaxActionNameLen mirrors the schema column width for actions.name.

View Source
const MaxReplyLen = 67

MaxReplyLen is the per-message APRS text cap (one frame).

View Source
const MaxReplyLinesCeiling = 5

MaxReplyLinesCeiling is the hard upper bound enforced by the validator on Action.MaxReplyLines. Operators choose 1..ceiling; the runtime clamps anything outside that range. The cap exists because each extra line is one extra RF frame plus its own ack/retry budget; >5 turns a single trigger into an airtime storm.

View Source
const TestFireSenderCall = "(test-web)"

TestFireSenderCall is the synthetic SenderCall stamped on invocations originated from the REST test-fire endpoint. It bypasses any real callsign while still producing an audit row so operators can see the dry-run history.

Variables

View Source
var ErrOTPReplay = errors.New("actions: code already used")

ErrOTPReplay is returned by OTPVerifier.Verify when a code matches but has already been observed within the replay TTL. Callers should map this to StatusBadOTP and an audit detail of "replay".

View Source
var ErrParse = errors.New("actions: parse error")

Functions

func BadArgKey

func BadArgKey(err error) string

BadArgKey returns the offending key when err is a *BadArgError, otherwise the empty string. Lets callers compose a "bad arg: KEY" reply without importing the error type directly.

func FormatReplies

func FormatReplies(r Result, maxLines int) ([]string, bool)

FormatReplies produces up to maxLines on-air reply strings from a Result. Line 1 carries the status-word prefix ("ok: ..."); lines 2..N are plain output lines (no prefix). Each line is sanitized of control characters and capped at MaxReplyLen runes. Non-OK statuses always collapse to a single line — there is no point fanning a bad-arg or timeout failure across multiple frames.

maxLines <= 0 is treated as 1. Blank lines in the captured output are dropped. Returns the produced lines and a bool that is true when any data was dropped: line count exceeded maxLines, an individual line was clamped to MaxReplyLen, or the assembled status+detail was truncated.

func FormatReply

func FormatReply(r Result) (string, bool)

FormatReply preserves the legacy single-line API. New code should call FormatReplies; this wrapper is kept so call sites that have not been updated keep working (the two production callers — runner + service test-fire audit — are switched to FormatReplies in this change; this wrapper exists for anything else, e.g. webapi test-fire which still reports a single primary line in addition to the full slice).

func IsBadArgErr

func IsBadArgErr(err error) bool

func StartAuditPruner

func StartAuditPruner(ctx context.Context, p InvocationPruner, cfg AuditPrunerConfig) func()

StartAuditPruner runs prune at Interval (default 24h) until the returned stop func is called or ctx is cancelled.

func StartOTPSweeper

func StartOTPSweeper(ctx context.Context, v *OTPVerifier, interval time.Duration) func()

StartOTPSweeper runs Sweep at interval (or DefaultOTPSweepInterval when zero) until the returned stop func is called or ctx is cancelled. Idempotent stop.

func ValidActionName

func ValidActionName(s string) bool

ValidActionName reports whether s is a legal action name. Mirrors the inbound parser's grammar so outbound macro creation rejects names the receiver would refuse. Charset: ASCII letters, digits, dot, dash, underscore. The character-class check accepts both cases; lookup is case-insensitive (configstore.Store.GetActionByName uppercases the query) and storage is uppercase (Action.BeforeSave).

Types

type ActionLookup

type ActionLookup interface {
	GetActionByName(ctx context.Context, name string) (*configstore.Action, error)
}

ActionLookup loads an Action by name. Wraps the configstore for testability.

type AddresseeSet

type AddresseeSet struct {
	// contains filtered or unexported fields
}

AddresseeSet is a lock-free read-side cache of operator-defined listener addressees. The classifier hits Contains on every inbound message packet; the REST handler swaps in a new snapshot via Replace. Mirrors messages.TacticalSet semantics so the hot path is allocation-free.

func NewAddresseeSet

func NewAddresseeSet() *AddresseeSet

NewAddresseeSet constructs an empty set. Seed via Replace.

func (*AddresseeSet) Contains

func (s *AddresseeSet) Contains(name string) bool

Contains reports whether name (case-insensitively) is a member of the current snapshot.

func (*AddresseeSet) Replace

func (s *AddresseeSet) Replace(items []string)

Replace atomically swaps in a new snapshot. items are normalized to uppercase/trimmed; empty strings are dropped. A nil items slice is treated as "empty" — readers never observe nil.

type ArgMode

type ArgMode string

ArgMode controls how argv after the action verb is interpreted.

kv:       "@@<otp>#name k1=v1 k2=v2"  (current behavior, default)
freeform: "@@<otp>#name <one untokenized payload>"
const (
	ArgModeKV       ArgMode = "kv"
	ArgModeFreeform ArgMode = "freeform"
)

type ArgSpec

type ArgSpec struct {
	Key      string `json:"key"`
	Regex    string `json:"regex,omitempty"`
	MaxLen   int    `json:"max_len,omitempty"`
	Required bool   `json:"required,omitempty"`
}

ArgSpec is one entry in an Action's arg_schema, decoded from JSON.

func DecodeArgSchemaJSON

func DecodeArgSchemaJSON(s string) ([]ArgSpec, error)

DecodeArgSchemaJSON parses the JSON wire form of an action's arg_schema column into the runtime ArgSpec slice. Empty / "[]" strings return (nil, nil).

type AuditPrunerConfig

type AuditPrunerConfig struct {
	MaxRows  int
	MaxAge   time.Duration
	Interval time.Duration
}

type AuditSink

type AuditSink interface {
	Insert(ctx context.Context, row *configstore.ActionInvocation) error
}

AuditSink writes invocation rows. Wraps the configstore repo for testability.

type BadArgError

type BadArgError struct {
	Key    string
	Reason string
}

BadArgError carries the first offending key for the reply formatter.

func (*BadArgError) Error

func (e *BadArgError) Error() string

type Classifier

type Classifier struct {
	// contains filtered or unexported fields
}

Classifier inspects inbound APRS messages and decides whether they are Actions traffic. Lives in the rxfanout hot path; never blocks on I/O directly — store + verifier calls are bounded synchronous reads, and the runner submission is non-blocking.

func NewClassifier

func NewClassifier(cfg ClassifierConfig) *Classifier

func (*Classifier) Classify

func (c *Classifier) Classify(ctx context.Context, pkt *aprs.DecodedAPRSPacket) bool

Classify inspects pkt. Returns true when the packet was consumed by the Actions subsystem and the messages router must skip it.

Consumption rule: the packet is an APRS message addressed to the trigger surface (station call / tactical alias / listener addressee) AND its body begins with ActionPrefix. Parse failures on a consumed packet still return true — they emit an "unknown" audit row + reply rather than leaking partial Actions noise into the inbox.

Third-party APRS101 ch 20 envelopes are unwrapped before classification so an action gated from IS→RF (or vice-versa) is claimed by Classify, not by the messages router.

type ClassifierConfig

type ClassifierConfig struct {
	OurCall     func() string
	TacticalSet *messages.TacticalSet
	Listeners   *AddresseeSet
	ActionStore ActionLookup
	CredStore   CredentialLookup
	OTPVerifier *OTPVerifier
	Runner      Submitter
	// Preflight is the shared auto-ACK + dedup component owned by
	// messages.Service. When non-nil, every consumed @@ packet sends an
	// auto-ACK before any executor work and a duplicate (same from, msg
	// id, text hash) within the dedup window short-circuits without
	// re-submitting. Nil disables the hookup (test-only).
	Preflight *messages.Preflight
}

ClassifierConfig wires the classifier to the rest of the subsystem.

type CommandExecutor

type CommandExecutor struct{}

CommandExecutor runs a configured Action.CommandPath via os/exec — always argv-style, never `sh -c`. Stdout+stderr are merged and truncated to cmdOutputCap bytes for capture.

func NewCommandExecutor

func NewCommandExecutor() *CommandExecutor

func (CommandExecutor) Execute

func (CommandExecutor) Execute(ctx context.Context, req ExecRequest) Result

type CredentialLookup

type CredentialLookup interface {
	GetOTPCredential(ctx context.Context, id uint) (*configstore.OTPCredential, error)
}

CredentialLookup loads an OTP credential by ID.

type ExecRequest

type ExecRequest struct {
	Action     *configstore.Action
	Invocation Invocation
	Timeout    time.Duration
}

ExecRequest is the contract between the runner and an Executor. The runner builds it from a sanitized invocation + the matched Action.

type Executor

type Executor interface {
	// Execute runs the request and returns a Result. The implementation
	// must honor ctx for cancellation/timeout. It must not panic; any
	// internal failure becomes Result{Status: StatusError, ...}.
	Execute(ctx context.Context, req ExecRequest) Result
}

Executor runs one Action invocation. Implementations are stateless from the runner's perspective; per-call state lives in ExecRequest / Result. Adding a new Action type means writing a new Executor and registering it in the registry — no other package needs to change.

type ExecutorRegistry

type ExecutorRegistry struct {
	// contains filtered or unexported fields
}

ExecutorRegistry maps Action.Type strings to concrete Executors.

func NewExecutorRegistry

func NewExecutorRegistry() *ExecutorRegistry

func (*ExecutorRegistry) Lookup

func (r *ExecutorRegistry) Lookup(typeName string) (Executor, bool)

func (*ExecutorRegistry) Register

func (r *ExecutorRegistry) Register(typeName string, e Executor) error

type Invocation

type Invocation struct {
	ID              uint64
	ActionID        uint
	ActionName      string
	SenderCall      string
	Source          Source
	OTPCredentialID uint // 0 if no credential was consulted
	OTPVerified     bool
	OTPCredName     string
	Args            []KeyValue
	StartedAt       time.Time
}

Invocation is the runtime envelope passed to the Executor. The runner constructs it from a ParsedInvocation plus the matched configstore.Action and runtime context.

type InvocationPruner

type InvocationPruner interface {
	PruneActionInvocations(ctx context.Context, maxRows int, maxAge time.Duration) (int, error)
}

InvocationPruner is the subset of the configstore repo the pruner loop needs. Wrapping it keeps the loop testable without spinning up a database.

type KeyValue

type KeyValue struct {
	Key   string
	Value string
}

func Sanitize

func Sanitize(schema []ArgSpec, raw []KeyValue) ([]KeyValue, error)

Sanitize validates raw key/value pairs against the schema and returns the canonical ordered slice handed to the executor. The returned slice preserves the schema order, not the wire order, so command argv is stable.

func SanitizeFreeform

func SanitizeFreeform(schema []ArgSpec, raw string, ceiling int) ([]KeyValue, error)

SanitizeFreeform validates a single untokenized payload against an Action's freeform arg schema. The schema must contain exactly one ArgSpec; the synthetic FreeformArgKey is imposed on the result so downstream executors can rely on a stable key.

Defense layers, in order:

  1. Schema shape — exactly one ArgSpec.
  2. Required check — empty value rejected when Required.
  3. Length — operator MaxLen, hard-capped by ceiling.
  4. Control-char floor — bytes outside printable ASCII + extended UTF-8 are rejected unconditionally. Specifically: any byte in 0x00..0x1F or 0x7F. Catches tabs, NUL, CR/LF, escape codes and other terminal/log-injection vectors regardless of operator regex permissiveness.
  5. Regex — operator pattern, defaulted to `.*` if empty so a blank pattern doesn't fall through to the kv default that requires the value to look like an identifier.

Order matters: the control-char floor runs BEFORE regex so a permissive `.*` cannot widen the floor.

func SanitizeFromMap

func SanitizeFromMap(schema []ArgSpec, args map[string]string) ([]KeyValue, error)

SanitizeFromMap is the map-keyed counterpart to Sanitize. The Actions REST test-fire endpoint hands us a wire map; the classifier hands us an ordered slice. Both paths need the same downstream schema check, so this helper converts and delegates to Sanitize.

The returned slice is in schema-declared order (not map iteration order) so the executor argv stays stable.

type MessagesReplySender

type MessagesReplySender struct {
	// contains filtered or unexported fields
}

MessagesReplySender adapts messages.Service to the actions.ReplySender contract. It constructs a one-off outbound DM row through the same path operator-composed messages take, so replies inherit msg-id allocation, the operator outbound view, and the retry ladder. The inbound transport is reflected back to the originator: RF inbound uses RF-first-with-IS-fallback; IS inbound uses IS-only.

func NewMessagesReplySender

func NewMessagesReplySender(svc *messages.Service, ourCall func() string) *MessagesReplySender

NewMessagesReplySender constructs an adapter. ourCall must return the primary station callsign (with or without SSID); messages service uses it as the From address on the outbound row.

func (*MessagesReplySender) SendReply

func (a *MessagesReplySender) SendReply(ctx context.Context, _ uint32, source Source, toCall, text string) error

type OTPVerifier

type OTPVerifier struct {
	// contains filtered or unexported fields
}

OTPVerifier validates TOTP codes and rejects replays.

func NewOTPVerifier

func NewOTPVerifier(cfg OTPVerifierConfig) *OTPVerifier

func (*OTPVerifier) Sweep

func (v *OTPVerifier) Sweep()

Sweep purges expired ring entries. Safe to call from a background goroutine. Service owns the periodic ticker via StartOTPSweeper; NewOTPVerifier intentionally does not start one.

func (*OTPVerifier) Verify

func (v *OTPVerifier) Verify(credID uint, secretB32, code string) (bool, error)

Verify returns (true, nil) when code matches secret within the ±1-step window AND has not been used before within the replay TTL. Returns (false, nil) on any plain mismatch; (false, err) on replay.

type OTPVerifierConfig

type OTPVerifierConfig struct {
	Now func() time.Time
}

type ParsedInvocation

type ParsedInvocation struct {
	OTPDigits  string // empty if message had no OTP digits
	Action     string
	Args       []KeyValue
	RawArgTail string
}

ParsedInvocation is the output of parser.Parse. Args preserve key order as parsed off the wire so executors can present a stable argv.

Args contains raw, untrusted key=value tokens straight off the wire. Callers MUST run them through the runner's sanitizer (Phase C) before passing them to an Executor.

RawArgTail is the raw bytes after the action name and the first space, with no trimming or tokenization. Freeform consumers read it directly; kv consumers ignore it. May be populated even when Args is nil (kv tokenization failed but the action name parsed cleanly), so the classifier can still dispatch a freeform Action.

func Parse

func Parse(body string) (*ParsedInvocation, error)

Parse converts an APRS message body into a ParsedInvocation. Grammar: @@<otp>#<action> [k=v k=v ...] where <otp> is empty or exactly six ASCII digits.

type ReplySender

type ReplySender interface {
	SendReply(ctx context.Context, channel uint32, source Source, toCall, text string) error
}

ReplySender dispatches one reply back to the originator over the matching transport. The RF/IS routing is the implementation's concern; the runner just hands over the addressee + text. One call per line — the runner loops when MaxReplyLines > 1.

type Result

type Result struct {
	Status        Status
	StatusDetail  string
	OutputCapture string
	ExitCode      *int
	HTTPStatus    *int
}

Result is the executor outcome consumed by the runner.

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner owns one queue + worker per Action. Queues are created lazily on first Submit.

func NewRunner

func NewRunner(cfg RunnerConfig) *Runner

func (*Runner) Reply

func (r *Runner) Reply(ctx context.Context, inv Invocation, channel uint32, res Result)

Reply dispatches a synthetic reply + audit row without queueing real work. Used by the classifier short-circuit paths (denied, bad_otp, bad_arg, no_credential) so those outcomes still flow through the normal reply + audit pipeline without exercising the per-Action queue or executor.

func (*Runner) Stop

func (r *Runner) Stop()

Stop drains state and closes every per-Action queue. Once Stop returns, no further Submit will enqueue or spawn goroutines for new queues; in-flight worker goroutines drain naturally as their channels close.

func (*Runner) Submit

func (r *Runner) Submit(ctx context.Context, inv Invocation, a *configstore.Action, channel uint32)

Submit queues one invocation for processing. The reply is dispatched asynchronously; Submit returns immediately. Disabled / no-credential / busy / rate-limited paths reply + audit synchronously inside Submit so the caller's request lifetime captures the full path.

type RunnerConfig

type RunnerConfig struct {
	Registry *ExecutorRegistry
	Replies  ReplySender
	Audit    AuditSink
	Logger   *slog.Logger
	Now      func() time.Time
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the composition root for the Actions subsystem. One instance per graywolf process. The wiring layer constructs it after the messages.Service is up and parks it on App for the rxfanout classifier hook to reach.

func NewService

func NewService(ctx context.Context, cfg ServiceConfig) (*Service, error)

NewService builds the subsystem and starts background workers (audit pruner). The caller is responsible for invoking Stop on shutdown.

func (*Service) Classifier

func (s *Service) Classifier() *Classifier

Classifier returns the inbound classifier; the wiring layer hooks it into the rxfanout APRS-message branch.

func (*Service) Registry

func (s *Service) Registry() *ExecutorRegistry

Registry returns the executor registry so test harnesses or future REST handlers can register custom executors.

func (*Service) ReloadListeners

func (s *Service) ReloadListeners(ctx context.Context) error

ReloadListeners refreshes the in-memory listener-addressee snapshot from the store. Called on startup and whenever the REST handler mutates the table.

func (*Service) Stop

func (s *Service) Stop()

Stop releases background resources. Safe to call multiple times.

func (*Service) TestFire

func (s *Service) TestFire(ctx context.Context, a *configstore.Action, kvs []KeyValue) (Result, uint)

TestFire runs a one-shot invocation through the executor without going through the classifier or the per-Action queue. The OTP requirement and sender allowlist are intentionally bypassed — the caller is operator-authenticated via the REST cookie, so the dry-run is the point. The arg sanitizer still runs at the call site (the handler invokes SanitizeFromMap before calling us). No reply is dispatched to RF/IS; the result is returned synchronously for the HTTP response.

An audit row is written using SenderCall=TestFireSenderCall and Source=SourceRF so operators can spot dry-runs separately from real-air invocations. The persisted invocation id is returned for the UI to deep-link into.

type ServiceConfig

type ServiceConfig struct {
	// Store is the configstore.Store; required.
	Store *configstore.Store
	// Messages is the running messages.Service; required so replies
	// flow through the same outbound path operator-composed messages
	// take.
	Messages *messages.Service
	// OurCall returns the current primary callsign; used by the
	// classifier (addressee match) and by the reply adapter (From
	// address). Required.
	OurCall func() string
	// TacticalSet is the live tactical-alias set; the classifier
	// matches against it on every inbound packet. Required.
	TacticalSet *messages.TacticalSet
	// Preflight is the shared messages preflight (auto-ACK + dedup).
	// Required when running inside the wired app; tests may pass nil
	// to opt out of preflight integration.
	Preflight *messages.Preflight
	// Logger is optional; nil falls back to slog.Default().
	Logger *slog.Logger
	// AuditPruner overrides the audit-log retention defaults. Zero
	// values fall back to package defaults.
	AuditPruner AuditPrunerConfig
}

ServiceConfig wires the subsystem to the host process.

type Source

type Source string

Source is the inbound transport the invocation arrived on.

const (
	SourceRF Source = "rf"
	SourceIS Source = "is"
)

type Status

type Status string

Status is the outcome of a single invocation attempt. Always one of the values in StatusValues; the runner panics on any other value.

const (
	StatusOK           Status = "ok"
	StatusBadOTP       Status = "bad_otp"
	StatusBadArg       Status = "bad_arg"
	StatusDenied       Status = "denied"
	StatusDisabled     Status = "disabled"
	StatusUnknown      Status = "unknown"
	StatusNoCredential Status = "no_credential"
	StatusBusy         Status = "busy"
	StatusRateLimited  Status = "rate_limited"
	StatusTimeout      Status = "timeout"
	StatusError        Status = "error"
)

type Submitter

type Submitter interface {
	Submit(ctx context.Context, inv Invocation, a *configstore.Action, channel uint32)
	Reply(ctx context.Context, inv Invocation, channel uint32, res Result)
}

Submitter is the runner-facing interface used by the classifier. Submit enqueues a normal invocation; Reply short-circuits with a synthetic result for outcomes the runner never sees (denied, bad_arg, bad_otp, no_credential).

type WebhookExecutor

type WebhookExecutor struct {
	// contains filtered or unexported fields
}

func NewWebhookExecutor

func NewWebhookExecutor() *WebhookExecutor

func (*WebhookExecutor) Execute

func (e *WebhookExecutor) Execute(ctx context.Context, req ExecRequest) Result

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL