配置flink进行mysql、dm、oracle、sqlserver的cdc流式同步

记录下flink进行mysql、dm、oracle、sqlserver的流式同步,其中mysql、oracle、sqlserver都有官方flink-cdc提供的包,而dm有达梦数据库提供的cdc包,因此首先说明mysql、oracle、sqlserver的连接

MySQL

配置MySQL

mysql首先需要打开数据库的binlog配置
执行show variables like 'log_bin';,如果返回的是ON代表开启了binlog
image 1.png
如果mysql没有开启binlog模式,则需要配置MySQL CDC Binlog

添加MySQL依赖

由于项目原因,需要mysql、oracle、sqlserver、dm四种数据库都支持cdc流式同步,因此需要四种数据库之间引用的依赖不能有冲突,因此需要对官方提供包依赖路径进行修改,具体可见编译flink-connector-cdc包
将重新编译的jar包放入/opt/flink/lib文件夹中
其中/opt/flink/lib为flink目录的lib文件夹

设置Mysql-cdc

在flink-sql中使用mysql-cdc来流式监控,下面是一个示例流式同步sql语句,从一个mysql数据库中cdc流式同步到另一个mysql数据库

DROP TABLE IF EXISTS mysql_binlog021701;

-- 创建新表
CREATE TABLE mysql_binlog021701 (
id INT NOT NULL,
xiangmu1 STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxx.xxx.xxx.xxx',
'port' = 'xxxx',
'username' = 'root',
'password' = 'password',
'database-name' = 'hl',
'table-name' = 'flink1',
'debezium.snapshot.locking.mode' = 'none',
'scan.incremental.snapshot.enabled' = 'false'
);
-- 如果表存在,则删除表
DROP TABLE IF EXISTS mysql_binlog021702;
-- 创建新表
CREATE TABLE mysql_binlog021702 (
id INT,
xiangmu1 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:xxxx/hl',
'username' = 'root',
'password' = 'password',
'table-name' = 'flink2'
);
-- 插入数据
INSERT INTO mysql_binlog021702
SELECT * FROM mysql_binlog021701;

设置Oracle-cdc

Oracle-cdc的处理大部分同Mysql-cdc的处理相同,同样的如果在同一环境下使用多个数据库的cdc,需要进行处理编译flink-connector-cdc包
下面是一个oracle-cdc流式同步的示例

DROP TABLE IF EXISTS oracle_binlog021701;
CREATE TABLE oracle_binlog021701 (
ID DECIMAL(38,0),
XIANGMU1 STRING
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'xxx.xxx.xxx.xxx',
'port' = 'xxxx',
'username' = 'flink_user',
'password' = 'Password123456',
'database-name' = 'ORCLCDB',
'schema-name' = 'FLINK_USER',
'table-name' = 'FLINK1',
'scan.startup.mode' = 'initial',
'debezium.snapshot.locking.mode' = 'extended',
'scan.incremental.snapshot.enabled' = 'true'
);
DROP TABLE IF EXISTS oracle_binlog021702;
CREATE TABLE oracle_binlog021702 (
ID DECIMAL(38,0),
XIANGMU1 STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:oracle:thin:@//xxx.xxx.xxx.xxx:xxxx/ORCLCDB',
'username' = 'flink_user',
'password' = 'Password123456',
'table-name' = 'FLINK_USER.FLINK2'
);
INSERT INTO oracle_binlog021702
SELECT * FROM oracle_binlog021701;

设置SQLServer-cdc

如上设置flink-sql-connector-sqlserver-cdc包,编译flink-connector-cdc包
下面是一个sqlserver-cdc示例

DROP TABLE IF EXISTS sqlserver_binlog021701;
CREATE TABLE sqlserver_binlog021701 (
id INT NOT NULL,
xiangmu1 STRING
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'xxx.xxx.xxx.xxx',
'port' = 'xxxxx',
'username' = 'sa',
'password' = 'iD8lM8rZ0bE1',
'database-name' = 'dmp_test',
'table-name' = 'dbo.flink1'
);
DROP TABLE IF EXISTS sqlserver_binlog021702;

CREATE TABLE sqlserver_binlog021702 (
id INT,
xiangmu1 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://xxx.xxx.xxx.xxx:xxxx\;databaseName=dmp_test',
'username' = 'sa',
'password' = 'iD8lM8rZ0bE1',
'table-name' = 'dbo.flink2'
);
INSERT INTO sqlserver_binlog021702
SELECT * FROM sqlserver_binlog021701;

需要注意sqlserver的jdbc中分号之前需要加一个转义符,以避免flink根据分号来做分割。但是转义符在连接sqlserver又会导致错误,因此需要修改flink-connector-jdbc将url中的转义符去掉,具体可见flink-connector-jdbc_2.4.1添加sqlserver方言