sql

package
v0.0.0-...-5d6a5f9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2022 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	// MysqlDatetimeFormat is the Mysql date time format.
	MysqlDatetimeFormat = "2006-01-02 15:04:05"
)

Variables

View Source
var ErrInvalidSQLQueryType = errors.New("invalid SQL query type")

ErrInvalidSQLQueryType is the error for when the query passed in is isn't one of the know types.

View Source
var ErrStopExtract = errors.New("stop iterating over the *sqlx.Rows")

ErrStopExtract is the error to be used by a rowHandler to indicate that the iterating over the rows results should not continue.

Functions

func ExtractRecordsFromRows

func ExtractRecordsFromRows(rows *sqlx.Rows, rowHandler func(message.OrderedRecord, error) error) error

ExtractRecordsFromRows iterates over the *sqlx.Rows and calls the handler for each or error if something went wrong. The row handler can return it's own error in which case the iterating still stop. If the error is not the ErrStopExtract error, then it will be pass on through as the error of the overall func call. Keep in mind you will need to make sure the rows passed in are close elsewhere. (Ex: rows.Close() ) This func will not close the rows.

Types

type Conn

type Conn struct {
	DSN    string // connection string (required)
	Driver string // used to make a new connection if DB is nil
	DB     *sqlx.DB
	// contains filtered or unexported fields
}

Conn is a wrapper around a connection to handle creating the connection and shutting it down. This allows you to just set a DSN and this will handle connecting to it.

Example

ExampleConn shows how to create a database connection that is used by the Query and Exec transformers. Conn is rarely used directly on it's own, but it wrapped by Exec and Query and used by them.

package main

import (
	"github.com/Reisender/pipe/extras/sql"
)

func main() {
	c := sql.Conn{Driver: "postgres", DSN: "postgres://user:pass@localhost/dbname?sslmode=verify-full"}
	err := c.Open()
	if err != nil {
		panic(err)
	}
	defer c.Close()

	// now the connection is ready to use
}
Output:

func (*Conn) Close

func (m *Conn) Close() error

Close will close the connection to the database if we created the connection in Startup.

func (*Conn) Open

func (m *Conn) Open() error

Open will connect to the database if we are handed a DSN instead of an *sqlx.DB .

type Exec

type Exec Conn

Exec executes a query

Example
package main

import (
	"github.com/Reisender/pipe/extras/sql"
	"github.com/Reisender/pipe/line"
	"github.com/Reisender/pipe/message"
)

func main() {
	line.New().SetP(func(out chan<- interface{}, errs chan<- error) {

		// using a string directly as the query
		out <- "INSERT INTO foo (name) VALUES ('bar')"

		// using message.Query
		out <- message.Query{SQL: "INSERT INTO foo (name) VALUES ('bar')"}

		// using message.Query with args (using sqlx under the hood for arg matching)
		query := message.Query{SQL: "INSERT INTO foo (name) VALUES (?)"}
		query.Args = append(query.Args, "bar")
		out <- query

		// using delta messages
		record := message.NewRecordFromMSI(map[string]interface{}{"name": "bar"})
		delta := message.InsertDelta{Table: "foo", Record: record}
		out <- delta

	}).Add(
		sql.Exec{Driver: "postgres", DSN: "postgres://user:pass@localhost/dbname?sslmode=verify-full"}.T,
	).Run()
}
Output:

func (Exec) I

func (m Exec) I(msg interface{}) (interface{}, error)

I implements the InlineFunc interface. It is context aware if the incoming message is a mysql.Query and has the context set.

func (Exec) T

func (m Exec) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T will take in records and use them in a sql query. The input message is expected to implement the fmt.Stringer interface and string is expected to be the SQL to run.

type Get

type Get struct {
	Conn
	SQL   string // (required)
	Table string

	PageSize int
	OrderBy  string
	BodyCol  string // the column to put as the body of the message (blank is all as json)
}

Get gets records from a query or table

func (Get) P

func (m Get) P(out chan<- interface{}, errs chan<- error)

P starts sourcing the data for the pipeline from a table

func (Get) T

func (m Get) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T will take in records and use them in a sql query. The input can be a single record or a batch of records. The predefined template functions can be used to extract individual keys from the metadata record, or the message body as a single string value. Or if this is a batch the keys can be used to extract one columns data as a comma seperated list e.g. for use in an IN clause.

The output by default will send one resulting record in one out message. Otherwise this can be configured to send all resulting records from one

type Query

type Query Conn

Query runs an SQL query on a db connection

Example
package main

import (
	"github.com/Reisender/pipe/extras/sql"
	"github.com/Reisender/pipe/line"
	"github.com/Reisender/pipe/message"

	_ "github.com/proullon/ramsql/driver"
)

func main() {
	line.New().SetP(func(out chan<- interface{}, errs chan<- error) {

		// using a string directly as the query
		out <- "SELECT * FROM foo"

		// using message.Query
		out <- message.Query{SQL: "SELECT * FROM foo"}

		// using message.Query with args (using sqlx under the hood for arg matching)
		query := message.Query{SQL: "SELECT * FROM foo WHERE name=? AND email=?"}
		query.Args = append(query.Args, "bar", "bar@foo.com")
		out <- query

	}).Add(
		sql.Query{Driver: "postgres", DSN: "postgres://user:pass@localhost/dbname?sslmode=verify-full"}.T,
	).Run()
}
Output:

func (Query) I

func (m Query) I(msg interface{}) (interface{}, error)

I actually does the query again the database and implements the InlineTfunc interface.

func (Query) T

func (m Query) T(in <-chan interface{}, out chan<- interface{}, errs chan<- error)

T will take in records and use them in a sql query.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL