util

package
v0.0.0-...-6b56f1d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2021 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UnprocessedQueue BucketType = "Unprocessed"
	InflightQueue               = "Inflight"
	InternalBucket              = "Internal"
)

Variables

View Source
var EmptySpanContext = trace.SpanContext{}

Functions

func DigestTypeToPb

func DigestTypeToPb(digestType DigestType) api.DigestType

func ExtractDistributedLockIndex

func ExtractDistributedLockIndex(ctx context.Context) int

func ExtractParentSpanContext

func ExtractParentSpanContext(ctx context.Context) trace.SpanContext

func ExtractPubSubContext

func ExtractPubSubContext(payload []byte) context.Context

func ExtractSpanContext

func ExtractSpanContext(ctx context.Context) trace.SpanContext

func Flatten

func Flatten(in map[string]interface{}) map[string]interface{}

func GetInteger

func GetInteger(x interface{}) (int64, error)

func GetMap

func GetMap(in map[string]interface{}, path []string) (interface{}, error)

func GetNumeric

func GetNumeric(x interface{}) (float64, error)

func InitHash

func InitHash(digestType DigestType) hash.Hash

Construct a hash object using a supported Digest type. If the Digest type is not supported, return nil.

func InjectDistributedLockIndex

func InjectDistributedLockIndex(ctx context.Context, index int) context.Context

func InjectParentSpanContext

func InjectParentSpanContext(currCtx context.Context, parentSpanCtx trace.SpanContext) context.Context

func InjectPubSubContext

func InjectPubSubContext(ctx context.Context, payload []byte) context.Context

func InjectSpanContext

func InjectSpanContext(ctx context.Context, spanCtx trace.SpanContext) context.Context

func IsWarning

func IsWarning(err error) bool

func JsonToMap

func JsonToMap(in []byte) (map[string]interface{}, error)

func MapInterfaceToInt

func MapInterfaceToInt(in map[string]interface{}) (map[string]int, error)

func MapToJson

func MapToJson(in map[string]interface{}) ([]byte, error)

func MergeMaps

func MergeMaps(lhs, rhs map[string]interface{}) (map[string]interface{}, error)

func NumericEqual

func NumericEqual(lhs, rhs interface{}) bool

func NumericResolver

func NumericResolver(x, y interface{}) (float64, float64, error)

func StringWithTrace

func StringWithTrace(err error) string

StringWithTrace will print an error string with stacktrace

func UpdateMap

func UpdateMap(in map[string]interface{}, path []string, value interface{}) error

func WaitAll

func WaitAll(futures []Future, timeout time.Duration)

ToDo(KMG): Re-visit this function. I could not think of a way to use WaitGroups without leaking a go routine when the Wait() call hangs forever when we set a timeout. The best I could think of was to track the number of waiters and decrement the count when we timeout.

This can probably done with atomic incr/decr and channels

func WithStack

func WithStack(err error) error

WithStack will decorate an error with a stack trace

func Wrap

func Wrap(err error, msg string) error

Wrap will decorate an error with a stack trace and message

Types

type AlreadyCompletedError

type AlreadyCompletedError struct {
	// contains filtered or unexported fields
}

AlreadyCompletedError

func NewAlreadyCompletedError

func NewAlreadyCompletedError(msg string) *AlreadyCompletedError

NewAlreadyCompletedError will return a AlreadyCompletedError object

func (*AlreadyCompletedError) Error

func (err *AlreadyCompletedError) Error() string

Error returns the error message

type AlreadyOpenedError

type AlreadyOpenedError struct {
	// contains filtered or unexported fields
}

AlreadyOpenedError represents an error condition when a persistent resource is already opened by another process

func NewAlreadyOpenedError

func NewAlreadyOpenedError(msg string) *AlreadyOpenedError

NewAlreadyOpenedError is the constructor for AlreadyOpenedError

func (*AlreadyOpenedError) Error

func (e *AlreadyOpenedError) Error() string

Error returns the string representation of the AlreadyOpenedError

type BucketType

type BucketType string

type CancelledError

type CancelledError struct {
}

CancelledError

func NewCancelledError

func NewCancelledError() *CancelledError

NewCancelledError will return a CancelledError object

func (*CancelledError) Error

func (err *CancelledError) Error() string

Error returns the error message

func (*CancelledError) Is

func (err *CancelledError) Is(other error) bool

Is

func (*CancelledError) IsWarning

func (err *CancelledError) IsWarning() bool

Error returns the error message

type ClosedError

type ClosedError struct {
	// contains filtered or unexported fields
}

ClosedError represents an error condition when a persistent resource is closed

func NewClosedError

func NewClosedError(msg string) *ClosedError

NewClosedError is the constructor for ClosedError

func (*ClosedError) Error

func (e *ClosedError) Error() string

Error returns the string representation of the ClosedError

type Completable

type Completable interface {
	Success(ctx context.Context, result interface{}) error
	Fail(ctx context.Context, err error) error
	Cancel(ctx context.Context) error
	Future() Future
	Close()
}

func NewCompletable

func NewCompletable() Completable

type ConflictError

type ConflictError struct {
	// contains filtered or unexported fields
}

ConflictError represents a conflict in a Data, kv or object store

func NewConflictError

func NewConflictError(msg string) *ConflictError

NewConflictError will return a ConflictError object

func (*ConflictError) Error

func (err *ConflictError) Error() string

Error returns the error message

func (*ConflictError) Is

func (err *ConflictError) Is(other error) bool

Is

type ContextKey

type ContextKey string
const (
	ParentContext      ContextKey = "agglo.io/parentContext"
	SpanContext        ContextKey = "agglo.io/spanContext"
	DistributedLockKey            = "agglo.io/distributedLockKey"
)

type ContinuationNotSatisfied

type ContinuationNotSatisfied struct {
	// contains filtered or unexported fields
}

ContinuationNotSatisfied represents a condition when a continuation is false

func NewContinuationNotSatisfied

func NewContinuationNotSatisfied(msg string) *ContinuationNotSatisfied

NewContinuationNotSatisfied is the constructor for ContinuationNotSatisfied

func (*ContinuationNotSatisfied) Error

func (e *ContinuationNotSatisfied) Error() string

Error returns the string representation of the ContinuationNotSatisfied

func (*ContinuationNotSatisfied) Is

func (err *ContinuationNotSatisfied) Is(other error) bool

Is

func (*ContinuationNotSatisfied) IsWarning

func (err *ContinuationNotSatisfied) IsWarning() bool

IsWarning

type CopyableMap

type CopyableMap map[string]interface{}

func (CopyableMap) DeepCompare

func (m CopyableMap) DeepCompare(in map[string]interface{}) bool

func (CopyableMap) DeepCopy

func (m CopyableMap) DeepCopy() map[string]interface{}

type CopyableSlice

type CopyableSlice []interface{}

func (CopyableSlice) DeepCompare

func (m CopyableSlice) DeepCompare(in []interface{}) bool

func (CopyableSlice) DeepCopy

func (m CopyableSlice) DeepCopy() []interface{}

type DigestType

type DigestType int

Used to distinguish between different Digest algorithms

const (
	SHA1 DigestType = iota
	SHA256
	MD5
)

func DigestTypeFromPb

func DigestTypeFromPb(digestType api.DigestType) DigestType

type DurableQueue

type DurableQueue struct {
	// contains filtered or unexported fields
}

func OpenDurableQueue

func OpenDurableQueue(dbFile string, recoverFunc func(in []byte) error, force bool) (*DurableQueue, error)

func (*DurableQueue) Ack

func (q *DurableQueue) Ack(queueItem *QueueItem) error

func (*DurableQueue) Close

func (q *DurableQueue) Close() error

func (*DurableQueue) Dequeue

func (q *DurableQueue) Dequeue() (*QueueItem, error)

func (*DurableQueue) Drop

func (q *DurableQueue) Drop() error

func (*DurableQueue) Enqueue

func (q *DurableQueue) Enqueue(v []byte) error

func (*DurableQueue) GetInflight

func (q *DurableQueue) GetInflight() ([]*QueueItem, error)

func (*DurableQueue) GetUnprocessed

func (q *DurableQueue) GetUnprocessed() ([]*QueueItem, error)

func (*DurableQueue) Length

func (q *DurableQueue) Length() int64

func (*DurableQueue) NumInflight

func (q *DurableQueue) NumInflight() int64

type EmptyQueue

type EmptyQueue struct {
	// contains filtered or unexported fields
}

EmptyQueue represents a condition when a queue is empty

func NewEmptyQueue

func NewEmptyQueue(msg string) *EmptyQueue

NewEmptyQueue is the constructor for EmptyQueue

func (*EmptyQueue) Error

func (e *EmptyQueue) Error() string

Error returns the string representation of the EmptyQueue

func (*EmptyQueue) Is

func (err *EmptyQueue) Is(other error) bool

Is

type EndOfStreamError

type EndOfStreamError struct {
	// contains filtered or unexported fields
}

EndOfStreamError represents a end-of-stream error in a Data, kv or object store

func NewEndOfStreamError

func NewEndOfStreamError(msg string) *EndOfStreamError

NewEndOfStreamError will return a EndOfStreamError object

func (*EndOfStreamError) Error

func (err *EndOfStreamError) Error() string

Error returns the error message

type EvictedError

type EvictedError struct {
	// contains filtered or unexported fields
}

EvictedError represents an error condition where an object is unexpectedly not found

func NewEvictedError

func NewEvictedError(msg string) *EvictedError

NewEvictedError is the constructor for EvictedError

func (*EvictedError) Error

func (e *EvictedError) Error() string

Error returns the string representation of the EvictedError

type ExecOption

type ExecOption func(runnable *ExecRunnable)

func WithCmdArgs

func WithCmdArgs(cmdArgs ...string) ExecOption

func WithContext

func WithContext(ctx context.Context) ExecOption

func WithInData

func WithInData(inData interface{}) ExecOption

func WithPath

func WithPath(path string) ExecOption

type ExecRunnable

type ExecRunnable struct {
	// contains filtered or unexported fields
}

ExecRunnable will run a command that accepts a JSON-encoded map on stdin and returns a JSON-encoded map on stdout

func NewExecRunnable

func NewExecRunnable(options ...ExecOption) *ExecRunnable

func (*ExecRunnable) Run

func (runnable *ExecRunnable) Run(ctx context.Context) (interface{}, error)

func (*ExecRunnable) SetInData

func (runnable *ExecRunnable) SetInData(inData interface{}) error

type FlushDidNotCompleteError

type FlushDidNotCompleteError struct {
	// contains filtered or unexported fields
}

FlushDidNotCompleteError

func NewFlushDidNotCompleteError

func NewFlushDidNotCompleteError(name string) *FlushDidNotCompleteError

NewFlushDidNotCompleteError is the constructor for FlushDidNotCompleteError

func (*FlushDidNotCompleteError) AddError

func (e *FlushDidNotCompleteError) AddError(other error)

func (*FlushDidNotCompleteError) Error

func (e *FlushDidNotCompleteError) Error() string

Error returns the string representation of the FlushDidNotCompleteError

func (*FlushDidNotCompleteError) Is

func (e *FlushDidNotCompleteError) Is(other error) bool

Is

func (*FlushDidNotCompleteError) IsWarning

func (e *FlushDidNotCompleteError) IsWarning() bool

IsWarning()

type Future

type Future interface {
	Get() *FutureResult
	GetWithTimeout(duration time.Duration) *FutureResult
	Then(runnable PartialRunnable, options ...FutureOption) Future
	Cancel(ctx context.Context) error
	IsCancelled() bool
	IsCompleted() bool
	IsSucceeded() bool
	CallbacksCompleted() bool
	OnSuccess(func(context.Context, interface{})) Future
	OnCancel(func(context.Context)) Future
	OnFail(func(context.Context, error)) Future
}

func CreateFuture

func CreateFuture(runnable Runnable, options ...FutureOption) Future

type FutureOption

type FutureOption func(opts *FutureOptions)

func SetContext

func SetContext(ctx context.Context) FutureOption

func WithDelay

func WithDelay(delay time.Duration) FutureOption

func WithPrepare

func WithPrepare(prepareFn func(ctx, prev context.Context) context.Context) FutureOption

func WithRetry

func WithRetry(num int, initialDelay time.Duration) FutureOption

type FutureOptions

type FutureOptions struct {
	// contains filtered or unexported fields
}

type FutureResult

type FutureResult struct {
	// contains filtered or unexported fields
}

func (FutureResult) Context

func (result FutureResult) Context() context.Context

func (FutureResult) Error

func (result FutureResult) Error() error

func (FutureResult) Value

func (result FutureResult) Value() interface{}

type InconsistentStateError

type InconsistentStateError struct {
	// contains filtered or unexported fields
}

InconsistentStateError represents an error condition where an object is unexpectedly not found

func NewInconsistentStateError

func NewInconsistentStateError(msg string) *InconsistentStateError

NewInconsistentStateError is the constructor for InconsistentStateError

func (*InconsistentStateError) Error

func (e *InconsistentStateError) Error() string

Error returns the string representation of the InconsistentStateError

type IndexError

type IndexError struct {
	// contains filtered or unexported fields
}

IndexError represents a condition when a queue is empty

func NewIndexError

func NewIndexError(msg string) *IndexError

NewIndexError is the constructor for IndexError

func (*IndexError) Error

func (e *IndexError) Error() string

Error returns the string representation of the IndexError

func (*IndexError) Is

func (err *IndexError) Is(other error) bool

Is

type InternalError

type InternalError struct {
	// contains filtered or unexported fields
}

InternalError represents an internal error in a Data, kv or object store

func NewInternalError

func NewInternalError(msg string) *InternalError

NewInternalError will return a InternalError object

func (*InternalError) Error

func (err *InternalError) Error() string

Error returns the error message

type InvalidError

type InvalidError struct {
	// contains filtered or unexported fields
}

InvalidError represents a 4xx error in a Data, kv or object store

func NewInvalidError

func NewInvalidError(msg string) *InvalidError

NewInvalidError will return a InvalidError object

func (*InvalidError) Error

func (err *InvalidError) Error() string

Error returns the error message

func (*InvalidError) Is

func (err *InvalidError) Is(other error) bool

Is

type NotFoundError

type NotFoundError struct {
	// contains filtered or unexported fields
}

NotFoundError represents a 404 error in a Data, kv or object store

func NewNotFoundError

func NewNotFoundError(msg string) *NotFoundError

NewNotFoundError will return a NotFoundError object

func (*NotFoundError) Error

func (err *NotFoundError) Error() string

Error returns the error message

func (*NotFoundError) Is

func (err *NotFoundError) Is(other error) bool

Is

type OutOfBoundsError

type OutOfBoundsError struct {
	// contains filtered or unexported fields
}

OutOfBoundsError represents a out-of-bounds error in a Data, kv or object store

func NewOutOfBoundsError

func NewOutOfBoundsError(msg string) *OutOfBoundsError

NewOutOfBoundsError will return a OutOfBoundsError object

func (*OutOfBoundsError) Error

func (err *OutOfBoundsError) Error() string

Error returns the error message

type PartialRunnable

type PartialRunnable interface {
	Runnable
	SetInData(inData interface{}) error
}

type PipelineError

type PipelineError struct {
	// contains filtered or unexported fields
}

PipelineError

func NewPipelineError

func NewPipelineError(name string) *PipelineError

NewPipelineError is the constructor for PipelineError

func (*PipelineError) AddError

func (e *PipelineError) AddError(other error)

func (*PipelineError) Error

func (e *PipelineError) Error() string

Error returns the string representation of the PipelineError

func (*PipelineError) Is

func (e *PipelineError) Is(other error) bool

Is

func (*PipelineError) IsWarning

func (e *PipelineError) IsWarning() bool

IsWarning()

type QueueItem

type QueueItem struct {
	Data         []byte
	QueueTime    int64
	InflightTime int64
	Idx          int64
}

func NewQueueItem

func NewQueueItem(data []byte, idx int64) *QueueItem

type QueueState

type QueueState int
const (
	QueueUnknown QueueState = iota
	QueueOpening
	QueueRecovering
	QueueOpened
	QueueClosed
)

type Runnable

type Runnable interface {
	Run(ctx context.Context) (interface{}, error)
}

type SignatureError

type SignatureError struct {
	// contains filtered or unexported fields
}

SignatureError represents a end-of-stream error in a Data, kv or object store

func NewSignatureError

func NewSignatureError(msg string) *SignatureError

NewSignatureError will return a SignatureError object

func (*SignatureError) Error

func (err *SignatureError) Error() string

Error returns the error message

type TimedOutError

type TimedOutError struct {
	// contains filtered or unexported fields
}

TimedOutError

func NewTimedOutError

func NewTimedOutError(msg string) *TimedOutError

NewTimedOutError will return a TimedOutError object

func (*TimedOutError) Error

func (err *TimedOutError) Error() string

Error returns the error message

func (*TimedOutError) Is

func (err *TimedOutError) Is(other error) bool

Is

type Warning

type Warning interface {
	IsWarning() bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL