funnel

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2023 License: MIT Imports: 5 Imported by: 0

README

Build Status codecov Go Report Card

Funnel

Funnel is a Go library that provides unification of identical operations (e.g. API requests).

Installation

go get github.com/intuit/funnel

Usage

import "github.com/intuit/funnel"

Funnel package is designed for scenarios where multiple goroutines are trying to execute an identical operation at the same time. It will take care of deduplication, execute the operation only once and return the results to all of them. A simple example:

import (
        "github.com/intuit/funnel"
        ...
)


func main() {
        fnl := funnel.New()
        var numOfOps uint64 = 0

        //The following operation will be performed only once and all the 100 goroutines will get the same result

        operation := func() (interface{}, error) {
                url := "http://www.golang.org"
                response, err := http.Get(url)
                if err != nil {
                        return nil, err
                }
                defer response.Body.Close()
                atomic.AddUint64(&numOfOps, 1)
                return response.Status, nil
        }

        var wg sync.WaitGroup
        wg.Add(100)
        for i := 0; i < 100; i++ {
                go func() {
                        defer wg.Done()
                        // "operation_id" is a string which uniquely identifies the operation
                        opRes, err := fnl.Execute("operation_id", operation)
                        fmt.Println(opRes)

                }()
        }
        wg.Wait()
        opsFinal := atomic.LoadUint64(&numOfOps)
        fmt.Println(opsFinal) // numOfOps == 1
}

In addition, the results of the operation can be cached, to prevent any identical operations being performed for a set period of time. A simple example:

import (
        "github.com/intuit/funnel"
        ...
)
func main() {
        fnl := funnel.New(funnel.WithCacheTtl(time.Second*5))

        var numOfOps int = 0

        operation1 := func() (interface{}, error) {
                time.Sleep(time.Second)
                numOfOps++
                return "op1-res", nil
        }

        res, err := fnl.Execute("operation1", operation1)

        time.Sleep(time.Second*2)

        res, err = fnl.Execute("operation1", operation1) //It gets the result from the previous operation and not performs the operation again
        // numOfOps == 1

}

It's worth noting that only timeout related error responses are discarded. 5xx class responses or other errors will be cached as the response.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

License

See LICENSE for details

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

A Config structure is used to configure the Funnel

type Funnel

type Funnel struct {
	sync.Mutex
	// contains filtered or unexported fields
}

The purpose of Funnel is to prevent running of identical operations in concurrently. when receiving requests for a specific operation when an identical operation already in process, the other operation requests will wait until the end of the operation and then will use the same result.

func New

func New(option ...Option) *Funnel

Return a pointer to a new Funnel. By default the timeout is one minute and the cacheTtl is 0. You can pass options to change it, for example:

// Create Funnel with cacheTtl of 5 seconds and timeout of 3 minutes.
funnel.New(funnel.WithCacheTtl(time.Second*5),funnel.WithTimeout(time.Minute*3))

func (*Funnel) Execute

func (f *Funnel) Execute(operationId string, opExeFunc func() (interface{}, error)) (res interface{}, err error)

Execute receives an identifier of the operation and a callback function to execute. The first request to funnel with this identifier will result in the callback function being executed in a new goroutine. All other requests (with the same identifier) will wait for the result of the first execution. IMPORTANT: The returned object is shared between all the requesting callers. Use ExecuteAndCopyResult to return a dedicated (copied) object.

func (*Funnel) ExecuteAndCopyResult

func (f *Funnel) ExecuteAndCopyResult(operationId string, opExeFunc func() (interface{}, error)) (res interface{}, err error)

IMPORTANT: Only exported field values can be copied over.

func (*Funnel) IsOpInProgress

func (f *Funnel) IsOpInProgress(operationId string) bool

type Option

type Option func(*Config)

func WithCacheTtl

func WithCacheTtl(cTtl time.Duration) Option

WithCacheTtl defines the time for which the result can remain cached (the default is 0 )

func WithShouldCachePredicate added in v1.0.2

func WithShouldCachePredicate(p func(interface{}, error) bool) Option

WithShouldCachePredicate allows more control over which responses should be cached. If this option is used responses from execute will only be cached if the predicate provided returns true.

func WithTimeout

func WithTimeout(t time.Duration) Option

WithTimeout defines the maximum time that goroutines will wait for ending of operation (the default is one minute)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL