backup

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2025 License: Apache-2.0 Imports: 45 Imported by: 8

README

backup-go

Tests PkgGoDev codecov

A Go library for backing up and restoring Aerospike data, with support for both standard and transactionally consistent backups.

The repository includes the asbackup and asrestore CLI tools, built using this library. Refer to their respective README files for usage instructions. Binaries for various platforms are released alongside the library and can be found under releases.

Backup CLI Tools

This repository currently hosts the Golang port of the asbackup and asrestore Aerospike Backup Tools, which are built using this library. These tools are planned to be moved to a separate repository in the future.
For build instructions and packaging options, refer to the Aerospike Backup Tools user guide.

Features

  • Standard backup and restore operations
  • Flexible backup configurations including:
    • Partition-based backups
    • Node-based backups
    • Incremental backups using modification time filters
    • Compression (ZSTD)
    • Encryption (AES-128/256)
    • Secret Agent integration
  • Multiple backup formats:
    • ASB (Aerospike Backup) text format
  • Configurable parallelism for both reading and writing
  • Support for backup file size limits and state preservation
  • Bandwidth and records-per-second rate limiting

Design

This Aerospike backup package is built around the Aerospike Go client. The package uses a client structure to start backup and restore jobs. The client structure is thread safe, backup and restore jobs can be started in multiple goroutines. When the client is used to start backup and restore jobs, a handler is immediately returned that is used to check the job's status, errors, and wait for it to finish.

Key Components
  • Client: The main entry point for backup operations
  • Writers/Readers: Handle backup data I/O
  • Configurations: Define backup/restore behavior

Usage

Standard Backup

The regular backup operation backs up data from an Aerospike database based on a user-defined configuration. First, a scan operation uses the configured scope to query the database and retrieve matching records. Then, a decoder converts the retrieved data into the asb format, which is subsequently stored by a supported writer.

package main

import (
  "context"
  "log"

  "github.com/aerospike/aerospike-client-go/v8"
  "github.com/aerospike/backup-go"
  ioStorage "github.com/aerospike/backup-go/io/storage"
  "github.com/aerospike/backup-go/io/storage/local"
)

func main() {
  // Create Aerospike client.
  aerospikeClient, aerr := aerospike.NewClient("127.0.0.1", 3000)
  if aerr != nil {
    log.Fatal(aerr)
  }

  // Create backup client.
  backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
  if err != nil {
    log.Fatal(err)
  }

  ctx := context.Background()

  // Configure writers for backup.
  // For backup to single file use local.WithFile(fileName).
  writers, err := local.NewWriter(
    ctx,
    ioStorage.WithRemoveFiles(),
    ioStorage.WithDir("backups_folder"),
  )
  if err != nil {
    log.Fatal(err)
  }

  // Configure backup.
  backupCfg := backup.NewDefaultBackupConfig()
  backupCfg.Namespace = "test"
  backupCfg.ParallelRead = 10
  backupCfg.ParallelWrite = 10

  // Start backup.
  backupHandler, err := backupClient.Backup(ctx, backupCfg, writers, nil)
  if err != nil {
    log.Fatal(err)
  }

  // Wait for completion. 
  // Use backupHandler.Wait(ctx) to wait for the job to finish or fail.
  // You can use different context here, and if it is canceled
  // backupClient.Backup(ctx, backupCfg, writers) context will be cancelled too.
  if err = backupHandler.Wait(ctx); err != nil {
    log.Printf("Backup failed: %v", err)
  }
}
Restore

The restore operation reads backup files in both asb and asbx formats and restores them using the configured backup client.

func main() {
    // ... create clients as above ...

    // Configure restore
    restoreCfg := backup.NewDefaultRestoreConfig()
    restoreCfg.Parallel = 5
    
    // Optional: configure namespace mapping
    source := "source-ns"
    dest := "dest-ns"
    restoreCfg.Namespace = &backup.RestoreNamespaceConfig{
        Source:      &source,
        Destination: &dest,
    }

    // Create reader for restore
    reader, err := local.NewReader(
        ioStorage.WithValidator(asb.NewValidator()),
        ioStorage.WithDir("backups_folder"),
    )
    if err != nil {
        panic(err)
    }

    // Start restore
    restoreHandler, err := backupClient.Restore(ctx, restoreCfg, reader)
    if err != nil {
        panic(err)
    }

    // Wait for completion
    if err = restoreHandler.Wait(ctx); err != nil {
        log.Printf("Restore failed: %v", err)
    }

    // Check restore statistics
    stats := restoreHandler.GetStats()
}

Configuration Options

Backup Configuration
Backup Configuration
type ConfigBackup struct {
    // InfoPolicy applies to Aerospike Info requests made during backup and
    // restore. If nil, the Aerospike client's default policy will be used.
    InfoPolicy *a.InfoPolicy
    // ScanPolicy applies to Aerospike scan operations made during backup and
    // restore. If nil, the Aerospike client's default policy will be used.
    ScanPolicy *a.ScanPolicy
    // Only include records that last changed before the given time (optional).
    ModBefore *time.Time
    // Only include records that last changed after the given time (optional).
    ModAfter *time.Time
    // Encryption details.
    EncryptionPolicy *EncryptionPolicy
    // Compression details.
    CompressionPolicy *CompressionPolicy
    // Secret agent config.
    SecretAgentConfig *SecretAgentConfig
    // PartitionFilters specifies the Aerospike partitions to back up.
    // Partition filters can be ranges, individual partitions,
    // or records after a specific digest within a single partition.
    // Note:
    // if not default partition filter NewPartitionFilterAll() is used,
    // each partition filter is an individual task which cannot be parallelized,
    // so you can only achieve as much parallelism as there are partition filters.
    // You may increase parallelism by dividing up partition ranges manually.
    // AfterDigest:
    // afterDigest filter can be applied with
    // NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error)
    // Backup records after record digest in record's partition plus all succeeding partitions.
    // Used to resume backup with last record received from previous incomplete backup.
    // This parameter will overwrite PartitionFilters.Begin value.
    // Can't be used in full backup mode.
    // This parameter is mutually exclusive with partition-list (not implemented).
    // Format: base64 encoded string.
    // Example: EjRWeJq83vEjRRI0VniavN7xI0U=
    PartitionFilters []*a.PartitionFilter
    // Namespace is the Aerospike namespace to back up.
    Namespace string
    // NodeList contains a list of nodes to back up.
    // <IP addr 1>:<port 1>[,<IP addr 2>:<port 2>[,...]]
    // <IP addr 1>:<TLS_NAME 1>:<port 1>[,<IP addr 2>:<TLS_NAME 2>:<port 2>[,...]]
    // Backup the given cluster nodes only.
    // If it is set, ParallelNodes automatically set to true.
    // This argument is mutually exclusive with partition-list/AfterDigest arguments.
    NodeList []string
    // SetList is the Aerospike set to back up (optional, given an empty list,
    // all sets will be backed up).
    SetList []string
    // The list of backup bin names
    // (optional, given an empty list, all bins will be backed up)
    BinList []string
    // ParallelNodes specifies how to perform scan.
    // If set to true, we launch parallel workers for nodes; otherwise workers run in parallel for partitions.
    // Excludes PartitionFilters param.
    ParallelNodes bool
    // EncoderType describes an Encoder type that will be used on backing up.
    // Default `EncoderTypeASB` = 0.
    EncoderType EncoderType
    // ParallelRead is the number of concurrent scans to run against the Aerospike cluster.
    ParallelRead int
    // ParallelWrite is the number of concurrent backup files writing.
    ParallelWrite int
    // Don't back up any records.
    NoRecords bool
    // Don't back up any secondary indexes.
    NoIndexes bool
    // Don't back up any UDFs.
    NoUDFs bool
    // RecordsPerSecond limits backup records per second (rps) rate.
    // Will not apply rps limit if RecordsPerSecond is zero (default).
    RecordsPerSecond int
    // Limits backup bandwidth (bytes per second).
    // Will not apply rps limit if Bandwidth is zero (default).
    Bandwidth int
    // File size limit (in bytes) for the backup. If a backup file exceeds this
    // size threshold, a new file will be created. 0 for no file size limit.
    FileLimit int64
    // Do not apply base-64 encoding to BLOBs: Bytes, HLL, RawMap, RawList.
    // Results in smaller backup files.
    Compact bool
    // Only include records that have no ttl set (persistent records).
    NoTTLOnly bool
    // Name of a state file that will be saved in backup directory.
    // Works only with FileLimit parameter.
    // As we reach FileLimit and close file, the current state will be saved.
    // Works only for default and/or partition backup.
    // Not work with ParallelNodes or NodeList.
    StateFile string
    // Resumes an interrupted/failed backup from where it was left off, given the .state file
    // that was generated from the interrupted/failed run.
    // Works only for default and/or partition backup. Not work with ParallelNodes or NodeList.
    Continue bool
    // How many records will be read on one iteration for continuation backup.
    // Affects size if overlap on resuming backup after an error.
    // By default, it must be zero. If any value is set, reading from Aerospike will be paginated.
    // Which affects the performance and RAM usage.
    PageSize int64
    // If set to true, the same number of workers will be created for each stage of the pipeline.
    // Each worker will be connected to the next stage worker with a separate unbuffered channel.
    PipelinesMode pipeline.Mode
    // When using directory parameter, prepend a prefix to the names of the generated files.
    OutputFilePrefix string
    // Retry policy for info commands.
    InfoRetryPolicy *models.RetryPolicy
}
Restore Configuration
Restore Configuration
type ConfigRestore struct {
    // InfoPolicy applies to Aerospike Info requests made during backup and restore
    // If nil, the Aerospike client's default policy will be used.
    InfoPolicy *a.InfoPolicy
    // WritePolicy applies to Aerospike write operations made during backup and restore
    // If nil, the Aerospike client's default policy will be used.
    WritePolicy *a.WritePolicy
    // Namespace details for the restore operation.
    // By default, the data is restored to the namespace from which it was taken.
    Namespace *RestoreNamespaceConfig `json:"namespace,omitempty"`
    // Encryption details.
    EncryptionPolicy *EncryptionPolicy
    // Compression details.
    CompressionPolicy *CompressionPolicy
    // Configuration of retries for each restore write operation.
    // If nil, no retries will be performed.
    RetryPolicy *models.RetryPolicy
    // Secret agent config.
    SecretAgentConfig *SecretAgentConfig
    // The sets to restore (optional, given an empty list, all sets will be restored).
    // Not applicable for ASBX restore.
    SetList []string
    // The bins to restore (optional, given an empty list, all bins will be restored).
    // Not applicable for ASBX restore.
    BinList []string
    // EncoderType describes an Encoder type that will be used on restoring.
    // Default `EncoderTypeASB` = 0.
    EncoderType EncoderType
    // Parallel is the number of concurrent record readers from backup files.
    Parallel int
    // RecordsPerSecond limits restore records per second (rps) rate.
    // Will not apply rps limit if RecordsPerSecond is zero (default).
    RecordsPerSecond int
    // Limits restore bandwidth (bytes per second).
    // Will not apply rps limit if Bandwidth is zero (default).
    Bandwidth int
    // Don't restore any records.
    NoRecords bool
    // Don't restore any secondary indexes.
    // Not applicable for ASBX restore.
    NoIndexes bool
    // Don't restore any UDFs.
    // Not applicable for ASBX restore.
    NoUDFs bool
    // Disables the use of batch writes when restoring records to the Aerospike cluster.
    // Not applicable for ASBX restore.
    DisableBatchWrites bool
    // The max allowed number of records per batch write call.
    // Not applicable for ASBX restore.
    BatchSize int
    // Max number of parallel writers to target AS cluster.
    // Not applicable for ASBX restore.
    MaxAsyncBatches int
    // Amount of extra time-to-live to add to records that have expirable void-times.
    // Must be set in seconds.
    // Not applicable for ASBX restore.
    ExtraTTL int64
    // Ignore permanent record-specific error.
    // E.g.: AEROSPIKE_RECORD_TOO_BIG.
    // By default, such errors are not ignored and restore terminates.
    // Not applicable for ASBX restore.
    IgnoreRecordError bool
    // Retry policy for info commands.
    InfoRetryPolicy *models.RetryPolicy
}

Advanced Features

Encryption

The library supports AES-128 and AES-256 encryption with keys from:

  • Files
  • Environment variables
  • Aerospike Secret Agent
// For backup encryption.
backupCfg.EncryptionPolicy = &backup.EncryptionPolicy{
    Mode:     backup.EncryptAES256,
    KeyFile:  &keyFilePath,
}
// For restore encrypted backup.
restoreCfg.EncryptionPolicy = &backup.EncryptionPolicy{
    Mode:     backup.EncryptAES256,
    KeyFile:  &keyFilePath,
}
Compression

ZSTD compression is supported with configurable compression levels:

// For backup compression.
backupCfg.CompressionPolicy = &backup.CompressionPolicy{
    Mode:  backup.CompressZSTD,
    Level: 3,
}
// For restore compressed backup.
restoreCfg.CompressionPolicy = &backup.CompressionPolicy{
    Mode:  backup.CompressZSTD,
    Level: 3,
}
Partition Filters

Backup specific partitions or ranges:

backupCfg.PartitionFilters = []*aerospike.PartitionFilter{
  // Filter by partition range.
  backup.NewPartitionFilterByRange(0, 100),
  // Filter by partition id.
  backup.NewPartitionFilterByID(200),
  // Filter by partition by exact partition digest.
  backup.NewPartitionFilterByDigest("source-ns1", "/+Ptyjj06wW9zx0AnxOmq45xJzs=")
  // Filter all records after digest.
  backup.NewPartitionFilterAfterDigest("source-ns1", "/+Ptyjj06wW9zx0AnxOmq45xJzs=")
}

Prerequisites

License

Apache License, Version 2.0. See LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	// MinParallel is the minimum number of workers to use during an operation.
	MinParallel = 1
	// MaxParallel is the maximum number of workers to use during an operation.
	MaxParallel = 1024
	// MaxPartitions is the maximum number of partitions in an Aerospike cluster.
	MaxPartitions = 4096
)
View Source
const (
	// CompressNone no compression.
	CompressNone = "NONE"
	// CompressZSTD compression using ZSTD.
	CompressZSTD = "ZSTD"
)

Compression modes

View Source
const (
	// EncryptNone no encryption.
	EncryptNone = "NONE"
	// EncryptAES128 encryption using AES128 algorithm.
	EncryptAES128 = "AES128"
	// EncryptAES256 encryption using AES256 algorithm.
	EncryptAES256 = "AES256"
)

Encryption modes

Variables

This section is empty.

Functions

func NewPartitionFilterAfterDigest

func NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error)

NewPartitionFilterAfterDigest returns a partition filter to scan records after the digest.

func NewPartitionFilterAll

func NewPartitionFilterAll() *a.PartitionFilter

NewPartitionFilterAll returns a partition range containing all partitions.

func NewPartitionFilterByDigest

func NewPartitionFilterByDigest(namespace, digest string) (*a.PartitionFilter, error)

NewPartitionFilterByDigest returns a partition filter by digest with specified value.

func NewPartitionFilterByID

func NewPartitionFilterByID(partitionID int) *a.PartitionFilter

NewPartitionFilterByID returns a partition filter by id with specified id.

func NewPartitionFilterByRange

func NewPartitionFilterByRange(begin, count int) *a.PartitionFilter

NewPartitionFilterByRange returns a partition range with boundaries specified by the provided values.

func ParsePartitionFilterListString

func ParsePartitionFilterListString(namespace, filters string) ([]*a.PartitionFilter, error)

ParsePartitionFilterListString parses comma separated values to slice of partition filters. Example: "0-1000,1000-1000,2222,EjRWeJq83vEjRRI0VniavN7xI0U=" Namespace can be empty, must be set only for partition by digest.

func ParsePartitionFilterString

func ParsePartitionFilterString(namespace, filter string) (*a.PartitionFilter, error)

ParsePartitionFilterString check inputs from string with regexp. Parse values and returns *aerospike.PartitionFilter or error. Namespace can be empty, must be set only for partition by digest.

func ParseSecret

func ParseSecret(config *SecretAgentConfig, secret string) (string, error)

ParseSecret checks if the provided string contains a secret key and attempts to retrieve the actual secret value from a secret agent, if configured.

If the input string does not contain a secret key it is returned as is, without any modification.

Types

type AerospikeClient

type AerospikeClient interface {
	GetDefaultScanPolicy() *a.ScanPolicy
	GetDefaultInfoPolicy() *a.InfoPolicy
	GetDefaultWritePolicy() *a.WritePolicy
	Put(policy *a.WritePolicy, key *a.Key, bins a.BinMap) a.Error
	CreateComplexIndex(policy *a.WritePolicy, namespace string, set string, indexName string, binName string,
		indexType a.IndexType, indexCollectionType a.IndexCollectionType, ctx ...*a.CDTContext,
	) (*a.IndexTask, a.Error)
	DropIndex(policy *a.WritePolicy, namespace string, set string, indexName string) a.Error
	RegisterUDF(policy *a.WritePolicy, udfBody []byte, serverPath string, language a.Language,
	) (*a.RegisterTask, a.Error)
	BatchOperate(policy *a.BatchPolicy, records []a.BatchRecordIfc) a.Error
	Cluster() *a.Cluster
	ScanPartitions(scanPolicy *a.ScanPolicy, partitionFilter *a.PartitionFilter, namespace string,
		setName string, binNames ...string) (*a.Recordset, a.Error)
	ScanNode(scanPolicy *a.ScanPolicy, node *a.Node, namespace string, setName string, binNames ...string,
	) (*a.Recordset, a.Error)
	Close()
	GetNodes() []*a.Node
	PutPayload(policy *a.WritePolicy, key *a.Key, payload []byte) a.Error
}

AerospikeClient describes aerospike client interface for easy mocking.

type BackupHandler

type BackupHandler struct {
	// contains filtered or unexported fields
}

BackupHandler handles a backup job.

func (*BackupHandler) GetMetrics

func (bh *BackupHandler) GetMetrics() *models.Metrics

GetMetrics returns metrics of the backup job.

func (*BackupHandler) GetStats

func (bh *BackupHandler) GetStats() *models.BackupStats

GetStats returns the stats of the backup job.

func (*BackupHandler) Wait

func (bh *BackupHandler) Wait(ctx context.Context) error

Wait waits for the backup job to complete and returns an error if the job failed.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the main entry point for the backup package. It wraps an aerospike client and provides methods to start backup and restore operations. Example usage:

asc, aerr := a.NewClientWithPolicy(...)	// create an aerospike client
if aerr != nil {
	// handle error
}

backupClient, err := backup.NewClient(asc, backup.WithID("id"))	// create a backup client
if err != nil {
	// handle error
}

writers, err := backup.NewWriterLocalDir("backups_folder", false)
if err != nil {
	// handle error
}

// use the backup client to start backup and restore operations
ctx := context.Background()
backupHandler, err := backupClient.Backup(ctx, writers, nil)
if err != nil {
	// handle error
}

// optionally, check the stats of the backup operation
stats := backupHandler.Stats()

// use the backupHandler to wait for the backup operation to finish
ctx := context.Background()
if err = backupHandler.Wait(ctx); err != nil {
	// handle error
}

func NewClient

func NewClient(ac AerospikeClient, opts ...ClientOpt) (*Client, error)

NewClient creates a new backup client.

  • ac is the aerospike client to use for backup and restore operations.

options:

  • WithID to set an identifier for the client.
  • WithLogger to set a logger that this client will log to.
  • WithScanLimiter to set a semaphore that is used to limit number of concurrent scans.

func (*Client) AerospikeClient

func (c *Client) AerospikeClient() AerospikeClient

AerospikeClient returns the underlying aerospike client.

func (*Client) Backup

func (c *Client) Backup(
	ctx context.Context,
	config *ConfigBackup,
	writer Writer,
	reader StreamingReader,
) (*BackupHandler, error)

Backup starts a backup operation that writes data to a provided writer.

  • ctx can be used to cancel the backup operation.
  • config is the configuration for the backup operation.
  • writer creates new writers for the backup operation.
  • reader is used only for reading a state file for continuation operations.

func (*Client) BackupXDR

func (c *Client) BackupXDR(
	ctx context.Context,
	config *ConfigBackupXDR,
	writer Writer,
) (*HandlerBackupXDR, error)

BackupXDR starts an xdr backup operation that writes data to a provided writer.

  • ctx can be used to cancel the backup operation.
  • config is the configuration for the xdr backup operation.
  • writer creates new writers for the backup operation.

func (*Client) Estimate

func (c *Client) Estimate(
	ctx context.Context,
	config *ConfigBackup,
	estimateSamples int64) (uint64, error)

Estimate calculates the backup size from a random sample of estimateSamples records number. It counts total records for backup, selects sample records, and interpolates the size of sample on total records count according to parallelism and compression.

  • ctx can be used to cancel the calculation operation.
  • config is the backup configuration for the calculation operation.
  • estimateSamples is number of records to be scanned for calculations.

func (*Client) Restore

func (c *Client) Restore(
	ctx context.Context,
	config *ConfigRestore,
	streamingReader StreamingReader,
) (Restorer, error)

Restore starts a restore operation that reads data from given readers. The backup data may be in a single file or multiple files.

  • ctx can be used to cancel the restore operation.
  • config is the configuration for the restore operation.
  • streamingReader provides readers with access to backup data.

type ClientOpt

type ClientOpt func(*Client)

ClientOpt is a functional option that allows configuring the Client.

func WithID

func WithID(id string) ClientOpt

WithID sets the ID for the Client. This ID is used for logging purposes.

func WithLogger

func WithLogger(logger *slog.Logger) ClientOpt

WithLogger sets the logger for the Client.

func WithScanLimiter

func WithScanLimiter(sem *semaphore.Weighted) ClientOpt

WithScanLimiter sets the scan limiter for the Client.

type CompressionPolicy

type CompressionPolicy struct {
	// The compression mode to be used (default is NONE).
	Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,ZSTD"`
	// The compression level to use (or -1 if unspecified).
	Level int `yaml:"level,omitempty" json:"level,omitempty"`
}

CompressionPolicy contains backup compression information.

func NewCompressionPolicy

func NewCompressionPolicy(mode string, level int) *CompressionPolicy

NewCompressionPolicy returns a new compression policy for backup/restore operations.

type ConfigBackup

type ConfigBackup struct {
	// InfoPolicy applies to Aerospike Info requests made during backup and
	// restore. If nil, the Aerospike client's default policy will be used.
	InfoPolicy *a.InfoPolicy
	// ScanPolicy applies to Aerospike scan operations made during backup and
	// restore. If nil, the Aerospike client's default policy will be used.
	ScanPolicy *a.ScanPolicy
	// Only include records that last changed before the given time (optional).
	ModBefore *time.Time
	// Only include records that last changed after the given time (optional).
	ModAfter *time.Time
	// Encryption details.
	EncryptionPolicy *EncryptionPolicy
	// Compression details.
	CompressionPolicy *CompressionPolicy
	// Secret agent config.
	SecretAgentConfig *SecretAgentConfig
	// PartitionFilters specifies the Aerospike partitions to back up.
	// Partition filters can be ranges, individual partitions,
	// or records after a specific digest within a single partition.
	// Note:
	// if not default partition filter NewPartitionFilterAll() is used,
	// each partition filter is an individual task which cannot be parallelized,
	// so you can only achieve as much parallelism as there are partition filters.
	// You may increase parallelism by dividing up partition ranges manually.
	// AfterDigest:
	// afterDigest filter can be applied with
	// NewPartitionFilterAfterDigest(namespace, digest string) (*a.PartitionFilter, error)
	// Backup records after record digest in record's partition plus all succeeding partitions.
	// Used to resume backup with last record received from previous incomplete backup.
	// This parameter will overwrite PartitionFilters.Begin value.
	// Can't be used in full backup mode.
	// This parameter is mutually exclusive with partition-list (not implemented).
	// Format: base64 encoded string.
	// Example: EjRWeJq83vEjRRI0VniavN7xI0U=
	PartitionFilters []*a.PartitionFilter
	// Namespace is the Aerospike namespace to back up.
	Namespace string
	// NodeList contains a list of nodes to back up.
	// <addr 1>:<port 1>[,<addr 2>:<port 2>[,...]] or <node name 1>[,<node name 2>[,...]]
	// To get the correct node address, use 'service-tls-std' if a database configured to use TLS
	// and 'service-clear-std' info command if no TLS is configured.
	// To get the node name, use the 'node:' info command.
	// Backup the given cluster nodes only.
	// If it is set, ParallelNodes automatically set to true.
	// This argument is mutually exclusive with partition-list/AfterDigest arguments.
	NodeList []string
	// SetList is the Aerospike set to back up (optional, given an empty list,
	// all sets will be backed up).
	SetList []string
	// The list of backup bin names
	// (optional, given an empty list, all bins will be backed up)
	BinList []string
	// The list of rack ids.
	// (optional, given an empty list, all racks will be backed up)
	RackList []int
	// ParallelNodes specifies how to perform scan.
	// If set to true, we launch parallel workers for nodes; otherwise workers run in parallel for partitions.
	// Excludes PartitionFilters param.
	ParallelNodes bool
	// EncoderType describes an Encoder type that will be used on backing up.
	// Default `EncoderTypeASB` = 0.
	EncoderType EncoderType
	// ParallelRead is the number of concurrent scans to run against the Aerospike cluster.
	ParallelRead int
	// ParallelWrite is the number of concurrent backup files writing.
	ParallelWrite int
	// Don't back up any records.
	NoRecords bool
	// Don't back up any secondary indexes.
	NoIndexes bool
	// Don't back up any UDFs.
	NoUDFs bool
	// RecordsPerSecond limits backup records per second (rps) rate.
	// Will not apply rps limit if RecordsPerSecond is zero (default).
	RecordsPerSecond int
	// Limits backup bandwidth (bytes per second).
	// The lower bound is 8MiB (maximum size of the Aerospike record).
	// Effective limit value is calculated using the formula:
	// Bandwidth * base64ratio + metaOverhead
	// Where: base64ratio = 1.34, metaOverhead = 16 * 1024
	// Will not apply rps limit if Bandwidth is zero (default).
	Bandwidth int
	// File size limit (in bytes) for the backup. If a backup file exceeds this
	// size threshold, a new file will be created. 0 for no file size limit.
	FileLimit uint64
	// Do not apply base-64 encoding to BLOBs: Bytes, HLL, RawMap, RawList.
	// Results in smaller backup files.
	Compact bool
	// Only include records that have no ttl set (persistent records).
	NoTTLOnly bool
	// Name of a state file that will be saved in backup directory.
	// Works only with FileLimit parameter.
	// As we reach FileLimit and close file, the current state will be saved.
	// Works only for default and/or partition backup.
	// Not work with ParallelNodes or NodeList.
	StateFile string
	// Resumes an interrupted/failed backup from where it was left off, given the .state file
	// that was generated from the interrupted/failed run.
	// Works only for default and/or partition backup. Not work with ParallelNodes or NodeList.
	Continue bool
	// How many records will be read on one iteration for continuation backup.
	// Affects size if overlap on resuming backup after an error.
	// By default, it must be zero. If any value is set, reading from Aerospike will be paginated.
	// Which affects the performance and RAM usage.
	PageSize int64
	// When using directory parameter, prepend a prefix to the names of the generated files.
	OutputFilePrefix string
	// Retry policy for info commands.
	InfoRetryPolicy *models.RetryPolicy
	// MetricsEnabled indicates whether backup metrics collection and reporting are enabled.
	MetricsEnabled bool
}

ConfigBackup contains configuration for the backup operation.

func NewDefaultBackupConfig

func NewDefaultBackupConfig() *ConfigBackup

NewDefaultBackupConfig returns a new ConfigBackup with default values.

type ConfigBackupXDR

type ConfigBackupXDR struct {
	// InfoPolicy applies to Aerospike Info requests made during backup and
	// restore. If nil, the Aerospike client's default policy will be used.
	InfoPolicy *a.InfoPolicy
	// Encryption details.
	EncryptionPolicy *EncryptionPolicy
	// Compression details.
	CompressionPolicy *CompressionPolicy
	// Secret agent config.
	SecretAgentConfig *SecretAgentConfig
	// EncoderType describes an Encoder type that will be used on backing up.
	// For XDR must be set to `EncoderTypeASBX` = 1.
	EncoderType EncoderType
	// File size limit (in bytes) for the backup. If a backup file exceeds this
	// size threshold, a new file will be created. 0 for no file size limit.
	FileLimit uint64
	// ParallelWrite is the number of concurrent backup files writing.
	ParallelWrite int
	// DC name of dc that will be created on source instance.
	DC string
	// Local address, where source cluster will send data.
	LocalAddress string
	// Local port, where source cluster will send data.
	LocalPort int
	// Namespace is the Aerospike namespace to back up.
	Namespace string
	// Rewind is used to ship all existing records of a namespace.
	// When rewinding a namespace, XDR will scan through the index and ship
	// all the records for that namespace, partition by partition.
	// Can be `all` or number of seconds.
	Rewind string
	// MaxThroughput number of records per second to ship using XDR. Must be in increments of 100.
	// If 0, the default server value will be used.
	MaxThroughput int
	// TLS config for secure XDR connection.
	TLSConfig *tls.Config
	// Timeout for TCP read operations.
	// Used by TCP server for XDR.
	ReadTimeout time.Duration
	// Timeout for TCP writes operations.
	// Used by TCP server for XDR.
	WriteTimeout time.Duration
	// Results queue size.
	// Used by TCP server for XDR.
	ResultQueueSize int
	// Ack messages queue size.
	// Used by TCP server for XDR.
	AckQueueSize int
	// Max number of allowed simultaneous connection to server.
	// Used by TCP server for XDR.
	MaxConnections int
	// How often a backup client will send info commands to check aerospike cluster stats.
	// To measure recovery state and lag.
	InfoPolingPeriod time.Duration
	// Timeout for starting TCP server for XDR.
	// If the TCP server for XDR does not receive any data within this timeout period, it will shut down.
	// This situation can occur if the LocalAddress and LocalPort options are misconfigured.
	StartTimeout time.Duration
	// Retry policy for info commands.
	InfoRetryPolicy *models.RetryPolicy
	// By default XDR writes that originated from another XDR are not forwarded to the specified destination
	// datacenters. Setting this parameter to true sends writes that originated from another XDR to the specified
	// destination datacenters.
	Forward bool
	// MetricsEnabled indicates whether backup metrics collection and reporting are enabled.
	MetricsEnabled bool
}

ConfigBackupXDR contains configuration for the xdr backup operation.

type ConfigRestore

type ConfigRestore struct {
	// InfoPolicy applies to Aerospike Info requests made during backup and restore
	// If nil, the Aerospike client's default policy will be used.
	InfoPolicy *a.InfoPolicy
	// WritePolicy applies to Aerospike write operations made during backup and restore
	// If nil, the Aerospike client's default policy will be used.
	WritePolicy *a.WritePolicy
	// Namespace details for the restore operation.
	// By default, the data is restored to the namespace from which it was taken.
	Namespace *RestoreNamespaceConfig `json:"namespace,omitempty"`
	// Encryption details.
	EncryptionPolicy *EncryptionPolicy
	// Compression details.
	CompressionPolicy *CompressionPolicy
	// Configuration of retries for each restore write operation.
	// If nil, no retries will be performed.
	RetryPolicy *models.RetryPolicy
	// Secret agent config.
	SecretAgentConfig *SecretAgentConfig
	// The sets to restore (optional, given an empty list, all sets will be restored).
	// Not applicable for XDR restore.
	SetList []string
	// The bins to restore (optional, given an empty list, all bins will be restored).
	// Not applicable for XDR restore.
	BinList []string
	// EncoderType describes an Encoder type that will be used on restoring.
	// Default `EncoderTypeASB` = 0.
	EncoderType EncoderType
	// Parallel is the number of concurrent record readers from backup files.
	Parallel int
	// RecordsPerSecond limits restore records per second (rps) rate.
	// Will not apply rps limit if RecordsPerSecond is zero (default).
	RecordsPerSecond int
	// Limits restore bandwidth (bytes per second).
	// The lower bound is 8MiB (maximum size of the Aerospike record).
	// Effective limit value is calculated using the formula:
	// Bandwidth * base64ratio + metaOverhead
	// Where: base64ratio = 1.34, metaOverhead = 16 * 1024
	// Will not apply rps limit if Bandwidth is zero (default).
	Bandwidth int
	// Don't restore any records.
	NoRecords bool
	// Don't restore any secondary indexes.
	// Not applicable for XDR restore.
	NoIndexes bool
	// Don't restore any UDFs.
	// Not applicable for XDR restore.
	NoUDFs bool
	// Disables the use of batch writes when restoring records to the Aerospike cluster.
	// Not applicable for XDR restore.
	DisableBatchWrites bool
	// The max allowed number of records per batch write call.
	// Not applicable for XDR restore.
	BatchSize int
	// Max number of parallel writers to target AS cluster.
	// Not applicable for XDR restore.
	MaxAsyncBatches int
	// Amount of extra time-to-live to add to records that have expirable void-times.
	// Must be set in seconds.
	// Not applicable for XDR restore.
	ExtraTTL int64
	// Ignore permanent record-specific error.
	// E.g.: AEROSPIKE_RECORD_TOO_BIG.
	// By default, such errors are not ignored and restore terminates.
	// Not applicable for XDR restore.
	IgnoreRecordError bool
	// Retry policy for info commands.
	InfoRetryPolicy *models.RetryPolicy
	// MetricsEnabled indicates whether backup metrics collection and reporting are enabled.
	MetricsEnabled bool
	// ValidateOnly indicates whether restore should only validate the backup files.
	ValidateOnly bool
}

ConfigRestore contains configuration for the restore operation.

func NewDefaultRestoreConfig

func NewDefaultRestoreConfig() *ConfigRestore

NewDefaultRestoreConfig returns a new ConfigRestore with default values.

type Decoder

type Decoder[T models.TokenConstraint] interface {
	NextToken() (T, error)
}

Decoder is an interface for reading backup data as tokens. It is used to support different data formats. While the return type is `any`, the actual types returned should only be the types exposed by the models package. e.g. *models.Record, *models.UDF and *models.SecondaryIndex

func NewDecoder

func NewDecoder[T models.TokenConstraint](eType EncoderType, src io.Reader, fileNumber uint64, fileName string,
) (Decoder[T], error)

NewDecoder returns a new Decoder according to `EncoderType`.

type Encoder

type Encoder[T models.TokenConstraint] interface {
	EncodeToken(T) ([]byte, error)
	GetHeader(uint64) []byte
	GenerateFilename(prefix, suffix string) string
}

Encoder is an interface for encoding the types from the models package. It is used to support different data formats.

func NewEncoder

func NewEncoder[T models.TokenConstraint](eType EncoderType, namespace string, compact bool) Encoder[T]

NewEncoder returns a new Encoder according to `EncoderType`.

type EncoderType

type EncoderType int

EncoderType custom type for Encoder types enum.

const (
	// EncoderTypeASB matches ASB Encoder with id 0.
	EncoderTypeASB EncoderType = iota
	EncoderTypeASBX
)

type EncryptionPolicy

type EncryptionPolicy struct {
	// The path to the file containing the encryption key.
	KeyFile *string `yaml:"key-file,omitempty" json:"key-file,omitempty"`
	// The name of the environment variable containing the encryption key.
	KeyEnv *string `yaml:"key-env,omitempty" json:"key-env,omitempty"`
	// The secret keyword in Aerospike Secret Agent containing the encryption key.
	KeySecret *string `yaml:"key-secret,omitempty" json:"key-secret,omitempty"`
	// The encryption mode to be used (NONE, AES128, AES256)
	Mode string `yaml:"mode,omitempty" json:"mode,omitempty" default:"NONE" enums:"NONE,AES128,AES256"`
}

EncryptionPolicy contains backup encryption information.

type HandlerBackupXDR

type HandlerBackupXDR struct {
	// contains filtered or unexported fields
}

HandlerBackupXDR handles a backup job over XDR protocol.

func (*HandlerBackupXDR) GetMetrics

func (bh *HandlerBackupXDR) GetMetrics() *models.Metrics

GetMetrics returns metrics of the backup job.

func (*HandlerBackupXDR) GetStats

func (bh *HandlerBackupXDR) GetStats() *models.BackupStats

GetStats returns the stats of the backup job.

func (*HandlerBackupXDR) Wait

func (bh *HandlerBackupXDR) Wait(ctx context.Context) error

Wait waits for the backup job to complete and returns an error if the job failed.

type RestoreHandler

type RestoreHandler[T models.TokenConstraint] struct {
	// contains filtered or unexported fields
}

RestoreHandler handles a restore job using the given reader.

func (*RestoreHandler[T]) GetMetrics

func (rh *RestoreHandler[T]) GetMetrics() *models.Metrics

GetMetrics returns the metrics of the restore job.

func (*RestoreHandler[T]) GetStats

func (rh *RestoreHandler[T]) GetStats() *models.RestoreStats

GetStats returns the stats of the restore job.

func (*RestoreHandler[T]) Wait

func (rh *RestoreHandler[T]) Wait(ctx context.Context) error

Wait waits for the restore job to complete and returns an error if the job failed.

type RestoreNamespaceConfig

type RestoreNamespaceConfig struct {
	// Original namespace name.
	Source *string `json:"source,omitempty" example:"source-ns" validate:"required"`
	// Destination namespace name.
	Destination *string `json:"destination,omitempty" example:"destination-ns" validate:"required"`
}

RestoreNamespaceConfig specifies an alternative namespace name for the restore operation, where Source is the original namespace name and Destination is the namespace name to which the backup data is to be restored.

type Restorer

type Restorer interface {
	GetStats() *models.RestoreStats
	Wait(ctx context.Context) error
	GetMetrics() *models.Metrics
}

Restorer represents restore handler interface.

type SecretAgentConfig

type SecretAgentConfig struct {
	// Connection type: tcp, unix.
	// Use constants form `secret-agent`: `ConnectionTypeTCP` or `ConnectionTypeUDS`
	ConnectionType *string `yaml:"sa-connection-type,omitempty" json:"sa-connection-type,omitempty"`
	// Secret agent host for TCP connection or socket file path for UDS connection.
	Address *string `yaml:"sa-address,omitempty" json:"sa-address,omitempty"`
	// Secret agent port (only for TCP connection).
	Port *int `yaml:"sa-port,omitempty" json:"sa-port,omitempty"`
	// Secret agent connection and reading timeout.
	// Default: 1000 millisecond.
	TimeoutMillisecond *int `yaml:"sa-timeout-millisecond,omitempty" json:"sa-timeout-millisecond,omitempty"`
	// Path to ca file for encrypted connection.
	CaFile *string `yaml:"sa-ca-file,omitempty" json:"sa-ca-file,omitempty"`
	// Flag that shows if secret agent responses are encrypted with base64.
	IsBase64 *bool `yaml:"sa-is-base64,omitempty" json:"sa-is-base64,omitempty"`
}

SecretAgentConfig contains Secret Agent connection information.

type State

type State struct {

	// Counter tracks the number of times the State instance has been initialized.
	// This is used to generate a unique suffix for backup files.
	Counter int
	// RecordsStateChan is a channel for communicating serialized partition filter
	// states.
	RecordsStateChan chan models.PartitionFilterSerialized
	// RecordStates stores the current states of all partition filters.
	RecordStates map[int]models.PartitionFilterSerialized

	RecordStatesSaved map[int]models.PartitionFilterSerialized
	// SaveCommandChan command to save current state for worker.
	SaveCommandChan chan int

	// FileName specifies the file name where the backup state is persisted.
	FileName string
	// FilePath specifies the file path where the backup state is persisted.
	FilePath string
	// contains filtered or unexported fields
}

State contains current backups status data. Fields must be Exportable for marshaling to GOB.

func NewState

func NewState(
	ctx context.Context,
	config *ConfigBackup,
	reader StreamingReader,
	writer Writer,
	logger *slog.Logger,
) (*State, error)

NewState creates and returns a State instance. If continuing a previous backup, the state is loaded from the specified state file. Otherwise, a new State instance is created.

type StreamingReader

type StreamingReader interface {
	// StreamFiles creates readers from files and sends them to the channel.
	// In case of an error, the error is sent to the error channel.
	// Must be run in a goroutine `go rh.reader.StreamFiles(ctx, readersCh, errorsCh)`.
	StreamFiles(context.Context, chan<- models.File, chan<- error)

	// StreamFile creates a single file reader and sends io.Readers to the `readersCh`
	// In case of an error, it is sent to the `errorsCh` channel.
	// Must be run in a goroutine `go rh.reader.StreamFile()`.
	StreamFile(ctx context.Context, filename string, readersCh chan<- models.File, errorsCh chan<- error)

	// GetType returns the type of storage. Used in logging.
	GetType() string

	// ListObjects return list of objects in the path.
	ListObjects(ctx context.Context, path string) ([]string, error)

	// GetSize returns the size of asb/asbx files in the path.
	GetSize() int64

	// GetNumber returns the number of asb/asbx files in the path.
	GetNumber() int64
}

StreamingReader defines an interface for accessing backup file data from a storage provider. Implementations, handling different storage types, are located within the io.storage package.

type Writer

type Writer interface {
	// NewWriter returns new writer for backup logic to use. Each call creates
	// a new writer, they might be working in parallel. Backup logic will close
	// the writer after backup is done. Header func is executed on a writer
	// after creation (on each one in case of multipart file).
	NewWriter(ctx context.Context, filename string) (io.WriteCloser, error)

	// GetType returns the type of storage. Used in logging.
	GetType() string

	// RemoveFiles removes a backup file or files from directory.
	RemoveFiles(ctx context.Context) error

	// Remove removes a file or directory at the specified path from the backup storage.
	// Returns an error if the operation fails.
	Remove(ctx context.Context, path string) error
}

Writer defines an interface for writing backup data to a storage provider. Implementations, handling different storage types, are located within the io.storage package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL