Documentation
¶
Overview ¶
Package binlogmsg contains publisher implemenation to 'publish' (store) messages to MySQL8 tables then flush to downstream publisher using binlog notification.
Since messages are stored in normal MySQL tables, all ACID properties are applied to them.
Index ¶
- Variables
- func CreateMsgTable(ctx context.Context, q sqlh.Queryer, schema, table string) error
- func NewMsgPublisher(encoder npenc.Encoder, q sqlh.Queryer, schema, table string) MsgPublisherFunc
- func NewPbJsonPublisher(q sqlh.Queryer, schema, table string) MsgPublisherFunc
- type MsgPipe
- type MsgPipeOption
- type MsgTableFilter
Constants ¶
This section is empty.
Variables ¶
var ( // DefaultMaxInflight is the default value of PipeOptMaxInflight. DefaultMaxInflight = 4096 // DefaultRetryWait is the default value of PipeOptRetryWait. DefaultRetryWait = 5 * time.Second )
Functions ¶
func CreateMsgTable ¶
CreateMsgTable creates a msg table to store msgs.
func NewMsgPublisher ¶
NewMsgPublisher creates a publisher to publish (store) message to MySQL msg tables:
- encoder: encoder for messages.
- q: *sql.DB/*sql.Tx/*sql.Conn/...
- schema: database name.
- table: msg table name, the table must be created by CreateMsgTable.
func NewPbJsonPublisher ¶
NewPbJsonPublisher creates a msg publisher using protobuf or json for encoding:
- If msg is proto.Message, then use protobuf.
- Otherwise use json.
Types ¶
type MsgPipe ¶
type MsgPipe struct {
// contains filtered or unexported fields
}
MsgPipe pipes messages from MySQL (>=8.0.2) msg tables to downstream. Messages from MsgPipe have type *rawenc.RawData. So downstream must be able to handle this type of messages.
func NewMsgPipe ¶
func NewMsgPipe( downstream interface{}, masterCfg *mycanal.Config, slaveCfg *mycanal.Config, tableFilter MsgTableFilter, opts ...MsgPipeOption, ) (*MsgPipe, error)
NewMsgPipe creates a new msg pipe:
- downstream: must be MsgPublisher or MsgAsyncPublisher.
- masterCfg: master connection to delete published messages.
- slaveCfg: slave connection for fulldump and incrdump.
- tableFilter: determine whether a table is used to store messages.
type MsgPipeOption ¶
MsgPipeOption is option in creating MsgPipe.
func PipeOptLogger ¶
func PipeOptLogger(logger logr.Logger) MsgPipeOption
PipeOptLogger sets logger for MsgPipe.
func PipeOptMaxInflight ¶
func PipeOptMaxInflight(maxInflight int) MsgPipeOption
PipeOptMaxInflight sets the max number of messages inflight (publishing).
func PipeOptRetryWait ¶
func PipeOptRetryWait(t time.Duration) MsgPipeOption
PipeOptRetryWait sets the interval between retries due to all kinds of errors.
type MsgTableFilter ¶
MsgTableFilter returns true if a given table is a msg table.