微服务架构下的 Spring Boot 3 集成 Flink CDC 1.17 实现 MySQL 数据同步
随着微服务架构的普及,数据同步成为保证系统一致性和可用性的重要环节。Flink CDC(Change Data Capture)为流处理应用提供了一种有效的方式来捕获数据库的变化,并实时同步到其他系统。本文将介绍如何使用 Spring Boot 3 集成 Flink CDC 1.17,实现 MySQL 数据的实时同步。
一、环境准备
- 开发工具:使用 IntelliJ IDEA 或者任何支持 Maven 的 IDE。
- JDK:Java 17 以上。
- MySQL:安装并配置 MySQL 数据库。
- Maven:管理项目依赖。
二、项目搭建
首先,我们创建一个 Spring Boot 项目,并在 pom.xml
文件中添加必要的依赖。
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Flink CDC -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.17.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc_2.12</artifactId>
<version>1.17.0</version>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
三、配置 MySQL 数据源
在 src/main/resources/application.yml
文件中配置 MySQL 数据源的信息,例如:
spring:
datasource:
url: jdbc:mysql://localhost:3306/your_database
username: your_username
password: your_password
四、编写 Flink CDC 任务
接下来,我们编写一个简单的 Flink CDC 任务,该任务将实时同步 MySQL 表的数据。
1. 创建 DataSyncService
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.mysql.cdc.MySQLSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.createTypeInformation;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@Component
public class DataSyncService {
public void startSync() throws Exception {
// 创建 Flink 流处理环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置 MySQL CDC Source
MySQLSource<String> source = MySQLSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("your_username")
.password("your_password")
.databaseList("your_database") // 监听的数据库
.tableList("your_database.your_table") // 监听的表
.deserializer(new JsonDebeziumDeserializationSchema()) // 数据反序列化
.build();
// 使用 Source 读取数据
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");
// 配置 JDBC Sink
stream.addSink(JdbcSink.sink(
"INSERT INTO target_table (column1, column2) VALUES (?, ?)",
(PreparedStatement ps, String record) -> {
// 设置 SQL 参数
ps.setString(1, record.column1());
ps.setString(2, record.column2());
},
JdbcSinkOptions.create()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://localhost:3306/target_database")
.withUsername("your_username")
.withPassword("your_password")
)
);
// 启动 Flink 任务
env.execute("MySQL CDC Data Sync");
}
}
2. 启动任务
在 Spring Boot 启动类中,我们可以通过 CommandLineRunner
启动数据同步任务。
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class DataSyncApplication {
public static void main(String[] args) {
SpringApplication.run(DataSyncApplication.class, args);
}
@Bean
public CommandLineRunner run(DataSyncService dataSyncService) {
return args -> {
dataSyncService.startSync();
};
}
}
五、总结
通过以上步骤,我们成功实现了 Spring Boot 3 集成 Flink CDC 1.17 的 MySQL 数据同步功能。Flink CDC 提供了一种简便且高效的方式来捕获数据库变更并实时同步,是微服务架构下数据一致性的有效解决方案。希望本篇文章对你有所帮助,激发你更多的实践和探索!