egobatch

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: MIT Imports: 6 Imported by: 1

README

GitHub Workflow Status (branch) GoDoc Coverage Status Supported Go Versions GitHub Release Go Report Card

egobatch

Generic batch task processing with type-safe custom error handling.


CHINESE README

中文说明

Main Features

🎯 Type-Safe Error Handling: Generic error types with comparable constraints ⚡ Batch Task Processing: Concurrent execution with TaskBatch abstraction 🔄 Flexible Execution Modes: Glide mode (independent tasks) and fail-fast mode 🌍 Context Cancellation: Full context propagation and timeout support 📋 Result Aggregation: Filter success/failed tasks with OkTasks/WaTasks methods

Installation

go get github.com/yyle88/egobatch

Quick Start

Basic errgroup with Custom Error Type
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/yyle88/egobatch/erxgroup"
)

// MyError is a custom error type with Code and Msg fields
type MyError struct {
	Code string
	Msg  string
}

func (e *MyError) Error() string {
	return fmt.Sprintf("[%s] %s", e.Code, e.Msg)
}

func main() {
	ctx := context.Background()
	ego := erxgroup.NewGroup[*MyError](ctx)

	// Add task 1: takes 100ms to finish
	ego.Go(func(ctx context.Context) *MyError {
		time.Sleep(100 * time.Millisecond)
		fmt.Println("Task 1 finished OK")
		return nil
	})

	// Add task 2: takes 50ms to finish
	ego.Go(func(ctx context.Context) *MyError {
		time.Sleep(50 * time.Millisecond)
		fmt.Println("Task 2 finished OK")
		return nil
	})

	// Add task 3: takes 80ms to finish
	ego.Go(func(ctx context.Context) *MyError {
		time.Sleep(80 * time.Millisecond)
		fmt.Println("Task 3 finished OK")
		return nil
	})

	// Wait until tasks finish and get the first issue
	if erx := ego.Wait(); erx != nil {
		fmt.Printf("Got issue: %s\n", erx.Error())
	} else {
		fmt.Println("Tasks finished OK")
	}
}

⬆️ Source: Source

Batch Task Processing
package main

import (
	"context"
	"fmt"

	"github.com/yyle88/egobatch"
	"github.com/yyle88/egobatch/erxgroup"
	"github.com/yyle88/must"
)

// MyError is a simple custom error type with error code
type MyError struct {
	Code string
}

func (e *MyError) Error() string {
	return e.Code
}

func main() {
	// Create batch with arguments
	args := []int{1, 2, 3, 4, 5}
	batch := egobatch.NewTaskBatch[int, string, *MyError](args)

	// Configure glide mode - keep going even when issues happen
	batch.SetGlide(true)

	// Execute batch tasks
	ctx := context.Background()
	ego := erxgroup.NewGroup[*MyError](ctx)

	batch.EgoRun(ego, func(ctx context.Context, num int) (string, *MyError) {
		if num%2 == 0 {
			// Even numbers finish OK
			return fmt.Sprintf("even-%d", num), nil
		}
		// Odd numbers have issues
		return "", &MyError{Code: "ODD_NUMBER"}
	})

	// In glide mode, ego.Wait() returns nil because errors are captured in tasks
	must.Null(ego.Wait())

	// Get and handle task outcomes
	okTasks := batch.Tasks.OkTasks()
	waTasks := batch.Tasks.WaTasks()

	fmt.Printf("Success: %d, Failed: %d\n", len(okTasks), len(waTasks))

	// Show OK outcomes
	for _, task := range okTasks {
		fmt.Printf("Arg: %d -> Outcome: %s\n", task.Arg, task.Res)
	}

	// Show bad outcomes
	for _, task := range waTasks {
		fmt.Printf("Arg: %d -> Issue: %s\n", task.Arg, task.Erx.Error())
	}
}

⬆️ Source: Source

Context Timeout Handling
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/yyle88/egobatch"
	"github.com/yyle88/egobatch/erxgroup"
	"github.com/yyle88/must"
)

// MyError is a custom error type with error code
type MyError struct {
	Code string
}

func (e *MyError) Error() string {
	return e.Code
}

func main() {
	// Create context with 150ms timeout
	ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
	defer cancel()

	// Create batch with task arguments
	args := []int{1, 2, 3, 4, 5}
	batch := egobatch.NewTaskBatch[int, string, *MyError](args)

	// Use glide mode to see which tasks finish vs timeout
	batch.SetGlide(true)

	// Convert context issues to custom error type
	batch.SetWaCtx(func(err error) *MyError {
		return &MyError{Code: "TIMEOUT"}
	})

	ego := erxgroup.NewGroup[*MyError](ctx)

	batch.EgoRun(ego, func(ctx context.Context, num int) (string, *MyError) {
		// Each task needs different time: 50ms, 100ms, 150ms, 200ms, 250ms
		taskTime := time.Duration(num*50) * time.Millisecond

		timer := time.NewTimer(taskTime)
		defer timer.Stop()

		select {
		case <-timer.C:
			// Task finishes within timeout
			fmt.Printf("Task %d finished (%dms)\n", num, num*50)
			return fmt.Sprintf("task-%d", num), nil
		case <-ctx.Done():
			// Task cancelled due to timeout
			fmt.Printf("Task %d cancelled (%dms needed)\n", num, num*50)
			return "", &MyError{Code: "CANCELLED"}
		}
	})

	// In glide mode, ego.Wait() returns nil because errors are captured in tasks
	must.Null(ego.Wait())

	// Show task outcomes
	okTasks := batch.Tasks.OkTasks()
	waTasks := batch.Tasks.WaTasks()

	fmt.Printf("\nSuccess: %d, Timeout: %d\n", len(okTasks), len(waTasks))

	// Show finished tasks
	for _, task := range okTasks {
		fmt.Printf("Arg: %d -> Outcome: %s\n", task.Arg, task.Res)
	}

	// Show timed-out tasks
	for _, task := range waTasks {
		fmt.Printf("Arg: %d -> Issue: %s\n", task.Arg, task.Erx.Error())
	}
}

⬆️ Source: Source

Fail-Fast Mode
batch := egobatch.NewTaskBatch[int, string, *MyError](args)
// Default is fail-fast mode (Glide: false)

ego := erxgroup.NewGroup[*MyError](ctx)
batch.EgoRun(ego, taskFunc)

if erx := ego.Wait(); erx != nil {
    // First error stops execution
    fmt.Printf("Stopped on error: %s\n", erx.Error())
}
Task Result Transformation
tasks := batch.Tasks

// Flatten with error handling
results := tasks.Flatten(func(arg int, err *MyError) string {
    return fmt.Sprintf("error-%d: %s", arg, err.Code)
})

// Mix of success results and transformed errors
for _, result := range results {
    fmt.Println(result)
}

Core Components

erxgroup.Group[E ErrorType]

Generic wrapper around errgroup.Group with type-safe custom errors:

  • NewGroup[E](ctx): Create new group with custom error type
  • Go(func(ctx) E): Add task returning custom error
  • TryGo(func(ctx) E): Add task with limit checking
  • Wait() E: Wait and get first typed error
  • SetLimit(n): Restrict concurrent task count
TaskBatch[A, R, E]

Batch task execution with concurrent processing:

  • NewTaskBatch[A, R, E](args): Create batch from arguments
  • SetGlide(bool): Configure execution mode
  • SetWaCtx(func(error) E): Handle context errors
  • GetRun(idx, func): Get task execution function
  • EgoRun(ego, func): Run batch with errgroup
Tasks[A, R, E]

Task collection with filtering methods:

  • OkTasks(): Get tasks that completed with success
  • WaTasks(): Get tasks that failed
  • Flatten(func): Transform results with error handling

Advanced Usage

Context Timeout Handling
batch := egobatch.NewTaskBatch[int, string, *MyError](args)
batch.SetGlide(true)

// Convert context errors to custom type
batch.SetWaCtx(func(err error) *MyError {
    return &MyError{Code: "CONTEXT_ERROR", Msg: err.Error()}
})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

ego := erxgroup.NewGroup[*MyError](ctx)
batch.EgoRun(ego, taskFunc)
ego.Wait()

// Tasks that timed out have context errors recorded
for _, task := range batch.Tasks.WaTasks() {
    fmt.Printf("Task %d error: %s\n", task.Arg, task.Erx.Error())
}
TaskOutput Pattern
import "github.com/yyle88/egobatch"

// Create task outputs
outputs := egobatch.TaskOutputList[int, string, *MyError]{
    egobatch.NewOkTaskOutput(1, "success-1"),
    egobatch.NewWaTaskOutput(2, &MyError{Code: "FAIL"}),
    egobatch.NewOkTaskOutput(3, "success-3"),
}

// Filter and aggregate
okList := outputs.OkList()
okCount := outputs.OkCount()
results := outputs.OkResults()
errors := outputs.WaReasons()

fmt.Printf("Success count: %d\n", okCount)
fmt.Printf("Results: %v\n", results)

Design Patterns

ErrorType Constraint

Custom error types must satisfy the ErrorType constraint:

type ErrorType interface {
    error
    comparable
}

This enables:

  • Type-safe error checking with errors.Is
  • Zero-value nil detection with constraint.Pass(erx)
  • Custom error conversion with errors.As
Glide vs Fail-Fast

Glide Mode (Glide: true):

  • Tasks execute in independent mode
  • Errors recorded without stopping others
  • Context cancellation affects remaining tasks
  • Best with independent operations

Fail-Fast Mode (Glide: false):

  • First error stops batch execution
  • Context gets cancelled on first error
  • Remaining tasks receive context cancellation
  • Best with dependent operations

Examples

See the examples directory:

📄 License

MIT License. See LICENSE.


🤝 Contributing

Contributions are welcome! Report bugs, suggest features, and contribute code:

  • 🐛 Found a mistake? Open an issue on GitHub with reproduction steps
  • 💡 Have a feature idea? Create an issue to discuss the suggestion
  • 📖 Documentation confusing? Report it so we can improve
  • 🚀 Need new features? Share the use cases to help us understand requirements
  • Performance issue? Help us optimize through reporting slow operations
  • 🔧 Configuration problem? Ask questions about complex setups
  • 📢 Follow project progress? Watch the repo to get new releases and features
  • 🌟 Success stories? Share how this package improved the workflow
  • 💬 Feedback? We welcome suggestions and comments

🔧 Development

New code contributions, follow this process:

  1. Fork: Fork the repo on GitHub (using the webpage UI).
  2. Clone: Clone the forked project (git clone https://github.com/yourname/repo-name.git).
  3. Navigate: Navigate to the cloned project (cd repo-name)
  4. Branch: Create a feature branch (git checkout -b feature/xxx).
  5. Code: Implement the changes with comprehensive tests
  6. Testing: (Golang project) Ensure tests pass (go test ./...) and follow Go code style conventions
  7. Documentation: Update documentation to support client-facing changes and use significant commit messages
  8. Stage: Stage changes (git add .)
  9. Commit: Commit changes (git commit -m "Add feature xxx") ensuring backward compatible code
  10. Push: Push to the branch (git push origin feature/xxx).
  11. PR: Open a merge request on GitHub (on the GitHub webpage) with detailed description.

Please ensure tests pass and include relevant documentation updates.


🌟 Support

Welcome to contribute to this project via submitting merge requests and reporting issues.

Project Support:

  • Give GitHub stars if this project helps you
  • 🤝 Share with teammates and (golang) programming friends
  • 📝 Write tech blogs about development tools and workflows - we provide content writing support
  • 🌟 Join the ecosystem - committed to supporting open source and the (golang) development scene

Have Fun Coding with this package! 🎉🎉🎉


GitHub Stars

Stargazers

Documentation

Overview

Package egobatch: Generic batch task processing with type-safe error handling Provides Task and TaskBatch types enabling concurrent batch operations with custom error types Supports result filtering (OkTasks/WaTasks) and flexible result transformation via Flatten

egobatch: 具有类型安全错误处理的泛型批量任务处理 提供 Task 和 TaskBatch 类型,支持使用自定义错误类型的并发批量操作 支持结果过滤(OkTasks/WaTasks)和通过 Flatten 进行灵活的结果转换

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrorType

type ErrorType = constraint.ErrorType

ErrorType is an alias to the constraint defined in the internal package Enables type-safe error handling with comparable custom error types

ErrorType 是内部包中定义的约束的别名 支持使用可比较的自定义错误类型进行类型安全的错误处理

type Task

type Task[A any, R any, E ErrorType] struct {
	Arg A // Task input argument // 任务输入参数
	Res R // Task result value // 任务结果值
	Erx E // Task error (nil when success) // 任务错误(成功时为 nil)
}

Task represents a single task with argument, result, and error Generic type supporting any argument type A, result type R, and error type E

Task 代表单个任务,包含参数、结果和错误 泛型类型支持任意参数类型 A、结果类型 R 和错误类型 E

type TaskBatch

type TaskBatch[A any, R any, E ErrorType] struct {
	Tasks Tasks[A, R, E] // Task collection with arguments and results // 任务集合,包含参数和结果
	Glide bool           // Glide mode flag: false=fail-fast, true=independent tasks // 平滑模式标志:false=快速失败,true=独立任务
	// contains filtered or unexported fields
}

TaskBatch manages batch task execution with concurrent processing Supports glide mode enabling independent task execution and fail-fast mode Provides context error handling and result aggregation capabilities

TaskBatch 管理批量任务的并发执行 支持平滑模式,可以独立执行任务或快速失败 提供上下文错误处理和结果聚合能力

func NewTaskBatch

func NewTaskBatch[A any, R any, E ErrorType](args []A) *TaskBatch[A, R, E]

NewTaskBatch creates batch task engine with starting arguments Each argument becomes a task with zero-initialized result and error Default glide mode is false (fail-fast mode)

NewTaskBatch 使用初始参数创建批量任务处理器 每个参数成为一个任务,结果和错误初始化为零值 默认平滑模式是 false(快速失败行为)

func (*TaskBatch[A, R, E]) EgoRun

func (t *TaskBatch[A, R, E]) EgoRun(ego *erxgroup.Group[E], run func(ctx context.Context, arg A) (R, E))

EgoRun demonstrates GetRun usage with inversion-of-control pattern When task logic is complex and scheduling logic is simple, pass scheduling engine as argument Auto schedules tasks into the provided errgroup

EgoRun 演示 GetRun 使用方式,采用控制反转模式 当任务逻辑较重而调度逻辑较轻时,将调度器作为参数传入 自动将所有任务调度到提供的 errgroup 中

func (*TaskBatch[A, R, E]) GetRun

func (t *TaskBatch[A, R, E]) GetRun(idx int, run func(ctx context.Context, arg A) (R, E)) func(ctx context.Context) E

GetRun creates execution function at given index compatible with errgroup.Go Index must be valid (invoking code controls iteration count as basic contract) Returns wrapped function handling context cancellation and error propagation

GetRun 在给定索引处创建与 errgroup.Go 兼容的执行函数 索引必须有效(调用者控制迭代次数作为基本约定) 返回处理上下文取消和错误传播的包装函数

func (*TaskBatch[A, R, E]) SetGlide

func (t *TaskBatch[A, R, E]) SetGlide(glide bool)

SetGlide configures glide mode When true: tasks execute in independent mode, errors recorded without stopping others When false: first error stops batch execution (fail-fast)

SetGlide 配置平滑模式行为 当为 true:任务独立执行,错误被记录但不停止其他任务 当为 false:第一个错误停止批量执行(快速失败)

func (*TaskBatch[A, R, E]) SetWaCtx

func (t *TaskBatch[A, R, E]) SetWaCtx(waCtx func(err error) E)

SetWaCtx configures context error conversion function Converts context.Context errors into custom error type E Gets invoked when context cancellation happens

SetWaCtx 配置上下文错误转换函数 将 context.Context 错误转换为自定义错误类型 E 在上下文取消或超时发生时调用

type TaskOutput

type TaskOutput[ARG any, RES any, E ErrorType] struct {
	Arg ARG // Task input argument // 任务输入参数
	Res RES // Task result value // 任务结果值
	Erx E   // Task error (nil when success) // 任务错误(成功时为 nil)
}

TaskOutput represents single task execution outcome with argument, result, and error Like Task but designed as simple data container without batch processing logic Used when returning task results without needing complete Task batch capabilities

TaskOutput 代表单个任务执行结果,包含参数、结果和错误 类似 Task 但设计为简单数据传输对象,不包含批处理逻辑 用于返回任务结果而不需要完整的 Task 批处理能力

func NewOkTaskOutput

func NewOkTaskOutput[ARG any, RES any, E ErrorType](arg ARG, res RES) *TaskOutput[ARG, RES, E]

NewOkTaskOutput creates success task output with result Error field initialized to zero value indicating success

NewOkTaskOutput 创建成功的任务输出,包含结果 错误字段初始化为零值,表示成功

func NewWaTaskOutput

func NewWaTaskOutput[ARG any, RES any, E ErrorType](arg ARG, erx E) *TaskOutput[ARG, RES, E]

NewWaTaskOutput creates failed task output with error Result field initialized to zero value as task failed

NewWaTaskOutput 创建失败的任务输出,包含错误 结果字段初始化为零值,因为任务失败

type TaskOutputList

type TaskOutputList[ARG any, RES any, E ErrorType] []*TaskOutput[ARG, RES, E]

TaskOutputList is a collection of task outputs supporting filtering and aggregation Provides methods to separate success and failed results Enables result extraction and error collection patterns

TaskOutputList 是任务输出的集合,支持过滤和聚合 提供分离成功和失败结果的方法 支持结果提取和错误收集模式

func (TaskOutputList[ARG, RES, E]) OkCount

func (rs TaskOutputList[ARG, RES, E]) OkCount() int

OkCount counts success task outputs Returns count of outputs on success

OkCount 统计成功的任务输出 返回成功的输出数量

func (TaskOutputList[ARG, RES, E]) OkList

func (rs TaskOutputList[ARG, RES, E]) OkList() TaskOutputList[ARG, RES, E]

OkList filters and returns outputs that completed with success Returns subset on success

OkList 过滤并返回成功完成的输出 返回成功的子集

func (TaskOutputList[ARG, RES, E]) OkResults

func (rs TaskOutputList[ARG, RES, E]) OkResults() []RES

OkResults extracts result values from success outputs Returns slice containing just results from outputs without errors

OkResults 从成功的输出中提取结果值 返回仅包含无错误输出的结果切片

func (TaskOutputList[ARG, RES, E]) WaCount

func (rs TaskOutputList[ARG, RES, E]) WaCount() int

WaCount counts failed task outputs Returns count of outputs when error occurs

WaCount 统计失败的任务输出 返回出错的输出数量

func (TaskOutputList[ARG, RES, E]) WaList

func (rs TaskOutputList[ARG, RES, E]) WaList() TaskOutputList[ARG, RES, E]

WaList filters and returns outputs that failed with errors Returns subset when error occurs

WaList 过滤并返回失败的输出 返回出错的子集

func (TaskOutputList[ARG, RES, E]) WaReasons

func (rs TaskOutputList[ARG, RES, E]) WaReasons() []E

WaReasons extracts error values from failed outputs Returns slice containing just errors from outputs with failures

WaReasons 从失败的输出中提取错误值 返回仅包含失败输出的错误切片

type Tasks

type Tasks[A any, R any, E ErrorType] []*Task[A, R, E]

Tasks is a slice of Task pointers supporting batch operations Provides filtering and transformation methods on task collections

Tasks 是 Task 指针切片,支持批量操作 提供任务集合的过滤和转换方法

func (Tasks[A, R, E]) Flatten

func (tasks Tasks[A, R, E]) Flatten(newWaFunc func(arg A, erx E) R) []R

Flatten transforms task results into flat slice with error handling Uses newWaFunc to convert failed tasks into result type R Returns slice of results mixing success cases and transformed errors

Flatten 将任务结果转换成扁平切片并处理错误 使用 newWaFunc 将失败的任务转换为结果类型 R 返回混合成功结果和转换后错误的结果切片

func (Tasks[A, R, E]) OkTasks

func (tasks Tasks[A, R, E]) OkTasks() Tasks[A, R, E]

OkTasks filters and returns tasks that completed with success Returns subset of tasks on success

OkTasks 过滤并返回成功完成的任务 返回成功的任务子集

func (Tasks[A, R, E]) WaTasks

func (tasks Tasks[A, R, E]) WaTasks() Tasks[A, R, E]

WaTasks filters and returns tasks that failed with errors Returns subset of tasks when error occurs

WaTasks 过滤并返回失败的任务 返回出错的任务子集

Directories

Path Synopsis
Package erxgroup provides generic wrapper around errgroup with type-safe custom error handling Enables using custom error types instead of standard error interface while maintaining errgroup semantics
Package erxgroup provides generic wrapper around errgroup with type-safe custom error handling Enables using custom error types instead of standard error interface while maintaining errgroup semantics
internal
demos/demo1x command
demos/demo2x command
demos/demo3x command
examples/example1
Package example1 demonstrates basic batch task processing patterns Shows guest and order processing with error handling
Package example1 demonstrates basic batch task processing patterns Shows guest and order processing with error handling
examples/example2
Package example2 demonstrates nested batch processing patterns Shows class, student, and subject score aggregation with error handling
Package example2 demonstrates nested batch processing patterns Shows class, student, and subject score aggregation with error handling
examples/example3
Package example3 demonstrates multi-step pipeline processing patterns Shows cascading task execution with nested batch operations
Package example3 demonstrates multi-step pipeline processing patterns Shows cascading task execution with nested batch operations
utils
Package utils provides common utilities 包 utils 提供通用工具
Package utils provides common utilities 包 utils 提供通用工具

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL