octopus

package module
v0.0.0-...-aadef97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 2, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

README

Octopus

Octopus is a golang library for managing a goroutine pool that can dynamic adjust the number of goroutine, the api is a bit like java concurrent pool api.

Octopus can new a pool to submit Callable job or Runnable job, Callable job is a function with a interface{} return value and no arguments, Runnable job is a function without arguments and return value, a job will be allocated to a worker when it becomes available.

Features

  1. dynamic adjust the number of goroutine according the idle of goroutine
  2. support synchronous and asynchronous to get calculating result
  3. support timeout to get calculating result
  4. support to get status of a job
  5. can drop jobs when pool is busy
  6. automatic recovery from a job's panic
  7. can set a log function to record pool's log infos
  8. the api is a bit like java concurrent pool and more easily to use

Docs

https://godoc.org/github.com/Comdex/octopus

Installation

go get github.com/Comdex/octopus

Simple example for a cachedWorkerPool

package main

import (
	"time"
	"fmt"
	"github.com/Comdex/octopus"
)

func main() {
	// the cachedpool will dynamic adjust the number of goroutine called worker according 
	// the timeout of workers process job and idle time of workers
	pool, err := octopus.NewCachedWorkerPool()
	if err != nil {
		fmt.Println(err)
	}
	// you can set a log func to get pool's  log info
	pool.SetLogFunc(func(msg string){
		fmt.Println(msg)
	})
	// the Runnable is a simple function
	var r Runnable = func() {
		fmt.Println("test runnable var")
	}
	pool.SubmitRunnable(r)
	// the Callable is a function with a return value
	var c Callable = func() interface{} {
		s := "test callable var"
		return 
	}
	pool.SubmitCallable(c)
	
	pool.SubmitRunnable(func(){
		fmt.Println("test1")
	})
	
	future, err2 := pool.SubmitCallable(func() interface{} {
		time.Sleep(2*time.Second)
		return "test2"
	})
	if err2 != nil {
		fmt.Println(err2)
	}
	// the Get method of future will wait for return value is prepared
	// Is it like a java concurrent pool api?
	value, err3 := future.Get()
	if err3 != nil {
		fmt.Println(err3)
	}
	fmt.Println("value: ", value)
	
	future2 , _ := pool.SubmitCallable(func() interface{} {
		time.Sleep(2*time.Second)
		return "test3"
	})
	
	//Get Value support timeout
	value2, timeoutErr := future2.GetTimed(1*time.Second)
	if timeoutErr != nil {
		fmt.Println(timeoutErr)
	}
	fmt.Println(value2)
	
	// close the pool and wait for all goroutines done
	pool.Shutdown()	
}

Example for a dataprocess pool

package main

import (
	"fmt"
	"github.com/Comdex/octopus"
)

func main() {
	pool, err := octopus.NewCachedDataProcessPool(func(object interface{}) interface{} {
		v := object.(int)
		return "data: " + strconv.Itoa(v)
	})
	if err != nil {
		fmt.Println(err)
	}
	
	pool.Submit(8)
	pool.Submit(29)
	
	future, err2 := pool.Submit(100)
	if err != nil {
		fmt.Println(err)
	}
	// the api is synchronous
	value, err3 := future.Get()
	if err3 != nil {
		fmt.Println(err3)
	}
	fmt.Println("100 value: ", value)
	
	future2, _ := pool.Submit(200)
	// Get method support timeout
	value2, _ := future2.GetTimed(2*time.Second)
	fmt.Println("200 value: ", value2)
	
	// close the pool and wait for all goroutine done
	pool.Shutdown()
}

License

Apache License

more api usage please refer to docs

Documentation

Overview

package octopus implements a simple goroutine pool like java concurrent pool.

octopus project octopus.go

Index

Constants

View Source
const (
	JOBUNSTART uint32 = 0
	JOBDOING   uint32 = 1
	JOBDONE    uint32 = 2
	JOBCRASH   uint32 = 3
)
View Source
const (
	JOBUNSTART uint32 = 0
	JOBDOING   uint32 = 1
	JOBDONE    uint32 = 2
)
View Source
const (
	UNINTERRUPT uint32 = 3
	INTERRUPT   uint32 = 4
)
View Source
const (
	WORKERFREE    uint32 = 5
	WORKERRUNNING uint32 = 6
	WORKERSTOP    uint32 = 7
)
View Source
const (
	POOLOPEN  uint32 = 8
	POOLCLOSE uint32 = 9
)

Variables

View Source
var (
	ErrPoolShutdown           = errors.New("the pool is closed")
	ErrJobTimedOut            = errors.New("job request timed out")
	ErrRunnableNoResult       = errors.New("runnable job has not a result value")
	ErrResultChannelClose     = errors.New("result channel close")
	ErrInvalidArguments       = errors.New("Invalid Arguments")
	ErrKeepAliveTimeArguments = errors.New("KeepAliveTime must be greater than 1 second")
)

Functions

This section is empty.

Types

type Callable

type Callable func() interface{}

type DataProcessFunc

type DataProcessFunc func(interface{}) interface{}

type DataProcessPool

type DataProcessPool struct {
	// contains filtered or unexported fields
}

func NewBaseDataProcessPool

func NewBaseDataProcessPool(MinPoolSize uint64, MaxPoolSize uint64, KeepAliveTime time.Duration, AwaitWokerTime time.Duration, ProcessFn DataProcessFunc) (dataProcessPool *DataProcessPool, err error)

Creates a goroutine pool for processing data by defining a DataProcessFunction with MinPoolSize , MaxPoolSize, the KeepAliveTime of a worker, the time of manager await worker. Please note that the KeepAliveTime must be greater than one second.

func NewCachedDataProcessPool

func NewCachedDataProcessPool(fn DataProcessFunc) (dataProcessPool *DataProcessPool, err error)

Creates a goroutine pool that creates new goroutines as needed for processing data by defining a DataProcessFunction, but will reuse previously constructed goroutines when they are available.

func NewFixDataProcessPool

func NewFixDataProcessPool(workerNum uint64, fn DataProcessFunc) (dataProcessPool *DataProcessPool, err error)

Creates a goroutine pool for processing data by defining a DataProcessFunction that reuses a fixed number of goroutines.

func (*DataProcessPool) CanDropJob

func (pool *DataProcessPool) CanDropJob() bool

CanDropJob will return if the manager will drop job datas when pool is busy.

func (*DataProcessPool) GetActiveCount

func (pool *DataProcessPool) GetActiveCount() uint64

Return the approximate total number of woker executing a job data in pool.

func (*DataProcessPool) GetAwaitWorkerTime

func (pool *DataProcessPool) GetAwaitWorkerTime() time.Duration

Return the awaitWorkerTime of pool manager.

func (*DataProcessPool) GetCompletedJobCount

func (pool *DataProcessPool) GetCompletedJobCount() uint64

Return the approximate total number of jobs that have completed execution.

func (*DataProcessPool) GetKeepAliveTime

func (pool *DataProcessPool) GetKeepAliveTime() time.Duration

Return the KeepAliveTime of a worker.

func (*DataProcessPool) GetMaxPoolSize

func (pool *DataProcessPool) GetMaxPoolSize() uint64

Return the maximum allowed number of goroutines.

func (*DataProcessPool) GetMinPoolSize

func (pool *DataProcessPool) GetMinPoolSize() uint64

Return the minimum number of goroutines.

func (*DataProcessPool) GetPoolSize

func (pool *DataProcessPool) GetPoolSize() uint64

Return approximate total number of goroutines in pool.

func (*DataProcessPool) IsShutDown

func (pool *DataProcessPool) IsShutDown() bool

if pool is close it will return true.

func (*DataProcessPool) SetDropJob

func (pool *DataProcessPool) SetDropJob(ok bool)

Set drop job data if await worker timeout, it will drop job datas when manager appears awaitWorkerTime timeout.

func (*DataProcessPool) SetKeepAliveTime

func (pool *DataProcessPool) SetKeepAliveTime(keepAliveTime time.Duration) error

Set the KeepAliveTime of a worker. Please note that it must be greater than one second.

func (*DataProcessPool) SetLogFunc

func (pool *DataProcessPool) SetLogFunc(function LogFunc)

Set a log function to record log infos.

func (*DataProcessPool) SetMaxPoolSize

func (pool *DataProcessPool) SetMaxPoolSize(maxPoolSize uint64)

Set the maximum allowed number of goroutines.

func (*DataProcessPool) SetMinPoolSize

func (pool *DataProcessPool) SetMinPoolSize(minPoolSize uint64)

Set the minimum number of goroutines.

func (*DataProcessPool) Shutdown

func (pool *DataProcessPool) Shutdown()

Close the pool and wait for all goroutines done, it may be block.

func (*DataProcessPool) ShutdownNow

func (pool *DataProcessPool) ShutdownNow()

Close the pool but will not wait for all goroutines done, it will be never block.

func (*DataProcessPool) Submit

func (pool *DataProcessPool) Submit(job interface{}) (future Future, err error)

Submit a job data for execution and return a Future representing the calculating result of that job data.

type Future

type Future interface {
	// Cancel method will set a cancel tag attempt to cancel execute job before starting this job represented by Future.
	Cancel() error
	// Get method can get value from Callable , if not ready it will block.
	Get() (interface{}, error)
	// GetTimed method can get value from Callable with setting timeout.
	GetTimed(time.Duration) (interface{}, error)
	// IsCancelled will return whether the job was setting a cancel tag, but it does not mean that the job has been terminated.
	IsCancelled() bool //default 0 ,interrupt 1
	// IsDone will return whether the job was done.
	IsDone() bool
}

type LogFunc

type LogFunc func(string)

type Runnable

type Runnable func()

type WorkPool

type WorkPool interface {
	IsPoolOpen() bool
	Close()
	Release()
	SubmitRunnable(Runnable) (Future, error)
	SubmitCallable(job Callable) (Future, error)
	InvokeAllRunnable([]Runnable) ([]Future, error)
	InvokeAllCallable([]Callable) ([]Future, error)
	// contains filtered or unexported methods
}

func NewBaseCachedWorkerPool

func NewBaseCachedWorkerPool(MinPoolSize uint64, MaxPoolSize uint64, KeepAliveTime time.Duration, AwaitWokerTime time.Duration) (workpool *WorkPool, err error)

Creates a goroutine pool with MinPoolSize , MaxPoolSize, the KeepAliveTime of a worker, the time of manager await worker. Please note that the KeepAliveTime must be greater than one second.

func NewCachedWorkerPool

func NewCachedWorkerPool() (workpool *WorkPool, err error)

Creates a goroutine pool that creates new goroutines as needed, but will reuse previously constructed goroutines when they are available.

func NewFixWorkerPool

func NewFixWorkerPool(workerNum uint64) (workpool *WorkPool, err error)

Creates a goroutine pool that reuses a fixed number of goroutines.

func (*WorkPool) CanDropJob

func (pool *WorkPool) CanDropJob() bool

CanDropJob will return if the manager will drop jobs when pool is busy.

func (*WorkPool) GetActiveCount

func (pool *WorkPool) GetActiveCount() uint64

Return the approximate total number of woker executing a job in pool.

func (*WorkPool) GetAwaitWorkerTime

func (pool *WorkPool) GetAwaitWorkerTime() time.Duration

Return the awaitWorkerTime of pool manager.

func (*WorkPool) GetCompletedJobCount

func (pool *WorkPool) GetCompletedJobCount() uint64

Return the approximate total number of jobs that have completed execution.

func (*WorkPool) GetKeepAliveTime

func (pool *WorkPool) GetKeepAliveTime() time.Duration

Return the KeepAliveTime of a worker.

func (*WorkPool) GetMaxPoolSize

func (pool *WorkPool) GetMaxPoolSize() uint64

Return the maximum allowed number of goroutines.

func (*WorkPool) GetMinPoolSize

func (pool *WorkPool) GetMinPoolSize() uint64

Return the minimum number of goroutines.

func (*WorkPool) GetPoolSize

func (pool *WorkPool) GetPoolSize() uint64

Return approximate total number of goroutines in pool.

func (*WorkPool) InvokeAllCallable

func (pool *WorkPool) InvokeAllCallable(jobs []Callable) (futures []Future, err error)

Submit Callable jobs for execution and return Futures representing those jobs.

func (*WorkPool) InvokeAllRunnable

func (pool *WorkPool) InvokeAllRunnable(jobs []Runnable) (futures []Future, err error)

Submit Runnable jobs for execution and return Futures representing those jobs.

func (*WorkPool) IsShutDown

func (pool *WorkPool) IsShutDown() bool

if pool is close it will return true.

func (*WorkPool) SetDropJob

func (pool *WorkPool) SetDropJob(ok bool)

Set drop job if await worker timeout, it will drop jobs when manager appears awaitWorkerTime timeout .

func (*WorkPool) SetKeepAliveTime

func (pool *WorkPool) SetKeepAliveTime(keepAliveTime time.Duration) error

Set the KeepAliveTime of a worker. Please note that it must be greater than one second.

func (*WorkPool) SetLogFunc

func (pool *WorkPool) SetLogFunc(function LogFunc)

Set a log function to record log infos.

func (*WorkPool) SetMaxPoolSize

func (pool *WorkPool) SetMaxPoolSize(maxPoolSize uint64)

Set the maximum allowed number of goroutines.

func (*WorkPool) SetMinPoolSize

func (pool *WorkPool) SetMinPoolSize(minPoolSize uint64)

Set the minimum number of goroutines.

func (*WorkPool) Shutdown

func (pool *WorkPool) Shutdown()

Close the pool and wait for all goroutines done, it may be block.

func (*WorkPool) ShutdownNow

func (pool *WorkPool) ShutdownNow()

Close the pool but will not wait for all goroutines done, it will be never block.

func (*WorkPool) SubmitCallable

func (pool *WorkPool) SubmitCallable(job Callable) (future Future, err error)

Submit a Callable job for execution and return a Future representing that job.

func (*WorkPool) SubmitRunnable

func (pool *WorkPool) SubmitRunnable(job Runnable) (future Future, err error)

Submit a Runnable job for execution and return a Future representing that job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL