Debezium通过监听MySQL binlog实现数据实时同步,需配置MySQL、部署Connector、设置Kafka Connect并消费变更事件;选择合适配置需根据需求设定server.id、连接信息、包含/排除表及快照模式;变更事件以JSON格式发布至Kafka,含before、after、op等字段,下游应用解析后执行对应操作;可通过Kafka Streams或Flink处理;使用Kafka Connect REST API和JMX指标监控Connector状态与性能;Schema演化通过Schema History Topic和注册表(如Confluent Schema Registry)管理;初始快照可配置模式与锁策略以减少数据库压力;性能优化包括提升资源、调整参数与数据库配置;数据一致性可通过事务性Outbox、Heartbeat、Kafka事务及数据比对保障。

Debezium通过捕获MySQL的变更数据,可以实时同步数据到其他系统,实现数据集成和微服务架构。它监听MySQL的binlog,将数据变更转化为事件流,供下游应用消费。
使用Debezium进行MySQL CDC实战主要涉及配置MySQL、部署Debezium Connector、配置Kafka Connect以及消费变更事件。
配置MySQL以启用binlog。 部署Debezium Connector到Kafka Connect集群。 配置Connector以连接到MySQL数据库并指定要捕获的数据库和表。 下游应用通过Kafka消费变更事件。
选择合适的Debezium Connector配置取决于你的具体需求。关键配置包括:
database.server.id
database.hostname
database.port
database.user
database.password
database.include.list
database.exclude.list
table.include.list
table.exclude.list
snapshot.mode
initial
never
topic.prefix
例如,如果你只想捕获
inventory
customers
{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "inventory",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "initial"
  }
}这个配置首先指定了连接器类型为
MySqlConnector
database.server.name
database.include.list
table.include.list
database.history.kafka.bootstrap.servers
database.history.kafka.topic
snapshot.mode
initial
Debezium捕获的变更事件以JSON格式发布到Kafka主题。每个事件包含
before
after
source
op
before
null
after
null
source
op
c
u
d
r
下游应用需要解析这些JSON事件,并根据
op
op
c
after
op
u
op
d
使用Kafka Streams或Apache Flink等流处理框架可以方便地处理这些事件。例如,使用Kafka Streams可以这样处理:
KStream<String, String> stream = builder.stream("inventory.customers");
stream.foreach((key, value) -> {
  try {
    JsonNode root = objectMapper.readTree(value);
    String op = root.get("payload").get("op").asText();
    if ("c".equals(op)) {
      JsonNode after = root.get("payload").get("after");
      // 将after中的数据插入到目标数据库
      System.out.println("Insert: " + after.toString());
    } else if ("u".equals(op)) {
      JsonNode after = root.get("payload").get("after");
      // 更新目标数据库中对应的数据
      System.out.println("Update: " + after.toString());
    } else if ("d".equals(op)) {
      JsonNode before = root.get("payload").get("before");
      // 从目标数据库中删除对应的数据
      System.out.println("Delete: " + before.toString());
    }
  } catch (Exception e) {
    e.printStackTrace();
  }
});这段代码从
inventory.customers
op
监控和管理Debezium Connector对于确保数据同步的稳定性和可靠性至关重要。Kafka Connect提供了REST API,可以用于监控Connector的状态、配置和任务。
可以使用以下API来获取Connector的状态:
curl -X GET http://localhost:8083/connectors/mysql-inventory-connector/status
这个API会返回Connector的状态信息,包括状态(running、failed等)、任务状态以及错误信息。
还可以使用以下API来更新Connector的配置:
curl -X PUT \
  http://localhost:8083/connectors/mysql-inventory-connector/config \
  -H 'Content-Type: application/json' \
  -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "inventory",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "never"
  }'这个API会更新Connector的配置,例如修改
snapshot.mode
never
除了Kafka Connect API,还可以使用Debezium提供的JMX指标来监控Connector的性能。这些指标包括捕获的事件数量、延迟、错误率等。
如果Connector出现问题,例如无法连接到MySQL或无法解析binlog事件,可以查看Kafka Connect的日志来排查问题。
Schema演化是CDC过程中常见的问题。当MySQL表的结构发生变化时,例如添加、删除或修改列,Debezium需要能够正确处理这些变化。
Debezium通过Schema History Topic来管理Schema演化。每次表的结构发生变化时,Debezium会将新的Schema信息写入到Schema History Topic。下游应用可以读取Schema History Topic,并根据新的Schema来解析变更事件。
为了处理Schema演化,可以使用Avro或Protobuf等Schema注册表。这些注册表可以存储Schema信息,并为每个Schema分配一个唯一的ID。Debezium可以将Schema ID写入到变更事件中,下游应用可以使用Schema ID从注册表中获取Schema信息。
例如,使用Confluent Schema Registry可以这样配置Debezium Connector:
{
  "name": "mysql-inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql.example.com",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "inventory",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "snapshot.mode": "initial",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}这个配置指定了使用AvroConverter作为Key和Value的转换器,并配置了Schema Registry的URL。Debezium会将Avro Schema信息写入到Schema Registry,并将Schema ID写入到变更事件中。
下游应用可以使用Confluent提供的AvroDeserializer来读取变更事件,并从Schema Registry中获取Schema信息。
Debezium Connector在首次启动时会执行初始快照,将MySQL数据库中的所有数据读取到Kafka主题。初始快照可能会对MySQL数据库造成性能影响,特别是对于大型数据库。
为了减少初始快照对MySQL数据库的影响,可以采取以下措施:
snapshot.mode
schema_only
never
snapshot.locking.mode
minimal
snapshot.new.tables
如果初始快照失败,可以查看Kafka Connect的日志来排查问题。常见的错误包括无法连接到MySQL、权限不足或内存不足。
优化Debezium Connector的性能可以提高数据同步的速度和可靠性。以下是一些优化建议:
tasks.max
consumer.override.max.poll.records
binlog_format
binlog_row_image
确保Debezium Connector的数据一致性是CDC的关键目标。以下是一些建议:
总的来说,使用Debezium进行MySQL CDC需要仔细规划和配置,并根据实际情况进行优化。通过合理的配置和监控,可以实现高效、可靠的数据同步,为微服务架构和数据集成提供强大的支持。
以上就是使用Debezium进行MySQL变更数据捕获(CDC)实战的详细内容,更多请关注php中文网其它相关文章!
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
                
                                
                                
                                
                                
                                
                                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号