objectstorage

package
v0.0.0-...-5dd4785 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2020 License: Apache-2.0, BSD-2-Clause Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BatchWriterQueueSize    = BatchWriterBatchSize
	BatchWriterBatchSize    = 10000
	BatchWriterBatchTimeout = 500 * time.Millisecond
)

Variables

View Source
var LeakDetection = struct {
	WrapCachedObject                 func(cachedObject *CachedObjectImpl, skipCallerFrames int) CachedObject
	ReportCachedObjectClosedTooOften func(wrappedCachedObject LeakDetectionWrapper, secondCallStack *reflect.CallStack)
	MonitorCachedObjectReleased      func(wrappedCachedObject LeakDetectionWrapper, options *LeakDetectionOptions)
	RegisterCachedObjectRetained     func(wrappedCachedObject LeakDetectionWrapper, options *LeakDetectionOptions)
	RegisterCachedObjectReleased     func(wrappedCachedObject LeakDetectionWrapper, options *LeakDetectionOptions)
}{
	WrapCachedObject:                 wrapCachedObject,
	ReportCachedObjectClosedTooOften: reportCachedObjectClosedTooOften,
	MonitorCachedObjectReleased:      monitorCachedObjectReleased,
	RegisterCachedObjectRetained:     registerCachedObjectRetained,
	RegisterCachedObjectReleased:     registerCachedObjectReleased,
}

Functions

This section is empty.

Types

type BatchedWriter

type BatchedWriter struct {
	// contains filtered or unexported fields
}

func NewBatchedWriter

func NewBatchedWriter(store kvstore.KVStore) *BatchedWriter

func (*BatchedWriter) StartBatchWriter

func (bw *BatchedWriter) StartBatchWriter()

func (*BatchedWriter) StopBatchWriter

func (bw *BatchedWriter) StopBatchWriter()

type CachedObject

type CachedObject interface {
	Exists() bool
	Get() (result StorableObject)
	Consume(consumer func(StorableObject), forceRelease ...bool) bool
	Retain() CachedObject

	Release(force ...bool)
	Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject
	RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject
	// contains filtered or unexported methods
}

type CachedObjectImpl

type CachedObjectImpl struct {
	// contains filtered or unexported fields
}

func NewEmptyCachedObject

func NewEmptyCachedObject(key []byte) (result *CachedObjectImpl)

Creates an "empty" CachedObjectImpl, that is not part of any ObjectStorage.

Sometimes, we want to be able to offer a "filtered view" on the ObjectStorage and therefore be able to return an "empty" value on load operations even if the underlying object exists (i.e. the value tangle on top of the normal tangle only returns value transactions in its load operations).

func (*CachedObjectImpl) Consume

func (cachedObject *CachedObjectImpl) Consume(consumer func(StorableObject), forceRelease ...bool) bool

Directly consumes the StorableObject. This method automatically Release()s the object when the callback is done. Returns true if the callback was called.

func (*CachedObjectImpl) Exists

func (cachedObject *CachedObjectImpl) Exists() bool

Exists returns true if the StorableObject in this container does exist (could be found in the database and was not marked as deleted).

func (*CachedObjectImpl) Get

func (cachedObject *CachedObjectImpl) Get() (result StorableObject)

Retrieves the StorableObject, that is cached in this container.

func (*CachedObjectImpl) RTransaction

func (cachedObject *CachedObjectImpl) RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

RTransaction is a synchronization primitive that executes the callback together with other RTransactions but never together with a normal Transaction.

The identifiers allow to define the scope of the RTransaction. RTransactions with different scopes can run at the same time independently of other RTransactions and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.RLock/RUnlock.

func (*CachedObjectImpl) Release

func (cachedObject *CachedObjectImpl) Release(force ...bool)

Releases the object, to be picked up by the persistence layer (as soon as all consumers are done).

func (*CachedObjectImpl) Retain

func (cachedObject *CachedObjectImpl) Retain() CachedObject

Registers a new consumer for this cached object.

func (*CachedObjectImpl) Transaction

func (cachedObject *CachedObjectImpl) Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

Transaction is a synchronization primitive that executes the callback atomically which means that if multiple Transactions are being started from different goroutines, then only one of them can run at the same time.

The identifiers allow to define the scope of the Transaction. Transactions with different scopes can run at the same time and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.Lock/Unlock.

type CachedObjects

type CachedObjects []CachedObject

func (CachedObjects) Release

func (cachedObjects CachedObjects) Release(force ...bool)

type ConsumerFunc

type ConsumerFunc = func(key []byte, cachedObject *CachedObjectImpl) bool

type Events

type Events struct {
	ObjectEvicted *events.Event
}

type Factory

type Factory struct {
	// contains filtered or unexported fields
}

Factory is a utility that offers an api for a more compact creation of multiple ObjectStorage instances from within the same package. It will automatically configure a new KVStore instance with the corresponding realm and provide it to the created ObjectStorage instances.

func NewFactory

func NewFactory(store kvstore.KVStore, packagePrefix byte) *Factory

NewFactory creates a new Factory with the given ObjectStorage parameters.

func (*Factory) New

func (factory *Factory) New(storagePrefix byte, objectFactory StorableObjectFactory, optionalOptions ...Option) *ObjectStorage

New creates a new ObjectStorage with the given parameters. It combines the store specific prefix with the package prefix, to create a unique realm for the KVStore of the ObjectStorage.

type LeakDetectionOptions

type LeakDetectionOptions struct {
	MaxConsumersPerObject int
	MaxConsumerHoldTime   time.Duration
}

type LeakDetectionWrapper

type LeakDetectionWrapper interface {
	CachedObject

	Base() *CachedObjectImpl
	GetInternalId() int64
	SetRetainCallStack(callStack *reflect.CallStack)
	GetRetainCallStack() *reflect.CallStack
	GetRetainTime() time.Time
	SetReleaseCallStack(callStack *reflect.CallStack)
	GetReleaseCallStack() *reflect.CallStack
	WasReleased() bool
}

type LeakDetectionWrapperImpl

type LeakDetectionWrapperImpl struct {
	*CachedObjectImpl
	// contains filtered or unexported fields
}

func (*LeakDetectionWrapperImpl) Base

func (wrappedCachedObject *LeakDetectionWrapperImpl) Base() *CachedObjectImpl

func (*LeakDetectionWrapperImpl) Consume

func (wrappedCachedObject *LeakDetectionWrapperImpl) Consume(consumer func(StorableObject), forceRelease ...bool) bool

func (*LeakDetectionWrapperImpl) GetInternalId

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetInternalId() int64

func (*LeakDetectionWrapperImpl) GetReleaseCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetReleaseCallStack() *reflect.CallStack

func (*LeakDetectionWrapperImpl) GetRetainCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetRetainCallStack() *reflect.CallStack

func (*LeakDetectionWrapperImpl) GetRetainTime

func (wrappedCachedObject *LeakDetectionWrapperImpl) GetRetainTime() time.Time

func (*LeakDetectionWrapperImpl) RTransaction

func (wrappedCachedObject *LeakDetectionWrapperImpl) RTransaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

RTransaction is a synchronization primitive that executes the callback together with other RTransactions but never together with a normal Transaction.

The identifiers allow to define the scope of the RTransaction. RTransactions with different scopes can run at the same time independently of other RTransactions and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.RLock/RUnlock.

func (*LeakDetectionWrapperImpl) Release

func (wrappedCachedObject *LeakDetectionWrapperImpl) Release(force ...bool)

func (*LeakDetectionWrapperImpl) Retain

func (wrappedCachedObject *LeakDetectionWrapperImpl) Retain() CachedObject

func (*LeakDetectionWrapperImpl) SetReleaseCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) SetReleaseCallStack(releaseCallStack *reflect.CallStack)

func (*LeakDetectionWrapperImpl) SetRetainCallStack

func (wrappedCachedObject *LeakDetectionWrapperImpl) SetRetainCallStack(retainCallStack *reflect.CallStack)

func (*LeakDetectionWrapperImpl) Transaction

func (wrappedCachedObject *LeakDetectionWrapperImpl) Transaction(callback func(object StorableObject), identifiers ...interface{}) CachedObject

Transaction is a synchronization primitive that executes the callback atomically which means that if multiple Transactions are being started from different goroutines, then only one of them can run at the same time.

The identifiers allow to define the scope of the Transaction. Transactions with different scopes can run at the same time and act as if they are secured by different mutexes.

It is also possible to provide multiple identifiers and the callback waits until all of them can be acquired at the same time. In contrast to normal mutexes where acquiring multiple locks can lead to deadlocks, this method is deadlock safe.

Note: It is the equivalent of a mutex.Lock/Unlock.

func (*LeakDetectionWrapperImpl) WasReleased

func (wrappedCachedObject *LeakDetectionWrapperImpl) WasReleased() bool

type ObjectStorage

type ObjectStorage struct {
	Events Events
	// contains filtered or unexported fields
}

ObjectStorage is a manual cache which keeps objects as long as consumers are using it.

func New

func New(store kvstore.KVStore, objectFactory StorableObjectFactory, optionalOptions ...Option) *ObjectStorage

New is the constructor for the ObjectStorage.

func (*ObjectStorage) ComputeIfAbsent

func (objectStorage *ObjectStorage) ComputeIfAbsent(key []byte, remappingFunction func(key []byte) StorableObject) CachedObject

func (*ObjectStorage) Contains

func (objectStorage *ObjectStorage) Contains(key []byte) (result bool)

func (*ObjectStorage) Delete

func (objectStorage *ObjectStorage) Delete(key []byte)

Performs a "blind delete", where we do not check the objects existence. blindDelete is used to delete without accessing the value log.

func (*ObjectStorage) DeleteEntriesFromStore

func (objectStorage *ObjectStorage) DeleteEntriesFromStore(keys [][]byte)

DeleteEntriesFromStore deletes entries from the persistence layer.

func (*ObjectStorage) DeleteEntryFromStore

func (objectStorage *ObjectStorage) DeleteEntryFromStore(key []byte)

DeleteEntryFromStore deletes an entry from the persistence layer.

func (*ObjectStorage) DeleteIfPresent

func (objectStorage *ObjectStorage) DeleteIfPresent(key []byte) bool

This method deletes an element and return true if the element was deleted.

func (*ObjectStorage) Flush

func (objectStorage *ObjectStorage) Flush()

func (*ObjectStorage) ForEach

func (objectStorage *ObjectStorage) ForEach(consumer func(key []byte, cachedObject CachedObject) bool, optionalPrefix ...[]byte)

ForEach calls the consumer function on every object residing within the cache and the underlying persistence layer.

func (*ObjectStorage) ForEachKeyOnly

func (objectStorage *ObjectStorage) ForEachKeyOnly(consumer func(key []byte) bool, skipCache bool, optionalPrefix ...[]byte)

ForEachKeyOnly calls the consumer function on every storage key residing within the cache and the underlying persistence layer.

func (*ObjectStorage) Get

func (objectStorage *ObjectStorage) Get(key []byte) CachedObject

func (*ObjectStorage) GetSize

func (objectStorage *ObjectStorage) GetSize() int

func (*ObjectStorage) Load

func (objectStorage *ObjectStorage) Load(key []byte) CachedObject

func (*ObjectStorage) LoadObjectFromStore

func (objectStorage *ObjectStorage) LoadObjectFromStore(key []byte) StorableObject

LoadObjectFromStore loads a storable object from the persistence layer.

func (*ObjectStorage) ObjectExistsInStore

func (objectStorage *ObjectStorage) ObjectExistsInStore(key []byte) bool

func (*ObjectStorage) Prune

func (objectStorage *ObjectStorage) Prune() error

func (*ObjectStorage) Put

func (objectStorage *ObjectStorage) Put(object StorableObject) CachedObject

func (*ObjectStorage) Shutdown

func (objectStorage *ObjectStorage) Shutdown()

func (*ObjectStorage) Store

func (objectStorage *ObjectStorage) Store(object StorableObject) CachedObject

func (*ObjectStorage) StoreIfAbsent

func (objectStorage *ObjectStorage) StoreIfAbsent(object StorableObject) (result CachedObject, stored bool)

Stores an object only if it was not stored before. In contrast to "ComputeIfAbsent", this method does not access the value log. If the object was not stored, then the returned CachedObject is nil and does not need to be Released.

type Option

type Option func(*Options)

func BatchedWriterInstance

func BatchedWriterInstance(batchedWriterInstance *BatchedWriter) Option

func CacheTime

func CacheTime(duration time.Duration) Option

func KeysOnly

func KeysOnly(keysOnly bool) Option

func LeakDetectionEnabled

func LeakDetectionEnabled(leakDetectionEnabled bool, options ...LeakDetectionOptions) Option

func LogAccess

func LogAccess(fileName string, commandsFilter ...kvstore.Command) Option

LogAccess sets up a logger that logs all calls to the underlying store in the given file. It is possible to filter the logged commands by providing an optional filter flag.

func OverrideLeakDetectionWrapper

func OverrideLeakDetectionWrapper(wrapperFunc func(cachedObject *CachedObjectImpl) LeakDetectionWrapper) Option

func PartitionKey

func PartitionKey(keyPartitions ...int) Option

func PersistenceEnabled

func PersistenceEnabled(persistenceEnabled bool) Option

func StoreOnCreation

func StoreOnCreation(store bool) Option

StoreOnCreation writes an object directly to the persistence layer on creation.

type Options

type Options struct {
	// contains filtered or unexported fields
}

type PartitionsManager

type PartitionsManager struct {
	// contains filtered or unexported fields
}

func NewPartitionsManager

func NewPartitionsManager() *PartitionsManager

func (*PartitionsManager) IsEmpty

func (partitionsManager *PartitionsManager) IsEmpty() bool

func (*PartitionsManager) IsRetained

func (partitionsManager *PartitionsManager) IsRetained(keys []string) bool

func (*PartitionsManager) Release

func (partitionsManager *PartitionsManager) Release(keysToRelease []string) bool

func (*PartitionsManager) Retain

func (partitionsManager *PartitionsManager) Retain(keysToRetain []string)

type StorableObject

type StorableObject interface {
	// Marks the object as modified, which causes it to be written to the disk (if persistence is enabled).
	// Default value when omitting the parameter: true
	SetModified(modified ...bool)

	// Returns true if the object was marked as modified.
	IsModified() bool

	// Marks the object to be deleted from the persistence layer.
	// Default value when omitting the parameter: true
	Delete(delete ...bool)

	// Returns true if the object was marked as deleted.
	IsDeleted() bool

	// Enables or disables persistence for this object. Objects that have persistence disabled get discarded once they
	// are evicted from the cache.
	// Default value when omitting the parameter: true
	Persist(enabled ...bool)

	// Returns "true" if this object is going to be persisted.
	ShouldPersist() bool

	// Updates the object with the values of another object "in place" (so it should use a pointer receiver)
	Update(other StorableObject)

	// ObjectStorageKey returns the bytes, that are used as a key to store the object in the k/v store.
	ObjectStorageKey() []byte

	// ObjectStorageValue returns the bytes, that are stored in the value part of the k/v store.
	ObjectStorageValue() []byte
}

type StorableObjectFactory

type StorableObjectFactory func(key []byte, data []byte) (result StorableObject, err error)

StorableObjectFactory is used to address the factory method that generically creates StorableObjects. It receives the key and the serialized data of the object and returns an "empty" StorableObject that just has its key set. The object is then fully unmarshalled by the ObjectStorage which calls the UnmarshalObjectStorageValue with the data. The data is anyway provided in this method already to allow the dynamic creation of different object types depending on the stored data.

type StorableObjectFlags

type StorableObjectFlags struct {
	// contains filtered or unexported fields
}

func (*StorableObjectFlags) Delete

func (testObject *StorableObjectFlags) Delete(delete ...bool)

func (*StorableObjectFlags) IsDeleted

func (testObject *StorableObjectFlags) IsDeleted() bool

func (*StorableObjectFlags) IsModified

func (testObject *StorableObjectFlags) IsModified() bool

func (*StorableObjectFlags) Persist

func (testObject *StorableObjectFlags) Persist(persist ...bool)

func (*StorableObjectFlags) SetModified

func (testObject *StorableObjectFlags) SetModified(modified ...bool)

func (*StorableObjectFlags) ShouldPersist

func (testObject *StorableObjectFlags) ShouldPersist() bool

ShouldPersist returns "true" if this object is going to be persisted.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL