1、需求
在 Flink 发布SpringBoot 打包的 jar 包能够实时同步 MySQL 表,做到原表进行新增、修改、删除的时候目标表都能对应同步。
2、设计
- 在 SpringBoot 用 Java 做业务代码的开发;
- 基于Flink CDC 用 FlinkSQL 做 Mysql 实时同步处理;
- 打包成 jar 包后用 Flink 运行并管理。
3、环境要求
MySQL | 8.* |
Flink | 1.16.2 |
Flink CDC | 2.3.0 |
Java | 8 |
SpringBoot | 2.7.12 |
3、代码实现
3.1、pom 文件
pom 文件可以整个复制过来,自己打包运行可能会遇到各种各样的错,可以直接全部复制。
pom 中的 mainClass 一定要替换成自己的
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.7.12
com.usoten
processminingreport
0.0.1-SNAPSHOT
ProcessMiningReport
ProcessMiningReport
1.8
1.16.2
2.3.0
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
com.mysql
mysql-connector-j
org.apache.flink
flink-java
${flink.version}
org.slf4j
slf4j-api
org.apache.flink
flink-clients
${flink.version}
org.apache.flink
flink-streaming-java
${flink.version}
org.apache.flink
flink-table-api-java-bridge
${flink.version}
org.apache.flink
flink-table-planner-loader
${flink.version}
provided
org.apache.flink
flink-table-runtime
${flink.version}
org.apache.flink
flink-runtime-web
${flink.version}
test
org.apache.flink
flink-connector-base
${flink.version}
org.apache.flink
flink-connector-jdbc
${flink.version}
org.apache.flink
flink-connector-files
${flink.version}
com.ververica
flink-sql-connector-mysql-cdc
${flink-cdc.version}
org.apache.maven.plugins
maven-shade-plugin
true
true
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
package
shade
META-INF/spring.handlers
META-INF/spring.schemas
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
META-INF/spring/org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration.imports
META-INF/spring.factories
com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication
3.2、代码实现
需要将原始表同步到目标表,这里我们需要执行三个 SQL,所以入参用 List。具体 SQL 见后面 postman 调用时的例子。
启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProcessMiningCdcManagerApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ProcessMiningCdcManagerApplication.class, args);
while (true){
Thread.sleep(30000);
}
}
}
controller层
import com.usoten.processminingcdcmanager.service.CdcBaseService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/datasource")
public class CdcBaseController {
private CdcBaseService cdcBaseService;
public CdcBaseController(CdcBaseService cdcBaseService) {
this.cdcBaseService = cdcBaseService;
}
@PostMapping("/cdc/executeSql")
public void getColumnMetadata(@RequestBody List sqlList) {
cdcBaseService.executeSql(sqlList);
}
}
service层
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
@Service
public class CdcBaseService {
public void executeSql(@RequestBody List sqlList) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
sqlList.forEach(sql -> {
tEnv.executeSql(sql);
});
}
}
4、打包
执行 mvn clean package 打包,并将包放进 Flink 的 lib 目录下
5、运行 jar
nohup ./bin/flink run --class com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication ./lib/processminingreport-0.0.1-SNAPSHOT.jar &
- 报错1
20:30:59.960 [http-nio-8100-exec-1] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.IllegalStateException: Unable to instantiate java compiler] with root cause
java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)
这是因为Flink 把客户端的ClassLoader解析顺序调整为了Child优先,这就导致用户的Jar包不能包含Flink框架的classes,比如常见的Calcite、Flink-Planner依赖、Hive依赖等等。用户需要把有冲突classes的jar放到flink-home/lib下,或者调整策略为Parent优先,这里直接调整为 Parent 优先,$
FLINK_HOME/conf/flink-conf.yaml中 添加
classloader.resolve-order: parent-first
- 报错2
如果是报下面的错,是因为main方法停止了,解决的话可以看前面的启动类加一个死循环
重新启动 Flink 再次运行 jar,出现下面日志即可
6、准备数据
- 建一个 test 数据库
- 建两张表 原表:test 目标表:test1
- 先在 test 表中造几条数据
- CREATE TABLE `test` (
`id` int NOT NULL,
`username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
CREATE TABLE `test` (
`id` int NOT NULL,
`username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; - mysql 需要开启 binlog,并且表有主键
7、发送请求
localhost:8100/datasource/cdc/executeSql
body
[
"CREATE TABLE mysql_source ( id INT, username STRING, password STRING,PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'table-name' = 'test', 'hostname' = 'localhost', 'database-name' = 'test', 'port' = '3306', 'username' = 'root', 'password' = '12345678', 'scan.startup.mode' = 'initial')",
"CREATE TABLE oceanbase_sink ( id INT NOT NULL, username STRING, password STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'test1', 'username' = 'root', 'password' = '12345678')",
"insert into oceanbase_sink select id,username,password from mysql_source"
]
请求响应成功过后,我们进入 Flink 页面就可以看到运行的任务,此任务会一直运行,监听并同步 MySQL。
验证
此时我们会发现test表中的数据会同步test1表中,然后u对test表做新增、修改、删除操作时,test1表都会做相应变化
Flink 入门系列入口:
第4章 Flink 基础API 三:转换算子(Transformation)
.....
更多内容持续更新,点关注、不迷路。。。