pipeline

package module
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: MIT Imports: 7 Imported by: 0

README

go-pipeline package

モチベーション

データのパイプライン処理においては、考慮が必要な多くの非機能要件が存在します。

  • 並列制御(並列実行数、レートリミット)
  • エラー処理(一部でのエラー or 全体でのエラー)
  • メトリクス・ロギング

このパッケージは、単一プロセスで動くパイプライン処理を抽象化した軽量な Go ライブラリを提供することで、これらの非機能要件を実装しやすくし、開発者がコアロジックの実装に集中できるようにすることを目指すものです。

MapReduce

このパッケージで実装されているパイプラインモデルは MapReduce という歴史あるプログラミングモデルを強く意識した設計となっています。前提知識としてこちらを参照しておくと理解が深まりやすいのではないかと思います。

https://www.talend.com/jp/resources/what-is-mapreduce/

モデル

346062370-f419de33-faac-41fc-9d2b-281cf84ed19c

パイプライン (Pipeline)

データの流れをモデル化したもの。各処理を行う複数のステージと、それらの間を流れるデータであるレコードから構成されます。 最初のステージに渡されるレコードは処理の開始を表す特殊な値であり、最後のステージから出力されたレコードがパイプライン全体の出力となります。

レコード (Record)

パイプラインを流れるデータを抽象化したものです。レコードの区分けを表すグループ (Group) と、グループ内で一意の値を取る識別子 (Identifier) をヘッダーとして持ち、ペイロードとしてデータの実体を持ちます。

ステージ (Stage)

list(<group1, id1>) -> Stage() -> list(<group2, id2>)

実際の計算処理を実行する部分です。各ステージはレコードの集合を受け取り、何らかの処理を行なって加工したデータをレコードの集合として返す必要があります。 MapReduce を参考に、現在 Mapper と Reducer、および Streamer という 3 つの処理アルゴリズムが実装されています。

マッパー (Mapper)

<group1, id1> -> Mapper() -> list(<group2, id2>)

1 つのレコードに対して複数のレコードを返す関数として表現され、それらをレコード単位で並列に複数実行することで全体の結果を生成します。主に外部 API からのデータの取得処理やデータの変換処理に利用することができます。

リデューサー (Reducer)

<group1, list(id1)> -> Reduce() -> list(<group2, id2>)

前段のレコードをグループ化し、各グループのレコードの集合に対して複数のレコードを返す関数として表現され、それらをグループ単位で並列に複数実行することで全体の結果を返します。主に後段の処理で加工されたデータを集約し集計や保存処理を行うために利用することができます。

ストリーマー (Streamer)

channel(<group1, id1>) -> Streamer() -> channel(<group2, id2>)

レコードを逐次受け取り、逐次出力を返すようなストリーム処理を表現します。Mapper や Reducer での表現が難しい非同期処理を記述する場合に利用します。例えば、大量のデータを生成するような Mapper を実装する際、 代わりに Streamer で逐次結果を返すようにすることで、メモリ使用量を抑えられる他後続の処理を逐次開始することができパフォーマンスの向上も見込めます。

アウトプット (Output)

処理の単位ごとに出力されたレコードをまとめたものをレコードと区別してアウトプットと呼びます。アウトプットはレコードの他、処理の成功・失敗を表すステータス、エラーの場合にはエラーの情報も含みます。

ステータスがエラーになったアウトプットは以降のパイプラインからは除外され、成功したレコードのみで処理が進みます。発生したエラーは別途ステージの実行情報として集計されます。

使い方

examples/ にサンプルコードを配置しているので参考にしてください。サンプルコードでは、VM に対する脆弱性スキャンを題材にリージョンの取得、インスタンスのリストアップ、スキャン、脆弱性の集計をモデル化しています。

1. 各ステージで扱う Record を定義する

Record は以下のようなインターフェースとして定義されています。

type Record interface {
	Group() Group
	Identifier() string
}

type Group interface {
	String() string
}
  • レコードのグループを表す値を Record.Group() として返すように実装します。Group は String() を持つ独自型として定義するか、Group にデータを持たせる必要がない場合はGroupString(string) を利用することができます。
  • レコードの識別子を表す値を Record.Identifier() として返すように実装します。
  • グループや識別子に値を持たせる必要がない場合は、 GroupNAIdentifierNA を利用することができます。
サンプルコードの実装例
type Region struct {
	Name string
}
// Implements Record
func (r *Region) Group() pipeline.Group { return pipeline.GroupNA }
func (r *Region) Identifier() string    { return r.Name }


type Instance struct {
	Region string
	ID     string
}
// Implements Record
func (i *Instance) Group() pipeline.Group { return pipeline.GroupString(i.Region) }
func (i *Instance) Identifier() string    { return i.ID }


type Vulnerability struct {
	Instance *Instance
	ID       string
}
// Implements Record
func (v *Vulnerability) Group() pipeline.Group { return pipeline.GroupString(v.ID) }
func (v *Vulnerability) Identifier() string    { return v.Instance.ID }


type VulnerabilityCount struct {
	VulnerabilityID string
	Count           int
}
// Implements Record
func (v *VulnerabilityCount) Group() pipeline.Group { return pipeline.GroupString(v.VulnerabilityID) }
func (v *VulnerabilityCount) Identifier() string    { return pipeline.IdentifierNA }
2. Mapper / Reducer / Streamer を実装する

Mapper / Reducer / Streamer はそれぞれ次のようなインターフェースとして定義されています。これらを満たす型を実装します。

input には前段で出力されたレコードが入ります。型アサーションにより型を特定した上で必要な情報を参照します。

type Mapper[I Record, O Record] interface {
	Map(ctx context.Context, input I) ([]O, error)
}

type Reducer[I Record, O Record, G Group] interface {
	Reduce(ctx context.Context, group G, inputs []I) ([]O, error)
}

type Streamer[I Record, O Record] interface {
	Stream(ctx context.Context, inputs <-chan I) (<-chan O, <-chan error)
}
サンプルコードの実装例
type RegionLister struct{}

func (l *RegionLister) Stream(ctx context.Context, inputs <-chan pipeline.Origin) (<-chan *Region, <-chan error) {
	<-inputs

	outs := make(chan *Region)
	errs := make(chan error)

	go func() {
		defer close(outs)
		defer close(errs)

		outs <- &Region{Name: "ap-northeast-1"}
		outs <- &Region{Name: "us-west-1"}
	}()

	return outs, errs
}

type VMLister struct{}

func (l *VMLister) Map(ctx context.Context, region *Region) ([]*Instance, error) {
	if region.Name == "ap-northeast-1" {
		return []*Instance{
			{ID: "i-123"},
			{ID: "i-456"},
		}, nil
	}

	return nil, nil
}

type Scanner struct{}

func (s *Scanner) Map(ctx context.Context, instance *Instance) ([]*Vulnerability, error) {
	if instance.ID == "i-123" {
		return nil, fmt.Errorf("failed to scan instance %s", instance.ID)
	}
	if instance.ID == "i-456" {
		return []*Vulnerability{
			{ID: "CVE-2020-5678"},
			{ID: "CVE-2020-9012"},
		}, nil
	}

	return nil, nil
}

type Counter struct{}

func (c *Counter) Reduce(ctx context.Context, group pipeline.Group, vulnerabilities []*Vulnerability) ([]*VulnerabilityCount, error) {
	vulnerabilityID := group.String()

	return []*VulnerabilityCount{
		{VulnerabilityID: vulnerabilityID, Count: len(vulnerabilities)},
	}, nil
}
3. Pipeline を組み立てる

定義した Mapper や Reducer を使い、パイプラインを組み立てます。

pp := pipeline.New(
    pipeline.StreamStage("RegionLister", &RegionLister{}),
    pipeline.MapStage("VMLister", &VMLister{}, pipeline.StageTimeout(1*time.Second)),
    pipeline.MapStage("Scanner", &Scanner{}, pipeline.StageMaxParallel(3)),
    pipeline.ReduceStage("Counter", &Counter{}, pipeline.StageAbortIfAnyError(true)),
)

(Mapper|Reducer|Streamer)Stage() を使って、定義した処理をパイプラインに組み込むことができます。 またオプション引数で以下の値を設定できます。

  • StageTimeout(d time.Duration): ステージ単位のタイムアウト。タイムアウト前に正常に完了したレコードは後続のステージに渡されそのまま実行されていきます。
  • StageMaxParallel(n int): 並列実行数の上限を指定します。Mapper の場合はレコード、Reducer の場合はグループの数が最大の並列数になります。(StreamStage では利用不可)
4. Pipeline を実行する

Execute(ctx context.Context) で定義したパイプラインを実行します。

outputs, stages, err := pp.Execute(context.Background())

Execute() の返り値は以下のようになります。

  • outputs []Record: 最後のステージで処理が正常に完了したレコード
  • stages []StageExecution: 各ステージでの実行結果。定義したステージ順に値が入る
  • err error: StageAbortIfAnyError 設定時にエラーが発生した場合、全体のパイプラインを中止して該当エラーがここに入る
サンプルコードの実装例
fmt.Println("--- Outputs ---")
for _, o := range outputs {
    v := o.(*VulnerabilityCount)
    fmt.Printf("VulnerabilityID: %s, Count: %d\n", v.VulnerabilityID, v.Count)
}

fmt.Println("--- Executions ---")
for _, stage := range stages {
    successCount := 0
    errorCount := 0
    recordCount := 0
    for _, o := range stage.Outputs {
        if o.Status == pipeline.OutputStatusSuccess {
            successCount++
        }
        if o.Status == pipeline.OutputStatusError {
            errorCount++
        }
        recordCount += o.RecordCount
    }

    fmt.Printf("Stage %s: %d records generated, %d success, %d errors\n", stage.Name, recordCount, successCount, errorCount)
}
--- Outputs ---
VulnerabilityID: CVE-2020-5678, Count: 1
VulnerabilityID: CVE-2020-9012, Count: 1
--- Executions ---
Stage RegionLister: 2 records generated, 1 success, 0 errors
Stage VMLister: 2 records generated, 2 success, 0 errors
Stage Scanner: 2 records generated, 1 success, 1 errors
Stage Counter: 2 records generated, 2 success, 0 errors
その他
  • Reducer はデフォルトの挙動では全体のレコードを全て待ち受けた後にそれぞれのグループに分割して処理を行います。全体のデータ量が多い場合には、この挙動ではメモリ使用量が増大する恐れがあります。前段の処理においてグループごとに処理タイミングの偏りがある場合には、GroupCommit という特殊なレコードを用いてグループのレコードを打ち切ることができ、Reducer は GroupCommit を受け取った時点でそのグループの処理を開始します。GroupCommit が送られなかったグループは、前段の全てのレコードの送出が完了した時点でまとめて処理されます。このレコードは、実体のレコードが 0 件のグループを作成したい場合にも利用することができます。

  • 実行時に全ステージの channel を作成し、各ステージで完了した出力から後段に流していく実装となっているので、1 つのステージの実行が完了していない段階でも完了したレコードについて順次後段のステージの処理が実行されていきます。ただし、Reducer は全てのレコードの出力を待ち受けるため前段のステージ全体が完了してから実行されます。

参考実装

pipeline/examples

脆弱性スキャンを題材にしたシンプルなユースケースをサンプルとして実装しています。 本 README と合わせて参照してください。

https://github.com/cloudbase-inc/go-pipeline/tree/main/examples

ライセンス

このプロジェクトは MIT ライセンスの下でライセンスされています。詳細は LICENSE ファイルを参照してください。

Documentation

Index

Constants

View Source
const GroupNA = GroupString(na)
View Source
const IdentifierNA = na

Variables

This section is empty.

Functions

func AbortError added in v1.1.1

func AbortError(err error) error

全体のパイプラインを中止すべきクリティカルなエラーが発生した場合は、このエラーを返してください

func EmptyGroup deprecated

func EmptyGroup(g Group) groupCommit

Deprecated: 代わりにGroupCommitを利用してください

func GroupCommit added in v1.1.0

func GroupCommit(g Group) groupCommit

func IsAbortError added in v1.1.1

func IsAbortError(err error) bool

func RecordKey

func RecordKey(r Record) string

その他ユーティリティ関数等

Types

type Group

type Group interface {
	String() string
}

type GroupCommiter added in v1.2.1

type GroupCommiter interface {
	GroupCommit() bool
}

type GroupString

type GroupString string

シンプルな文字列として表現されるグループ

func (GroupString) String

func (g GroupString) String() string

type Mapper

type Mapper[I Record, O Record] interface {
	Map(ctx context.Context, input I) ([]O, error)
}

<group1, id1> -> Mapper() -> list(<group2, id2>) 利用者が実装する型

type Origin added in v1.2.0

type Origin struct{}

パイプラインの開始点を表す特殊なレコード。処理関数内では無視すること

func (Origin) Group added in v1.2.0

func (o Origin) Group() Group

func (Origin) Identifier added in v1.2.0

func (o Origin) Identifier() string

type Output

type Output struct {
	Unit    string
	Status  OutputStatus
	Records []Record
	Err     error
}

func (Output) Summarized

func (o Output) Summarized() SummarizedOutput

type OutputStatus

type OutputStatus string
const (
	OutputStatusSuccess OutputStatus = "Success"
	OutputStatusError   OutputStatus = "Error"
)

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

func New

func New(stages ...*PipelineStage) *Pipeline

func (*Pipeline) Execute

func (p *Pipeline) Execute(ctx context.Context) (outputs []Record, stages []StageExecution, abortErr error)

type PipelineStage

type PipelineStage struct {
	// contains filtered or unexported fields
}

func MapStage

func MapStage[I Record, O Record](name string, mapper Mapper[I, O], opts ...PipelineStageOption) *PipelineStage

Mapper / Reducerを元にステージを組み立てるためのユーティリティ関数

func ReduceStage

func ReduceStage[I Record, O Record, G Group](name string, reducer Reducer[I, O, G], opts ...PipelineStageOption) *PipelineStage

func Stage

func Stage(pr Processor, opts ...PipelineStageOption) *PipelineStage

Stageの初期化

func StreamStage added in v1.2.1

func StreamStage[I Record, O Record](name string, streamer Streamer[I, O]) *PipelineStage

type PipelineStageOption

type PipelineStageOption func(*PipelineStage)

func StageAbortIfAnyError deprecated

func StageAbortIfAnyError(value bool) PipelineStageOption

Deprecated: 代わりにAbortErrorを利用してください

func StageMaxParallel

func StageMaxParallel(max int) PipelineStageOption

実行時オプション

func StageTimeout

func StageTimeout(timeout time.Duration) PipelineStageOption

type Processor

type Processor interface {
	Name() string
	Type() ProcessorType

	// list(<group1, id1>) -> Stage() -> list(<group2, id2>)
	Process(ctx context.Context, inputs <-chan Record, abort chan<- error) <-chan Output

	// Options
	SetMaxParallel(max int)
	SetAbortIfAnyError(value bool)
}

type ProcessorType

type ProcessorType string
const (
	ProcessorTypeMap    ProcessorType = "Map"
	ProcessorTypeReduce ProcessorType = "Reduce"
	ProcessorTypeStream ProcessorType = "Stream"
)

type Record

type Record interface {
	Group() Group
	Identifier() string
}

利用者で実装すべきインターフェース

type Reducer

type Reducer[I Record, O Record, G Group] interface {
	Reduce(ctx context.Context, group G, inputs []I) ([]O, error)
}

<group1, list(id1)> -> Reduce() -> list(<group2, id2>) 利用者が実装する型

type ReducerOption

type ReducerOption func(p *reduceProcessor)

func (ReducerOption) ReduceStageOption

func (o ReducerOption) ReduceStageOption()

type StageExecution

type StageExecution struct {
	Name       string
	Type       ProcessorType
	GroupCount int
	Outputs    []SummarizedOutput
}

ステージの実行結果

type Streamer added in v1.2.1

type Streamer[I Record, O Record] interface {
	Stream(ctx context.Context, inputs <-chan I) (<-chan O, <-chan error)
}

channel(<group1, id1>) -> Streamer() -> channel(<group2, id2>) 利用者が実装する型

type SummarizedOutput

type SummarizedOutput struct {
	Unit        string
	Status      OutputStatus
	RecordCount int
	Err         error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL