sources

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2020 License: Apache-2.0 Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"

	ModeGTID   = "GTID"
	ModeBinlog = "binlog"

	MariadDB = "mariadb"
	Mysql    = "mysql"
)
View Source
const (
	// Source Possible Statuses
	SourceStatusOnError        = "ON_ERROR"
	SourceStatusRunning        = "RUNNING"
	SourceStatusWaiting        = "WAITING"
	SourceStatusWaitingForMETA = "WAITING_FOR_META"
	SourceStatusInit           = "INITIALIZING"

	DefaultChannelSize = 100
)

Possible Statuses

View Source
const FileReadingFollowerType = "FileReadingFollower"

FileReadingFollowerType type of source

View Source
const MysqlCDCType = "MysqlCDC"

MysqlCDCType type of source

View Source
const MysqlQueryType = "MysqlQuery"

MysqlQueryType type of source

View Source
const PRI = "PRI"
View Source
const PostgreSQLCDCType = "PostgresqlCDC"

PostgreSQLCDCType type of source

View Source
const PostgreSQLQueryType = "PostgresqlQuery"

PostgreSQLQueryType type of source

View Source
const RandomType = "Random"

RandomType type of source

View Source
const SqlserverCDCType = "SqlserverCDC"

SqlserverCDCType type of source

View Source
const SqlserverQueryType = "SqlserverQuery"

SqlserverQueryType type of source

View Source
const SyslogType = "Syslog"

SyslogType type of source

View Source
const TickerValue = 10

TickerValue number second to wait between to tick

Variables

View Source
var Factory = map[string]sourceCreatorT{
	RandomType:              NewRandom,
	MysqlQueryType:          NewMysqlQuery,
	MysqlCDCType:            NewMysqlCdc,
	PostgreSQLQueryType:     NewPostgreSQLQuery,
	PostgreSQLCDCType:       NewPostgreSQLCdc,
	SqlserverQueryType:      NewSqlserverSQLQuery,
	SqlserverCDCType:        NewSqlserverCDC,
	SyslogType:              NewSyslog,
	FileReadingFollowerType: NewFileReadingFollower,
}

Factory source Factory

Functions

This section is empty.

Types

type AgentHeader

type AgentHeader struct {
	Tenant   events.LookatchTenantInfo
	Hostname string
	UUID     string
}

AgentHeader representation of Agent Header contain agent auth information for events

type Column added in v0.2.0

type Column struct {
	Database               string   `json:"database,omitempty"`
	Schema                 string   `json:"schema,omitempty"`
	Table                  string   `json:"table,omitempty"`
	Column                 string   `json:"column"`
	ColumnOrdPos           int      `json:"column_ord_pos"`
	Nullable               bool     `json:"nullable"`
	DataType               string   `json:"data_type"`
	CharacterMaximumLength null.Int `json:"character_maximum_length,omitempty"`
	NumericPrecision       null.Int `json:"numeric_precision,omitempty"`
	NumericScale           null.Int `json:"numeric_scale,omitempty"`
	ColumnType             string   `json:"column_type"`
	ColumnKey              string   `json:"column_key,omitempty"`
}

Column representation of schema

type DBSQLQuery added in v0.2.0

type DBSQLQuery struct {
	*Source
	Config DBSQLQueryConfig
	// contains filtered or unexported fields
}

DBSQLQuery representation of DBSQL query

func NewDBSQLQuery added in v0.2.0

func NewDBSQLQuery(s *Source) DBSQLQuery

NewDBSQLQuery create new DBSQL query client

func (*DBSQLQuery) ExtractDatabaseTable added in v0.2.0

func (d *DBSQLQuery) ExtractDatabaseTable(query string) (string, string)

ExtractDatabaseTable Extract the database (or schema, depending on your RDBMS) name and table name from a SQL query

func (*DBSQLQuery) GetCapabilities added in v0.2.0

func (d *DBSQLQuery) GetCapabilities() map[string]*utils.TaskDescription

GetCapabilities returns the actions the collector can perform on this source

func (*DBSQLQuery) GetPrimary added in v0.2.0

func (d *DBSQLQuery) GetPrimary(schema, table string) string

GetPrimary check if columns is primary key

func (*DBSQLQuery) GetSchema added in v0.2.0

func (d *DBSQLQuery) GetSchema() map[string]map[string]*Column

GetSchema returns source schema

func (*DBSQLQuery) GetStatus added in v0.2.0

func (d *DBSQLQuery) GetStatus() interface{}

GetStatus returns the collector's source status

func (*DBSQLQuery) HealthCheck added in v0.2.0

func (d *DBSQLQuery) HealthCheck() bool

HealthCheck returns true if the source is correctly configured and the collector is connected to it

func (*DBSQLQuery) ProcessLines added in v0.2.0

func (d *DBSQLQuery) ProcessLines(columns []string, lines [][]interface{}, info QueryInfo, wg *sizedwaitgroup.SizedWaitGroup)

ProcessLines process batches of lines from a resultset to map them before sending them to a marshall goroutine

func (*DBSQLQuery) Query added in v0.2.0

func (d *DBSQLQuery) Query(database string, query string) (err error)

Query send a SQL query to the configured source

func (*DBSQLQuery) QueryMeta added in v0.2.0

func (d *DBSQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)

QueryMeta execute query metadata

func (*DBSQLQuery) QuerySchema added in v0.2.0

func (d *DBSQLQuery) QuerySchema(q string) (err error)

QuerySchema retrieves source's schema directly from the source itself

type DBSQLQueryConfig added in v0.2.0

type DBSQLQueryConfig struct {
	Host             string            `json:"host"`
	Port             int               `json:"port"`
	User             string            `json:"user"`
	Password         string            `json:"password"`
	BatchSize        int               `json:"batch_size" mapstructure:"batch_size"`
	NbWorker         int               `json:"nb_worker"  mapstructure:"nb_worker"`
	ColumnsMetaValue bool              `json:"columns_meta" mapstructure:"columns_meta"`
	DefinedPk        map[string]string `json:"defined_pk" mapstructure:"defined_pk"`
}

DBSQLQueryConfig representation of DBSQL query configuration

type FileReadingFollower added in v0.1.0

type FileReadingFollower struct {
	*Source
	// contains filtered or unexported fields
}

FileReadingFollower representation of FileReadingFollower

func (*FileReadingFollower) GetCapabilities added in v0.2.0

func (f *FileReadingFollower) GetCapabilities() map[string]*utils.TaskDescription

GetCapabilities returns available actions

func (*FileReadingFollower) GetMeta added in v0.1.0

func (f *FileReadingFollower) GetMeta() map[string]utils.Meta

GetMeta get source meta

func (*FileReadingFollower) GetSchema added in v0.1.0

func (f *FileReadingFollower) GetSchema() map[string]map[string]*Column

GetSchema Get source Schema

func (*FileReadingFollower) Process added in v0.1.0

func (f *FileReadingFollower) Process(action string, params ...interface{}) interface{}

Process action

func (*FileReadingFollower) Start added in v0.1.0

func (f *FileReadingFollower) Start(i ...interface{}) (err error)

Start source

func (*FileReadingFollower) UpdateCommittedLsn added in v0.2.0

func (f *FileReadingFollower) UpdateCommittedLsn()

UpdateCommittedLsn update CommittedLsn

type FileReadingFollowerConfig added in v0.1.0

type FileReadingFollowerConfig struct {
	Path   string `json:"path"`
	Offset int64  `json:"offset"`
}

FileReadingFollowerConfig representation of FileReadingFollower Config

type Message

type Message struct {
	Columnnames  []string      `json:"columnnames"`
	Columntypes  []string      `json:"columntypes"`
	Columnvalues []interface{} `json:"columnvalues"`
	Kind         string        `json:"kind"`
	Schema       string        `json:"schema"`
	Table        string        `json:"table"`
	Oldkeys      Oldkeys       `json:"oldkeys"`
}

Message representation of message

type Messages

type Messages struct {
	Change []Message `json:"change"`
}

Messages representation of messages

type Meta

type Meta struct {
	LastState    string        `json:"laststate"`
	CurrentLsn   pglogrepl.LSN `json:"current_lsn"`
	CommittedLsn pglogrepl.LSN `json:"committed_lsn"`
	SlotStatus   bool          `json:"slotstatus"`
	ServerWALEnd pglogrepl.LSN `json:"werver_wal_end"`
}

Meta representation of metadata

type MySQLQuery

type MySQLQuery struct {
	*DBSQLQuery
	// contains filtered or unexported fields
}

MySQLQuery representation of MySQL Query source

func (*MySQLQuery) Connect

func (m *MySQLQuery) Connect(schema string) error

Connect connection to database

func (*MySQLQuery) GetStatus

func (m *MySQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*MySQLQuery) HealthCheck

func (m *MySQLQuery) HealthCheck() bool

HealthCheck returns true if source is ok

func (*MySQLQuery) Init

func (m *MySQLQuery) Init()

Init initialisation of Mysql Query source

func (*MySQLQuery) Process

func (m *MySQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*MySQLQuery) Query

func (m *MySQLQuery) Query(query string) error

Query execute query string

func (*MySQLQuery) QueryMeta

func (m *MySQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)

QueryMeta execute query meta string

func (*MySQLQuery) QuerySchema

func (m *MySQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type MysqlCDC

type MysqlCDC struct {
	*Source
	// contains filtered or unexported fields
}

MysqlCDC representation of Mysql change data capture

func (*MysqlCDC) GetFirstBinlog

func (m *MysqlCDC) GetFirstBinlog() (mysql.Position, error)

GetFirstBinlog Get First Binlog offset

func (*MysqlCDC) GetGTIDFromMariaDBPosition added in v0.2.0

func (m *MysqlCDC) GetGTIDFromMariaDBPosition(pos mysql.Position) (mysql.GTIDSet, error)

GetGTIDFromMariaDBPosition return mariadb GTID from binlog position

func (*MysqlCDC) GetLastBinlog added in v0.2.0

func (m *MysqlCDC) GetLastBinlog() (mysql.Position, error)

GetLastBinlog Get last Binlog offset

func (*MysqlCDC) GetMariaDBPosGTID added in v0.2.2

func (m *MysqlCDC) GetMariaDBPosGTID() (mysql.GTIDSet, error)

GetMariaDBPosGTID return mariadb GTIDSet from slave

func (*MysqlCDC) GetMeta

func (m *MysqlCDC) GetMeta() map[string]utils.Meta

GetMeta get metadata

func (*MysqlCDC) GetSchema

func (m *MysqlCDC) GetSchema() map[string]map[string]*Column

GetSchema get schema

func (*MysqlCDC) GetValidBinlogFromOffset added in v0.2.0

func (m *MysqlCDC) GetValidBinlogFromOffset(offset string) mysql.Position

GetValidBinlogFromOffset return a valid binlog offset if offset is invalid return first binlog offset if offset is greater than master position offset return master position else return parse offset

func (*MysqlCDC) GetValidMariaDBGTIDFromOffset added in v0.2.0

func (m *MysqlCDC) GetValidMariaDBGTIDFromOffset(offset string) (mysql.GTIDSet, error)

GetValidBinlogFromOffset return a valid Mariadb GTID offset if offset is invalid return first GTID offset if offset is greater than master position offset return master position else return parse GTID offset

func (*MysqlCDC) GetValidMysqlGTIDFromOffset added in v0.2.0

func (m *MysqlCDC) GetValidMysqlGTIDFromOffset(offset string) (mysql.GTIDSet, error)

GetValidMysqlGTIDFromOffset return a valid Mariadb GTID offset parse string offset and add purge GTIDset if exist

func (*MysqlCDC) GetValidOffset added in v0.2.0

func (m *MysqlCDC) GetValidOffset(mode string, flavor string, offset string) error

GetValidOffset return a valid offset

func (*MysqlCDC) Init

func (m *MysqlCDC) Init()

Init source

func (*MysqlCDC) OnDDL added in v0.2.0

func (m *MysqlCDC) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error

OnDDL

func (*MysqlCDC) OnGTID added in v0.2.0

func (m *MysqlCDC) OnGTID(gset mysql.GTIDSet) error

OnGTID store GTID Position

func (*MysqlCDC) OnPosSynced added in v0.2.0

func (m *MysqlCDC) OnPosSynced(pos mysql.Position, gset mysql.GTIDSet, force bool) error

OnPosSynced Use your own way to sync position. When force is true, sync position immediately.

func (*MysqlCDC) OnRotate added in v0.2.0

func (m *MysqlCDC) OnRotate(e *replication.RotateEvent) error

OnRotate store binlog Position

func (*MysqlCDC) OnRow added in v0.2.0

func (m *MysqlCDC) OnRow(e *canal.RowsEvent) error

OnRow send row to multiplexer

func (*MysqlCDC) OnTableChanged added in v0.2.0

func (m *MysqlCDC) OnTableChanged(schema string, table string) error

OnTableChanged

func (*MysqlCDC) OnXID added in v0.2.0

func (m *MysqlCDC) OnXID(pos mysql.Position) error

OnXID store binlog Position

func (*MysqlCDC) ParsePosition added in v0.2.0

func (m *MysqlCDC) ParsePosition(offset string) (mysql.Position, error)

ParsePosition parse binlog offset from string

func (*MysqlCDC) Process

func (m *MysqlCDC) Process(action string, params ...interface{}) interface{}

Process action

func (*MysqlCDC) Start

func (m *MysqlCDC) Start(i ...interface{}) (err error)

Start source

func (*MysqlCDC) StartCanal added in v0.2.0

func (m *MysqlCDC) StartCanal() error

StartCanal decode event

func (*MysqlCDC) String added in v0.2.0

func (m *MysqlCDC) String() string

String

func (*MysqlCDC) UpdateCommittedLsn added in v0.2.0

func (m *MysqlCDC) UpdateCommittedLsn()

UpdateCommittedLsn update CommittedLsn

type MysqlCDCConfig

type MysqlCDCConfig struct {
	Enabled          bool                   `json:"enabled"`
	OldValue         bool                   `json:"old_value" mapstructure:"old_value"`
	ColumnsMetaValue bool                   `json:"columns_meta" mapstructure:"columns_meta"`
	SlaveID          uint32                 `json:"slave_id" mapstructure:"slave_id"`
	Host             string                 `json:"host"`
	Port             int                    `json:"port"`
	User             string                 `json:"user"`
	Password         string                 `json:"password"`
	Offset           string                 `json:"offset"`
	Flavor           string                 `json:"flavor"`
	Mode             string                 `json:"mode"`
	FilterPolicy     string                 `json:"filter_policy" mapstructure:"filter_policy"`
	Filter           map[string]interface{} `json:"filter"`
	DefinedPk        map[string]string      `json:"defined_pk" mapstructure:"defined_pk"`
}

MysqlCDCConfig representation of Mysql change data capture configuration

type MysqlCDCMeta added in v0.2.0

type MysqlCDCMeta struct {
	ErrorEventNumber int    `json:"error_event_number"`
	CommittedOffset  string `json:"committed_offset"`
}

MysqlCDCMeta representation of metadata

type MysqlOffset added in v0.2.0

type MysqlOffset struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

MysqlOffset representation of binlog and GTID Offset

func (*MysqlOffset) GTIDSet added in v0.2.0

func (m *MysqlOffset) GTIDSet() mysql.GTIDSet

GTIDSet get GTID

func (*MysqlOffset) OffsetString added in v0.2.0

func (m *MysqlOffset) OffsetString(mode string) string

OffsetString convert offset to string

func (*MysqlOffset) Position added in v0.2.0

func (m *MysqlOffset) Position() mysql.Position

Position get Position

func (*MysqlOffset) Update added in v0.2.0

func (m *MysqlOffset) Update(pos mysql.Position)

Update set pos

func (*MysqlOffset) UpdateGTIDSet added in v0.2.0

func (m *MysqlOffset) UpdateGTIDSet(gset mysql.GTIDSet)

UpdateGTIDSet set GTID pos

func (*MysqlOffset) UpdatePos added in v0.2.2

func (m *MysqlOffset) UpdatePos(pos uint32)

Update set pos

type MysqlQueryConfig

type MysqlQueryConfig struct {
	*DBSQLQueryConfig
	Schema  string   `json:"schema"`
	Exclude []string `json:"exclude"`
}

MysqlQueryConfig representation MySQL Query configuration

type OffsetCommittedState added in v0.2.2

type OffsetCommittedState struct {
	sync.RWMutex
	SendedLsn []pglogrepl.LSN
}

OffsetComittedState keep track offSend Event

func NewOffsetCommittedState added in v0.2.2

func NewOffsetCommittedState() *OffsetCommittedState

func (*OffsetCommittedState) Add added in v0.2.2

func (c *OffsetCommittedState) Add(lsn pglogrepl.LSN)

Add add new lsn to state

func (*OffsetCommittedState) CleanFromLsn added in v0.2.2

func (c *OffsetCommittedState) CleanFromLsn(lsn pglogrepl.LSN)

CleanFromLsn clean old lsn state

func (*OffsetCommittedState) IsEmpty added in v0.2.2

func (c *OffsetCommittedState) IsEmpty() bool

IsEmpty check if no offset in State

type Oldkeys

type Oldkeys struct {
	Keynames  []string      `json:"keynames"`
	Keytypes  []string      `json:"keytypes"`
	Keyvalues []interface{} `json:"keyvalues"`
}

Oldkeys representation of old event

type PostgreSQLCDC

type PostgreSQLCDC struct {
	*Source

	CommittedState *OffsetCommittedState
	// contains filtered or unexported fields
}

PostgreSQLCDC representation of PostgreSQL change data capture

func (*PostgreSQLCDC) GetConfirmedFlushLsn added in v0.2.2

func (p *PostgreSQLCDC) GetConfirmedFlushLsn() (pglogrepl.LSN, error)

GetRestartLsn Get restart Lsn

func (*PostgreSQLCDC) GetMeta

func (p *PostgreSQLCDC) GetMeta() map[string]utils.Meta

GetMeta get metadata

func (*PostgreSQLCDC) GetSchema

func (p *PostgreSQLCDC) GetSchema() map[string]map[string]*Column

GetSchema get schema

func (*PostgreSQLCDC) GetSlotStatus added in v0.2.0

func (p *PostgreSQLCDC) GetSlotStatus() bool

GetSlotStatus get slot status

func (*PostgreSQLCDC) HealthCheck

func (p *PostgreSQLCDC) HealthCheck() bool

HealthCheck returns true if ok

func (*PostgreSQLCDC) Init

func (p *PostgreSQLCDC) Init()

Init source

func (*PostgreSQLCDC) NewConn added in v0.2.0

func (p *PostgreSQLCDC) NewConn() (*pgconn.PgConn, error)

NewConn create new pg connection

func (*PostgreSQLCDC) Process

func (p *PostgreSQLCDC) Process(action string, params ...interface{}) interface{}

Process action

func (*PostgreSQLCDC) Start

func (p *PostgreSQLCDC) Start(i ...interface{}) (err error)

Start source

func (*PostgreSQLCDC) StartReplication added in v0.1.0

func (p *PostgreSQLCDC) StartReplication()

StartReplication Start Replication

func (*PostgreSQLCDC) UpdateCommittedLsn added in v0.2.0

func (p *PostgreSQLCDC) UpdateCommittedLsn()

UpdateCommittedLsn update CommittedLsn

type PostgreSQLCDCConf

type PostgreSQLCDCConf struct {
	Enabled          bool                   `json:"enabled"`
	OldValue         bool                   `json:"old_value" mapstructure:"old_value"`
	ColumnsMetaValue bool                   `json:"columns_meta" mapstructure:"columns_meta"`
	Host             string                 `json:"host"`
	Port             int                    `json:"port"`
	User             string                 `json:"user"`
	Password         string                 `json:"password"`
	SslMode          string                 `json:"sslmode"`
	Database         string                 `json:"database"`
	SlotName         string                 `json:"slot_name" mapstructure:"slot_name"`
	FilterPolicy     string                 `json:"filter_policy" mapstructure:"filter_policy"`
	Filter           map[string]interface{} `json:"filter"`
	DefinedPk        map[string]string      `json:"defined_pk" mapstructure:"defined_pk"`
}

PostgreSQLCDCConf representation of PostgreSQL change data capture configuration

type PostgreSQLQuery

type PostgreSQLQuery struct {
	*DBSQLQuery
	// contains filtered or unexported fields
}

PostgreSQLQuery representation of PostgreSQL Query source

func (*PostgreSQLQuery) Connect

func (p *PostgreSQLQuery) Connect()

Connect connection to database

func (*PostgreSQLQuery) GetStatus

func (p *PostgreSQLQuery) GetStatus() interface{}

GetStatus returns current status of connexion

func (*PostgreSQLQuery) HealthCheck

func (p *PostgreSQLQuery) HealthCheck() bool

HealthCheck returns true if the source is correctly configured and the collector is connected to it

func (*PostgreSQLQuery) Init

func (p *PostgreSQLQuery) Init()

Init initialisation of PostgreSQL Query source

func (*PostgreSQLQuery) Process

func (p *PostgreSQLQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*PostgreSQLQuery) Query

func (p *PostgreSQLQuery) Query(query string) error

Query execute query string

func (*PostgreSQLQuery) QueryMeta

func (p *PostgreSQLQuery) QueryMeta(query string) ([]map[string]interface{}, error)

QueryMeta execute query meta string

func (*PostgreSQLQuery) QuerySchema

func (p *PostgreSQLQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type PostgreSQLQueryConfig

type PostgreSQLQueryConfig struct {
	*DBSQLQueryConfig
	SslMode  string `json:"sslmode"`
	Database string `json:"database"`
}

PostgreSQLQueryConfig representation PostgreSQL Query configuration

type Query

type Query struct {
	Query string `name:"query" description:"SQL query to execute on the source by the collector" required:"true"`
}

Query representation of query action

type QueryInfo added in v0.2.2

type QueryInfo struct {
	Database      string
	Schema        string
	Table         string
	PrimaryKey    string
	ExecTimestamp string
	ColumnMeta    map[string]events.ColumnsMeta
}

QueryInfo need Inbformation for query Event

type Random

type Random struct {
	*Source

	NbMessages uint64
	// contains filtered or unexported fields
}

Random representation of Random

func (*Random) GetMeta

func (r *Random) GetMeta() map[string]utils.Meta

GetMeta get source meta

func (*Random) GetSchema

func (r *Random) GetSchema() map[string]map[string]*Column

GetSchema Get source Schema

func (*Random) GetStatus

func (r *Random) GetStatus() interface{}

GetStatus Get source status

func (*Random) HealthCheck

func (r *Random) HealthCheck() bool

HealthCheck returns true if the source is correctly configured and the collector is connected to it

func (*Random) Process

func (r *Random) Process(action string, params ...interface{}) interface{}

Process action

func (*Random) Start

func (r *Random) Start(i ...interface{}) error

Start source

type RandomConfig

type RandomConfig struct {
	Wait string `json:"wait"`
}

RandomConfig representation of Random Config

type SQLSchema added in v0.1.0

type SQLSchema map[string]map[string]map[string]*Column

SQLSchema schema table Position

type Source

type Source struct {
	Name          string
	OutputChannel chan events.LookatchEvent
	CommitChannel chan interface{}
	AgentInfo     *AgentHeader
	Conf          *viper.Viper
	Offset        int64
	Status        string
}

Source representation of source

func (*Source) GetCapabilities added in v0.2.0

func (s *Source) GetCapabilities() map[string]*utils.TaskDescription

GetCapabilities returns available actions

func (*Source) GetCommitChan added in v0.2.0

func (s *Source) GetCommitChan() chan interface{}

GetCommitChan return commit channel attach to source

func (*Source) GetMeta added in v0.2.0

func (s *Source) GetMeta() map[string]utils.Meta

GetMeta returns source meta

func (*Source) GetName added in v0.2.0

func (s *Source) GetName() string

GetName get name of source

func (*Source) GetOutputChan added in v0.2.0

func (s *Source) GetOutputChan() chan events.LookatchEvent

GetOutputChan get output channel

func (*Source) GetStatus added in v0.2.0

func (s *Source) GetStatus() interface{}

GetStatus returns the collector's source status

func (*Source) HealthCheck added in v0.2.0

func (s *Source) HealthCheck() bool

HealthCheck returns true if the source is correctly configured and the collector is connected to it

func (*Source) Init added in v0.2.0

func (s *Source) Init()

Init source

func (*Source) IsEnable

func (s *Source) IsEnable() bool

IsEnable check if the configured source is enabled

func (*Source) Start added in v0.2.0

func (s *Source) Start(i ...interface{}) (err error)

Start source

func (*Source) Stop added in v0.2.0

func (s *Source) Stop() error

Stop source

func (*Source) UpdateCommittedLsn added in v0.2.0

func (s *Source) UpdateCommittedLsn()

UpdateCommittedLsn do noting avoid deadlock

type SourceI

type SourceI interface {
	Init()
	Stop() error
	Start(...interface{}) error
	GetName() string
	GetOutputChan() chan events.LookatchEvent
	GetCommitChan() chan interface{}
	UpdateCommittedLsn()
	GetMeta() map[string]utils.Meta
	GetSchema() map[string]map[string]*Column
	GetStatus() interface{}
	IsEnable() bool
	HealthCheck() bool
	GetCapabilities() map[string]*utils.TaskDescription
	Process(string, ...interface{}) interface{}
}

SourceI interface of source

func New

func New(name string, sourceType string, config *viper.Viper) (s SourceI, err error)

New create new source

func NewFileReadingFollower added in v0.2.2

func NewFileReadingFollower(s *Source) (SourceI, error)

NewFileReadingFollower create new FileReadingFollower source

func NewMysqlCdc added in v0.2.2

func NewMysqlCdc(s *Source) (SourceI, error)

NewMysqlCdc create new mysql CDC source

func NewMysqlQuery added in v0.2.2

func NewMysqlQuery(s *Source) (SourceI, error)

NewMysqlQuery create a Mysql Query source

func NewPostgreSQLCdc added in v0.2.2

func NewPostgreSQLCdc(s *Source) (SourceI, error)

NewPostgreSQLCdc create new PostgreSQL CDC source

func NewPostgreSQLQuery added in v0.2.2

func NewPostgreSQLQuery(s *Source) (SourceI, error)

NewPostgreSQLQuery create a PostgreSQL Query source

func NewRandom added in v0.2.2

func NewRandom(s *Source) (SourceI, error)

NewRandom create new Random source

func NewSqlserverCDC added in v0.2.2

func NewSqlserverCDC(s *Source) (SourceI, error)

NewSqlserverCDC create a Sqlserver Query source

func NewSqlserverSQLQuery added in v0.2.2

func NewSqlserverSQLQuery(s *Source) (SourceI, error)

NewSqlserverSQLQuery create a Sqlserver Query source

func NewSyslog added in v0.2.2

func NewSyslog(s *Source) (SourceI, error)

NewSyslog create new syslog source

type SqlserverCDC added in v0.2.0

type SqlserverCDC struct {
	*Source
	// contains filtered or unexported fields
}

SqlserverCDC representation of Sqlserver Query source

func (*SqlserverCDC) Connect added in v0.2.0

func (s *SqlserverCDC) Connect()

Connect connection to database

func (*SqlserverCDC) GetCapabilities added in v0.2.0

func (s *SqlserverCDC) GetCapabilities() map[string]*utils.TaskDescription

GetCapabilities returns available actions

func (*SqlserverCDC) GetChangesForTables added in v0.2.0

func (s *SqlserverCDC) GetChangesForTables(table string, fromLsn []byte, toLsn []byte) []map[string]interface{}

GetChangesForTables get all change for table between fromLsn and toLsn

func (*SqlserverCDC) GetMaxLsn added in v0.2.0

func (s *SqlserverCDC) GetMaxLsn() []byte

GetMaxLsn get max lsn of server

func (*SqlserverCDC) GetMeta added in v0.2.0

func (s *SqlserverCDC) GetMeta() map[string]utils.Meta

GetMeta get metadata

func (*SqlserverCDC) GetMinLsn added in v0.2.0

func (s *SqlserverCDC) GetMinLsn(table string) []byte

GetMinLsn get start lsn of a table

func (*SqlserverCDC) GetNextLsn added in v0.2.0

func (s *SqlserverCDC) GetNextLsn(lsn []byte) []byte

GetMinLsn get start lsn of a table

func (*SqlserverCDC) GetRecordedChances added in v0.2.0

func (s *SqlserverCDC) GetRecordedChances()

GetRecordedChances get cdc change for table list

func (*SqlserverCDC) GetSchema added in v0.2.0

func (s *SqlserverCDC) GetSchema() map[string]map[string]*Column

GetSchema get schema

func (*SqlserverCDC) GetTimestampFromLsn added in v0.2.0

func (s *SqlserverCDC) GetTimestampFromLsn(lsn []byte) int64

GetTimestampFromLsn get Timestamp associated to lsn

func (*SqlserverCDC) HealthCheck added in v0.2.0

func (s *SqlserverCDC) HealthCheck() bool

HealthCheck returns true if source is ok

func (*SqlserverCDC) Init added in v0.2.0

func (s *SqlserverCDC) Init()

Init initialisation of Sqlserver Query source

func (*SqlserverCDC) Process added in v0.2.0

func (s *SqlserverCDC) Process(action string, params ...interface{}) interface{}

Process process an action

func (*SqlserverCDC) ProcessRow added in v0.2.0

func (s *SqlserverCDC) ProcessRow(row map[string]interface{}, schema string, table string, pk string, method string)

ProcessRow send row to chan

func (*SqlserverCDC) Query added in v0.2.0

func (s *SqlserverCDC) Query(query string) []map[string]interface{}

Query execute query

func (*SqlserverCDC) Start added in v0.2.0

func (s *SqlserverCDC) Start(i ...interface{}) (err error)

Start source

func (*SqlserverCDC) Stop added in v0.2.0

func (s *SqlserverCDC) Stop() error

Stop source

func (*SqlserverCDC) UpdateChangeTables added in v0.2.0

func (s *SqlserverCDC) UpdateChangeTables()

UpdateChangeTables update list of activated cdc tables

func (*SqlserverCDC) UpdateCommittedLsn added in v0.2.0

func (s *SqlserverCDC) UpdateCommittedLsn()

UpdateCommittedLsn update CommittedLsn

type SqlserverCDCConfig added in v0.2.0

type SqlserverCDCConfig struct {
	Host         string                 `json:"host"`
	Port         int                    `json:"port"`
	User         string                 `json:"user"`
	Password     string                 `json:"password"`
	SslMode      string                 `json:"sslmode"`
	Database     string                 `json:"database"`
	PollInterval string                 `json:"poll_interval" mapstructure:"poll_interval"`
	FilterPolicy string                 `json:"filter_policy" mapstructure:"filter_policy"`
	Filter       map[string]interface{} `json:"filter"`
	Enabled      bool                   `json:"enabled"`
	Lsn          string                 `json:"lsn"`
}

SqlserverCDCConfig representation Sqlserver Query configuration

type SqlserverCDCMeta added in v0.2.0

type SqlserverCDCMeta struct {
	CurrentLsn string `json:"current_lsn"`
}

SqlserverCDCMeta representation of matadata

type SqlserverQuery added in v0.2.0

type SqlserverQuery struct {
	*DBSQLQuery
	// contains filtered or unexported fields
}

SqlserverQuery representation of Sqlserver Query source

func (*SqlserverQuery) Connect added in v0.2.0

func (m *SqlserverQuery) Connect()

Connect connection to database

func (*SqlserverQuery) GetStatus added in v0.2.0

func (m *SqlserverQuery) GetStatus() interface{}

GetStatus returns the collector's source status

func (*SqlserverQuery) HealthCheck added in v0.2.0

func (m *SqlserverQuery) HealthCheck() bool

HealthCheck returns true if the source is correctly configured and the collector is connected to it

func (*SqlserverQuery) Init added in v0.2.0

func (m *SqlserverQuery) Init()

Init initialisation of Sqlserver Query source

func (*SqlserverQuery) Process added in v0.2.0

func (m *SqlserverQuery) Process(action string, params ...interface{}) interface{}

Process process an action

func (*SqlserverQuery) Query added in v0.2.0

func (m *SqlserverQuery) Query(query string) error

Query execute query string

func (*SqlserverQuery) QueryMeta added in v0.2.0

func (m *SqlserverQuery) QueryMeta(query string) ([]map[string]interface{}, error)

QueryMeta execute query meta string

func (*SqlserverQuery) QuerySchema added in v0.2.0

func (m *SqlserverQuery) QuerySchema() (err error)

QuerySchema extract schema from database

type SqlserverQueryConfig added in v0.2.0

type SqlserverQueryConfig struct {
	*DBSQLQueryConfig
	SslMode  string `json:"sslmode"`
	Database string `json:"database"`
}

SqlserverQueryConfig representation Sqlserver Query configuration

type Syslog added in v0.1.0

type Syslog struct {
	*Source

	NbMessages int
	// contains filtered or unexported fields
}

Syslog representation of Random

func (*Syslog) GetCapabilities added in v0.2.0

func (s *Syslog) GetCapabilities() map[string]*utils.TaskDescription

GetCapabilities returns available actions

func (*Syslog) GetCommitChan added in v0.2.0

func (s *Syslog) GetCommitChan() chan interface{}

GetCommitChan return commit channel attach to source

func (*Syslog) GetMeta added in v0.1.0

func (s *Syslog) GetMeta() map[string]utils.Meta

GetMeta returns source meta

func (*Syslog) GetName added in v0.1.0

func (s *Syslog) GetName() string

GetName get source name

func (*Syslog) GetOutputChan added in v0.1.0

func (s *Syslog) GetOutputChan() chan events.LookatchEvent

GetOutputChan get output channel

func (*Syslog) GetSchema added in v0.1.0

func (s *Syslog) GetSchema() map[string]map[string]*Column

GetSchema returns schema

func (*Syslog) GetStatus added in v0.1.0

func (s *Syslog) GetStatus() interface{}

GetStatus get source status

func (*Syslog) HealthCheck added in v0.1.0

func (s *Syslog) HealthCheck() bool

GetStatus returns the collector's source status

func (*Syslog) Init added in v0.1.0

func (s *Syslog) Init()

Init syslog source

func (*Syslog) IsEnable added in v0.1.0

func (s *Syslog) IsEnable() bool

IsEnable check if source is enable

func (*Syslog) Process added in v0.1.0

func (s *Syslog) Process(action string, params ...interface{}) interface{}

Process action

func (*Syslog) Start added in v0.1.0

func (s *Syslog) Start(i ...interface{}) error

Start syslog source

func (*Syslog) Stop added in v0.1.0

func (s *Syslog) Stop() error

Stop syslog source

type SyslogConfig added in v0.1.0

type SyslogConfig struct {
	Type string `json:"Type"`
	Port int    `json:"Port"`
}

SyslogConfig representation of Random Config

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL