operationsbus

package
v0.0.23 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Unknown     string = "Unknown"
	Pending            = "Pending"
	In_Progress        = "In_Progress"
	Completed          = "Completed"
	Failed             = "Failed"
	Cancelled          = "Cancelled"
)

All the status types that should be supported.

Variables

This section is empty.

Functions

func CreateProcessor

func CreateProcessor(
	sender sb.ServiceBusSender,
	serviceBusReceiver sb.ServiceBusReceiver,
	matcher *Matcher,
	operationController OperationController,
	customHandler shuttle.HandlerFunc,
	processorOptions *shuttle.ProcessorOptions,
	hooks []BaseOperationHooksInterface,
) (*shuttle.Processor, error)

The processor will be utilized to "process" all the operations by receiving the message, guarding against concurrency, running the operation, and updating the right database status.

func SetLogHandler added in v0.0.22

func SetLogHandler(handler slog.Handler)

Types

type ApiOperation added in v0.0.22

type ApiOperation interface {
	Init(context.Context, OperationRequest) (ApiOperation, error)
	GuardConcurrency(context.Context, Entity) (*CategorizedError, error)
	Run(context.Context) error
}

ApiOperation is the interface all operations will need to implement.

type BaseOperationHooksInterface added in v0.0.22

type BaseOperationHooksInterface interface {
	BeforeInit(ctx context.Context, req OperationRequest)
	AfterInit(ctx context.Context, op *ApiOperation, req OperationRequest, err error)

	BeforeGuardConcurrency(ctx context.Context, op *ApiOperation, entity Entity)
	AfterGuardConcurrency(ctx context.Context, op *ApiOperation, ce *CategorizedError, err error)

	BeforeRun(ctx context.Context, op *ApiOperation)
	AfterRun(ctx context.Context, op *ApiOperation, err error)
}

type CategorizedError

type CategorizedError struct {
	Message      string
	InnerMessage string
	ErrorCode    int
}

func NewCategorizedError

func NewCategorizedError(message string, innerMessage string, errorCode int) *CategorizedError

func (*CategorizedError) Error

func (ce *CategorizedError) Error() string

type Entity

type Entity interface {
	GetLatestOperationID() string
}

type HookedApiOperation added in v0.0.22

type HookedApiOperation struct {
	Operation      *ApiOperation
	OperationHooks []BaseOperationHooksInterface
}

func (*HookedApiOperation) AfterGuardConcurrency added in v0.0.22

func (h *HookedApiOperation) AfterGuardConcurrency(ctx context.Context, op *ApiOperation, ce *CategorizedError, err error)

func (*HookedApiOperation) AfterInit added in v0.0.22

func (h *HookedApiOperation) AfterInit(ctx context.Context, op *ApiOperation, req OperationRequest, err error)

func (*HookedApiOperation) AfterRun added in v0.0.22

func (h *HookedApiOperation) AfterRun(ctx context.Context, op *ApiOperation, err error)

func (*HookedApiOperation) BeforeGuardConcurrency added in v0.0.22

func (h *HookedApiOperation) BeforeGuardConcurrency(ctx context.Context, op *ApiOperation, entity Entity)

func (*HookedApiOperation) BeforeInit added in v0.0.22

func (h *HookedApiOperation) BeforeInit(ctx context.Context, req OperationRequest)

func (*HookedApiOperation) BeforeRun added in v0.0.22

func (h *HookedApiOperation) BeforeRun(ctx context.Context, op *ApiOperation)

func (*HookedApiOperation) GuardConcurrency added in v0.0.22

func (h *HookedApiOperation) GuardConcurrency(ctx context.Context, entity Entity) (*CategorizedError, error)

func (*HookedApiOperation) Init added in v0.0.22

func (*HookedApiOperation) Run added in v0.0.22

type Logger added in v0.0.22

type Logger interface {
	Info(s string)
	Warn(s string)
	Error(s string)
}

type Matcher

type Matcher struct {
	Types map[string]reflect.Type
}

The matcher is utilized in order to keep track of the name and type of each operation. This is required because we only send the OperationRequest through the service bus, but we utilize the name shown in that struct in order to create an instance of the right operation type (e.g. LongRunning) and Run with the correct logic.

func NewMatcher

func NewMatcher() *Matcher

func (*Matcher) CreateHookedInstace added in v0.0.22

func (m *Matcher) CreateHookedInstace(key string, hooks []BaseOperationHooksInterface) (*HookedApiOperation, error)

func (*Matcher) CreateInstance

func (m *Matcher) CreateInstance(key string) (ApiOperation, error)

This will create an empty instance of the type, with which you can then call op.Init() and initialize any info you need.

func (*Matcher) Get

func (m *Matcher) Get(key string) (reflect.Type, bool)

TODO(mheberling): do we need to delete this? Get retrieves a value from the map by its key

func (*Matcher) Register

func (m *Matcher) Register(key string, value ApiOperation)

Set adds a key-value pair to the map Ex: matcher.Register("LongRunning", &LongRunning{})

type OperationController added in v0.0.8

type OperationController interface {
	OperationCancel(context.Context, string) error
	OperationInProgress(context.Context, string) error
	OperationTimeout(context.Context, string) error
	OperationCompleted(context.Context, string) error
	OperationPending(context.Context, string) error
	OperationUnknown(context.Context, string) error
	OperationFailed(context.Context, string) error
	OperationGetEntity(context.Context, OperationRequest) (Entity, error)
}

Operationcontroller is the interface that handles updating the database with the correct operation state.

type OperationRequest

type OperationRequest struct {
	OperationName  string
	APIVersion     string
	RetryCount     int
	OperationId    string
	EntityId       string
	EntityType     string
	ExpirationDate *timestamppb.Timestamp

	// HTTP
	Body       []byte
	HttpMethod string

	Extension interface{}
}

All the fields that the operations might need. This struct will be part of every operation.

func NewOperationRequest added in v0.0.21

func NewOperationRequest(
	operationName string,
	apiVersion string,
	operationId string,
	entityId string,
	entityType string,
	retryCount int,
	expirationDate *timestamppb.Timestamp,
	body []byte,
	httpMethod string,
	extension interface{},
) *OperationRequest

func (*OperationRequest) Retry added in v0.0.8

func (opRequest *OperationRequest) Retry(ctx context.Context, sender sb.ServiceBusSender) error

Generalized method to retry every operation. If the operation failed or hit an error at any stage, this method will be called after the panic is handled.

func (*OperationRequest) SetExtension added in v0.0.21

func (opRequest *OperationRequest) SetExtension(newValue interface{}) error

SetExtension sets the Extension field to a new type and value, copying data if possible

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL