百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

SpringBoot 整合 Flink 实时同步 MySQL

ztj100 2025-04-02 00:36 5 浏览 0 评论

1、需求

在 Flink 发布SpringBoot 打包的 jar 包能够实时同步 MySQL 表,做到原表进行新增、修改、删除的时候目标表都能对应同步。

2、设计

  1. 在 SpringBoot 用 Java 做业务代码的开发;
  2. 基于Flink CDC 用 FlinkSQL 做 Mysql 实时同步处理;
  3. 打包成 jar 包后用 Flink 运行并管理。

3、环境要求


MySQL

8.*

Flink

1.16.2

Flink CDC

2.3.0

Java

8

SpringBoot

2.7.12

3、代码实现

3.1、pom 文件

pom 文件可以整个复制过来,自己打包运行可能会遇到各种各样的错,可以直接全部复制。

pom 中的 mainClass 一定要替换成自己的



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.12
         
    
    com.usoten
    processminingreport
    0.0.1-SNAPSHOT
    ProcessMiningReport
    ProcessMiningReport
    
        1.8
        1.16.2
        2.3.0
    
    
        
            org.springframework.boot
            spring-boot-starter-web
           
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            com.mysql
            mysql-connector-j
        

        
        
            org.apache.flink
            flink-java
            ${flink.version}
            
                
                    org.slf4j
                    slf4j-api
                
            
        
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-loader
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-table-runtime
            ${flink.version}
        
        
        
            org.apache.flink
            flink-runtime-web
            ${flink.version}
            test
        
        
            org.apache.flink
            flink-connector-base
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-jdbc
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-files
            ${flink.version}
        

        
        
            com.ververica
            flink-sql-connector-mysql-cdc
            ${flink-cdc.version}
        

    

    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                
                    true
                    true
                    
                        
                            *:*
                            
                                META-INF/*.SF
                                META-INF/*.DSA
                                META-INF/*.RSA
                            
                        
                    
                
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    META-INF/spring.handlers
                                
                                
                                    META-INF/spring.schemas
                                
                                
                                    META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
                                
                                
                                    META-INF/spring/org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration.imports
                                
                                
                                    META-INF/spring.factories
                                
                                
                                
                                    
                                    com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication
                                
                            
                        
                    
                
            
        
    

3.2、代码实现

需要将原始表同步到目标表,这里我们需要执行三个 SQL,所以入参用 List。具体 SQL 见后面 postman 调用时的例子。

启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProcessMiningCdcManagerApplication {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ProcessMiningCdcManagerApplication.class, args);

        while (true){
            Thread.sleep(30000);
        }
    }

}

controller层

import com.usoten.processminingcdcmanager.service.CdcBaseService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController

@RequestMapping("/datasource")

public class CdcBaseController {

    private CdcBaseService cdcBaseService;

    public CdcBaseController(CdcBaseService cdcBaseService) {
        this.cdcBaseService = cdcBaseService;
    }

    @PostMapping("/cdc/executeSql")
    public void getColumnMetadata(@RequestBody List sqlList) {

        cdcBaseService.executeSql(sqlList);
    }
}

service层

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@Service
public class CdcBaseService {
    public void executeSql(@RequestBody List sqlList) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        sqlList.forEach(sql -> {
                tEnv.executeSql(sql);
        });

    }
}

4、打包

执行 mvn clean package 打包,并将包放进 Flink 的 lib 目录下

5、运行 jar

nohup ./bin/flink run --class com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication ./lib/processminingreport-0.0.1-SNAPSHOT.jar &
  • 报错1
20:30:59.960 [http-nio-8100-exec-1] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.IllegalStateException: Unable to instantiate java compiler] with root cause
java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
	at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
	at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
	at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)

这是因为Flink 把客户端的ClassLoader解析顺序调整为了Child优先,这就导致用户的Jar包不能包含Flink框架的classes,比如常见的Calcite、Flink-Planner依赖、Hive依赖等等。用户需要把有冲突classes的jar放到flink-home/lib下,或者调整策略为Parent优先,这里直接调整为 Parent 优先,$
FLINK_HOME/conf/flink-conf.yaml中 添加

classloader.resolve-order: parent-first
  • 报错2

如果是报下面的错,是因为main方法停止了,解决的话可以看前面的启动类加一个死循环

重新启动 Flink 再次运行 jar,出现下面日志即可

6、准备数据

  • 建一个 test 数据库
  • 建两张表 原表:test 目标表:test1
  • 先在 test 表中造几条数据
  • CREATE TABLE `test` (
    `id` int NOT NULL,
    `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
    `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
    CREATE TABLE `test` (
    `id` int NOT NULL,
    `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
    `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  • mysql 需要开启 binlog,并且表有主键

7、发送请求

localhost:8100/datasource/cdc/executeSql

body

[
    "CREATE TABLE mysql_source ( id INT, username STRING,  password STRING,PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'table-name' = 'test', 'hostname' = 'localhost', 'database-name' = 'test', 'port' = '3306', 'username' = 'root', 'password' = '12345678', 'scan.startup.mode' = 'initial')",
    "CREATE TABLE oceanbase_sink ( id INT NOT NULL, username STRING,  password STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'test1', 'username' = 'root', 'password' = '12345678')",
    "insert into oceanbase_sink select id,username,password from mysql_source"
]

请求响应成功过后,我们进入 Flink 页面就可以看到运行的任务,此任务会一直运行,监听并同步 MySQL。

验证

此时我们会发现test表中的数据会同步test1表中,然后u对test表做新增、修改、删除操作时,test1表都会做相应变化


Flink 入门系列入口:

第1章 Flink 基础概念

第3章 Flink的运行架构

第4章 Flink 基础API 三:转换算子(Transformation)

.....

更多内容持续更新,点关注、不迷路。。。

相关推荐

从IDEA开始,迈进GO语言之门(idea got)

前言笔者在学习GO语言编程的时候,GO语言在国内还没有像JAVA/Php/Python那样普及,绕了不少的弯路,要开始入门学习一门编程语言,最好就先从选择一个好的编程语言的开发环境开始,有了这个开发环...

基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于SpringBoot...

基于springboot的个人服装管理系统java网上商城jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...

基于springboot的美食网站Java食品销售jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...

贸易管理进销存springboot云管货管账分析java jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述贸易管理进销存spring...

SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql

本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍SpringBoot+V...

目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗

帮粉丝找了一个基于SpringBoot的天猫商城项目,快速部署运行,所用技术:MySQL,Druid,Log4j2,Maven,Echarts,Bootstrap...免费给大家分享出来前台演示...

SpringBoot+Mysql实现的手机商城附带源码演示导入视频

今天为大家带来的是基于SpringBoot+JPA+Thymeleaf框架的手机商城管理系统,商城系统分为前台和后台、前台用的是Bootstrap框架后台用的是SpringBoot+JPA都是现在主...

全网首发!马士兵内部共享—1658页《Java面试突击核心讲》

又是一年一度的“金九银十”秋招大热门,为助力广大程序员朋友“面试造火箭”,小编今天给大家分享的便是这份马士兵内部的面试神技——1658页《Java面试突击核心讲》!...

SpringBoot数据库操作的应用(springboot与数据库交互)

1.JDBC+HikariDataSource...

SpringBoot 整合 Flink 实时同步 MySQL

1、需求在Flink发布SpringBoot打包的jar包能够实时同步MySQL表,做到原表进行新增、修改、删除的时候目标表都能对应同步。...

SpringBoot + Mybatis + Shiro + mysql + redis智能平台源码分享

后端技术栈基于SpringBoot+Mybatis+Shiro+mysql+redis构建的智慧云智能教育平台基于数据驱动视图的理念封装element-ui,即使没有vue的使...

Springboot+Mysql舞蹈课程在线预约系统源码附带视频运行教程

今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的Springboot+Mysql舞蹈课程在线预约系统,系统项目源代码在【猿来入此】获取!https://www.yuan...

SpringBoot+Mysql在线众筹系统源码+讲解视频+开发文档(参考论文

今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的在线众筹管理系统,主要实现了普通用户在线参与众筹基本操作流程的全部功能,系统分普通用户、超级管理员等角色,除基础脚手架外...

Docker一键部署 SpringBoot 应用的方法,贼快贼好用

这两天发现个Gradle插件,支持一键打包、推送Docker镜像。今天我们来讲讲这个插件,希望对大家有所帮助!GradleDockerPlugin简介...

取消回复欢迎 发表评论: