gomaxscale

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2022 License: MIT Imports: 11 Imported by: 0

README

gomaxscale

Go Reference license

Go library that allows consuming from MaxScale CDC listener. Useful for detecting database changes via binlog.

This consumer follows the connection protocol defined by MaxScale 6 here.

Testing with Docker

For this test environment the following file structure was used:

  • 📂 mariadb-config
    • 📄 mariadb.cnf
  • 📂 mariadb-init
    • 📄 00_schema.sql
  • 📂 maxscale-config
    • 📄 maxscale.cnf
  • 📄 docker-compose.yml
  • 📄 consumer.go
mariadb.cnf

We need to enable replication in the MariaDB master database:

[mysqld]
server_id=1
binlog_format=row
binlog_row_image=full
log-bin=/var/log/mysql/mariadb-bin
00_schema.sql

A basic schema adding the MaxScale user, and some testing database to play with:

RESET MASTER;

CREATE USER 'maxuser'@'%' IDENTIFIED BY 'maxpwd';
GRANT REPLICATION SLAVE ON *.* TO 'maxuser'@'%';
GRANT REPLICATION CLIENT ON *.* TO 'maxuser'@'%';
-- GRANT FILE ON *.* TO 'maxuser'@'%';
GRANT SELECT ON mysql.user TO 'maxuser'@'%';
GRANT SELECT ON mysql.db TO 'maxuser'@'%';
GRANT SELECT ON mysql.tables_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.columns_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.procs_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.proxies_priv TO 'maxuser'@'%';
GRANT SELECT ON mysql.roles_mapping TO 'maxuser'@'%';
GRANT SHOW DATABASES ON *.* TO 'maxuser'@'%';
-- GRANT SELECT ON *.* TO 'maxuser'@'%';

DROP DATABASE IF EXISTS example;
CREATE DATABASE IF NOT EXISTS example;

USE example;

DROP TABLE IF EXISTS users;
CREATE TABLE users (
  `id` INT NOT NULL AUTO_INCREMENT,
  `name` VARCHAR(255) NOT NULL,
  `email` VARCHAR(255) NOT NULL,
  PRIMARY KEY (id)
);

INSERT INTO `users` (`name`, `email`) VALUES ('John Doe', 'john@doe.com');
INSERT INTO `users` (`name`, `email`) VALUES ('Jane Doe', 'jane@doe.com');
maxscale.cnf

MaxScale configuration to configure the Avro router and expose a listener so gomaxscale can retrieve the information.

[MaxScale]
threads=1
admin_secure_gui=false
threads=auto
admin_host=0.0.0.0

[server1]
type=server
address=db
port=3306
protocol=MariaDBBackend

[cdc-service]
type=service
router=avrorouter
servers=server1
user=maxuser
password=maxpwd

[cdc-listener]
type=listener
service=cdc-service
protocol=CDC
port=4001

[MariaDB-Monitor]
type=monitor
module=mariadbmon
servers=server1
user=maxuser
password=maxpwd
monitor_interval=5000
docker-compose.yml

To setup a MariaDB database and a MaxScale server we will use docker-compose with the following configuration:

version: '2.4'
services:
  db:
    container_name: "lab-db"
    image: mariadb:10.3.8
    volumes:
      - ./mariadb-config:/etc/mysql/conf.d
      - ./mariadb-init:/docker-entrypoint-initdb.d
    environment:
      MYSQL_ROOT_PASSWORD: abc123
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "--silent"]

  dbproxy:
    container_name: "lab-maxscale"
    image: mariadb/maxscale:6.2
    volumes:
      - ./maxscale-config/maxscale.cnf:/etc/maxscale.cnf
    ports:
      - 4001:4001
    depends_on:
      - db
consumer.go

A local Go file will consume and log the modified items in the database:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"
  "syscall"

  "github.com/rafaeljusto/gomaxscale"
)

func main() {
  consumer := gomaxscale.NewConsumer("127.0.0.1:4001", "example", "users",
    gomaxscale.WithAuth("maxuser", "maxpwd"),
  )
  dataStream, err := consumer.Start()
  if err != nil {
    log.Fatal(err)
  }
  defer consumer.Close()

  fmt.Printf("started consuming events from database '%s' table '%s'\n",
    dataStream.Database, dataStream.Table)

  done := make(chan bool)
  go func() {
    consumer.Process(func(event gomaxscale.Event) {
      fmt.Printf("event '%s' detected\n", event.Type)
    })
    done <- true
  }()

  signalChanel := make(chan os.Signal, 1)
  signal.Notify(signalChanel, syscall.SIGINT, syscall.SIGTERM)

  select {
  case <-signalChanel:
  case <-done:
  }

  fmt.Println("terminating")
}
Running

First, start all services:

% docker-compose up -d

Then we can start consuming the items:

% go run consumer.go

To see the magic happening you could do some database changes:

% docker-compose exec db mysql -u root -p abc123 -D example \
  -e "INSERT INTO users (name, email) VALUES ('James Doe', 'james@doe.com')"

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithAuth

func WithAuth(user, password string) func(*Options)

WithAuth sets the authentication options.

func WithBufferSize

func WithBufferSize(bufferSize int) func(*Options)

WithBufferSize sets the buffer size for the data stream.

func WithGTID

func WithGTID(gtid string) func(*Options)

WithGTID sets the GTID position.

func WithLogger

func WithLogger(logger logger) func(*Options)

WithLogger sets the logger to report issues when processing the data stream.

func WithTimeout

func WithTimeout(readTimeout, writeTimeout time.Duration) func(*Options)

WithTimeout sets connection timeouts.

func WithUUID

func WithUUID(uuid string) func(*Options)

WithUUID sets the UUID of the client.

func WithVersion added in v1.0.1

func WithVersion(version int) func(*Options)

WithVersion sets the binlog version to use.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a Go implementation of a MaxScale CDC consumer.

func NewConsumer

func NewConsumer(address, database, table string, optFuncs ...func(*Options)) *Consumer

NewConsumer creates a new Consumer instance.

func (*Consumer) Close

func (g *Consumer) Close()

Close closes the connection to MaxScale.

func (*Consumer) Process

func (g *Consumer) Process(eventFunc func(Event))

Process starts consuming events and trigerring the callback function for each event.

func (*Consumer) Start

func (g *Consumer) Start() (*DataStream, error)

Start connects to MaxScale and starts consuming events.

https://mariadb.com/kb/en/mariadb-maxscale-6-change-data-capture-cdc-protocol/

Example
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/rafaeljusto/gomaxscale"
)

func main() {
	consumer := gomaxscale.NewConsumer("127.0.0.1:4001", "database", "table",
		gomaxscale.WithAuth("maxuser", "maxpwd"),
	)
	dataStream, err := consumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	fmt.Printf("started consuming events from database '%s' table '%s'\n",
		dataStream.Database, dataStream.Table)

	done := make(chan bool)
	go func() {
		consumer.Process(func(event gomaxscale.Event) {
			fmt.Printf("event '%s' detected\n", event.Type)
		})
		done <- true
	}()

	signalChanel := make(chan os.Signal, 1)
	signal.Notify(signalChanel, syscall.SIGINT, syscall.SIGTERM)

	select {
	case <-signalChanel:
	case <-done:
	}

	fmt.Println("terminating")
}
Output:

type DataStream

type DataStream struct {
	Namespace string `json:"namespace"`
	Type      string `json:"type"`
	Name      string `json:"name"`
	Table     string `json:"table"`
	Database  string `json:"database"`
	Version   int    `json:"version"`
	GTID      string `json:"gtid"`
	Fields    []struct {
		Name     string      `json:"name"`
		Type     interface{} `json:"type"`
		RealType string      `json:"real_type"`
		Length   int         `json:"length"`
		Unsigned bool        `json:"unsigned"`
	} `json:"fields"`
}

DataStream is the first event response with the target table information.

https://avro.apache.org/docs/1.11.0/spec.html#schemas

type Event

type Event struct {
	Domain      int    `json:"domain"`
	ServerID    int    `json:"server_id"`
	Sequence    int    `json:"sequence"`
	EventNumber int    `json:"event_number"`
	Timestamp   int64  `json:"timestamp"`
	Type        string `json:"event_type"`
	RawData     []byte `json:"-"`
}

Event is a MaxScale event.

type Options

type Options struct {
	// contains filtered or unexported fields
}

Options stores all available options to connect with MaxScale.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL