mongopool

package module
v0.0.0-...-c40a79b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2017 License: MIT Imports: 6 Imported by: 0

README

mongopool

Provides a concurrency limit and automatic reconnection for mgo.

See docs on godoc.org.

Example

package main

import (
    "log"
    "os"
    "sync"
    "time"

    "github.com/andreyvit/mongopool"
    "gopkg.in/mgo.v2"
    "gopkg.in/mgo.v2/bson"
)

func main() {
    mpool := mongopool.Dial(os.Getenv("MONGO_URI"), mongopool.Options{
        Concurrency: 10,
        Configure: func(session *mgo.Session) {
            session.SetMode(mgo.Monotonic, true)
            session.SetBatch(10000)
        },
    })
    defer mpool.Close()

    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go Handle(i, mpool, wg.Done)
    }
    wg.Wait()
}

func Handle(idx int, mpool *mongopool.Pool, done func()) {
    defer done()
    err := handle(idx, mpool)
    if err != nil {
        log.Printf("ERROR (worker %d): %v", idx, err)
    }
}

func handle(idx int, mpool *mongopool.Pool) error {
    db, err := mpool.Acquire()
    if err != nil {
        return err
    }
    defer mpool.Release(db)

    log.Printf("Worker %d proceeding.", idx)

    err = db.C("foo").Insert(bson.M{"i": idx})
    if err != nil {
        return err
    }

    // Slow things down for more informative output.
    time.Sleep(500 * time.Millisecond)

    return nil
}

Documentation

Overview

Package mongopool adds a concurrency limit and automatic reconnection to mgo sessions.

Example
package main

import (
	"log"
	"os"
	"sync"
	"time"

	"github.com/andreyvit/mongopool"
	"gopkg.in/mgo.v2"
	"gopkg.in/mgo.v2/bson"
)

func main() {
	mpool := mongopool.Dial(os.Getenv("MONGO_URI"), mongopool.Options{
		Concurrency: 10,
		Configure: func(session *mgo.Session) {
			session.SetMode(mgo.Monotonic, true)
			session.SetBatch(10000)
		},
	})
	defer mpool.Close()

	var wg sync.WaitGroup
	for i := 0; i < 100; i++ {
		wg.Add(1)
		go Handle(i, mpool, wg.Done)
	}
	wg.Wait()
}

func Handle(idx int, mpool *mongopool.Pool, done func()) {
	defer done()
	err := handle(idx, mpool)
	if err != nil {
		log.Printf("ERROR (worker %d): %v", idx, err)
	}
}

func handle(idx int, mpool *mongopool.Pool) error {
	db, err := mpool.Acquire()
	if err != nil {
		return err
	}
	defer mpool.Release(db)

	log.Printf("Worker %d proceeding.", idx)

	err = db.C("foo").Insert(bson.M{"i": idx})
	if err != nil {
		return err
	}

	// Slow things down for more informative output.
	time.Sleep(500 * time.Millisecond)

	return nil
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotConfigured = errors.New("MongoDB connection has not been configured")

ErrNotConfigured is returned when the pool is created with an empty URI.

Functions

This section is empty.

Types

type Error

type Error struct {
	Msg        string
	Underlying error
}

Error wraps errors returned by the pool

func (*Error) Error

func (e *Error) Error() string

type Options

type Options struct {
	// Concurrency determines the number of MongoDB sessions that may be active at the same time. Defaults to 10.
	Concurrency int

	// Set Logger to obtain verbose output from the pool.
	Logger Printfer

	// DebugName is an arbitrary string that identifies this pool for debugging purposes.
	DebugName string

	// If Lazy is true, the pool won't try to connect to MongoDB until the first call to Acquire.
	Lazy bool

	// PingInterval defines how often the connection is verified to be alive when Acquire is called. The ping is skipped if it has already been performed within PingInterval. If the ping fails, the connection is reestablished. Defaults to 10 seconds.
	PingInterval time.Duration

	// If the last connection attempt has failed, will attempt to reconnect after ReconnectInterval. Defaults to 10 seconds.
	ReconnectInterval time.Duration

	// A function to configure a newly established MongoDB session.
	Configure func(sess *mgo.Session)
}

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(uri string, opt Options) *Pool

Dial creates a new pool that connects to MongoDB using the specified connection string.

Dial does not block and never fails. It will start connecting in background, and will keep trying to reconnect (with Options.ReconnectInterval intervals) if the initial connection or a subsequent ping fails. You can check IsOnline() to see

If you pass an empty URI to Dial, Acquire will always return ErrNotConfigured. This can be useful if Mongo functionality is optional in your app.

Provide Options.Configure func to customize safety, batching and timeout options of the established session.

Options.Concurrency specifies the limit on the concurrent use of Mongo sessions. The pool will maintain this many sessions via mgo.Session.Copy(). Concurrency defaults to 10.

Before returning a session, the pool will check that it hasn't timed out by calling Session.Ping. The pings are performed only once per Options.PingInterval.

func (*Pool) Acquire

func (pool *Pool) Acquire() (*mgo.Database, error)

Acquire obtains a new session from the pool, establishing one if necessary. If Concurrency sessions are already in use, Acquire blocks until other goroutine calls Release.

The caller must subsequently call Release with the return value of this method, for example:

db, err := pool.Acquire()
if err != nil {
	return err
}
defer pool.Release(db)

func (*Pool) Close

func (pool *Pool) Close()

Close shuts down the pool and close any outstanding free connections.

func (*Pool) Exec

func (pool *Pool) Exec(f func(db *mgo.Database) error) error

Exec is a tiny wrapper around Acquire and Release. It is recommended to use Acquire and Release directly; please only use Exec if it makes the code easier to read in your case.

func (*Pool) IsOnline

func (pool *Pool) IsOnline() bool

IsOnline returns whether the pool currently has a functional MongoDB connection.

func (*Pool) Release

func (pool *Pool) Release(db *mgo.Database)

Release returns the given session into the pool's free list.

func (*Pool) StatusString

func (pool *Pool) StatusString() string

StatusString returns a short description of the pool status.

func (*Pool) String

func (pool *Pool) String() string

String returns the name of this pool.

type Printfer

type Printfer interface {
	Printf(format string, v ...interface{})
}

Printfer allows for any logger that implement Printf

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL