Documentation ¶
Overview ¶
Package saga provides a generic implementation of the Saga pattern. Scoot uses the Saga pattern to track the state of long-lived Jobs for reporting and failure recovery. For info on the Saga pattern, see: https://speakerdeck.com/caitiem20/applying-the-saga-pattern
Package saga is a generated GoMock package.
Index ¶
- Constants
- func FatalErr(err error) bool
- func GenId() gopter.Gen
- func GenSagaState(includeJob bool) gopter.Gen
- func GenSagaStateAndTaskId() gopter.Gen
- func NewCorruptedSagaLogError(sagaId string, msg string) error
- func NewInternalLogError(msg string) error
- func NewInvalidRequestError(msg string) error
- func NewInvalidSagaMessageError(msg string) error
- func NewInvalidSagaStateError(msg string, args ...interface{}) error
- type CorruptedSagaLogError
- type InternalLogError
- type InvalidRequestError
- type InvalidSagaMessageError
- type InvalidSagaStateError
- type MockSagaLog
- func (m *MockSagaLog) EXPECT() *MockSagaLogMockRecorder
- func (m *MockSagaLog) GetActiveSagas() ([]string, error)
- func (m *MockSagaLog) GetMessages(sagaId string) ([]SagaMessage, error)
- func (m *MockSagaLog) LogBatchMessages(messages []SagaMessage) error
- func (m *MockSagaLog) LogMessage(message SagaMessage) error
- func (m *MockSagaLog) StartSaga(sagaId string, job []byte) error
- type MockSagaLogMockRecorder
- func (mr *MockSagaLogMockRecorder) GetActiveSagas() *gomock.Call
- func (mr *MockSagaLogMockRecorder) GetMessages(sagaId interface{}) *gomock.Call
- func (mr *MockSagaLogMockRecorder) LogBatchMessages(messages interface{}) *gomock.Call
- func (mr *MockSagaLogMockRecorder) LogMessage(message interface{}) *gomock.Call
- func (mr *MockSagaLogMockRecorder) StartSaga(sagaId, job interface{}) *gomock.Call
- type Saga
- func (s *Saga) AbortSaga() error
- func (s *Saga) BulkMessage(messages []SagaMessage) error
- func (s *Saga) EndCompensatingTask(taskId string, results []byte) error
- func (s *Saga) EndSaga() error
- func (s *Saga) EndTask(taskId string, results []byte) error
- func (s *Saga) GetState() *SagaState
- func (s *Saga) ID() string
- func (s *Saga) StartCompensatingTask(taskId string, data []byte) error
- func (s *Saga) StartTask(taskId string, data []byte) error
- type SagaCoordinator
- func (sc SagaCoordinator) GetNumSagas() int
- func (s SagaCoordinator) GetSagaState(sagaId string) (*SagaState, error)
- func (s SagaCoordinator) MakeSaga(sagaId string, job []byte) (*Saga, error)
- func (sc SagaCoordinator) RecoverSagaState(sagaId string, recoveryType SagaRecoveryType) (*Saga, error)
- func (s SagaCoordinator) Startup() ([]string, error)
- type SagaLog
- type SagaMessage
- func MakeAbortSagaMessage(sagaId string) SagaMessage
- func MakeEndCompTaskMessage(sagaId string, taskId string, results []byte) SagaMessage
- func MakeEndSagaMessage(sagaId string) SagaMessage
- func MakeEndTaskMessage(sagaId string, taskId string, results []byte) SagaMessage
- func MakeStartCompTaskMessage(sagaId string, taskId string, data []byte) SagaMessage
- func MakeStartSagaMessage(sagaId string, job []byte) SagaMessage
- func MakeStartTaskMessage(sagaId string, taskId string, data []byte) SagaMessage
- type SagaMessageType
- type SagaRecoveryType
- type SagaState
- func (state *SagaState) GetEndCompTaskData(taskId string) []byte
- func (state *SagaState) GetEndTaskData(taskId string) []byte
- func (state *SagaState) GetStartCompTaskData(taskId string) []byte
- func (state *SagaState) GetStartTaskData(taskId string) []byte
- func (state *SagaState) GetTaskIds() []string
- func (state *SagaState) IsCompTaskCompleted(taskId string) bool
- func (state *SagaState) IsCompTaskStarted(taskId string) bool
- func (state *SagaState) IsSagaAborted() bool
- func (state *SagaState) IsSagaCompleted() bool
- func (state *SagaState) IsTaskCompleted(taskId string) bool
- func (state *SagaState) IsTaskStarted(taskId string) bool
- func (state *SagaState) Job() []byte
- func (state *SagaState) SagaId() string
- func (state *SagaState) String() string
- type StateTaskPair
Constants ¶
const ( TaskStarted flag = 1 << iota TaskCompleted CompTaskStarted CompTaskCompleted )
Variables ¶
This section is empty.
Functions ¶
func FatalErr ¶
Checks the error returned by updating saga state. Returns true if the error is a FatalErr. Returns false if the error is transient and a retry might succeed
func GenSagaState ¶
Generator for a Valid Saga State, with a job def if includeJob
func GenSagaStateAndTaskId ¶
Generator for a SagaState and TaskId, returns a StateTaskPair SagaState is always valid. TaskId may or may not be part of the saga
func NewInternalLogError ¶
func NewInvalidRequestError ¶
Types ¶
type CorruptedSagaLogError ¶
type CorruptedSagaLogError struct {
// contains filtered or unexported fields
}
CorruptedSagaLogError this is a critical error specifies that the data stored in the sagalog for a specified saga is corrupted and unrecoverable.
func (CorruptedSagaLogError) Error ¶
func (e CorruptedSagaLogError) Error() string
type InternalLogError ¶
type InternalLogError struct {
// contains filtered or unexported fields
}
InternalLogError should be returned by the SagaLog when the request failed, but may succeed on retry this is equivalent to an HTTP 500
func (InternalLogError) Error ¶
func (e InternalLogError) Error() string
type InvalidRequestError ¶
type InvalidRequestError struct {
// contains filtered or unexported fields
}
InvalidRequestError should be returned by the SagaLog when the request is invalid and the same request will fail on restart, equivalent to an HTTP 400
func (InvalidRequestError) Error ¶
func (e InvalidRequestError) Error() string
type InvalidSagaMessageError ¶
type InvalidSagaMessageError struct {
// contains filtered or unexported fields
}
func (InvalidSagaMessageError) Error ¶
func (e InvalidSagaMessageError) Error() string
type InvalidSagaStateError ¶
type InvalidSagaStateError struct {
// contains filtered or unexported fields
}
func (InvalidSagaStateError) Error ¶
func (e InvalidSagaStateError) Error() string
type MockSagaLog ¶
type MockSagaLog struct {
// contains filtered or unexported fields
}
MockSagaLog is a mock of SagaLog interface
func NewMockSagaLog ¶
func NewMockSagaLog(ctrl *gomock.Controller) *MockSagaLog
NewMockSagaLog creates a new mock instance
func (*MockSagaLog) EXPECT ¶
func (m *MockSagaLog) EXPECT() *MockSagaLogMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockSagaLog) GetActiveSagas ¶
func (m *MockSagaLog) GetActiveSagas() ([]string, error)
GetActiveSagas mocks base method
func (*MockSagaLog) GetMessages ¶
func (m *MockSagaLog) GetMessages(sagaId string) ([]SagaMessage, error)
GetMessages mocks base method
func (*MockSagaLog) LogBatchMessages ¶
func (m *MockSagaLog) LogBatchMessages(messages []SagaMessage) error
LogBatchMessages mocks base method
func (*MockSagaLog) LogMessage ¶
func (m *MockSagaLog) LogMessage(message SagaMessage) error
LogMessage mocks base method
type MockSagaLogMockRecorder ¶
type MockSagaLogMockRecorder struct {
// contains filtered or unexported fields
}
MockSagaLogMockRecorder is the mock recorder for MockSagaLog
func (*MockSagaLogMockRecorder) GetActiveSagas ¶
func (mr *MockSagaLogMockRecorder) GetActiveSagas() *gomock.Call
GetActiveSagas indicates an expected call of GetActiveSagas
func (*MockSagaLogMockRecorder) GetMessages ¶
func (mr *MockSagaLogMockRecorder) GetMessages(sagaId interface{}) *gomock.Call
GetMessages indicates an expected call of GetMessages
func (*MockSagaLogMockRecorder) LogBatchMessages ¶
func (mr *MockSagaLogMockRecorder) LogBatchMessages(messages interface{}) *gomock.Call
LogBatchMessages indicates an expected call of LogBatchMessages
func (*MockSagaLogMockRecorder) LogMessage ¶
func (mr *MockSagaLogMockRecorder) LogMessage(message interface{}) *gomock.Call
LogMessage indicates an expected call of LogMessage
func (*MockSagaLogMockRecorder) StartSaga ¶
func (mr *MockSagaLogMockRecorder) StartSaga(sagaId, job interface{}) *gomock.Call
StartSaga indicates an expected call of StartSaga
type Saga ¶
type Saga struct {
// contains filtered or unexported fields
}
Concurrent Object Representing a Saga Methods update the state of the saga or Provide access to the Current State
func (*Saga) AbortSaga ¶
Log an AbortSaga message. This indicates that the Saga has failed and all execution should be stopped and compensating transactions should be applied.
Returns an error if it fails
func (*Saga) BulkMessage ¶
func (s *Saga) BulkMessage(messages []SagaMessage) error
BulkMessage takes a slice of SagaMessages to be applied. The messages update the saga state and log in the order given. The update is done "atomically", within a single Saga mutex lock. Note that the underlying log update will be SagaLog implementation dependent.
func (*Saga) EndCompensatingTask ¶
Log an End Compensating Task Message to the log when a Compensating Task has been successfully completed. Returns an error if it fails.
EndCompTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written EndCompTask message will win
Returns an error if it fails
func (*Saga) EndSaga ¶
Log an End Saga Message to the log, returns updated SagaState Returns the resulting SagaState or an error if it fails
Once EndSaga is successfully called, trying to log additional messages will result in a panic.
func (*Saga) EndTask ¶
Log an EndTask Message to the log. Indicates that this task has been successfully completed. Returns an error if it fails.
EndTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written EndTask message will win
Returns an error if it fails
func (*Saga) StartCompensatingTask ¶
Log a Start Compensating Task Message to the log. Should only be logged after a Saga has been avoided and in Rollback Recovery Mode. Should not be used in ForwardRecovery Mode returns an error if it fails
StartCompTask is idempotent with respect to sagaId & taskId. If the data passed changes the last written StartCompTask message will win
Returns an error if it fails
type SagaCoordinator ¶
type SagaCoordinator struct {
// contains filtered or unexported fields
}
Saga Object which provides all Saga Functionality Implementations of SagaLog should provide a factory method which returns a saga based on its implementation.
func MakeSagaCoordinator ¶
func MakeSagaCoordinator(log SagaLog, stat stats.StatsReceiver) SagaCoordinator
Make a Saga which uses the specied SagaLog interface for durable storage
func (SagaCoordinator) GetNumSagas ¶
func (sc SagaCoordinator) GetNumSagas() int
GetNumSagas get the number of sagas currently being managed in memory
func (SagaCoordinator) GetSagaState ¶
func (s SagaCoordinator) GetSagaState(sagaId string) (*SagaState, error)
Read the Current SagaState from the Log, intended for status queries does not check for recovery. RecoverSagaState should be used for recovering state in a failure scenario
func (SagaCoordinator) MakeSaga ¶
func (s SagaCoordinator) MakeSaga(sagaId string, job []byte) (*Saga, error)
Make a Saga add it to the SagaCoordinator, if a Saga Already exists with the same id, it will overwrite the already existing one.
func (SagaCoordinator) RecoverSagaState ¶
func (sc SagaCoordinator) RecoverSagaState(sagaId string, recoveryType SagaRecoveryType) (*Saga, error)
Recovers SagaState by reading all logged messages from the log. Utilizes the specified recoveryType to determine if Saga needs to be Aborted or can proceed safely.
Returns the current SagaState. If no Saga exists for the requested id, nil is returned
func (SagaCoordinator) Startup ¶
func (s SagaCoordinator) Startup() ([]string, error)
Should be called at Saga Creation time. Returns a Slice of In Progress SagaIds
type SagaLog ¶
type SagaLog interface { /* * Log a Start Saga Message message to the log. * Returns an error if it fails. */ StartSaga(sagaId string, job []byte) error /* * Update the State of the Saga by Logging a message. * Returns an error if it fails. */ LogMessage(message SagaMessage) error /* * Log multiple messages to a single saga in one call. */ LogBatchMessages(messages []SagaMessage) error /* * Returns all of the messages logged so far for the * specified saga. Does not return an error if the * saga does not exist. */ GetMessages(sagaId string) ([]SagaMessage, error) /* * Returns a list of all in progress sagaIds. * This MUST include all not completed sagaIds. * It may also included completed sagas * Returns an error if it fails. */ GetActiveSagas() ([]string, error) }
* SagaLog Interface
type SagaMessage ¶
type SagaMessage struct { SagaId string MsgType SagaMessageType Data []byte TaskId string }
* Data Structure representation of a entry in the SagaLog. * Different SagaMessageTypes utilize different fields. * Factory Methods are supplied for creation of Saga Messages * and should be used instead of directly creatinga sagaMessage struct
func MakeAbortSagaMessage ¶
func MakeAbortSagaMessage(sagaId string) SagaMessage
* AbortSaga SagaMessageType * - sagaId - id of the Saga
func MakeEndCompTaskMessage ¶
func MakeEndCompTaskMessage(sagaId string, taskId string, results []byte) SagaMessage
* EndCompTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the completed compensating task. Should * be the same as the original taskId * - data - any results from compensating task completion
func MakeEndSagaMessage ¶
func MakeEndSagaMessage(sagaId string) SagaMessage
* EndSaga SagaMessageType * - sagaId - id of the Saga
func MakeEndTaskMessage ¶
func MakeEndTaskMessage(sagaId string, taskId string, results []byte) SagaMessage
* EndTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the completed Task * - data - any results from task completion
func MakeStartCompTaskMessage ¶
func MakeStartCompTaskMessage(sagaId string, taskId string, data []byte) SagaMessage
* StartCompTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the started compensating task. Should * be the same as the original taskId * - data - data that is persisted to the log, useful for * diagnostic information
func MakeStartSagaMessage ¶
func MakeStartSagaMessage(sagaId string, job []byte) SagaMessage
* StartSaga SagaMessageType * - sagaId - id of the Saga * - data - data needed to execute the saga
func MakeStartTaskMessage ¶
func MakeStartTaskMessage(sagaId string, taskId string, data []byte) SagaMessage
* StartTask SagaMessageType * - sagaId - id of the Saga * - taskId - id of the started Task * - data - data that is persisted to the log, useful for * diagnostic information
func (SagaMessage) String ¶
func (s SagaMessage) String() string
type SagaMessageType ¶
type SagaMessageType int
const ( StartSaga SagaMessageType = iota EndSaga AbortSaga StartTask EndTask StartCompTask EndCompTask )
func (SagaMessageType) String ¶
func (s SagaMessageType) String() string
type SagaRecoveryType ¶
type SagaRecoveryType int
const ( RollbackRecovery SagaRecoveryType = iota ForwardRecovery )
Saga Recovery Types define how to interpret SagaState in RecoveryMode.
ForwardRecovery: all tasks in the saga must be executed at least once.
tasks MUST BE idempotent
RollbackRecovery: if Saga is Aborted or in unsafe state, compensating
tasks for all started tasks need to be executed. compensating tasks MUST BE idempotent.
type SagaState ¶
type SagaState struct {
// contains filtered or unexported fields
}
* Data Structure representation of the current state of the Saga.
func (*SagaState) GetEndCompTaskData ¶
* Get Data Associated with End Comp Task, supplied as * Part of the EndCompTask Message
func (*SagaState) GetEndTaskData ¶
* Get Data Associated with End Task, supplied as * Part of the EndTask Message
func (*SagaState) GetStartCompTaskData ¶
* Get Data Associated with Starting Comp Task, supplied as * Part of the StartCompTask Message
func (*SagaState) GetStartTaskData ¶
* Get Data Associated with Start Task, supplied as * Part of the StartTask Message
func (*SagaState) GetTaskIds ¶
* Returns a lists of task ids associated with this Saga
func (*SagaState) IsCompTaskCompleted ¶
* Returns true if the specified Compensating Task has been completed, * fasle otherwise
func (*SagaState) IsCompTaskStarted ¶
* Returns true if the specified Compensating Task has been started, * fasle otherwise
func (*SagaState) IsSagaAborted ¶
* Returns true if this Saga has been Aborted, false otherwise
func (*SagaState) IsSagaCompleted ¶
* Returns true if this Saga has been Completed, false otherwise
func (*SagaState) IsTaskCompleted ¶
* Returns true if the specified Task has been completed, * fasle otherwise
func (*SagaState) IsTaskStarted ¶
* Returns true if the specified Task has been started, * fasle otherwise
type StateTaskPair ¶
type StateTaskPair struct {
// contains filtered or unexported fields
}
func (StateTaskPair) String ¶
func (p StateTaskPair) String() string