Documentation
¶
Overview ¶
Package splunk provides a Splunk HTTP Event Collector (HEC) output for the audit library.
The output posts audit events to a Splunk indexer (Splunk Enterprise or Splunk Cloud) via the HEC POST API. Events are batched, gzipped by default, and delivered with full HEC error-code handling and exponential backoff on retryable failures.
Endpoints ¶
Two HEC endpoints are supported, selected via [Config.Endpoint]:
- EndpointEvent (default) — POST /services/collector/event with a JSON envelope per event (`{"event":...,"time":...,"sourcetype":...,...}`). Multiple events are concatenated (no separator, whitespace tolerated).
- EndpointRaw — POST /services/collector/raw with newline-delimited bodies. Metadata (sourcetype, source, index, host) travels in the URL query string. Splunk-side `props.conf` controls parsing.
Authentication ¶
HEC tokens authenticate via the `Authorization: Splunk <token>` header — note the literal "Splunk" scheme (not "Bearer"). Tokens are opaque strings (GUIDs in Splunk Web; arbitrary on Enterprise via `inputs.conf`). The library rejects tokens that start with "Splunk " or "Bearer " (foot-gun: the consumer accidentally including the scheme prefix) and tokens containing CR/LF/NUL (HTTP header-injection defence).
Splunk Cloud ¶
Splunk Cloud HEC endpoints use the URL form `https://http-inputs-<stack>.splunkcloud.com/services/collector/event`. In PR 2 the `splunkcloud://<stack>` URL scheme expands to this form automatically. In PR 1 use the full URL.
Indexer Acknowledgement ¶
HEC indexer acknowledgement is the durability gap between "HEC accepted" (HTTP 200) and "indexer replicated at the cluster's replication factor". Three modes are exposed via [Config.AckMode]:
- AckModeOff (default) — no channel header, no polling. Lowest overhead; HTTP 200 is the only durability signal.
- AckModeBestEffort — channel GUID generated, ack polled at [Config.AckPollInterval], surfaced as metrics; buffer progress is NOT gated. Good observability without back-pressure cost.
- AckModeRequired — events stay in the outbound buffer until ack returns positive. Compliance-grade durability. PR 2.
In PR 1, only AckModeOff is implemented; non-Off values are accepted at config time but produce a "not implemented in PR 1" error from New.
TLS and SSRF ¶
TLS is configured via [Config.TLSPolicy] (which uses the core audit.TLSPolicy type — TLS 1.2 floor). Custom CA via [Config.TLSCA] (file path). mTLS via [Config.TLSCert] and [Config.TLSKey] (file paths; Splunk Enterprise 10.0+ only — Splunk Cloud does not support mTLS for HEC).
SSRF prevention is wired via the core's audit.NewSSRFDialControl. Requests to private, loopback, link-local, multicast, and cloud-metadata IPs are blocked by default; [Config.AllowPrivateRanges] = true opts in for test or dev deployments.
Import ¶
Import this package for its side effect of registering the "splunk" output type with the audit output registry:
import _ "github.com/axonops/audit/splunk"
Or import the convenience github.com/axonops/audit/outputs package which blank-imports every built-in output backend.
Index ¶
Examples ¶
Constants ¶
const ( DefaultBatchSize = 500 DefaultMaxBatchBytes = 819200 // 800 KiB (1 MB stock cap with safety margin) DefaultMaxEventBytes = 1 << 20 // 1 MiB DefaultFlushInterval = 2 * time.Second DefaultTimeout = 10 * time.Second DefaultMaxRetries = 10 DefaultBufferSize = 10_000 DefaultRetryBaseDelay = 500 * time.Millisecond DefaultRetryMaxDelay = 30 * time.Second DefaultRetryJitter = 0.2 // ±20% DefaultAckPollInterval = 10 * time.Second DefaultAckResendWindow = 5 * time.Minute DefaultStartupVerificationTimeout = 5 * time.Second )
Default values for Config fields. These align with the Splunk HEC research findings (issue #55 pre-implementation comment):
- 800 KiB batch byte cap leaves 200 KiB headroom under the 1 MB stock HEC max_content_length, so over-cap drops never reach the server (HTTP 413).
- gzip ON by default — 6-12× compression ratio on audit-shaped JSON; the CPU cost is dominated by network savings.
- 10 events/request typical at scale, but 500 events/request covers bulk-friendly HEC ingest with payload-byte-cap as the dominant limiter.
- 10s HTTP timeout matches the OpenTelemetry splunkhecexporter default; matches loki/webhook.
- 10 retries matches the syslog precedent for audit-grade persistence; the buffer absorbs longer outages.
const ( MinBatchSize = 1 MaxBatchSize = 10_000 MinMaxBatchBytes = 1024 // 1 KiB MaxMaxBatchBytes = 1024 * 1024 // 1 MiB (Splunk Cloud hard cap) MinMaxEventBytes = 1024 // 1 KiB MaxMaxEventBytes = 10 * 1024 * 1024 // 10 MiB MinFlushInterval = 100 * time.Millisecond MaxFlushInterval = 5 * time.Minute MinTimeout = 1 * time.Second MaxTimeout = 5 * time.Minute MinMaxRetries = 0 MaxMaxRetries = 100 MinBufferSize = 100 MaxBufferSize = 1_000_000 )
Bounds on numeric Config values. Set as constants so unit tests and godoc can reference them.
Variables ¶
var ( // ErrConfigInvalid wraps every validation error from [Validate] // and the constructor pre-checks. Callers use // `errors.Is(err, splunk.ErrConfigInvalid)`. ErrConfigInvalid = errors.New("audit/splunk: configuration invalid") // ErrTokenRejected is returned from a runtime send when HEC // classifies the token as disabled or invalid (HEC codes 1/2/3/4/ // 22) — the Output transitions to its stopped state. ErrTokenRejected = errors.New("audit/splunk: token rejected by HEC") // ErrIndexRejected is returned from a runtime send when HEC // rejects the configured index (HEC code 7) — Output stops. ErrIndexRejected = errors.New("audit/splunk: index rejected by HEC") // ErrHealthCheckFailed is returned from [New] when the startup // health probe fails (and [Config.DisableStartupVerification] is // false). ErrHealthCheckFailed = errors.New("audit/splunk: HEC health check failed") // ErrPR1NotImplemented is returned when PR-1 sees a config // requesting a feature deferred to PR 2 (`splunkcloud://` URL // scheme; `AckMode != AckModeOff`). Carries a remediation hint // in the wrapped message. ErrPR1NotImplemented = errors.New("audit/splunk: feature not implemented in PR 1 (ships in PR 2)") // ErrAckDisabled is returned from [New] when the configured // [AckMode] is not [AckModeOff] but the HEC token / channel has // indexer acknowledgement disabled. The library refuses to // proceed because the operator's chosen durability guarantee // (best-effort or required) cannot be honoured. ErrAckDisabled = errors.New("audit/splunk: HEC indexer acknowledgement is disabled on this token/channel") // ErrCryptoRandFailed is returned from [New] when `crypto/rand` // fails to produce the 128-bit channel GUID. The library NEVER // falls back to a zero or pseudo-random GUID — a unique channel // is load-bearing for ACK correctness, and a deterministic value // would let an attacker observe / replay ack states. ErrCryptoRandFailed = errors.New("audit/splunk: crypto/rand failed during channel GUID generation") )
Sentinel errors returned by New and by the Output's runtime path. All sentinels are wrapped with `%w` so callers can discriminate via errors.Is.
Functions ¶
func NewFactory ¶
func NewFactory(factory audit.OutputMetricsFactory) audit.OutputFactory
NewFactory returns an audit.OutputFactory that creates Splunk outputs from YAML configuration and wires per-output metrics via the supplied audit.OutputMetricsFactory. When factory is non-nil, the returned audit.Output receives its per-output audit.OutputMetrics via WithOutputMetrics at construction time. Pass nil to disable per-output metrics.
Signature mirrors the other output modules' `NewFactory` (file, syslog, webhook, loki) for consistency (#581).
func ValidateCloudStack ¶
ValidateCloudStack returns nil when `stack` matches the Splunk Cloud stack-name shape (`^[a-z0-9][a-z0-9-]{0,62}$`). Used by PR 2's `splunkcloud://<stack>` URL scheme handler; exposed now so PR 1 can test the regex independently. Defends against host smuggling.
Types ¶
type AckMetricsRecorder ¶
type AckMetricsRecorder interface {
// RecordAckPending reports the current count of batches awaiting
// acknowledgement. Called once per poll tick.
RecordAckPending(gauge int)
// RecordAckConfirmed records that `count` batches received a
// positive ack in the last poll. Called once per poll tick when
// count > 0.
RecordAckConfirmed(count int)
// RecordAckTimedOut records that `count` batches exceeded the
// configured AckResendWindow. Called once per poll tick when
// count > 0. Only fires for AckModeRequired.
RecordAckTimedOut(count int)
// RecordAckBufferFullDrop records that `count` events were
// dropped because the in-flight buffer was full (AckModeRequired
// only). Called from the batchLoop side, not from the poller.
RecordAckBufferFullDrop(count int)
}
AckMetricsRecorder is an optional extension implemented by audit.OutputMetrics instances that care about ACK telemetry. The library detects support via a runtime type assertion (same pattern as [file.RotationRecorder]). Implementations that do not implement this interface silently skip the ACK metric calls.
type AckMode ¶
type AckMode int
AckMode selects indexer-acknowledgement behaviour. See [About HEC Indexer Acknowledgment](https://help.splunk.com/en/splunk-enterprise/get-started/get-data-in/10.0/get-data-with-http-event-collector/about-http-event-collector-indexer-acknowledgment) for the protocol details. Only AckModeOff is implemented in PR 1; non-Off values produce ErrPR1NotImplemented from New.
const ( // AckModeOff (default) — no `X-Splunk-Request-Channel` header, // no polling. HTTP 200 is the only durability signal. AckModeOff AckMode = iota // AckModeBestEffort — channel GUID generated, ack polled, results // exposed as metrics; buffer progress NOT gated. AckModeBestEffort // AckModeRequired — events stay in the buffer until ack returns // positive; on `AckResendWindow` timeout the events re-send. // Compliance-grade durability. AckModeRequired )
func (AckMode) MarshalYAML ¶
MarshalYAML returns the canonical YAML string form.
func (*AckMode) UnmarshalYAML ¶
UnmarshalYAML decodes an AckMode from the strings "off", "best_effort", or "required" (case-insensitive). Empty string defaults to "off". Unknown strings return ErrConfigInvalid.
type AckSnapshot ¶
type AckSnapshot struct {
// Pending is len(inFlight) — batches with ackIDs we haven't yet
// confirmed or timed out.
Pending int
// Confirmed is the cumulative count of batches with a positive ack.
Confirmed int64
// TimedOut is the cumulative count of batches whose AckResendWindow
// elapsed without a positive ack (AckModeRequired only).
TimedOut int64
// BufferFullDrops is the cumulative count of events dropped because
// the in-flight buffer was full (AckModeRequired only).
BufferFullDrops int64
}
AckSnapshot is a point-in-time view of the tracker's counters. Returned by Output.AckMetricsSnapshot for consumers who prefer pull-based reads over the push-based AckMetricsRecorder path.
type Config ¶
type Config struct {
// URL is the HEC endpoint. Two accepted forms:
// "https://splunk.example.com:8088" - Splunk Enterprise / self-managed
// "splunkcloud://acme-prod" - Splunk Cloud stack shortcut
//
// The `splunkcloud://<stack>` form is expanded at config validation
// to `https://http-inputs-<stack>.splunkcloud.com:443`. The stack
// name MUST match `^[a-z0-9][a-z0-9-]{0,62}$`. The shortcut form
// rejects any non-empty path, port, query, fragment, or opaque
// component (use the full `https://` form for non-standard cases),
// and rejects [Config.TLSCert] / [Config.TLSKey] (Splunk Cloud HEC
// does not support mTLS — use a self-managed HTTPS proxy with
// mTLS termination if mTLS is required).
URL string
// Token is the HEC token (opaque string). Required. Validated to
// reject CR/LF/NUL (header injection) and prefixes "Splunk " /
// "Bearer " (foot-gun where the consumer accidentally included the
// scheme prefix in the secret).
Token string
Endpoint Endpoint
Sourcetype string // default: "audit:event"
Source string // default: "audit"
Index string // empty = HEC token's default index
Host string // default: os.Hostname()
// IndexedFields enumerates field names whose values are copied
// from the event JSON into the HEC envelope `fields` object for
// index-time extraction. String values only. Ignored on /raw.
IndexedFields []string
BatchSize int
MaxBatchBytes int
MaxEventBytes int
FlushInterval time.Duration
// Gzip is a *bool so the YAML decoder can distinguish "unset"
// (use default true) from "explicitly false".
Gzip *bool
// UserAgent header value. Default
// "audit-splunk/<library-version>". MUST match
// ^[A-Za-z0-9._/-]+$ — validated at New() time to prevent
// header injection.
UserAgent string
Timeout time.Duration
// Headers are additional HTTP headers sent on every request.
// Header values are validated for CRLF; credential-shaped values
// are redacted in Config.String().
Headers map[string]string
MaxRetries int
RetryBaseDelay time.Duration
RetryMaxDelay time.Duration
RetryJitter float64
BufferSize int
AckMode AckMode
AckPollInterval time.Duration
AckResendWindow time.Duration
TLSPolicy *audit.TLSPolicy
TLSCA string
TLSCert string
TLSKey string
AllowInsecureHTTP bool
AllowPrivateRanges bool
DisableStartupVerification bool
StartupVerificationTimeout time.Duration
EventRoute audit.EventRoute
}
Config configures a Splunk HEC Output. See the package doc for usage patterns. Field names match the YAML keys via snake_case conversion in the outputconfig factory (e.g. `BatchSize` ↔ `batch_size`).
Example (Redaction) ¶
ExampleConfig_redaction demonstrates that Config's String / GoString / Format methods redact the token across every fmt verb. This is a load-bearing security guarantee — the token must NEVER appear in log lines, error messages, or stack traces.
package main
import (
"fmt"
"github.com/axonops/audit/splunk"
)
func main() {
cfg := splunk.Config{
URL: "https://splunk.example.com:8088",
Token: "super-secret-token",
}
fmt.Println(cfg.String())
}
Output: SplunkConfig{url="https://splunk.example.com:8088", endpoint=event, sourcetype="", index="", gzip=<default>, batch_size=0, max_batch_bytes=0, ack_mode=off, token=REDACTED}
func (Config) Format ¶
Format implements fmt.Formatter so both %v and %+v produce the redacted form. Without this, fmt's default reflection-based formatter would print every field including the token.
func (Config) String ¶
String returns a one-line, log-safe representation of the Config. The token is REDACTED; URL is sanitised to scheme+host. Implements fmt.Stringer. Matches the webhook precedent for log-safe Config repr.
func (*Config) Validate ¶
Validate checks the configuration for usable values, applies defaults to unset fields, and returns the first violation as a wrapped ErrConfigInvalid. Idempotent. Mutates the receiver to apply defaults.
type Endpoint ¶
type Endpoint int
Endpoint is a typed enum selecting which HEC endpoint the output posts to.
const ( // EndpointEvent (default) — POST /services/collector/event. Each // event is wrapped in a `{"event":..., "time":...}` envelope with // per-event metadata (sourcetype, source, index, host) and // optional indexed fields via the `fields` envelope object. EndpointEvent Endpoint = iota // EndpointRaw — POST /services/collector/raw. Events sent as // newline-delimited bodies; metadata travels in query string. No // envelope wrapping. Indexed `fields` extraction is not supported. EndpointRaw )
func (Endpoint) MarshalYAML ¶
MarshalYAML returns the canonical YAML string form.
func (*Endpoint) UnmarshalYAML ¶
UnmarshalYAML decodes an Endpoint from the YAML strings "event" or "raw" (case-insensitive). Empty string defaults to "event". Unknown strings return ErrConfigInvalid.
type Option ¶
type Option func(*options)
Option configures a Splunk Output at construction time. Options are passed as variadic arguments to New and applied in order before configuration validation, TLS setup, or warning emission.
func WithDiagnosticLogger ¶
WithDiagnosticLogger routes construction-time and runtime warnings (TLS policy, retry, buffer-full drops) to the given logger. When nil or not supplied, warnings go to slog.Default.
Consumers normally do not call this directly when using github.com/axonops/audit/outputconfig.New — outputconfig plumbs the auditor's diagnostic logger into every output it constructs. Use this option when constructing a Splunk output programmatically and you want its warnings to match your application's log handler.
Mirrors github.com/axonops/audit.WithDiagnosticLogger at the auditor level; the same logger may be passed to both for consistent routing.
func WithFrameworkContext ¶
func WithFrameworkContext(fctx audit.FrameworkContext) Option
WithFrameworkContext seeds the auditor-wide framework metadata (AppName, Host, Timezone, PID) used for envelope `host` defaulting. When omitted, the Output falls back to os.Hostname at construction time.
Consumers normally do not call this directly when using github.com/axonops/audit/outputconfig.New — outputconfig populates the equivalent values from the auditor configuration. Use this option when constructing a Splunk output programmatically (for example in integration tests).
func WithMaxIdleConns ¶
WithMaxIdleConns sets the HTTP transport's maximum idle connection pool size. Default is 100, matching the loki/webhook precedent. Increase for tier-1 SaaS pushing tens of MB/s to a single HEC endpoint; decrease only if you understand the keep-alive cost trade-off.
Exposed as an Option (not a Config field) because it is a HTTP transport tuning knob, not a feature surface. Mirrors the established escape-hatch pattern used by loki and webhook for the small set of low-level transport tunings (#688 / #770).
func WithOutputMetrics ¶
func WithOutputMetrics(m audit.OutputMetrics) Option
WithOutputMetrics sets the audit.OutputMetrics sink for this output. When omitted or nil, metrics calls become no-ops via audit.NoOpOutputMetrics. Mirrors WithDiagnosticLogger in usage and zero-value semantics.
Consumers normally do not call this directly when using github.com/axonops/audit/outputconfig.New — outputconfig wires per-output metrics through the audit.OutputMetricsFactory supplied via outputconfig.WithOutputMetricsFactory.
type Output ¶
type Output struct {
// contains filtered or unexported fields
}
Output pushes audit events to a Splunk HEC endpoint. Implements audit.Output and audit.DeliveryReporter.
func New ¶
New constructs a Splunk HEC Output. Returns an error if the config fails validation, if TLS material cannot be loaded, or if the startup health check fails (unless [Config.DisableStartupVerification] is true).
`metrics` may be nil — the output records via a no-op audit.NoOpOutputMetrics when omitted. Use WithOutputMetrics to pass a real audit.OutputMetrics sink.
Example ¶
ExampleNew shows the minimal construction shape against the default `/event` endpoint with JSON envelope wrapping and the `Authorization: Splunk <token>` header.
In production code, pass a real audit.OutputMetrics via splunk.WithOutputMetrics. The auditor will wire the output via audit.WithOutputs (or via the outputconfig YAML loader) and call `Write` on every emitted event.
package main
import (
"fmt"
"github.com/axonops/audit/splunk"
)
func main() {
// Note: this example does NOT contact a real Splunk endpoint —
// DisableStartupVerification skips the /health probe so the
// example can run without network access.
cfg := &splunk.Config{
URL: "https://splunk.example.com:8088",
Token: "your-hec-token",
Sourcetype: "axonops:audit",
Index: "audit_logs",
DisableStartupVerification: true,
}
out, err := splunk.New(cfg, nil)
if err != nil {
fmt.Println("error:", err)
return
}
defer func() { _ = out.Close() }()
// In real code: pass the output to audit.New via audit.WithOutputs.
fmt.Println(out.Name() != "")
}
Output: true
Example (AckRequired) ¶
ExampleNew_ackRequired shows the compliance-grade durability mode. With AckMode=Required, events stay in an in-flight buffer until Splunk's /services/collector/ack endpoint returns positive for the batch's ackID. On AckResendWindow timeout, the events re-send. The producer (Write) remains non-blocking — when the in-flight buffer reaches BufferSize, new batches drop with metric reason=ack_buffer_full instead of stalling.
The HEC token MUST have ACK enabled on its channel. The library feature-detects at startup and refuses to launch with splunk.ErrAckDisabled if ACK is not enabled.
package main
import (
"fmt"
"github.com/axonops/audit/splunk"
)
func main() {
cfg := &splunk.Config{
URL: "https://splunk.example.com:8088",
Token: "your-hec-token",
AckMode: splunk.AckModeRequired,
DisableStartupVerification: true,
}
out, err := splunk.New(cfg, nil)
if err != nil {
fmt.Println("error:", err)
return
}
defer func() { _ = out.Close() }()
fmt.Println("ack mode:", cfg.AckMode)
}
Output: ack mode: required
Example (CimJSON) ¶
ExampleNew_cimJSON wires the Splunk output with the audit.CIMChangeFormatter (`type: cim_change` in YAML). Pair with the reference Splunk Technology Add-on (or your own generated TA) so Splunk indexes the events as CIM Change.
The formatter is wired at the AUDITOR level (via audit.WithFormatter or per-output config) — splunk.Output itself is formatter-agnostic. This example shows the construction shape; in production, outputconfig YAML composes the auditor and the output via the cim_change formatter declaration.
package main
import (
"fmt"
"github.com/axonops/audit/splunk"
)
func main() {
cfg := &splunk.Config{
URL: "https://splunk.example.com:8088",
Token: "your-hec-token",
Sourcetype: "axonops:audit",
DisableStartupVerification: true,
}
out, err := splunk.New(cfg, nil)
if err != nil {
fmt.Println("error:", err)
return
}
defer func() { _ = out.Close() }()
fmt.Println("constructed for CIM Change pipeline")
}
Output: constructed for CIM Change pipeline
Example (Raw) ¶
ExampleNew_raw shows the `/services/collector/raw` endpoint mode, where events are sent as newline-delimited bodies and metadata travels in the URL query string. Useful when consumer events are already line-oriented (e.g. CEF) and Splunk-side `props.conf` owns the parsing.
package main
import (
"fmt"
"github.com/axonops/audit/splunk"
)
func main() {
cfg := &splunk.Config{
URL: "https://splunk.example.com:8088",
Token: "your-hec-token",
Endpoint: splunk.EndpointRaw,
Sourcetype: "axonops:audit",
Source: "axonops-audit",
Index: "audit_logs",
DisableStartupVerification: true,
}
out, err := splunk.New(cfg, nil)
if err != nil {
fmt.Println("error:", err)
return
}
defer func() { _ = out.Close() }()
fmt.Println("ok")
}
Output: ok
Example (SplunkCloud) ¶
ExampleNew_splunkCloud uses the `splunkcloud://<stack>` URL shortcut. The library expands the stack name (validated against `^[a-z0-9][a-z0-9-]{0,62}$`) to the canonical Splunk Cloud HEC URL `https://http-inputs-<stack>.splunkcloud.com:443`.
Splunk Cloud HEC does not support mTLS — configuring TLSCert, TLSKey, or TLSCA with a `splunkcloud://` URL is rejected at config validation. Use a self-managed HTTPS proxy with mTLS termination if mTLS is required.
package main
import (
"fmt"
"github.com/axonops/audit/splunk"
)
func main() {
cfg := &splunk.Config{
URL: "splunkcloud://acme-prod",
Token: "your-hec-token",
DisableStartupVerification: true,
}
out, err := splunk.New(cfg, nil)
if err != nil {
fmt.Println("error:", err)
return
}
defer func() { _ = out.Close() }()
// Name() reflects the EXPANDED URL host.
fmt.Println(out.Name())
}
Output: splunk:http-inputs-acme-prod.splunkcloud.com:443
func (*Output) AckMetricsSnapshot ¶
func (o *Output) AckMetricsSnapshot() AckSnapshot
AckMetricsSnapshot returns the current ACK counters. Zero value if ACK is disabled. Lock-free; safe to call from any goroutine.
func (*Output) Close ¶
Close shuts down the output, draining any buffered events up to [Config.Timeout] * 2 + 5s. Idempotent.
func (*Output) LastDeliveryAge ¶
LastDeliveryAge implements audit.DeliveryReporter.
func (*Output) ReportsDelivery ¶
ReportsDelivery implements audit.DeliveryReporter — splunk reports its own per-event delivery via audit.Metrics.RecordDelivery so the core pipeline must NOT double-record. Mirrors loki/webhook.
func (*Output) Write ¶
Write enqueues a serialised event for asynchronous delivery to HEC. Returns audit.ErrOutputClosed after [Close] has been called. Returns audit.ErrEventTooLarge if the event exceeds [Config.MaxEventBytes]. Returns nil on a successful enqueue OR on a buffer-full drop (the drop is recorded via metrics, not via the return value, matching the loki precedent).