Documentation
¶
Index ¶
- Constants
- func StoreEvents(ctx context.Context, events <-chan *BucketEvent, store BucketStore) error
- type BucketEvent
- type BucketEventType
- type BucketStore
- type BucketStoreItem
- type CleanupPolicy
- type ConsoleBucketStore
- func (s *ConsoleBucketStore) Close()
- func (s *ConsoleBucketStore) Count(_ string) (int, error)
- func (s *ConsoleBucketStore) Delete(items ...*BucketStoreItem) error
- func (s *ConsoleBucketStore) Get(_, _ string) (*BucketStoreItem, error)
- func (s *ConsoleBucketStore) LastClusterUpdate() (time.Time, error)
- func (s *ConsoleBucketStore) Set(items ...*BucketStoreItem) error
- func (s *ConsoleBucketStore) SetLastClusterUpdate(t time.Time) error
- func (s *ConsoleBucketStore) SetOrUpdate(items ...*BucketStoreItem) error
- func (s *ConsoleBucketStore) Size(_ string) (int, error)
- func (s *ConsoleBucketStore) TakeOldest(_ string, _ int) ([]*BucketStoreItem, error)
- func (s *ConsoleBucketStore) Update(items ...*BucketStoreItem) error
- type DataSize
- type DeliveryPolicy
- type MinioConfig
- type MinioManager
- type NatsConfig
- type NatsReceiver
- type SQLiteBucketStore
- func (s *SQLiteBucketStore) Close()
- func (s *SQLiteBucketStore) Count(bucket string) (int, error)
- func (s *SQLiteBucketStore) Delete(items ...*BucketStoreItem) error
- func (s *SQLiteBucketStore) Get(bucket, key string) (*BucketStoreItem, error)
- func (s *SQLiteBucketStore) LastClusterUpdate() (time.Time, error)
- func (s *SQLiteBucketStore) Set(items ...*BucketStoreItem) error
- func (s *SQLiteBucketStore) SetLastClusterUpdate(t time.Time) error
- func (s *SQLiteBucketStore) SetOrUpdate(items ...*BucketStoreItem) error
- func (s *SQLiteBucketStore) Size(bucket string) (int, error)
- func (s *SQLiteBucketStore) TakeOldest(bucket string, totalSize int) ([]*BucketStoreItem, error)
- func (s *SQLiteBucketStore) Update(items ...*BucketStoreItem) error
- type StoreConfig
Constants ¶
const ( BucketEventOther BucketEventType = "Other" BucketEventRead = "Read" BucketEventWrite = "Write" BucketEventDelete = "Delete" BucketEventStat = "Stat" )
const SQLiteInMemory = "file::memory:"
SQLiteInMemory defines a dbpath that is stored in-memory
Variables ¶
This section is empty.
Functions ¶
func StoreEvents ¶
func StoreEvents(ctx context.Context, events <-chan *BucketEvent, store BucketStore) error
StoreEvents receives new BucketEvents and delivers them to a configured BucketStore. Blocks until either the event channel is closed, or the context is cancelled.
Types ¶
type BucketEvent ¶
type BucketEvent struct {
EventName notification.EventType `json:"EventName"`
Key string `json:"Key"`
Records []record `json:"Records"`
}
BucketEvent is an event generated by the Minio bucket notification. Only a subset of interesting fields are capture from the larger event payload.
func (*BucketEvent) SetEventTime ¶
func (e *BucketEvent) SetEventTime(t time.Time)
SetEventTime sets the EventTime field for each Record in the event
func (*BucketEvent) Type ¶
func (e *BucketEvent) Type() BucketEventType
Type returns the type of the BucketEvent, if it is considered an interesting type. All other types are returned as BucketEventOther
type BucketEventType ¶
type BucketEventType string
BucketEventType defines general events that have been consolidated from the more specific S3 bucket events
type BucketStore ¶
type BucketStore interface {
// Count the number of items in a given bucket.
// If bucket string is empty, count across all buckets.
Count(bucket string) (int, error)
// Size returns the total number of bytes for each object in the bucket.
Size(bucket string) (int, error)
// Get an item by bucket and key.
// If item does not exist, return nil item.
// Error is non nil only for problems with Store communication.
Get(bucket, key string) (*BucketStoreItem, error)
// Set 1 or more items in the store, if they do not already exist.
Set(items ...*BucketStoreItem) error
// SetOrUpdate sets 1 or more items in the store, and update the access time if they already exist.
SetOrUpdate(items ...*BucketStoreItem) error
// Update AccessTime and Size for existing items in the Store.
Update(items ...*BucketStoreItem) error
// Delete existing items.
Delete(items ...*BucketStoreItem) error
// TakeOldest pops 0 or more items from the Store with the oldest AccessTime, up to a max total size.
TakeOldest(bucket string, totalSize int) ([]*BucketStoreItem, error)
// LastClusterUpdate returns the timestamp that the last cluster data info was updated.
// If SetLastClusterUpdate has not yet been called, returns a time with zero value and nil error.
LastClusterUpdate() (time.Time, error)
// SetLastClusterUpdate sets the timestamp of the last time the cluster data info was updated
SetLastClusterUpdate(time.Time) error
// Close the store and free related resources.
Close()
}
BucketStore defines an interface for storing BucketStoreItems
type BucketStoreItem ¶
BucketStoreItem is an item stored in a BucketStore
type CleanupPolicy ¶
type CleanupPolicy struct {
Bucket string `toml:"bucket"`
// Target size in bytes of the Minio storage minioUsage, used to trigger cleanups.
// If the target size is 0, then the policy will not be executed.
TargetSize DataSize `toml:"target_size"`
// Must be explicitly set to true if you want to set target_size
// to 0 and remove all data on each policy execution.
AllowRemoveAll bool `toml:"allow_remove_all"`
// If greater than zero, limit the size of data that will be removed during
// each execution of the policy.
MaxRemoveSize DataSize `toml:"max_remove_size"`
}
CleanupPolicy defines a policy for a single bucket.
func NewCleanupPolicy ¶
func NewCleanupPolicy(bucket, targetSize string) (*CleanupPolicy, error)
NewCleanupPolicy creates a new policy for a target bucket, and a target usage size specified as a string (ie "200G").
func (*CleanupPolicy) IsValid ¶
func (p *CleanupPolicy) IsValid() bool
IsValid returns whether the policy is valid and should be executed
func (*CleanupPolicy) SetTargetSize ¶
func (p *CleanupPolicy) SetTargetSize(size string) error
SetTargetSize sets the target minio cluster size from a string, such as "200G" or "5000 MB"
func (*CleanupPolicy) Validate ¶
func (p *CleanupPolicy) Validate() error
Validate returns an error if the policy is not defined correctly.
type ConsoleBucketStore ¶
type ConsoleBucketStore struct {
// contains filtered or unexported fields
}
ConsoleBucketStore is a store type used for debugging, which prints each event to the console stdout
func NewConsoleBucketStore ¶
func NewConsoleBucketStore() *ConsoleBucketStore
func (*ConsoleBucketStore) Close ¶
func (s *ConsoleBucketStore) Close()
func (*ConsoleBucketStore) Delete ¶
func (s *ConsoleBucketStore) Delete(items ...*BucketStoreItem) error
func (*ConsoleBucketStore) Get ¶
func (s *ConsoleBucketStore) Get(_, _ string) (*BucketStoreItem, error)
func (*ConsoleBucketStore) LastClusterUpdate ¶
func (s *ConsoleBucketStore) LastClusterUpdate() (time.Time, error)
func (*ConsoleBucketStore) Set ¶
func (s *ConsoleBucketStore) Set(items ...*BucketStoreItem) error
func (*ConsoleBucketStore) SetLastClusterUpdate ¶
func (s *ConsoleBucketStore) SetLastClusterUpdate(t time.Time) error
func (*ConsoleBucketStore) SetOrUpdate ¶
func (s *ConsoleBucketStore) SetOrUpdate(items ...*BucketStoreItem) error
func (*ConsoleBucketStore) TakeOldest ¶
func (s *ConsoleBucketStore) TakeOldest(_ string, _ int) ([]*BucketStoreItem, error)
func (*ConsoleBucketStore) Update ¶
func (s *ConsoleBucketStore) Update(items ...*BucketStoreItem) error
type DataSize ¶
type DataSize uint64
func NewDataSize ¶
NewDataSize returns a new DataSize, parsed from a human-readable string. ie: "500GB", or "100 MB"
func (*DataSize) SetFromString ¶
func (*DataSize) UnmarshalText ¶
type DeliveryPolicy ¶
type DeliveryPolicy struct {
jetstream.DeliverPolicy
}
func (*DeliveryPolicy) UnmarshalText ¶
func (p *DeliveryPolicy) UnmarshalText(text []byte) error
type MinioConfig ¶
type MinioConfig struct {
Endpoint string `toml:"endpoint"`
AccessKey string `toml:"access_key"`
SecretKey string `toml:"secret_key"`
Secure bool `toml:"secure"`
// How often the cleanup interval should run.
// If the duration is zero, then policies will not be executed.
CheckInterval time.Duration `toml:"check_interval"`
// One or more policies with a specific Bucket to clean.
// The cleanup operation runs the policies in order.
BucketPolicies []*CleanupPolicy `toml:"policies"`
}
func (*MinioConfig) Validate ¶
func (c *MinioConfig) Validate() error
type MinioManager ¶
type MinioManager struct {
// contains filtered or unexported fields
}
MinioManager defines credentials to connect to a Minio cluster endpoint, and a collection of cleanup policies to execute at a defined interval.
func NewMinioManager ¶
func NewMinioManager(cfg *MinioConfig, store BucketStore) *MinioManager
NewMinioManager creates a MinioManager using a configuration of credentials, and a BucketStore to read object access times and sizes. Manager does not take ownership of calling BucketStore.Close()
type NatsConfig ¶
type NatsConfig struct {
Servers []string `toml:"servers"`
Stream string `toml:"stream"`
Durable string `toml:"durable"`
Secure bool `toml:"secure"`
InactiveThreshold time.Duration `toml:"inactive_threshold"`
DeliveryPolicy DeliveryPolicy `toml:"delivery_policy"`
DeliveryStartSeq uint64 `toml:"delivery_start_seq"`
DeliveryStartTime *time.Time `toml:"delivery_start_time"`
}
func (*NatsConfig) Validate ¶
func (c *NatsConfig) Validate() error
type NatsReceiver ¶
type NatsReceiver struct {
// If true, handle object stat (HEAD) requests the same
// as set (CREATE if not exists). Used for back-filling from object stats.
SetFromStat bool
// contains filtered or unexported fields
}
NatsReceiver consumes Minio Bucket events from a Nats stream
func NewNatsReceiver ¶
func NewNatsReceiver(cfg NatsConfig) *NatsReceiver
NewNatsReceiver constructs a NatsReceiver using Nats connection options, an existing stream name, and stream consumer options
func (*NatsReceiver) Listen ¶
func (r *NatsReceiver) Listen(ctx context.Context, events chan<- *BucketEvent) error
Listen to a Nats stream, consuming Bucket events and passing them to the event channel. Blocks until the context is cancelled.
type SQLiteBucketStore ¶
type SQLiteBucketStore struct {
// contains filtered or unexported fields
}
SQLiteBucketStore is a store implementation that uses a sqlite database to store BucketStoreItems
func NewSQLiteBucketStore ¶
func NewSQLiteBucketStore(cfg StoreConfig) (*SQLiteBucketStore, error)
NewSQLiteBucketStore opens the sqlite database named by config path, and builds the schema as needed. The store should be closed when it is no longer needed.
func (*SQLiteBucketStore) Close ¶
func (s *SQLiteBucketStore) Close()
func (*SQLiteBucketStore) Delete ¶
func (s *SQLiteBucketStore) Delete(items ...*BucketStoreItem) error
func (*SQLiteBucketStore) Get ¶
func (s *SQLiteBucketStore) Get(bucket, key string) (*BucketStoreItem, error)
func (*SQLiteBucketStore) LastClusterUpdate ¶
func (s *SQLiteBucketStore) LastClusterUpdate() (time.Time, error)
func (*SQLiteBucketStore) Set ¶
func (s *SQLiteBucketStore) Set(items ...*BucketStoreItem) error
func (*SQLiteBucketStore) SetLastClusterUpdate ¶
func (s *SQLiteBucketStore) SetLastClusterUpdate(t time.Time) error
func (*SQLiteBucketStore) SetOrUpdate ¶
func (s *SQLiteBucketStore) SetOrUpdate(items ...*BucketStoreItem) error
func (*SQLiteBucketStore) TakeOldest ¶
func (s *SQLiteBucketStore) TakeOldest(bucket string, totalSize int) ([]*BucketStoreItem, error)
func (*SQLiteBucketStore) Update ¶
func (s *SQLiteBucketStore) Update(items ...*BucketStoreItem) error
type StoreConfig ¶
func (*StoreConfig) Validate ¶
func (c *StoreConfig) Validate() error