Flink CDC 实时同步 MySQL 数据

Flink CDC(Change Data Capture)是一个用于实时数据流处理的强大工具,可以用于捕捉和同步数据库中的数据变化。在实际业务场景中,实时同步 MySQL 数据是一个常见的需求,特别是在数据分析、报告生成和数据仓库建设等方面。本文将介绍如何使用 Flink CDC 实现 MySQL 数据的实时同步,并提供一些代码示例。

环境准备

在开始之前,我们需要准备一些基本的环境:

  1. Java Development Kit (JDK):确保安装了 JDK 8 或更高版本。
  2. Apache Flink:下载并解压 Apache Flink 的最新版本。
  3. MySQL 数据库:确保你的机器上有运行中的 MySQL 数据库,并创建一个测试数据库。
  4. Maven:如果需要构建项目,可以使用 Maven 来管理依赖。

以下是一个简单的演示,目标是将 MySQL 中某个表的数据实时同步到另一个数据存储(如 Kafka、ElasticSearch)中。

MySQL 数据库配置

首先,我们需要在 MySQL 中创建一个测试表并插入一些数据。可以使用以下 SQL 语句:

CREATE DATABASE flink_cdc;

USE flink_cdc;

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100),
    age INT
);

INSERT INTO users (name, age) VALUES ('Alice', 30), ('Bob', 25);

Flink CDC 项目代码

接下来,我们需要创建一个 Maven 项目,并在 pom.xml 中添加 Flink CDC 依赖:

<dependencies>
    <dependency>
        <groupId>com.ververica.cdc</groupId>
        <artifactId>debezium-connector-mysql_2.12</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
</dependencies>

Flink CDC 代码示例

下面是一个简单的 Flink 应用程序,用于实时同步 MySQL 数据:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.engine.*;

import java.util.Properties;

public class FlinkCDCMysql {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka Producer 配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092"); // Kafka 地址

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                "flink-output",                  // 目标主题
                new SimpleStringSchema(),        // 序列化方案
                properties);

        // Debezium 配置
        MySqlConnectorConfig config = MySqlConnectorConfig.configure()
                .with("name", "mysql-server")                   // 连接器名称
                .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
                .with("tasks.max", "1")
                .with("database.hostname", "localhost")
                .with("database.port", "3306")
                .with("database.user", "root")
                .with("database.password", "your_password")
                .with("database.server.id", "184054")
                .with("database.server.name", "mysql-server")
                .with("database.include.list", "flink_cdc") // 需要同步的数据库
                .with("table.include.list", "flink_cdc.users") // 需要同步的表
                .build();

        // 启动 Debezium Engine
        DebeziumEngine<String> engine = DebeziumEngine.create(String.class)
                .using(config.asProperties())
                .notifying(record -> {
                    // 将接收到的记录发送到 Kafka
                    env.fromElements(record).sinkTo(kafkaProducer);
                })
                .build();

        // 启动引擎
        new Thread(engine).start();

        // 执行 Flink Job
        env.execute("Flink CDC MySQL Example");
    }
}

代码说明

  • 我们首先设置了 Flink 的执行环境。
  • 然后配置 Kafka 生产者,发送同步数据到 Kafka。
  • 接着,我们用 Debezium 进行 MySQL 的配置,将其连接到我们的 MySQL 数据库。
  • 最后,我们启动 Debezium 引擎,并将捕获到的变更数据发送到 Kafka。

结论

使用 Flink CDC 实现 MySQL 数据的实时同步是一种高效的方式。通过使用 Debezium 作为变更数据捕获工具,我们可以轻松地将数据库中的数据变更实时推送到其他系统。这为数据流处理、实时分析等场景提供了极大的便利。通过上述示例,希望你能快速上手 Flink CDC,并在自己的项目中灵活应用。

点赞(0) 打赏

微信小程序

微信扫一扫体验

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部