Documentation
¶
Overview ¶
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2018 StreamSets Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func NewEdgeRunner(pipelineId string, config execution.Config, runtimeInfo *common.RuntimeInfo, ...) (execution.Runner, error)
- type BatchImpl
- type BatchMakerImpl
- type EdgeRunner
- func (edgeRunner *EdgeRunner) CommitOffset(sourceOffset common.SourceOffset) error
- func (edgeRunner *EdgeRunner) GetErrorMessages(stageInstanceName string, size int) ([]api.ErrorMessage, error)
- func (edgeRunner *EdgeRunner) GetErrorRecords(stageInstanceName string, size int) ([]api.Record, error)
- func (edgeRunner *EdgeRunner) GetHistory() ([]*common.PipelineState, error)
- func (edgeRunner *EdgeRunner) GetMetrics() (metrics.Registry, error)
- func (edgeRunner *EdgeRunner) GetOffset() (common.SourceOffset, error)
- func (edgeRunner *EdgeRunner) GetPipelineConfig() common.PipelineConfiguration
- func (edgeRunner *EdgeRunner) GetStatus() (*common.PipelineState, error)
- func (edgeRunner *EdgeRunner) IsRemotePipeline() bool
- func (edgeRunner *EdgeRunner) ResetOffset() error
- func (edgeRunner *EdgeRunner) StartPipeline(runtimeParameters map[string]interface{}) (*common.PipelineState, error)
- func (edgeRunner *EdgeRunner) StopPipeline() (*common.PipelineState, error)
- type FullPipeBatch
- func (b *FullPipeBatch) CompleteStage(batchMaker *BatchMakerImpl)
- func (b *FullPipeBatch) GetBatch(pipe StagePipe) *BatchImpl
- func (b *FullPipeBatch) GetBatchSize() int
- func (b *FullPipeBatch) GetErrorMessages() int64
- func (b *FullPipeBatch) GetErrorRecords() int64
- func (b *FullPipeBatch) GetErrorSink() *common.ErrorSink
- func (b *FullPipeBatch) GetEventRecords() int64
- func (b *FullPipeBatch) GetEventSink() *common.EventSink
- func (b *FullPipeBatch) GetInputRecords() int64
- func (b *FullPipeBatch) GetOutputRecords() int64
- func (b *FullPipeBatch) GetPreviousOffset() *string
- func (b *FullPipeBatch) GetSnapshotsOfAllStagesOutput() []execution.StageOutput
- func (b *FullPipeBatch) OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)
- func (b *FullPipeBatch) SetNewOffset(newOffset *string)
- func (b *FullPipeBatch) StartStage(pipe StagePipe) *BatchMakerImpl
- type MetricsEventRunnable
- type Pipe
- type PipeBatch
- type Pipeline
- type ProductionPipeline
- type ProductionSourceOffsetTracker
- func (o *ProductionSourceOffsetTracker) CommitOffset() error
- func (o *ProductionSourceOffsetTracker) GetLastBatchTime() time.Time
- func (o *ProductionSourceOffsetTracker) GetOffset() *string
- func (o *ProductionSourceOffsetTracker) IsFinished() bool
- func (o *ProductionSourceOffsetTracker) SetOffset(newOffset *string)
- type SDCMetrics
- type StagePipe
- func (s *StagePipe) Destroy()
- func (s *StagePipe) GetEventLanes() []string
- func (s *StagePipe) GetInstanceName() string
- func (s *StagePipe) GetOutputLanes() []string
- func (s *StagePipe) GetStageContext() api.StageContext
- func (s *StagePipe) Init() []validation.Issue
- func (s *StagePipe) IsProcessor() bool
- func (s *StagePipe) IsSource() bool
- func (s *StagePipe) IsTarget() bool
- func (s *StagePipe) Process(pipeBatch PipeBatch) error
- type StageRuntime
Constants ¶
const ( STATS_DPM_DIRECTLY_TARGET = "com_streamsets_pipeline_stage_destination_devnull_StatsDpmDirectlyDTarget" REMOTE_TIMESERIES_URL = "REMOTE_TIMESERIES_URL" PIPELINE_COMMIT_ID = "PIPELINE_COMMIT_ID" JOB_ID = "JOB_ID" UPDATE_WAIT_TIME_MS = "UPDATE_WAIT_TIME_MS" DPM_PIPELINE_COMMIT_ID = "dpm.pipeline.commitId" DPM_JOB_ID = "dpm.job.id" TIME_SERIES_ANALYSIS_PARAM_ID = "TIME_SERIES_ANALYSIS" TIME_SERIES_ANALYSIS_METADATA_ID = "timeSeriesAnalysis" )
const ( InputRecords = ".inputRecords" OutputRecords = ".outputRecords" ErrorRecords = ".errorRecords" StageErrors = ".stageErrors" BatchProcessing = ".batchProcessing" )
const ( AtMostOnce = "AT_MOST_ONCE" AtLeastOnce = "AT_LEAST_ONCE" PipelineBatchProcessing = "pipeline.batchProcessing" PipelineBatchCount = "pipeline.batchCount" PipelineBatchInputRecords = "pipeline.batchInputRecords" PipelineBatchOutputRecords = "pipeline.batchOutputRecords" PipelineBatchErrorRecords = "pipeline.batchErrorRecords" PipelineBatchErrorMessages = "pipeline.batchErrorMessages" PipelineInputRecordsPerBatch = "pipeline.inputRecordsPerBatch" PipelineOutputRecordsPerBatch = "pipeline.outputRecordsPerBatch" PipelineErrorRecordsPerBatch = "pipeline.errorRecordsPerBatch" PipelineErrorsPerBatch = "pipeline.errorsPerBatch" MaxCountInCache = 10 )
const (
IssueErrorTemplate = "Initialization Error '%s' on Instance : '%s' "
)
Variables ¶
Functions ¶
func NewEdgeRunner ¶
func NewEdgeRunner( pipelineId string, config execution.Config, runtimeInfo *common.RuntimeInfo, pipelineStoreTask pipelineStore.PipelineStoreTask, ) (execution.Runner, error)
Types ¶
type BatchImpl ¶
type BatchImpl struct {
// contains filtered or unexported fields
}
func NewBatchImpl ¶
func (*BatchImpl) GetRecords ¶
func (*BatchImpl) GetSourceOffset ¶
type BatchMakerImpl ¶
type BatchMakerImpl struct {
StageOutputSnapshot map[string][]api.Record
// contains filtered or unexported fields
}
func NewBatchMakerImpl ¶
func NewBatchMakerImpl(stagePipe StagePipe, keepSnapshot bool) *BatchMakerImpl
func (*BatchMakerImpl) AddRecord ¶
func (b *BatchMakerImpl) AddRecord(record api.Record, outputLanes ...string)
func (*BatchMakerImpl) GetLanes ¶
func (b *BatchMakerImpl) GetLanes() []string
func (*BatchMakerImpl) GetSize ¶
func (b *BatchMakerImpl) GetSize() int64
func (*BatchMakerImpl) GetStageOutput ¶
func (b *BatchMakerImpl) GetStageOutput(outputLane ...string) []api.Record
type EdgeRunner ¶
type EdgeRunner struct {
// contains filtered or unexported fields
}
func (*EdgeRunner) CommitOffset ¶
func (edgeRunner *EdgeRunner) CommitOffset(sourceOffset common.SourceOffset) error
func (*EdgeRunner) GetErrorMessages ¶
func (edgeRunner *EdgeRunner) GetErrorMessages(stageInstanceName string, size int) ([]api.ErrorMessage, error)
func (*EdgeRunner) GetErrorRecords ¶
func (*EdgeRunner) GetHistory ¶
func (edgeRunner *EdgeRunner) GetHistory() ([]*common.PipelineState, error)
func (*EdgeRunner) GetMetrics ¶
func (edgeRunner *EdgeRunner) GetMetrics() (metrics.Registry, error)
func (*EdgeRunner) GetOffset ¶
func (edgeRunner *EdgeRunner) GetOffset() (common.SourceOffset, error)
func (*EdgeRunner) GetPipelineConfig ¶
func (edgeRunner *EdgeRunner) GetPipelineConfig() common.PipelineConfiguration
func (*EdgeRunner) GetStatus ¶
func (edgeRunner *EdgeRunner) GetStatus() (*common.PipelineState, error)
func (*EdgeRunner) IsRemotePipeline ¶
func (edgeRunner *EdgeRunner) IsRemotePipeline() bool
func (*EdgeRunner) ResetOffset ¶
func (edgeRunner *EdgeRunner) ResetOffset() error
func (*EdgeRunner) StartPipeline ¶
func (edgeRunner *EdgeRunner) StartPipeline( runtimeParameters map[string]interface{}, ) (*common.PipelineState, error)
func (*EdgeRunner) StopPipeline ¶
func (edgeRunner *EdgeRunner) StopPipeline() (*common.PipelineState, error)
type FullPipeBatch ¶
type FullPipeBatch struct {
StageOutputSnapshot []execution.StageOutput
// contains filtered or unexported fields
}
func (*FullPipeBatch) CompleteStage ¶
func (b *FullPipeBatch) CompleteStage(batchMaker *BatchMakerImpl)
func (*FullPipeBatch) GetBatch ¶
func (b *FullPipeBatch) GetBatch(pipe StagePipe) *BatchImpl
func (*FullPipeBatch) GetBatchSize ¶
func (b *FullPipeBatch) GetBatchSize() int
func (*FullPipeBatch) GetErrorMessages ¶
func (b *FullPipeBatch) GetErrorMessages() int64
func (*FullPipeBatch) GetErrorRecords ¶
func (b *FullPipeBatch) GetErrorRecords() int64
func (*FullPipeBatch) GetErrorSink ¶
func (b *FullPipeBatch) GetErrorSink() *common.ErrorSink
func (*FullPipeBatch) GetEventRecords ¶
func (b *FullPipeBatch) GetEventRecords() int64
func (*FullPipeBatch) GetEventSink ¶
func (b *FullPipeBatch) GetEventSink() *common.EventSink
func (*FullPipeBatch) GetInputRecords ¶
func (b *FullPipeBatch) GetInputRecords() int64
func (*FullPipeBatch) GetOutputRecords ¶
func (b *FullPipeBatch) GetOutputRecords() int64
func (*FullPipeBatch) GetPreviousOffset ¶
func (b *FullPipeBatch) GetPreviousOffset() *string
func (*FullPipeBatch) GetSnapshotsOfAllStagesOutput ¶
func (b *FullPipeBatch) GetSnapshotsOfAllStagesOutput() []execution.StageOutput
func (*FullPipeBatch) OverrideStageOutput ¶
func (b *FullPipeBatch) OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)
func (*FullPipeBatch) SetNewOffset ¶
func (b *FullPipeBatch) SetNewOffset(newOffset *string)
func (*FullPipeBatch) StartStage ¶
func (b *FullPipeBatch) StartStage(pipe StagePipe) *BatchMakerImpl
type MetricsEventRunnable ¶
type MetricsEventRunnable struct {
// contains filtered or unexported fields
}
func NewMetricsEventRunnable ¶
func NewMetricsEventRunnable( pipelineId string, pipelineConfig common.PipelineConfiguration, pipelineBean creation.PipelineBean, metricRegistry metrics.Registry, runtimeInfo *common.RuntimeInfo, ) *MetricsEventRunnable
func (*MetricsEventRunnable) Run ¶
func (m *MetricsEventRunnable) Run()
func (*MetricsEventRunnable) Stop ¶
func (m *MetricsEventRunnable) Stop()
type Pipe ¶
type Pipe interface {
Init() []validation.Issue
Process(pipeBatch PipeBatch) error
Destroy()
IsSource() bool
IsProcessor() bool
IsTarget() bool
GetInstanceName() string
GetStageContext() api.StageContext
GetOutputLanes() []string
GetEventLanes() []string
}
func NewStagePipe ¶
func NewStagePipe(stage StageRuntime, config execution.Config) Pipe
type PipeBatch ¶
type PipeBatch interface {
GetBatchSize() int
GetPreviousOffset() *string
SetNewOffset(offset *string)
GetBatch(pipe StagePipe) *BatchImpl
StartStage(pipe StagePipe) *BatchMakerImpl
CompleteStage(batchMaker *BatchMakerImpl)
GetErrorSink() *common.ErrorSink
GetEventSink() *common.EventSink
GetInputRecords() int64
GetOutputRecords() int64
GetEventRecords() int64
GetErrorRecords() int64
GetErrorMessages() int64
OverrideStageOutput(pipe Pipe, stageOutput *execution.StageOutput)
GetSnapshotsOfAllStagesOutput() []execution.StageOutput
}
type Pipeline ¶
func NewPipeline ¶
func NewPipeline( config execution.Config, pipelineConfig common.PipelineConfiguration, sourceOffsetTracker execution.SourceOffsetTracker, runtimeParameters map[string]interface{}, metricRegistry metrics.Registry, ) (*Pipeline, []validation.Issue)
func (*Pipeline) GetErrorMessages ¶
func (*Pipeline) GetErrorRecords ¶
func (*Pipeline) Init ¶
func (p *Pipeline) Init() []validation.Issue
type ProductionPipeline ¶
type ProductionPipeline struct {
PipelineConfig common.PipelineConfiguration
Pipeline *Pipeline
MetricRegistry metrics.Registry
}
func NewProductionPipeline ¶
func NewProductionPipeline( pipelineId string, config execution.Config, runner execution.Runner, pipelineConfiguration common.PipelineConfiguration, runtimeParameters map[string]interface{}, ) (*ProductionPipeline, []validation.Issue)
func (*ProductionPipeline) Init ¶
func (p *ProductionPipeline) Init() []validation.Issue
func (*ProductionPipeline) Run ¶
func (p *ProductionPipeline) Run()
func (*ProductionPipeline) Stop ¶
func (p *ProductionPipeline) Stop()
type ProductionSourceOffsetTracker ¶
type ProductionSourceOffsetTracker struct {
// contains filtered or unexported fields
}
func NewProductionSourceOffsetTracker ¶
func NewProductionSourceOffsetTracker(pipelineId string) (*ProductionSourceOffsetTracker, error)
func (*ProductionSourceOffsetTracker) CommitOffset ¶
func (o *ProductionSourceOffsetTracker) CommitOffset() error
func (*ProductionSourceOffsetTracker) GetLastBatchTime ¶
func (o *ProductionSourceOffsetTracker) GetLastBatchTime() time.Time
func (*ProductionSourceOffsetTracker) GetOffset ¶
func (o *ProductionSourceOffsetTracker) GetOffset() *string
func (*ProductionSourceOffsetTracker) IsFinished ¶
func (o *ProductionSourceOffsetTracker) IsFinished() bool
func (*ProductionSourceOffsetTracker) SetOffset ¶
func (o *ProductionSourceOffsetTracker) SetOffset(newOffset *string)
type SDCMetrics ¶
type StagePipe ¶
type StagePipe struct {
Stage StageRuntime
InputLanes []string
OutputLanes []string
EventLanes []string
// contains filtered or unexported fields
}
func (*StagePipe) GetEventLanes ¶
func (*StagePipe) GetInstanceName ¶
func (*StagePipe) GetOutputLanes ¶
func (*StagePipe) GetStageContext ¶
func (s *StagePipe) GetStageContext() api.StageContext
func (*StagePipe) Init ¶
func (s *StagePipe) Init() []validation.Issue
func (*StagePipe) IsProcessor ¶
type StageRuntime ¶
type StageRuntime struct {
// contains filtered or unexported fields
}
func NewStageRuntime ¶
func NewStageRuntime( pipelineBean creation.PipelineBean, stageBean creation.StageBean, stageContext api.StageContext, ) StageRuntime
func (*StageRuntime) Destroy ¶
func (s *StageRuntime) Destroy()
func (*StageRuntime) Execute ¶
func (s *StageRuntime) Execute( previousOffset *string, batchSize int, batch *BatchImpl, batchMaker *BatchMakerImpl, ) (*string, error)
func (*StageRuntime) GetInstanceName ¶
func (s *StageRuntime) GetInstanceName() string
func (*StageRuntime) Init ¶
func (s *StageRuntime) Init() []validation.Issue