scheduler

package
v0.0.0-...-d841f61 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2021 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type QueueLock

type QueueLock interface {
	// LockOrPush tries to lock lockID and pushes item to queue by lockID if fails.
	// Return true if locks or false if error or item is added to queue.
	LockOrPush(ctx context.Context, lockID string, item []byte) (bool, error)
	// LockMultipleOrPush tries to lock all lockIDList
	// and pushes item to queue by lockID if fails.
	LockMultipleOrPush(ctx context.Context, lockIDList []string, lockID string, item []byte) (bool, error)
	// PopOrUnlock tries to extend lock lockID and pops item from queue by lockID.
	// It unlocks lockID if either fails.
	PopOrUnlock(ctx context.Context, lockID string) ([]byte, error)
	// LockAndPop tries to lock lockID and pops item from queue by lockID.
	LockAndPop(ctx context.Context, lockID string) ([]byte, error)
	// IsLocked returns true if lock lockID is set.
	IsLocked(ctx context.Context, lockID string) bool
	// IsEmpty returns true if queue lockID is empty.
	IsEmpty(ctx context.Context, lockID string) bool
}

QueueLock interface is used to implement a lock to consistently process items for the same resource. Base implementation uses redis to set lock and store next items while current item is processing. Im-memory mutex is used to synchronize access to redis so it cannot be used in multi-instance app.

func NewQueueLock

func NewQueueLock(
	lockClient redismod.Cmdable,
	lockExpirationTime time.Duration,
	queueClient redismod.Cmdable,
	asyncUnlock bool,
	logger zerolog.Logger,
) QueueLock

NewQueueLock creates lock.

type Scheduler

type Scheduler interface {
	Start(ctx context.Context)
	Stop(ctx context.Context)
	ProcessEvent(context.Context, libamqp.Channel, string, []byte) error
	AckEvent(context.Context, libamqp.Channel, types.Event) error
}

Scheduler ...

func NewSchedulerService

func NewSchedulerService(
	redisLockStorage *redismod.Client,
	redisQueueStorage *redismod.Client,
	channelPub libamqp.Channel,
	publishToQueue string,
	logger zerolog.Logger,
	lockTtl int,
	jsonDecoder encoding.Decoder,
	enableMetaAlarmProcessing bool,
) Scheduler

NewSchedulerService ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL