saga

package
v0.0.1-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package saga provides a generic implementation of the Saga pattern. Scoot uses the Saga pattern to track the state of long-lived Jobs for reporting and failure recovery. For info on the Saga pattern, see: https://speakerdeck.com/caitiem20/applying-the-saga-pattern

Package saga is a generated GoMock package.

Index

Constants

View Source
const (
	TaskStarted flag = 1 << iota
	TaskCompleted
	CompTaskStarted
	CompTaskCompleted
)

Variables

This section is empty.

Functions

func FatalErr

func FatalErr(err error) bool

Checks the error returned by updating saga state. Returns true if the error is a FatalErr. Returns false if the error is transient and a retry might succeed

func GenId

func GenId() gopter.Gen

Generator for a valid SagaId or TaskId

func GenSagaState

func GenSagaState(includeJob bool) gopter.Gen

Generator for a Valid Saga State, with a job def if includeJob

func GenSagaStateAndTaskId

func GenSagaStateAndTaskId() gopter.Gen

Generator for a SagaState and TaskId, returns a StateTaskPair SagaState is always valid. TaskId may or may not be part of the saga

func NewCorruptedSagaLogError

func NewCorruptedSagaLogError(sagaId string, msg string) error

func NewInternalLogError

func NewInternalLogError(msg string) error

func NewInvalidRequestError

func NewInvalidRequestError(msg string) error

func NewInvalidSagaMessageError

func NewInvalidSagaMessageError(msg string) error

func NewInvalidSagaStateError

func NewInvalidSagaStateError(msg string, args ...interface{}) error

Types

type CorruptedSagaLogError

type CorruptedSagaLogError struct {
	// contains filtered or unexported fields
}

CorruptedSagaLogError this is a critical error specifies that the data stored in the sagalog for a specified saga is corrupted and unrecoverable.

func (CorruptedSagaLogError) Error

func (e CorruptedSagaLogError) Error() string

type InternalLogError

type InternalLogError struct {
	// contains filtered or unexported fields
}

InternalLogError should be returned by the SagaLog when the request failed, but may succeed on retry this is equivalent to an HTTP 500

func (InternalLogError) Error

func (e InternalLogError) Error() string

type InvalidRequestError

type InvalidRequestError struct {
	// contains filtered or unexported fields
}

InvalidRequestError should be returned by the SagaLog when the request is invalid and the same request will fail on restart, equivalent to an HTTP 400

func (InvalidRequestError) Error

func (e InvalidRequestError) Error() string

type InvalidSagaMessageError

type InvalidSagaMessageError struct {
	// contains filtered or unexported fields
}

func (InvalidSagaMessageError) Error

func (e InvalidSagaMessageError) Error() string

type InvalidSagaStateError

type InvalidSagaStateError struct {
	// contains filtered or unexported fields
}

func (InvalidSagaStateError) Error

func (e InvalidSagaStateError) Error() string

type MockSagaLog

type MockSagaLog struct {
	// contains filtered or unexported fields
}

MockSagaLog is a mock of SagaLog interface

func NewMockSagaLog

func NewMockSagaLog(ctrl *gomock.Controller) *MockSagaLog

NewMockSagaLog creates a new mock instance

func (*MockSagaLog) EXPECT

func (m *MockSagaLog) EXPECT() *MockSagaLogMockRecorder

EXPECT returns an object that allows the caller to indicate expected use

func (*MockSagaLog) GetActiveSagas

func (m *MockSagaLog) GetActiveSagas() ([]string, error)

GetActiveSagas mocks base method

func (*MockSagaLog) GetMessages

func (m *MockSagaLog) GetMessages(sagaId string) ([]SagaMessage, error)

GetMessages mocks base method

func (*MockSagaLog) LogBatchMessages

func (m *MockSagaLog) LogBatchMessages(messages []SagaMessage) error

LogBatchMessages mocks base method

func (*MockSagaLog) LogMessage

func (m *MockSagaLog) LogMessage(message SagaMessage) error

LogMessage mocks base method

func (*MockSagaLog) StartSaga

func (m *MockSagaLog) StartSaga(sagaId string, job []byte) error

StartSaga mocks base method

type MockSagaLogMockRecorder

type MockSagaLogMockRecorder struct {
	// contains filtered or unexported fields
}

MockSagaLogMockRecorder is the mock recorder for MockSagaLog

func (*MockSagaLogMockRecorder) GetActiveSagas

func (mr *MockSagaLogMockRecorder) GetActiveSagas() *gomock.Call

GetActiveSagas indicates an expected call of GetActiveSagas

func (*MockSagaLogMockRecorder) GetMessages

func (mr *MockSagaLogMockRecorder) GetMessages(sagaId interface{}) *gomock.Call

GetMessages indicates an expected call of GetMessages

func (*MockSagaLogMockRecorder) LogBatchMessages

func (mr *MockSagaLogMockRecorder) LogBatchMessages(messages interface{}) *gomock.Call

LogBatchMessages indicates an expected call of LogBatchMessages

func (*MockSagaLogMockRecorder) LogMessage

func (mr *MockSagaLogMockRecorder) LogMessage(message interface{}) *gomock.Call

LogMessage indicates an expected call of LogMessage

func (*MockSagaLogMockRecorder) StartSaga

func (mr *MockSagaLogMockRecorder) StartSaga(sagaId, job interface{}) *gomock.Call

StartSaga indicates an expected call of StartSaga

type Saga

type Saga struct {
	// contains filtered or unexported fields
}

Concurrent Object Representing a Saga Methods update the state of the saga or Provide access to the Current State

func (*Saga) AbortSaga

func (s *Saga) AbortSaga() error

Log an AbortSaga message. This indicates that the Saga has failed and all execution should be stopped and compensating transactions should be applied.

Returns an error if it fails

func (*Saga) BulkMessage

func (s *Saga) BulkMessage(messages []SagaMessage) error

BulkMessage takes a slice of SagaMessages to be applied. The messages update the saga state and log in the order given. The update is done "atomically", within a single Saga mutex lock. Note that the underlying log update will be SagaLog implementation dependent.

func (*Saga) EndCompensatingTask

func (s *Saga) EndCompensatingTask(taskId string, results []byte) error

Log an End Compensating Task Message to the log when a Compensating Task has been successfully completed. Returns an error if it fails.

EndCompTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written EndCompTask message will win

Returns an error if it fails

func (*Saga) EndSaga

func (s *Saga) EndSaga() error

Log an End Saga Message to the log, returns updated SagaState Returns the resulting SagaState or an error if it fails

Once EndSaga is successfully called, trying to log additional messages will result in a panic.

func (*Saga) EndTask

func (s *Saga) EndTask(taskId string, results []byte) error

Log an EndTask Message to the log. Indicates that this task has been successfully completed. Returns an error if it fails.

EndTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written EndTask message will win

Returns an error if it fails

func (*Saga) GetState

func (s *Saga) GetState() *SagaState

Returns the Current Saga State

func (*Saga) ID

func (s *Saga) ID() string

func (*Saga) StartCompensatingTask

func (s *Saga) StartCompensatingTask(taskId string, data []byte) error

Log a Start Compensating Task Message to the log. Should only be logged after a Saga has been avoided and in Rollback Recovery Mode. Should not be used in ForwardRecovery Mode returns an error if it fails

StartCompTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written StartCompTask message will win

Returns an error if it fails

func (*Saga) StartTask

func (s *Saga) StartTask(taskId string, data []byte) error

Log a StartTask Message to the log. Returns an error if it fails.

StartTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written StartTask message will win

Returns an error if it fails

type SagaCoordinator

type SagaCoordinator struct {
	// contains filtered or unexported fields
}

Saga Object which provides all Saga Functionality Implementations of SagaLog should provide a factory method which returns a saga based on its implementation.

func MakeSagaCoordinator

func MakeSagaCoordinator(log SagaLog, stat stats.StatsReceiver) SagaCoordinator

Make a Saga which uses the specied SagaLog interface for durable storage

func (SagaCoordinator) GetNumSagas

func (sc SagaCoordinator) GetNumSagas() int

GetNumSagas get the number of sagas currently being managed in memory

func (SagaCoordinator) GetSagaState

func (s SagaCoordinator) GetSagaState(sagaId string) (*SagaState, error)

Read the Current SagaState from the Log, intended for status queries does not check for recovery. RecoverSagaState should be used for recovering state in a failure scenario

func (SagaCoordinator) MakeSaga

func (s SagaCoordinator) MakeSaga(sagaId string, job []byte) (*Saga, error)

Make a Saga add it to the SagaCoordinator, if a Saga Already exists with the same id, it will overwrite the already existing one.

func (SagaCoordinator) RecoverSagaState

func (sc SagaCoordinator) RecoverSagaState(sagaId string, recoveryType SagaRecoveryType) (*Saga, error)

Recovers SagaState by reading all logged messages from the log. Utilizes the specified recoveryType to determine if Saga needs to be Aborted or can proceed safely.

Returns the current SagaState. If no Saga exists for the requested id, nil is returned

func (SagaCoordinator) Startup

func (s SagaCoordinator) Startup() ([]string, error)

Should be called at Saga Creation time. Returns a Slice of In Progress SagaIds

type SagaLog

type SagaLog interface {
	/*
	 * Log a Start Saga Message message to the log.
	 * Returns an error if it fails.
	 */
	StartSaga(sagaId string, job []byte) error

	/*
	 * Update the State of the Saga by Logging a message.
	 * Returns an error if it fails.
	 */
	LogMessage(message SagaMessage) error

	/*
	 * Log multiple messages to a single saga in one call.
	 */
	LogBatchMessages(messages []SagaMessage) error

	/*
	 * Returns all of the messages logged so far for the
	 * specified saga. Does not return an error if the
	 * saga does not exist.
	 */
	GetMessages(sagaId string) ([]SagaMessage, error)

	/*
	 * Returns a list of all in progress sagaIds.
	 * This MUST include all not completed sagaIds.
	 * It may also included completed sagas
	 * Returns an error if it fails.
	 */
	GetActiveSagas() ([]string, error)
}

* SagaLog Interface

type SagaMessage

type SagaMessage struct {
	SagaId  string
	MsgType SagaMessageType
	Data    []byte
	TaskId  string
}

* Data Structure representation of a entry in the SagaLog. * Different SagaMessageTypes utilize different fields. * Factory Methods are supplied for creation of Saga Messages * and should be used instead of directly creatinga sagaMessage struct

func MakeAbortSagaMessage

func MakeAbortSagaMessage(sagaId string) SagaMessage

* AbortSaga SagaMessageType * - sagaId - id of the Saga

func MakeEndCompTaskMessage

func MakeEndCompTaskMessage(sagaId string, taskId string, results []byte) SagaMessage

* EndCompTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the completed compensating task. Should * be the same as the original taskId * - data - any results from compensating task completion

func MakeEndSagaMessage

func MakeEndSagaMessage(sagaId string) SagaMessage

* EndSaga SagaMessageType * - sagaId - id of the Saga

func MakeEndTaskMessage

func MakeEndTaskMessage(sagaId string, taskId string, results []byte) SagaMessage

* EndTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the completed Task * - data - any results from task completion

func MakeStartCompTaskMessage

func MakeStartCompTaskMessage(sagaId string, taskId string, data []byte) SagaMessage

* StartCompTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the started compensating task. Should * be the same as the original taskId * - data - data that is persisted to the log, useful for * diagnostic information

func MakeStartSagaMessage

func MakeStartSagaMessage(sagaId string, job []byte) SagaMessage

* StartSaga SagaMessageType * - sagaId - id of the Saga * - data - data needed to execute the saga

func MakeStartTaskMessage

func MakeStartTaskMessage(sagaId string, taskId string, data []byte) SagaMessage

* StartTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the started Task * - data - data that is persisted to the log, useful for * diagnostic information

func (SagaMessage) String

func (s SagaMessage) String() string

type SagaMessageType

type SagaMessageType int
const (
	StartSaga SagaMessageType = iota
	EndSaga
	AbortSaga
	StartTask
	EndTask
	StartCompTask
	EndCompTask
)

func (SagaMessageType) String

func (s SagaMessageType) String() string

type SagaRecoveryType

type SagaRecoveryType int
const (
	RollbackRecovery SagaRecoveryType = iota
	ForwardRecovery
)

Saga Recovery Types define how to interpret SagaState in RecoveryMode.

ForwardRecovery: all tasks in the saga must be executed at least once.

tasks MUST BE idempotent

RollbackRecovery: if Saga is Aborted or in unsafe state, compensating

tasks for all started tasks need to be executed.
 compensating tasks MUST BE idempotent.

type SagaState

type SagaState struct {
	// contains filtered or unexported fields
}

* Data Structure representation of the current state of the Saga.

func (*SagaState) GetEndCompTaskData

func (state *SagaState) GetEndCompTaskData(taskId string) []byte

* Get Data Associated with End Comp Task, supplied as * Part of the EndCompTask Message

func (*SagaState) GetEndTaskData

func (state *SagaState) GetEndTaskData(taskId string) []byte

* Get Data Associated with End Task, supplied as * Part of the EndTask Message

func (*SagaState) GetStartCompTaskData

func (state *SagaState) GetStartCompTaskData(taskId string) []byte

* Get Data Associated with Starting Comp Task, supplied as * Part of the StartCompTask Message

func (*SagaState) GetStartTaskData

func (state *SagaState) GetStartTaskData(taskId string) []byte

* Get Data Associated with Start Task, supplied as * Part of the StartTask Message

func (*SagaState) GetTaskIds

func (state *SagaState) GetTaskIds() []string

* Returns a lists of task ids associated with this Saga

func (*SagaState) IsCompTaskCompleted

func (state *SagaState) IsCompTaskCompleted(taskId string) bool

* Returns true if the specified Compensating Task has been completed, * fasle otherwise

func (*SagaState) IsCompTaskStarted

func (state *SagaState) IsCompTaskStarted(taskId string) bool

* Returns true if the specified Compensating Task has been started, * fasle otherwise

func (*SagaState) IsSagaAborted

func (state *SagaState) IsSagaAborted() bool

* Returns true if this Saga has been Aborted, false otherwise

func (*SagaState) IsSagaCompleted

func (state *SagaState) IsSagaCompleted() bool

* Returns true if this Saga has been Completed, false otherwise

func (*SagaState) IsTaskCompleted

func (state *SagaState) IsTaskCompleted(taskId string) bool

* Returns true if the specified Task has been completed, * fasle otherwise

func (*SagaState) IsTaskStarted

func (state *SagaState) IsTaskStarted(taskId string) bool

* Returns true if the specified Task has been started, * fasle otherwise

func (*SagaState) Job

func (state *SagaState) Job() []byte

* Returns the Job associated with this Saga

func (*SagaState) SagaId

func (state *SagaState) SagaId() string

* Returns the Id of the Saga this state represents

func (*SagaState) String

func (state *SagaState) String() string

* Custom ToString function for SagaState

type StateTaskPair

type StateTaskPair struct {
	// contains filtered or unexported fields
}

func (StateTaskPair) String

func (p StateTaskPair) String() string

Directories

Path Synopsis
Package sagalogs provides implementations of SagaLog.
Package sagalogs provides implementations of SagaLog.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL