Documentation
¶
Overview ¶
Package actions implements the Graywolf Actions subsystem: an APRS message-driven trigger surface that dispatches operator-defined commands and webhooks. Inbound messages prefixed `@@` and addressed to the trigger surface are diverted from the messages router by the classifier, parsed, OTP-verified (when required), and dispatched through a per-Action FIFO runner to one of the registered Executors. Every attempt is recorded in the action_invocations audit log and a reply is sent back to the originator over the matching transport.
Index ¶
- Constants
- Variables
- func BadArgKey(err error) string
- func FormatReplies(r Result, maxLines int) ([]string, bool)
- func FormatReply(r Result) (string, bool)
- func IsBadArgErr(err error) bool
- func StartAuditPruner(ctx context.Context, p InvocationPruner, cfg AuditPrunerConfig) func()
- func StartOTPSweeper(ctx context.Context, v *OTPVerifier, interval time.Duration) func()
- func ValidActionName(s string) bool
- type ActionLookup
- type AddresseeSet
- type ArgMode
- type ArgSpec
- type AuditPrunerConfig
- type AuditSink
- type BadArgError
- type Classifier
- type ClassifierConfig
- type CommandExecutor
- type CredentialLookup
- type ExecRequest
- type Executor
- type ExecutorRegistry
- type Invocation
- type InvocationPruner
- type KeyValue
- type MessagesReplySender
- type OTPVerifier
- type OTPVerifierConfig
- type ParsedInvocation
- type ReplySender
- type Result
- type Runner
- type RunnerConfig
- type Service
- type ServiceConfig
- type Source
- type Status
- type Submitter
- type WebhookExecutor
Constants ¶
const ( DefaultMaxInvocationRows = 1000 DefaultMaxInvocationAge = 30 * 24 * time.Hour DefaultPruneInterval = 24 * time.Hour )
const ActionPrefix = "@@"
ActionPrefix is the on-air sentinel that diverts an inbound message from the inbox to the Actions runner.
const DefaultMaxReplyLines = 1
DefaultMaxReplyLines is the value used when an Action row pre-dates the column or the operator left it at zero.
const DefaultOTPSweepInterval = 5 * time.Minute
DefaultOTPSweepInterval is how often the Service-owned ticker invokes Sweep. The replay-ring TTL is ~150s; 5 minutes is a small constant multiple, keeping the residual entry count bounded without burning a goroutine on a tight loop.
const FreeformArgKey = "arg"
FreeformArgKey is the synthetic key used for the single freeform value. Stable so executors, audit log, and webhook templates can refer to it as `arg` (env var GW_ARG, token {{arg}}). Not operator- settable.
const ( // FreeformValueCeiling is the absolute server-side cap on a single // freeform payload, regardless of operator MaxLen. Matches the APRS // message body limit comfortably and prevents an over-permissive // schema from accepting payloads larger than the message subsystem // can stage. FreeformValueCeiling = 200 )
const MaxActionNameLen = 32
MaxActionNameLen mirrors the schema column width for actions.name.
const MaxReplyLen = 67
MaxReplyLen is the per-message APRS text cap (one frame).
const MaxReplyLinesCeiling = 5
MaxReplyLinesCeiling is the hard upper bound enforced by the validator on Action.MaxReplyLines. Operators choose 1..ceiling; the runtime clamps anything outside that range. The cap exists because each extra line is one extra RF frame plus its own ack/retry budget; >5 turns a single trigger into an airtime storm.
const TestFireSenderCall = "(test-web)"
TestFireSenderCall is the synthetic SenderCall stamped on invocations originated from the REST test-fire endpoint. It bypasses any real callsign while still producing an audit row so operators can see the dry-run history.
Variables ¶
var ErrOTPReplay = errors.New("actions: code already used")
ErrOTPReplay is returned by OTPVerifier.Verify when a code matches but has already been observed within the replay TTL. Callers should map this to StatusBadOTP and an audit detail of "replay".
var ErrParse = errors.New("actions: parse error")
Functions ¶
func BadArgKey ¶
BadArgKey returns the offending key when err is a *BadArgError, otherwise the empty string. Lets callers compose a "bad arg: KEY" reply without importing the error type directly.
func FormatReplies ¶
FormatReplies produces up to maxLines on-air reply strings from a Result. Line 1 carries the status-word prefix ("ok: ..."); lines 2..N are plain output lines (no prefix). Each line is sanitized of control characters and capped at MaxReplyLen runes. Non-OK statuses always collapse to a single line — there is no point fanning a bad-arg or timeout failure across multiple frames.
maxLines <= 0 is treated as 1. Blank lines in the captured output are dropped. Returns the produced lines and a bool that is true when any data was dropped: line count exceeded maxLines, an individual line was clamped to MaxReplyLen, or the assembled status+detail was truncated.
func FormatReply ¶
FormatReply preserves the legacy single-line API. New code should call FormatReplies; this wrapper is kept so call sites that have not been updated keep working (the two production callers — runner + service test-fire audit — are switched to FormatReplies in this change; this wrapper exists for anything else, e.g. webapi test-fire which still reports a single primary line in addition to the full slice).
func IsBadArgErr ¶
func StartAuditPruner ¶
func StartAuditPruner(ctx context.Context, p InvocationPruner, cfg AuditPrunerConfig) func()
StartAuditPruner runs prune at Interval (default 24h) until the returned stop func is called or ctx is cancelled.
func StartOTPSweeper ¶
func StartOTPSweeper(ctx context.Context, v *OTPVerifier, interval time.Duration) func()
StartOTPSweeper runs Sweep at interval (or DefaultOTPSweepInterval when zero) until the returned stop func is called or ctx is cancelled. Idempotent stop.
func ValidActionName ¶
ValidActionName reports whether s is a legal action name. Mirrors the inbound parser's grammar so outbound macro creation rejects names the receiver would refuse. Charset: ASCII letters, digits, dot, dash, underscore. The character-class check accepts both cases; lookup is case-insensitive (configstore.Store.GetActionByName uppercases the query) and storage is uppercase (Action.BeforeSave).
Types ¶
type ActionLookup ¶
type ActionLookup interface {
GetActionByName(ctx context.Context, name string) (*configstore.Action, error)
}
ActionLookup loads an Action by name. Wraps the configstore for testability.
type AddresseeSet ¶
type AddresseeSet struct {
// contains filtered or unexported fields
}
AddresseeSet is a lock-free read-side cache of operator-defined listener addressees. The classifier hits Contains on every inbound message packet; the REST handler swaps in a new snapshot via Replace. Mirrors messages.TacticalSet semantics so the hot path is allocation-free.
func NewAddresseeSet ¶
func NewAddresseeSet() *AddresseeSet
NewAddresseeSet constructs an empty set. Seed via Replace.
func (*AddresseeSet) Contains ¶
func (s *AddresseeSet) Contains(name string) bool
Contains reports whether name (case-insensitively) is a member of the current snapshot.
func (*AddresseeSet) Replace ¶
func (s *AddresseeSet) Replace(items []string)
Replace atomically swaps in a new snapshot. items are normalized to uppercase/trimmed; empty strings are dropped. A nil items slice is treated as "empty" — readers never observe nil.
type ArgMode ¶
type ArgMode string
ArgMode controls how argv after the action verb is interpreted.
kv: "@@<otp>#name k1=v1 k2=v2" (current behavior, default) freeform: "@@<otp>#name <one untokenized payload>"
type ArgSpec ¶
type ArgSpec struct {
Key string `json:"key"`
Regex string `json:"regex,omitempty"`
MaxLen int `json:"max_len,omitempty"`
Required bool `json:"required,omitempty"`
}
ArgSpec is one entry in an Action's arg_schema, decoded from JSON.
func DecodeArgSchemaJSON ¶
DecodeArgSchemaJSON parses the JSON wire form of an action's arg_schema column into the runtime ArgSpec slice. Empty / "[]" strings return (nil, nil).
type AuditPrunerConfig ¶
type AuditSink ¶
type AuditSink interface {
Insert(ctx context.Context, row *configstore.ActionInvocation) error
}
AuditSink writes invocation rows. Wraps the configstore repo for testability.
type BadArgError ¶
BadArgError carries the first offending key for the reply formatter.
func (*BadArgError) Error ¶
func (e *BadArgError) Error() string
type Classifier ¶
type Classifier struct {
// contains filtered or unexported fields
}
Classifier inspects inbound APRS messages and decides whether they are Actions traffic. Lives in the rxfanout hot path; never blocks on I/O directly — store + verifier calls are bounded synchronous reads, and the runner submission is non-blocking.
func NewClassifier ¶
func NewClassifier(cfg ClassifierConfig) *Classifier
func (*Classifier) Classify ¶
func (c *Classifier) Classify(ctx context.Context, pkt *aprs.DecodedAPRSPacket) bool
Classify inspects pkt. Returns true when the packet was consumed by the Actions subsystem and the messages router must skip it.
Consumption rule: the packet is an APRS message addressed to the trigger surface (station call / tactical alias / listener addressee) AND its body begins with ActionPrefix. Parse failures on a consumed packet still return true — they emit an "unknown" audit row + reply rather than leaking partial Actions noise into the inbox.
Third-party APRS101 ch 20 envelopes are unwrapped before classification so an action gated from IS→RF (or vice-versa) is claimed by Classify, not by the messages router.
type ClassifierConfig ¶
type ClassifierConfig struct {
OurCall func() string
TacticalSet *messages.TacticalSet
Listeners *AddresseeSet
ActionStore ActionLookup
CredStore CredentialLookup
OTPVerifier *OTPVerifier
Runner Submitter
// Preflight is the shared auto-ACK + dedup component owned by
// messages.Service. When non-nil, every consumed @@ packet sends an
// auto-ACK before any executor work and a duplicate (same from, msg
// id, text hash) within the dedup window short-circuits without
// re-submitting. Nil disables the hookup (test-only).
Preflight *messages.Preflight
}
ClassifierConfig wires the classifier to the rest of the subsystem.
type CommandExecutor ¶
type CommandExecutor struct{}
CommandExecutor runs a configured Action.CommandPath via os/exec — always argv-style, never `sh -c`. Stdout+stderr are merged and truncated to cmdOutputCap bytes for capture.
func NewCommandExecutor ¶
func NewCommandExecutor() *CommandExecutor
func (CommandExecutor) Execute ¶
func (CommandExecutor) Execute(ctx context.Context, req ExecRequest) Result
type CredentialLookup ¶
type CredentialLookup interface {
GetOTPCredential(ctx context.Context, id uint) (*configstore.OTPCredential, error)
}
CredentialLookup loads an OTP credential by ID.
type ExecRequest ¶
type ExecRequest struct {
Action *configstore.Action
Invocation Invocation
Timeout time.Duration
}
ExecRequest is the contract between the runner and an Executor. The runner builds it from a sanitized invocation + the matched Action.
type Executor ¶
type Executor interface {
// Execute runs the request and returns a Result. The implementation
// must honor ctx for cancellation/timeout. It must not panic; any
// internal failure becomes Result{Status: StatusError, ...}.
Execute(ctx context.Context, req ExecRequest) Result
}
Executor runs one Action invocation. Implementations are stateless from the runner's perspective; per-call state lives in ExecRequest / Result. Adding a new Action type means writing a new Executor and registering it in the registry — no other package needs to change.
type ExecutorRegistry ¶
type ExecutorRegistry struct {
// contains filtered or unexported fields
}
ExecutorRegistry maps Action.Type strings to concrete Executors.
func NewExecutorRegistry ¶
func NewExecutorRegistry() *ExecutorRegistry
type Invocation ¶
type Invocation struct {
ID uint64
ActionID uint
ActionName string
SenderCall string
Source Source
OTPCredentialID uint // 0 if no credential was consulted
OTPVerified bool
OTPCredName string
Args []KeyValue
StartedAt time.Time
}
Invocation is the runtime envelope passed to the Executor. The runner constructs it from a ParsedInvocation plus the matched configstore.Action and runtime context.
type InvocationPruner ¶
type InvocationPruner interface {
PruneActionInvocations(ctx context.Context, maxRows int, maxAge time.Duration) (int, error)
}
InvocationPruner is the subset of the configstore repo the pruner loop needs. Wrapping it keeps the loop testable without spinning up a database.
type KeyValue ¶
func Sanitize ¶
Sanitize validates raw key/value pairs against the schema and returns the canonical ordered slice handed to the executor. The returned slice preserves the schema order, not the wire order, so command argv is stable.
func SanitizeFreeform ¶
SanitizeFreeform validates a single untokenized payload against an Action's freeform arg schema. The schema must contain exactly one ArgSpec; the synthetic FreeformArgKey is imposed on the result so downstream executors can rely on a stable key.
Defense layers, in order:
- Schema shape — exactly one ArgSpec.
- Required check — empty value rejected when Required.
- Length — operator MaxLen, hard-capped by ceiling.
- Control-char floor — bytes outside printable ASCII + extended UTF-8 are rejected unconditionally. Specifically: any byte in 0x00..0x1F or 0x7F. Catches tabs, NUL, CR/LF, escape codes and other terminal/log-injection vectors regardless of operator regex permissiveness.
- Regex — operator pattern, defaulted to `.*` if empty so a blank pattern doesn't fall through to the kv default that requires the value to look like an identifier.
Order matters: the control-char floor runs BEFORE regex so a permissive `.*` cannot widen the floor.
func SanitizeFromMap ¶
SanitizeFromMap is the map-keyed counterpart to Sanitize. The Actions REST test-fire endpoint hands us a wire map; the classifier hands us an ordered slice. Both paths need the same downstream schema check, so this helper converts and delegates to Sanitize.
The returned slice is in schema-declared order (not map iteration order) so the executor argv stays stable.
type MessagesReplySender ¶
type MessagesReplySender struct {
// contains filtered or unexported fields
}
MessagesReplySender adapts messages.Service to the actions.ReplySender contract. It constructs a one-off outbound DM row through the same path operator-composed messages take, so replies inherit msg-id allocation, the operator outbound view, and the retry ladder. The inbound transport is reflected back to the originator: RF inbound uses RF-first-with-IS-fallback; IS inbound uses IS-only.
func NewMessagesReplySender ¶
func NewMessagesReplySender(svc *messages.Service, ourCall func() string) *MessagesReplySender
NewMessagesReplySender constructs an adapter. ourCall must return the primary station callsign (with or without SSID); messages service uses it as the From address on the outbound row.
type OTPVerifier ¶
type OTPVerifier struct {
// contains filtered or unexported fields
}
OTPVerifier validates TOTP codes and rejects replays.
func NewOTPVerifier ¶
func NewOTPVerifier(cfg OTPVerifierConfig) *OTPVerifier
func (*OTPVerifier) Sweep ¶
func (v *OTPVerifier) Sweep()
Sweep purges expired ring entries. Safe to call from a background goroutine. Service owns the periodic ticker via StartOTPSweeper; NewOTPVerifier intentionally does not start one.
func (*OTPVerifier) Verify ¶
func (v *OTPVerifier) Verify(credID uint, secretB32, code string) (bool, error)
Verify returns (true, nil) when code matches secret within the ±1-step window AND has not been used before within the replay TTL. Returns (false, nil) on any plain mismatch; (false, err) on replay.
type OTPVerifierConfig ¶
type ParsedInvocation ¶
type ParsedInvocation struct {
OTPDigits string // empty if message had no OTP digits
Action string
Args []KeyValue
RawArgTail string
}
ParsedInvocation is the output of parser.Parse. Args preserve key order as parsed off the wire so executors can present a stable argv.
Args contains raw, untrusted key=value tokens straight off the wire. Callers MUST run them through the runner's sanitizer (Phase C) before passing them to an Executor.
RawArgTail is the raw bytes after the action name and the first space, with no trimming or tokenization. Freeform consumers read it directly; kv consumers ignore it. May be populated even when Args is nil (kv tokenization failed but the action name parsed cleanly), so the classifier can still dispatch a freeform Action.
func Parse ¶
func Parse(body string) (*ParsedInvocation, error)
Parse converts an APRS message body into a ParsedInvocation. Grammar: @@<otp>#<action> [k=v k=v ...] where <otp> is empty or exactly six ASCII digits.
type ReplySender ¶
type ReplySender interface {
SendReply(ctx context.Context, channel uint32, source Source, toCall, text string) error
}
ReplySender dispatches one reply back to the originator over the matching transport. The RF/IS routing is the implementation's concern; the runner just hands over the addressee + text. One call per line — the runner loops when MaxReplyLines > 1.
type Result ¶
type Result struct {
Status Status
StatusDetail string
OutputCapture string
ExitCode *int
HTTPStatus *int
}
Result is the executor outcome consumed by the runner.
type Runner ¶
type Runner struct {
// contains filtered or unexported fields
}
Runner owns one queue + worker per Action. Queues are created lazily on first Submit.
func NewRunner ¶
func NewRunner(cfg RunnerConfig) *Runner
func (*Runner) Reply ¶
Reply dispatches a synthetic reply + audit row without queueing real work. Used by the classifier short-circuit paths (denied, bad_otp, bad_arg, no_credential) so those outcomes still flow through the normal reply + audit pipeline without exercising the per-Action queue or executor.
func (*Runner) Stop ¶
func (r *Runner) Stop()
Stop drains state and closes every per-Action queue. Once Stop returns, no further Submit will enqueue or spawn goroutines for new queues; in-flight worker goroutines drain naturally as their channels close.
func (*Runner) Submit ¶
func (r *Runner) Submit(ctx context.Context, inv Invocation, a *configstore.Action, channel uint32)
Submit queues one invocation for processing. The reply is dispatched asynchronously; Submit returns immediately. Disabled / no-credential / busy / rate-limited paths reply + audit synchronously inside Submit so the caller's request lifetime captures the full path.
type RunnerConfig ¶
type RunnerConfig struct {
Registry *ExecutorRegistry
Replies ReplySender
Audit AuditSink
Logger *slog.Logger
Now func() time.Time
}
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is the composition root for the Actions subsystem. One instance per graywolf process. The wiring layer constructs it after the messages.Service is up and parks it on App for the rxfanout classifier hook to reach.
func NewService ¶
func NewService(ctx context.Context, cfg ServiceConfig) (*Service, error)
NewService builds the subsystem and starts background workers (audit pruner). The caller is responsible for invoking Stop on shutdown.
func (*Service) Classifier ¶
func (s *Service) Classifier() *Classifier
Classifier returns the inbound classifier; the wiring layer hooks it into the rxfanout APRS-message branch.
func (*Service) Registry ¶
func (s *Service) Registry() *ExecutorRegistry
Registry returns the executor registry so test harnesses or future REST handlers can register custom executors.
func (*Service) ReloadListeners ¶
ReloadListeners refreshes the in-memory listener-addressee snapshot from the store. Called on startup and whenever the REST handler mutates the table.
func (*Service) Stop ¶
func (s *Service) Stop()
Stop releases background resources. Safe to call multiple times.
func (*Service) TestFire ¶
func (s *Service) TestFire(ctx context.Context, a *configstore.Action, kvs []KeyValue) (Result, uint)
TestFire runs a one-shot invocation through the executor without going through the classifier or the per-Action queue. The OTP requirement and sender allowlist are intentionally bypassed — the caller is operator-authenticated via the REST cookie, so the dry-run is the point. The arg sanitizer still runs at the call site (the handler invokes SanitizeFromMap before calling us). No reply is dispatched to RF/IS; the result is returned synchronously for the HTTP response.
An audit row is written using SenderCall=TestFireSenderCall and Source=SourceRF so operators can spot dry-runs separately from real-air invocations. The persisted invocation id is returned for the UI to deep-link into.
type ServiceConfig ¶
type ServiceConfig struct {
// Store is the configstore.Store; required.
Store *configstore.Store
// Messages is the running messages.Service; required so replies
// flow through the same outbound path operator-composed messages
// take.
Messages *messages.Service
// OurCall returns the current primary callsign; used by the
// classifier (addressee match) and by the reply adapter (From
// address). Required.
OurCall func() string
// TacticalSet is the live tactical-alias set; the classifier
// matches against it on every inbound packet. Required.
TacticalSet *messages.TacticalSet
// Preflight is the shared messages preflight (auto-ACK + dedup).
// Required when running inside the wired app; tests may pass nil
// to opt out of preflight integration.
Preflight *messages.Preflight
// Logger is optional; nil falls back to slog.Default().
Logger *slog.Logger
// AuditPruner overrides the audit-log retention defaults. Zero
// values fall back to package defaults.
AuditPruner AuditPrunerConfig
}
ServiceConfig wires the subsystem to the host process.
type Status ¶
type Status string
Status is the outcome of a single invocation attempt. Always one of the values in StatusValues; the runner panics on any other value.
const ( StatusOK Status = "ok" StatusBadOTP Status = "bad_otp" StatusBadArg Status = "bad_arg" StatusDenied Status = "denied" StatusDisabled Status = "disabled" StatusUnknown Status = "unknown" StatusNoCredential Status = "no_credential" StatusBusy Status = "busy" StatusRateLimited Status = "rate_limited" StatusTimeout Status = "timeout" StatusError Status = "error" )
type Submitter ¶
type Submitter interface {
Submit(ctx context.Context, inv Invocation, a *configstore.Action, channel uint32)
Reply(ctx context.Context, inv Invocation, channel uint32, res Result)
}
Submitter is the runner-facing interface used by the classifier. Submit enqueues a normal invocation; Reply short-circuits with a synthetic result for outcomes the runner never sees (denied, bad_arg, bad_otp, no_credential).
type WebhookExecutor ¶
type WebhookExecutor struct {
// contains filtered or unexported fields
}
func NewWebhookExecutor ¶
func NewWebhookExecutor() *WebhookExecutor
func (*WebhookExecutor) Execute ¶
func (e *WebhookExecutor) Execute(ctx context.Context, req ExecRequest) Result