sparkv1

package
v1.30.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2023 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const JobGetStageResultQuery = "GET_STAGE_RESULT"

Variables

View Source
var (
	ErrTargetNotPointer            = errors.New("unable to set value of non-pointer")
	ErrUnableToBindUnknownMimeType = errors.New("unable to bind with unknown mime type")
)
View Source
var (
	ErrStageNotFoundInNodeChain = errors.New("stage not found in the Node SparkChain")
	ErrConditionalStageSkipped  = errors.New("conditional Stage execution")
	ErrChainIsNotValid          = errors.New("SparkChain is not valid")
)
View Source
var (
	DefaultRetryPolicy = &RetryPolicy{
		InitialInterval:        time.Second * 5,
		BackoffCoefficient:     2,
		MaximumInterval:        time.Minute * 5,
		MaximumAttempts:        1,
		NonRetryableErrorTypes: nil,
	}

	DefaultActivityOptions = &ActivityOptions{
		workflow.ActivityOptions{
			StartToCloseTimeout: time.Minute * 10,
		},
	}
)
View Source
var CompleteError = func(ctx CompleteContext) StageError {
	return NewStageErrorWithCode(errorCodeInternal, errors.New("Complete failed"))
}
View Source
var CompleteSuccess = func(ctx CompleteContext) StageError {
	return nil
}
View Source
var (
	ErrInvalidStageResultMimeType = errors.New("stage result expects mime-type of application/json")
)
View Source
var (
	MimeJsonError = codec.MimeTypeJson.WithType("error")
)

Functions

func NewBindable added in v1.20.0

func NewBindable(value Value) *bindable

Types

type ActivityOptions added in v1.20.0

type ActivityOptions struct {
	workflow.ActivityOptions
}

func (ActivityOptions) GetTemporalActivityOptions added in v1.20.0

func (ao ActivityOptions) GetTemporalActivityOptions() workflow.ActivityOptions

type Bindable added in v1.6.0

type Bindable interface {
	Bind(a any) error
	GetValue() (any, error)
	GetMimeType() string
	String() string
}

func NewBindableError added in v1.20.0

func NewBindableError(err error) Bindable

type BindableConfig added in v1.6.0

type BindableConfig interface {
	Bind(a any) error
	Raw() ([]byte, error)
}

type BindableMap added in v1.20.0

type BindableMap map[string]*bindable

type Builder

type Builder interface {
	NewChain(name string) BuilderChain
	ChainFinalizer
}

Builder contract for the SparkChain builder

func NewBuilder

func NewBuilder() Builder

NewBuilder main entry point to the builder

type BuilderChain

type BuilderChain interface {
	ChainNode
}

BuilderChain the root of a SparkChain

type Chain

type Chain interface {
	// contains filtered or unexported methods
}

Chain finalizes a Node in the SparkChain, used internally to build a part of the SparkChain

type ChainCancelled

type ChainCancelled interface {
	Cancelled(newNode Chain) ChainComplete
}

ChainCancelled contract the builder must implement for cancellation

type ChainCancelledOrComplete

type ChainCancelledOrComplete interface {
	ChainCancelled
	ChainComplete
}

ChainCancelledOrComplete allows defining only Cancel or completion

type ChainCompensate

type ChainCompensate interface {
	Compensate(newNode Chain) ChainCancelledOrComplete
}

ChainCompensate contract the builder must implement for compensation

type ChainComplete

type ChainComplete interface {
	Complete(completeDefinitionFn CompleteDefinitionFn, options ...StageOption) Chain
}

ChainComplete contract the builder must implement for completion

type ChainFinalizer

type ChainFinalizer interface {
	BuildChain() *SparkChain
}

ChainFinalizer finalizes the entire SparkChain, used internally to build the SparkChain

type ChainNode

type ChainNode interface {
	ChainStage // must have at least 1 Stage
}

ChainNode a Node in the SparkChain

type ChainReport

type ChainReport struct {
	Errors   []error
	StageMap map[string]ChainReportStage
	NodeMap  map[string]ChainReportNode
}

type ChainReportNode

type ChainReportNode struct {
	Name          string
	CanCompensate bool
	CanCancel     bool
	// contains filtered or unexported fields
}

type ChainReportStage

type ChainReportStage struct {
	Name  string
	Crumb string
}

type ChainStage

type ChainStage interface {
	Stage(name string, stageDefinitionFn StageDefinitionFn, options ...StageOption) ChainStageAny
}

ChainStage a Stage in the SparkChain Node

type ChainStageAny

type ChainStageAny interface {
	ChainStage
	ChainCompensate
	ChainCancelled
	ChainComplete
}

ChainStageAny allows defining more Stages and at least 1 of each Compensate, cancelled or Complete

type CompleteContext

type CompleteContext interface {
	StageContext
	Output(variables ...*Var) error
	Name() string
}

func NewCompleteContext

func NewCompleteContext(req *ExecuteStageRequest, sparkDataIO SparkDataIO, workflowId, runId, name string, logger Logger, inputs ExecuteSparkInputs) CompleteContext

type CompleteDefinitionFn

type CompleteDefinitionFn = func(ctx CompleteContext) StageError

type CompleteStage added in v1.20.0

type CompleteStage struct {
	Node *Node
	Name string
	// contains filtered or unexported fields
}

type Config

type Config struct {
	Id         string          `yaml:"id"`
	Name       string          `yaml:"Name"`
	QueueGroup string          `yaml:"queue_group"`
	Health     *configHealth   `yaml:"health"`
	Server     *configServer   `yaml:"plugin"`
	Log        *configLog      `yaml:"logging"`
	App        *configApp      `yaml:"app"`
	Temporal   *configTemporal `yaml:"temporal"`
}

type ConfigType added in v1.8.0

type ConfigType string
const (
	ConfigTypeYaml ConfigType = "yaml"
	ConfigTypeJson ConfigType = "json"
)

type Context

type Context interface {
	JobKey() string
	CorrelationID() string
	TransactionID() string
}

func NewJobContext

func NewJobContext(metadata Context, opts *SparkOpts) Context

func NewSparkMetadata

func NewSparkMetadata(jobKey, correlationID, transactionID string, logger Logger) Context

type ErrorCode added in v1.28.0

type ErrorCode string
const (
	ErrorCodeGeneric ErrorCode = "GENERIC"
)

type ErrorOption

type ErrorOption = func(err *stageError) *stageError

func WithErrorCode

func WithErrorCode(errorCode ErrorCode) ErrorOption

func WithMetadata

func WithMetadata(metadata any) ErrorOption

func WithRetry

func WithRetry(times uint, backoffMultiplier uint, firstBackoffWait time.Duration) ErrorOption

type ExecuteSparkError added in v1.20.0

type ExecuteSparkError struct {
	StageName    string           `json:"stage_name"`
	ErrorCode    ErrorCode        `json:"error_code"`
	ErrorMessage string           `json:"error_message,omitempty"`
	Metadata     map[string]any   `json:"metadata,omitempty"`
	StackTrace   []StackTraceItem `json:"stack_trace"`
}

func (*ExecuteSparkError) Error added in v1.20.0

func (ese *ExecuteSparkError) Error() string

type ExecuteSparkInputs added in v1.20.0

type ExecuteSparkInputs BindableMap

type ExecuteSparkOutput added in v1.20.0

type ExecuteSparkOutput struct {
	Outputs BindableMap        `json:"outputs,omitempty"`
	Error   *ExecuteSparkError `json:"error,omitempty"`
}

type ExecuteStageRequest added in v1.20.0

type ExecuteStageRequest struct {
	StageName     string
	Inputs        ExecuteSparkInputs
	WorkflowId    string
	RunId         string
	TransactionId string
	CorrelationId string
	JobKey        string
}

type Gettable added in v1.6.0

type Gettable interface {
	Get(name string) Bindable
}

type IOState added in v1.20.0

type IOState interface {
	GetVar(varName string) any
}

type InitContext added in v1.6.0

type InitContext interface {
	Config() BindableConfig
}

func NewInitContext added in v1.20.0

func NewInitContext(opts *SparkOpts) InitContext

type Input

type Input interface {
	Bindable
}

type Inputs

type Inputs interface {
	Get(name string) Bindable
}

type InternalStageTracker added in v1.20.0

type InternalStageTracker interface {
	SetStageResult(name string, value Bindable)
	SetStageStatus(name string, status StageStatus)
}

type JobContext added in v1.20.0

type JobContext struct {
	context.Context
	Metadata *JobMetadata
}

func (*JobContext) CorrelationID added in v1.20.0

func (jc *JobContext) CorrelationID() string

func (*JobContext) JobKey added in v1.20.0

func (jc *JobContext) JobKey() string

func (*JobContext) TransactionID added in v1.20.0

func (jc *JobContext) TransactionID() string

type JobMetadata added in v1.20.0

type JobMetadata struct {
	SparkId            string             `json:"spark_id"` // id of the spark to execute
	Inputs             ExecuteSparkInputs `json:"inputs"`   // all inputs for the given spark id
	JobKeyValue        string             `json:"job_key"`
	CorrelationIdValue string             `json:"correlation_id"`
	TransactionIdValue string             `json:"transaction_id"`

	RootExecutionWorkflowId string `json:"execution_workflow_id"`     // workflow id of the root execution to query
	RootExecutionRunId      string `json:"execution_run_id"`          // run id of the root execution to query
	JobExecutionWorkflowId  string `json:"job_execution_workflow_id"` // workflow id of the root job workflow
	JobExecutionRunId       string `json:"job_execution_run_id"`      // run id of the root job workflow
}

JobMetadata the context for the spark we want to execute on a module TODO this type should come from the Module Library

type JobState added in v1.20.0

type JobState struct {
	JobContext   *JobMetadata
	StageResults map[string]Bindable
}

type JobWorkflow added in v1.20.0

type JobWorkflow interface {
	Run(ctx workflow.Context, jmd *JobMetadata) (*ExecuteSparkOutput, error)
	ExecuteStageActivity(ctx context.Context, req *ExecuteStageRequest) (Bindable, StageError)
	ExecuteCompleteActivity(ctx context.Context, req *ExecuteStageRequest) (*ExecuteSparkOutput, StageError)
}

func NewJobWorkflow added in v1.20.0

func NewJobWorkflow(ctx context.Context, sparkDataIO SparkDataIO, sparkId string, chain *SparkChain, opts ...WorkflowOption) JobWorkflow

type Logger

type Logger interface {
	Info(format string, v ...any)
	Warn(format string, v ...any)
	Debug(format string, v ...any)
	Error(err error, format string, v ...any)
	AddFields(k string, v any) Logger
}

func NewLogger

func NewLogger() Logger

type Node added in v1.20.0

type Node struct {
	Stages     []*Stage
	Complete   *CompleteStage
	Cancel     *Node
	Compensate *Node
	Name       string
	// contains filtered or unexported fields
}

Node wraps all the Stages of a single SparkChain these are represented as one or more Stages but only one of each - cancellation - compensation - completion (finalizer)

func (*Node) ChainName added in v1.20.0

func (n *Node) ChainName() string

func (*Node) CompletionName added in v1.20.0

func (n *Node) CompletionName() string

func (*Node) CountOfStages added in v1.20.0

func (n *Node) CountOfStages() int

func (*Node) HasCancellationStage added in v1.20.0

func (n *Node) HasCancellationStage() bool

func (*Node) HasCompensationStage added in v1.20.0

func (n *Node) HasCompensationStage() bool

func (*Node) HasCompletionStage added in v1.20.0

func (n *Node) HasCompletionStage() bool

func (*Node) IsCancel added in v1.20.0

func (n *Node) IsCancel() bool

func (*Node) IsCompensate added in v1.20.0

func (n *Node) IsCompensate() bool

func (*Node) IsRoot added in v1.20.0

func (n *Node) IsRoot() bool

type Option

type Option = func(je *SparkOpts) *SparkOpts

func WithSparkConfig added in v1.20.0

func WithSparkConfig(cfg any) Option

type RetryConfig

type RetryConfig struct {
	Times             uint          `json:"times" yaml:"times"`
	FirstBackoffWait  time.Duration `json:"first_backoff_wait" yaml:"first_backoff_wait"`
	BackoffMultiplier uint          `json:"backoff_multiplier" yaml:"backoff_multiplier"`
}

type RetryPolicy added in v1.20.0

type RetryPolicy struct {
	// Backoff interval for the first retry. If BackoffCoefficient is 1.0 then it is used for all retries.
	// If not set or set to 0, a default interval of 1s will be used.
	InitialInterval time.Duration `yaml:"initial_interval"`

	// Coefficient used to calculate the next retry backoff interval.
	// The next retry interval is previous interval multiplied by this coefficient.
	// Must be 1 or larger. Default is 2.0.
	BackoffCoefficient float64 `yaml:"backoff_coefficient"`

	// Maximum backoff interval between retries. Exponential backoff leads to interval increase.
	// This value is the cap of the interval. Default is 100x of initial interval.
	MaximumInterval time.Duration `yaml:"maximum_interval"`

	// Maximum number of attempts. When exceeded the retries stop even if not expired yet.
	// If not set or set to 0, it means unlimited, and rely on activity ScheduleToCloseTimeout to stop.
	MaximumAttempts int32 `yaml:"maximum_attempts"`

	// Non-Retriable errors. This is optional. Temporal server will stop retry if error type matches this list.
	// Note:
	//  - cancellation is not a failure, so it won't be retried,
	//  - only StartToClose or Heartbeat timeouts are retryable.
	NonRetryableErrorTypes []string `yaml:"non_retryable_error_types"`
}

func (RetryPolicy) GetTemporalPolicy added in v1.20.0

func (rp RetryPolicy) GetTemporalPolicy() *temporal.RetryPolicy

type Spark

type Spark interface {
	BuildChain(b Builder) Chain
	Init(ctx InitContext) error
	Stop()
}

Spark the contract a developer must implement in order to be accepted by a worker

type SparkChain added in v1.20.0

type SparkChain struct {
	RootNode *Node
	// contains filtered or unexported fields
}

SparkChain represents the entire SparkChain the RootNode is the main entry point of the entire SparkChain it holds its own children as a tree below the RootNode

func (*SparkChain) GetStageCompleteFunc added in v1.20.0

func (sc *SparkChain) GetStageCompleteFunc(name string) CompleteDefinitionFn

func (*SparkChain) GetStageFunc added in v1.20.0

func (sc *SparkChain) GetStageFunc(name string) StageDefinitionFn

type SparkDataIO added in v1.20.0

type SparkDataIO interface {
	GetStageResult(workflowId, runId, stageName string) (Bindable, error)
}

type SparkOpts added in v1.20.0

type SparkOpts struct {
	// contains filtered or unexported fields
}

type StackTraceItem added in v1.25.0

type StackTraceItem struct {
	Type     string `json:"type"`
	Filepath string `json:"filepath"`
}

type Stage added in v1.20.0

type Stage struct {
	Node *Node
	Name string
	// contains filtered or unexported fields
}

func (*Stage) ApplyConditionalExecutionOptions added in v1.20.0

func (s *Stage) ApplyConditionalExecutionOptions(ctx Context, stageName string) StageError

type StageContext

type StageContext interface {
	Context
	Input(names string) Input
	StageResult(name string) Bindable
	Log() Logger
	Name() string
}

func NewStageContext

func NewStageContext(req *ExecuteStageRequest, sparkDataIO SparkDataIO, workflowId, runId, name string, logger Logger, inputs ExecuteSparkInputs) StageContext

type StageDefinitionFn

type StageDefinitionFn = func(ctx StageContext) (any, StageError)

type StageError

type StageError interface {
	ErrorCode() ErrorCode
	StageName() string
	Error() string
	Metadata() map[string]any
	GetRetryConfig() *RetryConfig
	// contains filtered or unexported methods
}

func NewStageError

func NewStageError(err error, opts ...ErrorOption) StageError

func NewStageErrorWithCode added in v1.28.0

func NewStageErrorWithCode(errorCode ErrorCode, err error, opts ...ErrorOption) StageError

type StageOption

type StageOption = func(StageOptionParams) StageError

type StageOptionParams

type StageOptionParams interface {
	StageName() string
	Context() Context
}

type StageStatus

type StageStatus string
const (
	StageStatus_STAGE_PENDING   StageStatus = "STAGE_PENDING"
	StageStatus_STAGE_STARTED   StageStatus = "STAGE_STARTED"
	StageStatus_STAGE_COMPLETED StageStatus = "STAGE_COMPLETED"
	StageStatus_STAGE_FAILED    StageStatus = "STAGE_FAILED"
	StageStatus_STAGE_SKIPPED   StageStatus = "STAGE_SKIPPED"
	StageStatus_STAGE_CANCELED  StageStatus = "CANCELED"
)

type StageTracker added in v1.20.0

type StageTracker interface {
	GetStageResult(name string) (data any, mime codec.MimeType, err StageError)
	AssertStageCompleted(stageName string)
	AssertStageStarted(stageName string)
	AssertStageSkipped(stageName string)
	AssertStageCancelled(stageName string)
	AssertStageFailed(stageName string)
	AssertStageResult(stageName string, expectedStageResult any)
	AssertStageOrder(stageNames ...string)
}

type TemporalLogger added in v1.20.0

type TemporalLogger struct {
}

Logger

func (*TemporalLogger) Debug added in v1.20.0

func (f *TemporalLogger) Debug(msg string, keyvals ...interface{})

func (*TemporalLogger) Error added in v1.20.0

func (f *TemporalLogger) Error(msg string, keyvals ...interface{})

func (*TemporalLogger) Info added in v1.20.0

func (f *TemporalLogger) Info(msg string, keyvals ...interface{})

func (*TemporalLogger) Warn added in v1.20.0

func (f *TemporalLogger) Warn(msg string, keyvals ...interface{})

type Value added in v1.20.0

type Value struct {
	Value    any    `json:"value"`
	Raw      any    `json:"raw"` // TODO Remove, only used for debug
	MimeType string `json:"mime_type"`
}

type Var

type Var struct {
	Name     string
	MimeType codec.MimeType
	Value    any
}

func NewVar

func NewVar(name string, mimeType codec.MimeType, value any) *Var

type Worker

type Worker interface {
	Run()
}

func NewSparkWorker

func NewSparkWorker(ctx context.Context, spark Spark, options ...Option) (Worker, error)

type WorkflowOption added in v1.20.0

type WorkflowOption = func(je *workflowOpts) *workflowOpts

func WithStageTracker added in v1.20.0

func WithStageTracker(ist InternalStageTracker) WorkflowOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL