chain

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2024 License: MIT Imports: 8 Imported by: 0

README

Chain: A Generic Operation Chain for Go

Go codecov

Introduction

Chain is a powerful and flexible Go package that provides a generic operation chain for executing a series of tasks in a structured and controlled manner. It supports both serial and parallel execution, error handling, interceptor, and context management, making it ideal for complex workflows and data processing pipelines.

Features

  • Generic Implementation: Works with any input and output types.
  • Serial and Parallel Execution: Supports both sequential and concurrent task execution.
  • Context Management: Integrates with Go's context package for cancellation and timeout control.
  • Error Handling: Gracefully handles errors and stops execution on first encountered error.
  • Thread-Safe: Provides mutex-based synchronization for shared resource access.
  • Timeout Control: Allows setting a timeout for the entire chain execution.
  • Fluent Interface: Offers a chainable API for easy and readable setup.
  • Interceptor Support: Enables custom logic before and after each operation.

Installation

To install Chain, use go get:

go get github.com/sysulq/chain-go

Usage

Here's a simple example demonstrating the basic usage of Chain:

package chain_test

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/sysulq/chain-go"
)

// Input represents the input data structure
type Input struct {
	Numbers []int
}

// Output represents the output data structure
type Output struct {
	Sum       int
	Product   int
	Processed bool
}

func Example() {
	slog.SetLogLoggerLevel(slog.LevelDebug)

	// Initialize input and output
	input := &Input{Numbers: []int{1, 2, 3, 4, 5}}
	output := &Output{}

	// Create a new chain
	c := chain.New(input, output).
		WithTimeout(5*time.Second).
		Use(chain.RecoverInterceptor, chain.LogInterceptor)

	// Define chain operations
	c.Serial(
		func(ctx context.Context, c *chain.State[Input, Output]) error {
			fmt.Println("Starting serial operations")
			return nil
		},
		calculateSum,
	).Parallel(
		simulateSlowOperation,
		calculateProduct,
	).Serial(
		markProcessed,
	)

	// Execute the chain
	result, err := c.Execute()
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	fmt.Printf("Sum: %d, Product: %d, Processed: %v\n", result.Sum, result.Product, result.Processed)

	// Output:
	// Starting serial operations
	// Calculating sum
	// Calculating product
	// Simulating slow operation
	// Marking as processed
	// Sum: 15, Product: 120, Processed: true
}

func calculateSum(ctx context.Context, c *chain.State[Input, Output]) error {
	fmt.Println("Calculating sum")
	sum := 0
	for _, num := range c.Input().Numbers {
		sum += num
	}
	c.SetOutput(func(o *Output) {
		o.Sum = sum
	})
	return nil
}

func calculateProduct(ctx context.Context, c *chain.State[Input, Output]) error {
	fmt.Println("Calculating product")
	product := 1
	for _, num := range c.Input().Numbers {
		product *= num
	}
	c.SetOutput(func(o *Output) {
		o.Product = product
	})
	return nil
}

func simulateSlowOperation(ctx context.Context, c *chain.State[Input, Output]) error {
	select {
	case <-time.After(100 * time.Millisecond):
		fmt.Println("Simulating slow operation")
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func markProcessed(ctx context.Context, c *chain.State[Input, Output]) error {
	fmt.Println("Marking as processed")
	c.SetOutput(func(o *Output) {
		o.Processed = true
	})
	return nil
}

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"github.com/sysulq/chain-go"
)

// Input represents the input data structure
type Input struct {
	Numbers []int
}

// Output represents the output data structure
type Output struct {
	Sum       int
	Product   int
	Processed bool
}

func main() {
	slog.SetLogLoggerLevel(slog.LevelDebug)

	// Initialize input and output
	input := &Input{Numbers: []int{1, 2, 3, 4, 5}}
	output := &Output{}

	// Create a new chain
	c := chain.New(input, output).
		WithTimeout(5*time.Second).
		Use(chain.RecoverInterceptor, chain.LogInterceptor)

	// Define chain operations
	c.Serial(
		func(ctx context.Context, c *chain.State[Input, Output]) error {
			fmt.Println("Starting serial operations")
			return nil
		},
		calculateSum,
	).Parallel(
		simulateSlowOperation,
		calculateProduct,
	).Serial(
		markProcessed,
	)

	// Execute the chain
	result, err := c.Execute()
	if err != nil {
		fmt.Printf("Error: %v\n", err)
		return
	}

	fmt.Printf("Sum: %d, Product: %d, Processed: %v\n", result.Sum, result.Product, result.Processed)

}

func calculateSum(ctx context.Context, c *chain.State[Input, Output]) error {
	fmt.Println("Calculating sum")
	sum := 0
	for _, num := range c.Input().Numbers {
		sum += num
	}
	c.SetOutput(func(o *Output) {
		o.Sum = sum
	})
	return nil
}

func calculateProduct(ctx context.Context, c *chain.State[Input, Output]) error {
	fmt.Println("Calculating product")
	product := 1
	for _, num := range c.Input().Numbers {
		product *= num
	}
	c.SetOutput(func(o *Output) {
		o.Product = product
	})
	return nil
}

func simulateSlowOperation(ctx context.Context, c *chain.State[Input, Output]) error {
	select {
	case <-time.After(100 * time.Millisecond):
		fmt.Println("Simulating slow operation")
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

func markProcessed(ctx context.Context, c *chain.State[Input, Output]) error {
	fmt.Println("Marking as processed")
	c.SetOutput(func(o *Output) {
		o.Processed = true
	})
	return nil
}
Output:

Starting serial operations
Calculating sum
Calculating product
Simulating slow operation
Marking as processed
Sum: 15, Product: 120, Processed: true

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Chain

type Chain[I, O any] struct {
	// contains filtered or unexported fields
}

Chain represents a generic operation chain, supporting input type I and output type O

func New

func New[I, O any](input *I, output *O) *Chain[I, O]

New creates a new Chain, specifying input and output types

func (*Chain[I, O]) Execute

func (c *Chain[I, O]) Execute() (*O, error)

Execute runs all operations in the chain

func (*Chain[I, O]) Parallel

func (c *Chain[I, O]) Parallel(fns ...HandleFunc[I, O]) *Chain[I, O]

Parallel adds operations to be executed concurrently

func (*Chain[I, O]) Serial

func (c *Chain[I, O]) Serial(fns ...HandleFunc[I, O]) *Chain[I, O]

Serial adds operations to be executed sequentially

func (*Chain[I, O]) Use

func (c *Chain[I, O]) Use(interceptors ...Interceptor[I, O]) *Chain[I, O]

Use adds interceptors to the chain to be executed before each operation

func (*Chain[I, O]) WithContext

func (c *Chain[I, O]) WithContext(ctx context.Context) *Chain[I, O]

WithContext sets a custom context for the Chain

func (*Chain[I, O]) WithMaxGoroutines

func (c *Chain[I, O]) WithMaxGoroutines(max int) *Chain[I, O]

WithMaxGoroutines sets the maximum number of goroutines for parallel execution

func (*Chain[I, O]) WithTimeout

func (c *Chain[I, O]) WithTimeout(d time.Duration) *Chain[I, O]

WithTimeout sets a timeout duration for the entire chain execution

type HandleFunc

type HandleFunc[I, O any] func(context.Context, *State[I, O]) error

HandleFunc represents a function that operates on a Chain's State

func LogInterceptor

func LogInterceptor[I, O any](fn HandleFunc[I, O]) HandleFunc[I, O]

LogInterceptor is an interceptor that prints the input and output of the ChainFunc

func RecoverInterceptor

func RecoverInterceptor[I, O any](fn HandleFunc[I, O]) HandleFunc[I, O]

RecoverInterceptor is an interceptor that recovers from panics in the ChainFunc

type Interceptor

type Interceptor[I, O any] func(HandleFunc[I, O]) HandleFunc[I, O]

Interceptor represents a function that wraps a handleFunc

type State

type State[I, O any] struct {
	// contains filtered or unexported fields
}

State holds the input and output data and a mutex for synchronization

func (*State[I, O]) Input

func (s *State[I, O]) Input() I

Input returns a copy of the input data of the Chain

When the chain is running in parallel, it use a mutex to get the input data

func (*State[I, O]) Output

func (s *State[I, O]) Output() O

Output returns a copy of the output data of the Chain

When the chain is running in parallel, it use a mutex to get the output data

func (*State[I, O]) SetOutput

func (s *State[I, O]) SetOutput(fn func(*O))

SetOutput sets the output data of the Chain

When the chain is running in parallel, it use a mutex to set the output data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL