pgbuffer

package module
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2021 License: GPL-3.0 Imports: 7 Imported by: 1

README

pgbuffer

Buffer data in memory and bulk copy to postgres. This is especially useful when using the timescaledb postresql extension for timeseries workloads. Useful for any append only workloads such as timeseries streams.

Features

  • Public flush signaling for custom flush handling such as time based, or os signal based.
  • Custom column definition
  • Multi-worker concurrent COPY
  • Utilizes COPY instead of insert for greater performance.

Installation

go get github.com/dev-mull/pgbuffer

Basic Usage


import 	(
    "github.com/dev-mull/pgbuffer"
    "database/sql"
    _ "github.com/lib/pq"

)
//Setup an optional logger
logger := logrus.New()
logger.SetOutput(os.Stdin)

//Setup a new buffer config
cfg := &pgbuffer.Config{
    Limit: 100,
    Workers: 2,
    Logger: logger, 
    Tables: []*pgbuffer.BufferedData{
    	{
    		Table: "test",
    		Columns: []string{"time","foo","bar"},
    	},
    },
}
//Connect to the db
db, err := sql.Open("postgres", dbUrl)
if err != nil {
    log.Fatal(err)
}

//Initialize the buffer
buff,err := pgbuffer.NewBuffer(db, cfg)
if err != nil {
    log.Fatal(err)
}


//Write some test data every second to the buffer.
//It will flush after 101 writes because the limit is set to 100
go func() {
    time.Sleep(time.Second * 1)
    buff.Write("test",time.Now(),"check","this")
}()

//Clean shutdown
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
    <-sigs
    buff.Stop()
}()

//Force a flush every minute
go func() {
    t := time.NewTicker(time.Minute)
    for {
        select {
        case <-t.C:
            buff.FlushAll()
        }   
    }
}()

//Block and run until finished
buff.Run()


TODO

  • write statistics handling
  • optional buffer to disk instead of memory

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer

func NewBuffer(db *sql.DB, cfg *Config) (*Buffer, error)

func (*Buffer) Flush

func (b *Buffer) Flush(buff *BufferedData, data [][]interface{})

func (*Buffer) FlushAll added in v0.1.6

func (b *Buffer) FlushAll()

func (*Buffer) Run

func (b *Buffer) Run()

func (*Buffer) Stop added in v0.1.6

func (b *Buffer) Stop()

func (*Buffer) Write

func (b *Buffer) Write(table string, rows ...interface{})

type BufferedData

type BufferedData struct {
	sync.Mutex

	LastExit  time.Time
	LastWrite time.Time

	Table   string   `yaml:"table"`
	Columns []string `yaml:"columns"`
	Limit   int64    `yaml:"limit"`
	// contains filtered or unexported fields
}

type Config

type Config struct {
	Limit   int64           `yaml:"limit"`
	Tables  []*BufferedData `yaml:"tables"`
	Workers int             `yaml:"workers"`
	Logger  *logrus.Logger
}

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL