client

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2019 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	GlobalFailure      = "FAILED"
	JSONUnmarshalError = "JSONUNMARSHALERROR"
	DefaultRetries     = 20
	NoRetries          = 0
)

appError codes

Variables

This section is empty.

Functions

func GetNonRetryableError added in v0.1.3

func GetNonRetryableError(err error, method FlinkMethod, errorCode string, message ...string) error

func GetRetryableError added in v0.1.3

func GetRetryableError(err error, method FlinkMethod, errorCode string, maxRetries int32, message ...string) error

Types

type CancelJobRequest

type CancelJobRequest struct {
	CancelJob       bool   `json:"cancel-job"`
	TargetDirectory string `json:"target-directory,omitempty"`
}

type CancelJobResponse

type CancelJobResponse struct {
	TriggerID string `json:"request-id"`
}

type CheckpointResponse

type CheckpointResponse struct {
	Counts  map[string]int32       `json:"counts"`
	Latest  LatestCheckpoints      `json:"latest"`
	History []CheckpointStatistics `json:"history"`
}

type CheckpointStatistics

type CheckpointStatistics struct {
	ID                 uint             `json:"id"`
	Status             CheckpointStatus `json:"status"`
	IsSavepoint        bool             `json:"is_savepoint"`
	TriggerTimestamp   int64            `json:"trigger_timestamp"`
	LatestAckTimestamp int64            `json:"latest_ack_timestamp"`
	StateSize          int64            `json:"state_size"`
	EndToEndDuration   int64            `json:"end_to_end_duration"`
	AlignmentBuffered  int64            `json:"alignment_buffered"`
	NumSubtasks        int64            `json:"num_subtasks"`
	FailureTimestamp   int64            `json:"failure_timestamp"`
	FailureMessage     string           `json:"failure_message"`
	ExternalPath       string           `json:"external_path"`
	Discarded          bool             `json:"discarded"`
	RestoredTimeStamp  int64            `json:"restore_timestamp"`
}

type CheckpointStatus

type CheckpointStatus string
const (
	CheckpointInProgress CheckpointStatus = "IN_PROGRESS"
	CheckpointFailed     CheckpointStatus = "FAILED"
	CheckpointCompleted  CheckpointStatus = "COMPLETED"
)

type ClusterOverviewResponse

type ClusterOverviewResponse struct {
	TaskManagerCount  int32 `json:"taskmanagers"`
	SlotsAvailable    int32 `json:"slots-available"`
	NumberOfTaskSlots int32 `json:"slots-total"`
}

type FailureCause

type FailureCause struct {
	Class      string `json:"class"`
	StackTrace string `json:"stack-trace"`
}

type FlinkAPIInterface

type FlinkAPIInterface interface {
	CancelJobWithSavepoint(ctx context.Context, url string, jobID string) (string, error)
	ForceCancelJob(ctx context.Context, url string, jobID string) error
	SubmitJob(ctx context.Context, url string, jarID string, submitJobRequest SubmitJobRequest) (*SubmitJobResponse, error)
	CheckSavepointStatus(ctx context.Context, url string, jobID, triggerID string) (*SavepointResponse, error)
	GetJobs(ctx context.Context, url string) (*GetJobsResponse, error)
	GetClusterOverview(ctx context.Context, url string) (*ClusterOverviewResponse, error)
	GetLatestCheckpoint(ctx context.Context, url string, jobID string) (*CheckpointStatistics, error)
	GetJobConfig(ctx context.Context, url string, jobID string) (*JobConfigResponse, error)
	GetTaskManagers(ctx context.Context, url string) (*TaskManagersResponse, error)
	GetCheckpointCounts(ctx context.Context, url string, jobID string) (*CheckpointResponse, error)
	GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error)
}

func NewFlinkJobManagerClient

func NewFlinkJobManagerClient(config config.RuntimeConfig) FlinkAPIInterface

type FlinkApplicationError added in v0.1.3

type FlinkApplicationError struct {
	AppError            string       `json:"appError,omitempty"`
	Method              FlinkMethod  `json:"method,omitempty"`
	ErrorCode           string       `json:"errorCode,omitempty"`
	IsRetryable         bool         `json:"isRetryable,omitempty"`
	IsFailFast          bool         `json:"isFailFast,omitempty"`
	MaxRetries          int32        `json:"maxRetries,omitempty"`
	LastErrorUpdateTime *metav1.Time `json:"lastErrorUpdateTime,omitempty"`
}

FlinkApplicationError implements the error interface to make error handling more structured

func NewFlinkApplicationError added in v0.1.3

func NewFlinkApplicationError(appError string, method FlinkMethod, errorCode string, isRetryable bool, isFailFast bool, maxRetries int32) *FlinkApplicationError

func (*FlinkApplicationError) DeepCopy added in v0.1.3

func (*FlinkApplicationError) DeepCopyInto added in v0.1.3

func (f *FlinkApplicationError) DeepCopyInto(out *FlinkApplicationError)

func (*FlinkApplicationError) Error added in v0.1.3

func (f *FlinkApplicationError) Error() string

type FlinkJob

type FlinkJob struct {
	JobID  string   `json:"id"`
	Status JobState `json:"status"`
}

type FlinkJobManagerClient

type FlinkJobManagerClient struct {
	// contains filtered or unexported fields
}

func (*FlinkJobManagerClient) CancelJobWithSavepoint

func (c *FlinkJobManagerClient) CancelJobWithSavepoint(ctx context.Context, url string, jobID string) (string, error)

func (*FlinkJobManagerClient) CheckSavepointStatus

func (c *FlinkJobManagerClient) CheckSavepointStatus(ctx context.Context, url string, jobID, triggerID string) (*SavepointResponse, error)

func (*FlinkJobManagerClient) ForceCancelJob

func (c *FlinkJobManagerClient) ForceCancelJob(ctx context.Context, url string, jobID string) error

func (*FlinkJobManagerClient) GetCheckpointCounts

func (c *FlinkJobManagerClient) GetCheckpointCounts(ctx context.Context, url string, jobID string) (*CheckpointResponse, error)

func (*FlinkJobManagerClient) GetClusterOverview

func (c *FlinkJobManagerClient) GetClusterOverview(ctx context.Context, url string) (*ClusterOverviewResponse, error)

func (*FlinkJobManagerClient) GetJobConfig

func (c *FlinkJobManagerClient) GetJobConfig(ctx context.Context, url, jobID string) (*JobConfigResponse, error)

func (*FlinkJobManagerClient) GetJobOverview

func (c *FlinkJobManagerClient) GetJobOverview(ctx context.Context, url string, jobID string) (*FlinkJobOverview, error)

func (*FlinkJobManagerClient) GetJobs

func (*FlinkJobManagerClient) GetLatestCheckpoint

func (c *FlinkJobManagerClient) GetLatestCheckpoint(ctx context.Context, url string, jobID string) (*CheckpointStatistics, error)

func (*FlinkJobManagerClient) GetTaskManagers

func (c *FlinkJobManagerClient) GetTaskManagers(ctx context.Context, url string) (*TaskManagersResponse, error)

func (*FlinkJobManagerClient) SubmitJob

func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID string, submitJobRequest SubmitJobRequest) (*SubmitJobResponse, error)

type FlinkJobOverview

type FlinkJobOverview struct {
	JobID     string   `json:"jid"`
	State     JobState `json:"state"`
	StartTime int64    `json:"start-time"`
	EndTime   int64    `json:"end-time"`
}

type FlinkMethod added in v0.1.3

type FlinkMethod string
const (
	CancelJobWithSavepoint FlinkMethod = "CancelJobWithSavepoint"
	ForceCancelJob         FlinkMethod = "ForceCancelJob"
	SubmitJob              FlinkMethod = "SubmitJob"
	CheckSavepointStatus   FlinkMethod = "CheckSavepointStatus"
	GetJobs                FlinkMethod = "GetJobs"
	GetClusterOverview     FlinkMethod = "GetClusterOverview"
	GetLatestCheckpoint    FlinkMethod = "GetLatestCheckpoint"
	GetJobConfig           FlinkMethod = "GetJobConfig"
	GetTaskManagers        FlinkMethod = "GetTaskManagers"
	GetCheckpointCounts    FlinkMethod = "GetCheckpointCounts"
	GetJobOverview         FlinkMethod = "GetJobOverview"
)

type GetJobsResponse

type GetJobsResponse struct {
	Jobs []FlinkJob `json:"jobs"`
}

type JobConfigResponse

type JobConfigResponse struct {
	JobID           string             `json:"jid"`
	ExecutionConfig JobExecutionConfig `json:"execution-config"`
}

type JobExecutionConfig

type JobExecutionConfig struct {
	Parallelism int32 `json:"job-parallelism"`
}

type JobState

type JobState string
const (
	Created     JobState = "CREATED"
	Running     JobState = "RUNNING"
	Failing     JobState = "FAILING"
	Failed      JobState = "FAILED"
	Cancelling  JobState = "CANCELLING"
	Canceled    JobState = "CANCELED"
	Finished    JobState = "FINISHED"
	Restarting  JobState = "RESTARTING"
	Suspended   JobState = "SUSPENDED"
	Reconciling JobState = "RECONCILING"
)

type LatestCheckpoints

type LatestCheckpoints struct {
	Completed *CheckpointStatistics `json:"completed,omitempty"`
	Savepoint *CheckpointStatistics `json:"savepoint,omitempty"`
	Failed    *CheckpointStatistics `json:"failed,omitempty"`
	Restored  *CheckpointStatistics `json:"restored,omitempty"`
}

type RetryHandler added in v0.1.3

type RetryHandler struct {
	// contains filtered or unexported fields
}

A Retryer that has methods to determine if an error is retryable and also does exponential backoff

func NewRetryHandler added in v0.1.3

func NewRetryHandler(baseBackoff time.Duration, timeToWait time.Duration, maxBackOff time.Duration) RetryHandler

func (RetryHandler) GetRetryDelay added in v0.1.3

func (r RetryHandler) GetRetryDelay(retryCount int32) time.Duration

func (RetryHandler) IsErrorRetryable added in v0.1.3

func (r RetryHandler) IsErrorRetryable(err error) bool

func (RetryHandler) IsRetryRemaining added in v0.1.3

func (r RetryHandler) IsRetryRemaining(err error, retryCount int32) bool

func (RetryHandler) IsTimeToRetry added in v0.1.3

func (r RetryHandler) IsTimeToRetry(clock clock.Clock, lastUpdatedTime time.Time, retryCount int32) bool

func (RetryHandler) WaitOnError added in v0.1.3

func (r RetryHandler) WaitOnError(clock clock.Clock, lastUpdatedTime time.Time) (time.Duration, bool)

type RetryHandlerInterface added in v0.1.3

type RetryHandlerInterface interface {
	IsErrorRetryable(err error) bool
	IsRetryRemaining(err error, retryCount int32) bool
	WaitOnError(clock clock.Clock, lastUpdatedTime time.Time) (time.Duration, bool)
	GetRetryDelay(retryCount int32) time.Duration
	IsTimeToRetry(clock clock.Clock, lastUpdatedTime time.Time, retryCount int32) bool
}

type SavepointOperationResponse

type SavepointOperationResponse struct {
	Location     string       `json:"location"`
	FailureCause FailureCause `json:"failure-cause"`
}

type SavepointResponse

type SavepointResponse struct {
	SavepointStatus SavepointStatusResponse    `json:"status"`
	Operation       SavepointOperationResponse `json:"operation"`
}

type SavepointStatus

type SavepointStatus string
const (
	SavePointInvalid    SavepointStatus = ""
	SavePointInProgress SavepointStatus = "IN_PROGRESS"
	SavePointCompleted  SavepointStatus = "COMPLETED"
)

type SavepointStatusResponse

type SavepointStatusResponse struct {
	Status SavepointStatus `json:"id"`
}

type SubmitJobRequest

type SubmitJobRequest struct {
	SavepointPath         string `json:"savepointPath"`
	Parallelism           int32  `json:"parallelism"`
	ProgramArgs           string `json:"programArgs"`
	EntryClass            string `json:"entryClass"`
	AllowNonRestoredState bool   `json:"allowNonRestoredState"`
}

type SubmitJobResponse

type SubmitJobResponse struct {
	JobID string `json:"jobid"`
}

type TaskManagerStats

type TaskManagerStats struct {
	Path                   string `json:"path"`
	DataPort               int32  `json:"dataPort"`
	TimeSinceLastHeartbeat int64  `json:"timeSinceLastHeartbeat"`
	SlotsNumber            int32  `json:"slotsNumber"`
	FreeSlots              int32  `json:"freeSlots"`
}

type TaskManagersResponse

type TaskManagersResponse struct {
	TaskManagers []TaskManagerStats `json:"taskmanagers"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL