gworker

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2020 License: BSD-3-Clause Imports: 2 Imported by: 0

README

gworker

Package gworker provides wrapper of github.com/panjf2000/ants/v2.

Documentation

Index

Examples

Constants

View Source
const (
	// DefaultAntsPoolSize is the default capacity for a default goroutine pool.
	//
	// actual size is math.MaxInt32.
	DefaultAntsPoolSize = ants.DefaultAntsPoolSize

	// DefaultCleanIntervalTime is the interval time to clean up goroutines.
	//
	// actual interval time is time.Second.
	DefaultCleanIntervalTime = ants.DefaultCleanIntervalTime
)

Variables

View Source
var ErrNotImplement = errors.New("not implement")

ErrNotImplement retruns the not implement error.

Functions

func IsInvalidPoolExpiryError

func IsInvalidPoolExpiryError(err error) bool

IsInvalidPoolExpiryError reports whether the err is ants.ErrInvalidPoolExpiry which returned when setting a negative number as the periodic duration to purge goroutines.

func IsInvalidPoolSizeError

func IsInvalidPoolSizeError(err error) bool

IsInvalidPoolSizeError reports whether the err is ants.ErrInvalidPoolSize which returned when setting a negative number as pool capacity.

func IsLackPoolFuncError

func IsLackPoolFuncError(err error) bool

IsLackPoolFuncError reports whether the err is ants.ErrLackPoolFunc which returned when invokers don't provide function for pool.

func IsPoolClosedError

func IsPoolClosedError(err error) bool

IsPoolClosedError reports whether the err is ants.ErrPoolClosed which returned when submitting task to a closed pool.

func IsPoolOverloadError

func IsPoolOverloadError(err error) bool

IsPoolOverloadError reports whether the err is ants.ErrPoolOverload which returned when the pool is full and no workers available.

Types

type Func

type Func func(args interface{})

Func represents a worker function.

type Logger added in v0.0.2

type Logger interface {
	Error(msg string, key string, value interface{})
}

Logger represents a gworker Logger.

type PanicHandlerFunc

type PanicHandlerFunc func(p interface{})

PanicHandlerFunc is used to handle panics from each worker goroutine. if nil, panics will be thrown out again from worker goroutines.

This function handles when follows situation.

 if p := recover(); p != nil {
 	if ph := w.pool.options.PanicHandler; ph != nil {
 		ph(p)
 	} else {
 		log.Printf("worker exits from a panic: %v\n", p)
 		var buf [4096]byte
 		n := runtime.Stack(buf[:], false)
 		log.Printf("worker exits from panic: %s\n", string(buf[:n]))
 	}
	}

func NewPanicHandler

func NewPanicHandler(logger Logger) PanicHandlerFunc

NewPanicHandler return the PanicHandler using zap.Logger.

type Worker

type Worker interface {
	// Running returns the number of the currently running goroutines.
	Running() int

	// Free returns a available goroutines to work.
	Free() int

	// Cap returns the capacity of this pool.
	Cap() int

	// Tune changes the capacity of this pool.
	Tune(size int)

	// Release closes this pool.
	Release()

	// Reboot reboots a released pool.
	Reboot()

	// Submit submits a task to this pool.
	Submit(task func()) error

	// Invoke submits a task to pool.
	Invoke(args interface{}) error
}

Worker represents a goroutine worker.

func NewWorker

func NewWorker(size int32, options ...ants.Option) (Worker, error)

NewWorker returns the new Worker which Submit style goroutine worker.

Example
package main

import (
	"fmt"
	"log"
	"sync"
	"sync/atomic"

	"github.com/zchee/gworker"
)

func main() {
	testSubmitFunc := func(args *int64) {
		atomic.AddInt64(args, 1)
	}

	w, err := gworker.NewWorker(1000)
	if err != nil {
		log.Fatal(err)
	}
	defer w.Release()

	const loopCount = 100
	var wg sync.WaitGroup
	arg := int64(0) // atmoic
	for i := 0; i < loopCount; i++ {
		wg.Add(1)
		if err := w.Submit(func() {
			defer wg.Done()
			testSubmitFunc(&arg)
		}); err != nil {
			log.Fatal(err)
		}
	}
	wg.Wait()

	fmt.Println(atomic.LoadInt64(&arg) == loopCount) // increase args per loopCount atomically

}
Output:

true

func NewWorkerFunc

func NewWorkerFunc(size int32, fn Func, options ...ants.Option) (Worker, error)

NewWorkerFunc returns the new Worker which Invoke style goroutine worker.

Example
package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/zchee/gworker"
)

type Bill struct {
	UserID    int64     `json:"UserId"`
	Status    string    `json:"Status"`
	CreatedAt time.Time `json:"CreatedAt"`
}

type invoker struct {
	wg *sync.WaitGroup
}

func (invoker) getBill(ctx context.Context) *Bill {
	return &Bill{
		UserID:    0,
		Status:    "buy",
		CreatedAt: time.Unix(0, 0).UTC(),
	}
}

func (i *invoker) work(iface interface{}) {
	defer i.wg.Done()

	bill := iface.(*Bill)
	fmt.Printf("UserID: %d, Status: %s, CreatedAt: %s\n", bill.UserID, bill.Status, bill.CreatedAt)
}

func main() {
	ctx := context.Background()

	var wg sync.WaitGroup
	invoker := &invoker{&wg}

	w, err := gworker.NewWorkerFunc(1000, invoker.work)
	if err != nil {
		log.Fatal(err)
	}
	defer w.Release()

	wg.Add(1)
	if err := w.Invoke(invoker.getBill(ctx)); err != nil {
		log.Fatal(err)
	}
	wg.Wait()

}
Output:

UserID: 0, Status: buy, CreatedAt: 1970-01-01 00:00:00 +0000 UTC

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL