Documentation
¶
Index ¶
- Constants
- func AbortError(err error) error
- func EmptyGroup(g Group) groupCommitdeprecated
- func GroupCommit(g Group) groupCommit
- func IsAbortError(err error) bool
- func RecordKey(r Record) string
- type Group
- type GroupCommiter
- type GroupString
- type Mapper
- type Origin
- type Output
- type OutputStatus
- type Pipeline
- type PipelineStage
- func MapStage[I Record, O Record](name string, mapper Mapper[I, O], opts ...PipelineStageOption) *PipelineStage
- func ReduceStage[I Record, O Record, G Group](name string, reducer Reducer[I, O, G], opts ...PipelineStageOption) *PipelineStage
- func Stage(pr Processor, opts ...PipelineStageOption) *PipelineStage
- func StreamStage[I Record, O Record](name string, streamer Streamer[I, O]) *PipelineStage
- type PipelineStageOption
- type Processor
- type ProcessorType
- type Record
- type Reducer
- type ReducerOption
- type StageExecution
- type Streamer
- type SummarizedOutput
Constants ¶
View Source
const GroupNA = GroupString(na)
View Source
const IdentifierNA = na
Variables ¶
This section is empty.
Functions ¶
func AbortError ¶ added in v1.1.1
全体のパイプラインを中止すべきクリティカルなエラーが発生した場合は、このエラーを返してください
func EmptyGroup
deprecated
func EmptyGroup(g Group) groupCommit
Deprecated: 代わりにGroupCommitを利用してください
func GroupCommit ¶ added in v1.1.0
func GroupCommit(g Group) groupCommit
func IsAbortError ¶ added in v1.1.1
Types ¶
type GroupCommiter ¶ added in v1.2.1
type GroupCommiter interface {
GroupCommit() bool
}
type GroupString ¶
type GroupString string
シンプルな文字列として表現されるグループ
func (GroupString) String ¶
func (g GroupString) String() string
type Origin ¶ added in v1.2.0
type Origin struct{}
パイプラインの開始点を表す特殊なレコード。処理関数内では無視すること
func (Origin) Identifier ¶ added in v1.2.0
type Output ¶
type Output struct { Unit string Status OutputStatus Records []Record Err error }
func (Output) Summarized ¶
func (o Output) Summarized() SummarizedOutput
type OutputStatus ¶
type OutputStatus string
const ( OutputStatusSuccess OutputStatus = "Success" OutputStatusError OutputStatus = "Error" )
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
func New ¶
func New(stages ...*PipelineStage) *Pipeline
type PipelineStage ¶
type PipelineStage struct {
// contains filtered or unexported fields
}
func MapStage ¶
func MapStage[I Record, O Record](name string, mapper Mapper[I, O], opts ...PipelineStageOption) *PipelineStage
Mapper / Reducerを元にステージを組み立てるためのユーティリティ関数
func ReduceStage ¶
func ReduceStage[I Record, O Record, G Group](name string, reducer Reducer[I, O, G], opts ...PipelineStageOption) *PipelineStage
func StreamStage ¶ added in v1.2.1
func StreamStage[I Record, O Record](name string, streamer Streamer[I, O]) *PipelineStage
type PipelineStageOption ¶
type PipelineStageOption func(*PipelineStage)
func StageAbortIfAnyError
deprecated
func StageAbortIfAnyError(value bool) PipelineStageOption
Deprecated: 代わりにAbortErrorを利用してください
func StageTimeout ¶
func StageTimeout(timeout time.Duration) PipelineStageOption
type ProcessorType ¶
type ProcessorType string
const ( ProcessorTypeMap ProcessorType = "Map" ProcessorTypeReduce ProcessorType = "Reduce" ProcessorTypeStream ProcessorType = "Stream" )
type Reducer ¶
type Reducer[I Record, O Record, G Group] interface { Reduce(ctx context.Context, group G, inputs []I) ([]O, error) }
<group1, list(id1)> -> Reduce() -> list(<group2, id2>) 利用者が実装する型
type ReducerOption ¶
type ReducerOption func(p *reduceProcessor)
func (ReducerOption) ReduceStageOption ¶
func (o ReducerOption) ReduceStageOption()
type StageExecution ¶
type StageExecution struct { Name string Type ProcessorType GroupCount int Outputs []SummarizedOutput }
ステージの実行結果
type Streamer ¶ added in v1.2.1
type Streamer[I Record, O Record] interface { Stream(ctx context.Context, inputs <-chan I) (<-chan O, <-chan error) }
channel(<group1, id1>) -> Streamer() -> channel(<group2, id2>) 利用者が実装する型
type SummarizedOutput ¶
type SummarizedOutput struct { Unit string Status OutputStatus RecordCount int Err error }
Source Files
¶
Click to show internal directories.
Click to hide internal directories.