Flink CDC 实时同步 MySQL 数据
Flink CDC(Change Data Capture)是一个用于实时数据流处理的强大工具,可以用于捕捉和同步数据库中的数据变化。在实际业务场景中,实时同步 MySQL 数据是一个常见的需求,特别是在数据分析、报告生成和数据仓库建设等方面。本文将介绍如何使用 Flink CDC 实现 MySQL 数据的实时同步,并提供一些代码示例。
环境准备
在开始之前,我们需要准备一些基本的环境:
- Java Development Kit (JDK):确保安装了 JDK 8 或更高版本。
- Apache Flink:下载并解压 Apache Flink 的最新版本。
- MySQL 数据库:确保你的机器上有运行中的 MySQL 数据库,并创建一个测试数据库。
- Maven:如果需要构建项目,可以使用 Maven 来管理依赖。
以下是一个简单的演示,目标是将 MySQL 中某个表的数据实时同步到另一个数据存储(如 Kafka、ElasticSearch)中。
MySQL 数据库配置
首先,我们需要在 MySQL 中创建一个测试表并插入一些数据。可以使用以下 SQL 语句:
CREATE DATABASE flink_cdc;
USE flink_cdc;
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
age INT
);
INSERT INTO users (name, age) VALUES ('Alice', 30), ('Bob', 25);
Flink CDC 项目代码
接下来,我们需要创建一个 Maven 项目,并在 pom.xml
中添加 Flink CDC 依赖:
<dependencies>
<dependency>
<groupId>com.ververica.cdc</groupId>
<artifactId>debezium-connector-mysql_2.12</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.0</version>
</dependency>
</dependencies>
Flink CDC 代码示例
下面是一个简单的 Flink 应用程序,用于实时同步 MySQL 数据:
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.api.datastream.DataStream;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.engine.*;
import java.util.Properties;
public class FlinkCDCMysql {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Producer 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092"); // Kafka 地址
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"flink-output", // 目标主题
new SimpleStringSchema(), // 序列化方案
properties);
// Debezium 配置
MySqlConnectorConfig config = MySqlConnectorConfig.configure()
.with("name", "mysql-server") // 连接器名称
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("tasks.max", "1")
.with("database.hostname", "localhost")
.with("database.port", "3306")
.with("database.user", "root")
.with("database.password", "your_password")
.with("database.server.id", "184054")
.with("database.server.name", "mysql-server")
.with("database.include.list", "flink_cdc") // 需要同步的数据库
.with("table.include.list", "flink_cdc.users") // 需要同步的表
.build();
// 启动 Debezium Engine
DebeziumEngine<String> engine = DebeziumEngine.create(String.class)
.using(config.asProperties())
.notifying(record -> {
// 将接收到的记录发送到 Kafka
env.fromElements(record).sinkTo(kafkaProducer);
})
.build();
// 启动引擎
new Thread(engine).start();
// 执行 Flink Job
env.execute("Flink CDC MySQL Example");
}
}
代码说明
- 我们首先设置了 Flink 的执行环境。
- 然后配置 Kafka 生产者,发送同步数据到 Kafka。
- 接着,我们用 Debezium 进行 MySQL 的配置,将其连接到我们的 MySQL 数据库。
- 最后,我们启动 Debezium 引擎,并将捕获到的变更数据发送到 Kafka。
结论
使用 Flink CDC 实现 MySQL 数据的实时同步是一种高效的方式。通过使用 Debezium 作为变更数据捕获工具,我们可以轻松地将数据库中的数据变更实时推送到其他系统。这为数据流处理、实时分析等场景提供了极大的便利。通过上述示例,希望你能快速上手 Flink CDC,并在自己的项目中灵活应用。