微服务架构下的 Spring Boot 3 集成 Flink CDC 1.17 实现 MySQL 数据同步

随着微服务架构的普及,数据同步成为保证系统一致性和可用性的重要环节。Flink CDC(Change Data Capture)为流处理应用提供了一种有效的方式来捕获数据库的变化,并实时同步到其他系统。本文将介绍如何使用 Spring Boot 3 集成 Flink CDC 1.17,实现 MySQL 数据的实时同步。

一、环境准备

  1. 开发工具:使用 IntelliJ IDEA 或者任何支持 Maven 的 IDE。
  2. JDK:Java 17 以上。
  3. MySQL:安装并配置 MySQL 数据库。
  4. Maven:管理项目依赖。

二、项目搭建

首先,我们创建一个 Spring Boot 项目,并在 pom.xml 文件中添加必要的依赖。

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <!-- Flink CDC -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.12</artifactId>
        <version>1.17.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mysql-cdc_2.12</artifactId>
        <version>1.17.0</version>
    </dependency>

    <!-- MySQL Connector -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.29</version>
    </dependency>

    <!-- Spring Boot Test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

三、配置 MySQL 数据源

src/main/resources/application.yml 文件中配置 MySQL 数据源的信息,例如:

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/your_database
    username: your_username
    password: your_password

四、编写 Flink CDC 任务

接下来,我们编写一个简单的 Flink CDC 任务,该任务将实时同步 MySQL 表的数据。

1. 创建 DataSyncService

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.mysql.cdc.MySQLSource;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.streaming.api.scala.createTypeInformation;

import java.sql.PreparedStatement;
import java.sql.SQLException;

@Component
public class DataSyncService {

    public void startSync() throws Exception {
        // 创建 Flink 流处理环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置 MySQL CDC Source
        MySQLSource<String> source = MySQLSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("your_username")
                .password("your_password")
                .databaseList("your_database") // 监听的数据库
                .tableList("your_database.your_table") // 监听的表
                .deserializer(new JsonDebeziumDeserializationSchema()) // 数据反序列化
                .build();

        // 使用 Source 读取数据
        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");

        // 配置 JDBC Sink
        stream.addSink(JdbcSink.sink(
                "INSERT INTO target_table (column1, column2) VALUES (?, ?)",
                (PreparedStatement ps, String record) -> {
                    // 设置 SQL 参数
                    ps.setString(1, record.column1());
                    ps.setString(2, record.column2());
                },
                JdbcSinkOptions.create()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://localhost:3306/target_database")
                        .withUsername("your_username")
                        .withPassword("your_password")
                )
        );

        // 启动 Flink 任务
        env.execute("MySQL CDC Data Sync");
    }
}

2. 启动任务

在 Spring Boot 启动类中,我们可以通过 CommandLineRunner 启动数据同步任务。

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DataSyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(DataSyncApplication.class, args);
    }

    @Bean
    public CommandLineRunner run(DataSyncService dataSyncService) {
        return args -> {
            dataSyncService.startSync();
        };
    }
}

五、总结

通过以上步骤,我们成功实现了 Spring Boot 3 集成 Flink CDC 1.17 的 MySQL 数据同步功能。Flink CDC 提供了一种简便且高效的方式来捕获数据库变更并实时同步,是微服务架构下数据一致性的有效解决方案。希望本篇文章对你有所帮助,激发你更多的实践和探索!

点赞(0) 打赏

微信小程序

微信扫一扫体验

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部