pkg

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BucketEventOther  BucketEventType = "Other"
	BucketEventRead                   = "Read"
	BucketEventWrite                  = "Write"
	BucketEventDelete                 = "Delete"
	BucketEventStat                   = "Stat"
)
View Source
const SQLiteInMemory = "file::memory:"

SQLiteInMemory defines a dbpath that is stored in-memory

Variables

This section is empty.

Functions

func StoreEvents

func StoreEvents(ctx context.Context, events <-chan *BucketEvent, store BucketStore) error

StoreEvents receives new BucketEvents and delivers them to a configured BucketStore. Blocks until either the event channel is closed, or the context is cancelled.

Types

type BucketEvent

type BucketEvent struct {
	EventName notification.EventType `json:"EventName"`
	Key       string                 `json:"Key"`
	Records   []record               `json:"Records"`
}

BucketEvent is an event generated by the Minio bucket notification. Only a subset of interesting fields are capture from the larger event payload.

func (*BucketEvent) Check

func (e *BucketEvent) Check() error

Check the event for validity

func (*BucketEvent) SetEventTime

func (e *BucketEvent) SetEventTime(t time.Time)

SetEventTime sets the EventTime field for each Record in the event

func (*BucketEvent) Type

func (e *BucketEvent) Type() BucketEventType

Type returns the type of the BucketEvent, if it is considered an interesting type. All other types are returned as BucketEventOther

type BucketEventType

type BucketEventType string

BucketEventType defines general events that have been consolidated from the more specific S3 bucket events

type BucketStore

type BucketStore interface {
	// Count the number of items in a given bucket.
	// If bucket string is empty, count across all buckets.
	Count(bucket string) (int, error)
	// Size returns the total number of bytes for each object in the bucket.
	Size(bucket string) (int, error)
	// Get an item by bucket and key.
	// If item does not exist, return nil item.
	// Error is non nil only for problems with Store communication.
	Get(bucket, key string) (*BucketStoreItem, error)
	// Set 1 or more items in the store, if they do not already exist.
	Set(items ...*BucketStoreItem) error
	// SetOrUpdate sets 1 or more items in the store, and update the access time if they already exist.
	SetOrUpdate(items ...*BucketStoreItem) error
	// Update AccessTime and Size for existing items in the Store.
	Update(items ...*BucketStoreItem) error
	// Delete existing items.
	Delete(items ...*BucketStoreItem) error
	// TakeOldest pops 0 or more items from the Store with the oldest AccessTime, up to a max total size.
	TakeOldest(bucket string, totalSize int) ([]*BucketStoreItem, error)
	// LastClusterUpdate returns the timestamp that the last cluster data info was updated.
	// If SetLastClusterUpdate has not yet been called, returns a time with zero value and nil error.
	LastClusterUpdate() (time.Time, error)
	// SetLastClusterUpdate sets the timestamp of the last time the cluster data info was updated
	SetLastClusterUpdate(time.Time) error

	// Close the store and free related resources.
	Close()
}

BucketStore defines an interface for storing BucketStoreItems

type BucketStoreItem

type BucketStoreItem struct {
	Bucket     string
	Key        string
	AccessTime time.Time
	Size       int
}

BucketStoreItem is an item stored in a BucketStore

type CleanupPolicy

type CleanupPolicy struct {
	Bucket string `toml:"bucket"`

	// Target size in bytes of the Minio storage minioUsage, used to trigger cleanups.
	// If the target size is 0, then the policy will not be executed.
	TargetSize DataSize `toml:"target_size"`

	// Must be explicitly set to true if you want to set target_size
	// to 0 and remove all data on each policy execution.
	AllowRemoveAll bool `toml:"allow_remove_all"`

	// If greater than zero, limit the size of data that will be removed during
	// each execution of the policy.
	MaxRemoveSize DataSize `toml:"max_remove_size"`
}

CleanupPolicy defines a policy for a single bucket.

func NewCleanupPolicy

func NewCleanupPolicy(bucket, targetSize string) (*CleanupPolicy, error)

NewCleanupPolicy creates a new policy for a target bucket, and a target usage size specified as a string (ie "200G").

func (*CleanupPolicy) IsValid

func (p *CleanupPolicy) IsValid() bool

IsValid returns whether the policy is valid and should be executed

func (*CleanupPolicy) SetTargetSize

func (p *CleanupPolicy) SetTargetSize(size string) error

SetTargetSize sets the target minio cluster size from a string, such as "200G" or "5000 MB"

func (*CleanupPolicy) Validate

func (p *CleanupPolicy) Validate() error

Validate returns an error if the policy is not defined correctly.

type ConsoleBucketStore

type ConsoleBucketStore struct {
	// contains filtered or unexported fields
}

ConsoleBucketStore is a store type used for debugging, which prints each event to the console stdout

func NewConsoleBucketStore

func NewConsoleBucketStore() *ConsoleBucketStore

func (*ConsoleBucketStore) Close

func (s *ConsoleBucketStore) Close()

func (*ConsoleBucketStore) Count

func (s *ConsoleBucketStore) Count(_ string) (int, error)

func (*ConsoleBucketStore) Delete

func (s *ConsoleBucketStore) Delete(items ...*BucketStoreItem) error

func (*ConsoleBucketStore) Get

func (*ConsoleBucketStore) LastClusterUpdate

func (s *ConsoleBucketStore) LastClusterUpdate() (time.Time, error)

func (*ConsoleBucketStore) Set

func (s *ConsoleBucketStore) Set(items ...*BucketStoreItem) error

func (*ConsoleBucketStore) SetLastClusterUpdate

func (s *ConsoleBucketStore) SetLastClusterUpdate(t time.Time) error

func (*ConsoleBucketStore) SetOrUpdate

func (s *ConsoleBucketStore) SetOrUpdate(items ...*BucketStoreItem) error

func (*ConsoleBucketStore) Size

func (s *ConsoleBucketStore) Size(_ string) (int, error)

func (*ConsoleBucketStore) TakeOldest

func (s *ConsoleBucketStore) TakeOldest(_ string, _ int) ([]*BucketStoreItem, error)

func (*ConsoleBucketStore) Update

func (s *ConsoleBucketStore) Update(items ...*BucketStoreItem) error

type DataSize

type DataSize uint64

func NewDataSize

func NewDataSize(size string) (DataSize, error)

NewDataSize returns a new DataSize, parsed from a human-readable string. ie: "500GB", or "100 MB"

func (*DataSize) SetFromString

func (s *DataSize) SetFromString(size string) error

func (*DataSize) UnmarshalText

func (s *DataSize) UnmarshalText(text []byte) error

type DeliveryPolicy

type DeliveryPolicy struct {
	jetstream.DeliverPolicy
}

func (*DeliveryPolicy) UnmarshalText

func (p *DeliveryPolicy) UnmarshalText(text []byte) error

type MinioConfig

type MinioConfig struct {
	Endpoint  string `toml:"endpoint"`
	AccessKey string `toml:"access_key"`
	SecretKey string `toml:"secret_key"`
	Secure    bool   `toml:"secure"`

	// How often the cleanup interval should run.
	// If the duration is zero, then policies will not be executed.
	CheckInterval time.Duration `toml:"check_interval"`

	// One or more policies with a specific Bucket to clean.
	// The cleanup operation runs the policies in order.
	BucketPolicies []*CleanupPolicy `toml:"policies"`
}

func (*MinioConfig) Validate

func (c *MinioConfig) Validate() error

type MinioManager

type MinioManager struct {
	// contains filtered or unexported fields
}

MinioManager defines credentials to connect to a Minio cluster endpoint, and a collection of cleanup policies to execute at a defined interval.

func NewMinioManager

func NewMinioManager(cfg *MinioConfig, store BucketStore) *MinioManager

NewMinioManager creates a MinioManager using a configuration of credentials, and a BucketStore to read object access times and sizes. Manager does not take ownership of calling BucketStore.Close()

func (*MinioManager) Run

func (m *MinioManager) Run(ctx context.Context) error

Run starts a blocking loop and executes the bucket policies after the configured interval. Call returns when the context is cancelled. If the interval is not > 0, an error will be returned.

type NatsConfig

type NatsConfig struct {
	Servers           []string       `toml:"servers"`
	Stream            string         `toml:"stream"`
	Durable           string         `toml:"durable"`
	Secure            bool           `toml:"secure"`
	InactiveThreshold time.Duration  `toml:"inactive_threshold"`
	DeliveryPolicy    DeliveryPolicy `toml:"delivery_policy"`
	DeliveryStartSeq  uint64         `toml:"delivery_start_seq"`
	DeliveryStartTime *time.Time     `toml:"delivery_start_time"`
}

func (*NatsConfig) Validate

func (c *NatsConfig) Validate() error

type NatsReceiver

type NatsReceiver struct {

	// If true, handle object stat (HEAD) requests the same
	// as set (CREATE if not exists). Used for back-filling from object stats.
	SetFromStat bool
	// contains filtered or unexported fields
}

NatsReceiver consumes Minio Bucket events from a Nats stream

func NewNatsReceiver

func NewNatsReceiver(cfg NatsConfig) *NatsReceiver

NewNatsReceiver constructs a NatsReceiver using Nats connection options, an existing stream name, and stream consumer options

func (*NatsReceiver) Listen

func (r *NatsReceiver) Listen(ctx context.Context, events chan<- *BucketEvent) error

Listen to a Nats stream, consuming Bucket events and passing them to the event channel. Blocks until the context is cancelled.

type SQLiteBucketStore

type SQLiteBucketStore struct {
	// contains filtered or unexported fields
}

SQLiteBucketStore is a store implementation that uses a sqlite database to store BucketStoreItems

func NewSQLiteBucketStore

func NewSQLiteBucketStore(cfg StoreConfig) (*SQLiteBucketStore, error)

NewSQLiteBucketStore opens the sqlite database named by config path, and builds the schema as needed. The store should be closed when it is no longer needed.

func (*SQLiteBucketStore) Close

func (s *SQLiteBucketStore) Close()

func (*SQLiteBucketStore) Count

func (s *SQLiteBucketStore) Count(bucket string) (int, error)

func (*SQLiteBucketStore) Delete

func (s *SQLiteBucketStore) Delete(items ...*BucketStoreItem) error

func (*SQLiteBucketStore) Get

func (s *SQLiteBucketStore) Get(bucket, key string) (*BucketStoreItem, error)

func (*SQLiteBucketStore) LastClusterUpdate

func (s *SQLiteBucketStore) LastClusterUpdate() (time.Time, error)

func (*SQLiteBucketStore) Set

func (s *SQLiteBucketStore) Set(items ...*BucketStoreItem) error

func (*SQLiteBucketStore) SetLastClusterUpdate

func (s *SQLiteBucketStore) SetLastClusterUpdate(t time.Time) error

func (*SQLiteBucketStore) SetOrUpdate

func (s *SQLiteBucketStore) SetOrUpdate(items ...*BucketStoreItem) error

func (*SQLiteBucketStore) Size

func (s *SQLiteBucketStore) Size(bucket string) (int, error)

func (*SQLiteBucketStore) TakeOldest

func (s *SQLiteBucketStore) TakeOldest(bucket string, totalSize int) ([]*BucketStoreItem, error)

func (*SQLiteBucketStore) Update

func (s *SQLiteBucketStore) Update(items ...*BucketStoreItem) error

type StoreConfig

type StoreConfig struct {
	Path      string `toml:"db_path"`
	EnableWAL bool   `toml:"enable_wal"`
}

func (*StoreConfig) Validate

func (c *StoreConfig) Validate() error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL