archive

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2025 License: Apache-2.0 Imports: 13 Imported by: 1

README

Audit archive Format Overview

Archive is a utility (part of the audit package) to capture and store large number of artifacts related to a NATS deployment.

For example, capture all 'monitor' endpoint for all servers in a given cluster. Or capture all details of all streams in a given account, or capture the streams state and configuration.

Primary design choices for archive:

  • Single file: easy to share over email, web, chat, etc.
  • Compressed: most artifacts have a lot of redundancy
  • Indexed: can be queried in a generic way
  • Default to JSON: easy for humans to grok, and for programs to process

An "archive" is a ZIP file that conforms to a specific schema convention.

The archive package provides Reader and Writer classes.

Artifact tagging and manifest

Rather than relying on path convention and contrived splitting, each artifact is added to the archive with a unique set of tags.

For example, the capture of a server VARZ endpoint may be tagged with:

  • cluster:awseast_bar
  • server:bar_1
  • type:server_varz

Artifact filenames are derived from their tags. For paged artifacts, the files are stored in a dedicated directory with numbered JSON pages:

capture/clusters/awseast_bar/bar_1/variables/0001.json

In addition to capture artifacts, the archive contains a few special files.

The most important special file is manifest.json which stores tags for each file in the archive. The manifest allows programmatic discovery of the archive content, and can be used to build indices at read time.

An example usage is: list all streams discovered under a given account A. Rather than having to infer this information from iterating and parsing files paths, the manifest can be traversed filtering by account:A and type:stream_info, and all matching files in the archive can be processed.

Archive organization

n.b. Do not rely on path parsing and path conventions, query using the manifest instead.

n.b.: If a server does not belong to a cluster, then cluster_name is unclustered for all the paths below.

Note on paging

Some artifacts (e.g., stream info, health, varz) are stored in paged format, split across multiple JSON files (e.g., 0001.json, 0002.json, ...), each containing one or more JSON objects.

Other artifacts (e.g., profiles and special files) are stored as non-paged single files.

Server-specific artifacts

${prefix}/clusters/${cluster_name}/${server_name}/${artifact_type}/0001.json

Example: snapshot of health endpoint

Account artifacts

Each account artifact can appear multiple times as it is captured through different servers.

${prefix}/accounts/${account_name}/servers/${cluster_name}__${server_name}/${artifact_type}/0001.json"

Example: account connections

Stream artifacts

Each stream artifact can appear multiple times as it is captured through different replicas.

${prefix}/accounts/${account_name}/streams/${stream_name}/replicas/${server_name}/${artifact_type}/0001.json

Example: stream details

Server profiles

Profiles are stored as non-paged files:

${prefix}/profiles/${cluster_name}/${server_name}__${profile_name}.prof

Example: memory profile

Special files

Special artifacts are stored as flat files and not paged:

In addition to artifacts, each archive contains a few special files.

Capture metadata

${prefix}/capture_metadata.json

Contains information such as the date of capture, the username, the tool version, and more.

Manifest

${prefix}/manifest.json

Contains a list of all artifacts and a list of tags associated with each one.

Capture log

${prefix}/capture.log

A log file for the process that created the archive, in case it contains useful information about artifacts (and lack of thereof).

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrMultipleMatches = fmt.Errorf("multiple files matched the given query")

ErrMultipleMatches is returned if multiple artifact matched the input combination of tags

View Source
var ErrNoMatches = fmt.Errorf("no file matched the given query")

ErrNoMatches is returned if no artifact matched the input combination of tags

Functions

func EachClusterServerArtifact added in v0.2.3

func EachClusterServerArtifact[T any](r *Reader, artifactTag *Tag, cb func(clusterTag *Tag, serverTag *Tag, err error, artifact *T) error) (int, error)

EachClusterServerArtifact iterates over all paged JSON artifact files in the archive by looping through every cluster and its servers. For each cluster, server pair, it constructs a tag slice consisting of the cluster tag, server tag, and the provided artifact tag, and then calls ForEachTaggedArtifact to load all artifacts of type T associated with that combination.

The matching artifact files are obtained by intersecting the tag-specific file lists from the Reader’s inverted index, and then filtered and sorted according to the paged artifact naming convention. For each decoded artifact, the provided callback function is called with the cluster tag, the server tag, and the loaded artifact (or an error if no matching artifact was found).

The function returns the total count of processed artifacts and any error encountered during iteration.

func ForEachTaggedArtifact added in v0.2.3

func ForEachTaggedArtifact[T any](r *Reader, tags []*Tag, cb func(*T) error) error

ForEachTaggedArtifact iterates over all paged JSON artifact files in the archive that match the given set of tags and calls the provided callback function for each decoded artifact.

The function uses the Reader’s inverted index to collect the file names associated with each tag, performs an intersection of these sets to determine the files that match all the given tags, and then filters these to include only those files that match the paged artifact naming convention.

The matching files are sorted by name, opened, and decoded from JSON into an object of type T. For each decoded artifact, the callback function cb is called. If the callback returns an error we iterating and the error is returned.

If no files match the provided tags it returns ErrNoMatches. It also returns any errors encountered during file opening or JSON decoding.

Types

type AuditMetadata

type AuditMetadata struct {
	Timestamp              time.Time `json:"capture_timestamp"`
	ConnectedServerName    string    `json:"connected_server_name"`
	ConnectedServerVersion string    `json:"connected_server_version"`
	ConnectURL             string    `json:"connect_url"`
	UserName               string    `json:"user_name"`
	CLIVersion             string    `json:"cli_version"`
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader encapsulates a reader for the actual underlying archive, and also provides indices for faster and more convenient iteration and querying of the archive content

func NewReader

func NewReader(archivePath string) (*Reader, error)

NewReader creates a new reader for the file at the given archivePath. Reader expect the file to comply to format and content created by a Writer in this same package. During creation, Reader creates in-memory indices to speed up subsequent queries.

func (*Reader) AccountNames

func (r *Reader) AccountNames() []string

AccountNames list the unique names of accounts found in the archive The list of names is sorted alphabetically

func (*Reader) AccountStreamNames

func (r *Reader) AccountStreamNames(accountName string) []string

AccountStreamNames list the unique stream names found in the archive for the given account The list of names is sorted alphabetically

func (*Reader) Close

func (r *Reader) Close() error

Close closes the reader

func (*Reader) ClusterNames

func (r *Reader) ClusterNames() []string

ClusterNames list the unique names of clusters found in the archive The list of names is sorted alphabetically

func (*Reader) ClusterServerNames

func (r *Reader) ClusterServerNames(clusterName string) []string

ClusterServerNames list the unique server names found in the archive for the given cluster The list of names is sorted alphabetically

func (*Reader) EachClusterServerAccountz

func (r *Reader) EachClusterServerAccountz(cb func(clusterTag *Tag, serverTag *Tag, err error, az *server.ServerAPIAccountzResponse) error) (int, error)

EachClusterServerAccountz iterates over all servers ordered by cluster and calls the callback function with the loaded Accountz response

The callback function will receive any error encountered during loading the server varz file and should check that and handle it If the callback returns an error iteration is stopped and that error is returned

Errors returned match those documented in Load() otherwise any other error that are encountered

func (*Reader) EachClusterServerHealthz

func (r *Reader) EachClusterServerHealthz(cb func(clusterTag *Tag, serverTag *Tag, err error, hz *server.ServerAPIHealthzResponse) error) (int, error)

EachClusterServerHealthz iterates over all servers ordered by cluster and calls the callback function with the loaded Healthz response

The callback function will receive any error encountered during loading the server varz file and should check that and handle it If the callback returns an error iteration is stopped and that error is returned

Errors returned match those documented in Load() otherwise any other error that are encountered

func (*Reader) EachClusterServerJsz

func (r *Reader) EachClusterServerJsz(cb func(clusterTag *Tag, serverTag *Tag, err error, jsz *server.ServerAPIJszResponse) error) (int, error)

EachClusterServerJsz iterates over all servers ordered by cluster and calls the callback function with the loaded Jsz response

The callback function will receive any error encountered during loading the server varz file and should check that and handle it If the callback returns an error iteration is stopped and that error is returned

Errors returned match those documented in Load() otherwise any other error that are encountered

func (*Reader) EachClusterServerLeafz

func (r *Reader) EachClusterServerLeafz(cb func(clusterTag *Tag, serverTag *Tag, err error, lz *server.ServerAPILeafzResponse) error) (int, error)

EachClusterServerLeafz iterates over all servers ordered by cluster and calls the callback function with the loaded Leafz response

The callback function will receive any error encountered during loading the server varz file and should check that and handle it If the callback returns an error iteration is stopped and that error is returned

Errors returned match those documented in Load() otherwise any other error that are encountered

func (*Reader) EachClusterServerVarz

func (r *Reader) EachClusterServerVarz(cb func(clusterTag *Tag, serverTag *Tag, err error, vz *server.ServerAPIVarzResponse) error) (int, error)

EachClusterServerVarz iterates over all servers ordered by cluster and calls the callback function with the loaded Varz response

The callback function will receive any error encountered during loading the server varz file and should check that and handle it If the callback returns an error iteration is stopped and that error is returned

Errors returned match those documented in Load() otherwise any other error that are encountered

func (*Reader) Load

func (r *Reader) Load(v any, queryTags ...*Tag) error

Load queries the indices for a single artifact matching the given input tags. If a single artifact is found, then it is deserialized into v If multiple artifact or no artifacts match the input tag, then ErrMultipleMatches and ErrNoMatches are returned respectively

func (*Reader) StreamServerNames

func (r *Reader) StreamServerNames(accountName, streamName string) []string

StreamServerNames list the unique server names found in the archive for the given stream in the given account The list of names is sorted alphabetically

type Tag

type Tag struct {
	Name  TagLabel
	Value string
}

func TagAccount

func TagAccount(accountName string) *Tag

func TagAccountConnections

func TagAccountConnections() *Tag

func TagAccountInfo

func TagAccountInfo() *Tag

func TagAccountJetStream

func TagAccountJetStream() *Tag

func TagAccountLeafs

func TagAccountLeafs() *Tag

func TagAccountSubs

func TagAccountSubs() *Tag

func TagArtifactType

func TagArtifactType(artifactType string) *Tag

func TagCluster

func TagCluster(clusterName string) *Tag

func TagNoCluster

func TagNoCluster() *Tag

func TagProfileName

func TagProfileName(profileType string) *Tag

func TagServer

func TagServer(serverName string) *Tag

func TagServerAccounts

func TagServerAccounts() *Tag

func TagServerConnections

func TagServerConnections() *Tag

func TagServerGateways

func TagServerGateways() *Tag

func TagServerHealth

func TagServerHealth() *Tag

func TagServerJetStream

func TagServerJetStream() *Tag

func TagServerLeafs

func TagServerLeafs() *Tag

func TagServerProfile

func TagServerProfile() *Tag

func TagServerRoutes

func TagServerRoutes() *Tag

func TagServerSubs

func TagServerSubs() *Tag

func TagServerVars

func TagServerVars() *Tag

func TagSpecial

func TagSpecial(special string) *Tag

func TagStream

func TagStream(streamName string) *Tag

func TagStreamInfo

func TagStreamInfo() *Tag

func (*Tag) String

func (t *Tag) String() string

type TagLabel

type TagLabel string

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer encapsulates a zip writer for the underlying archive file, but also tracks metadata used by the Reader to construct indices

func NewWriter

func NewWriter(archivePath string) (*Writer, error)

NewWriter creates a new writer for the file at the given archivePath. Writer creates a ZIP file whose content has additional structure and metadata. If archivePath is an existing file, it will be overwritten.

func (*Writer) Add

func (w *Writer) Add(artifact any, tags ...*Tag) error

Add serializes the given artifact to JSON and adds it to the archive, it creates a file name based on the provided tags and ensures uniqueness. The artifact is also added to the manifest for indexing, enabling tag-based querying in via Reader

func (*Writer) AddRaw

func (w *Writer) AddRaw(reader io.Reader, extension string, tags ...*Tag) error

AddRaw adds the given artifact to the archive similarly to Add. The artifact is assumed to be already serialized and is copied as-is byte for byte. If the artifact is tagged as "special", it will be written as a single non-paged file.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the writer

func (*Writer) PagedWriter added in v0.2.3

func (w *Writer) PagedWriter(dir string) *pagedWriter

func (*Writer) SetTime

func (w *Writer) SetTime(t time.Time)

SetTime sets the timestamp files in the archive should have, otherwise current time is used

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL