Debezium 是一组分布式服务,用于捕获数据库中的更改(通过读取数据库日志的方式来完成数据增删改的记录),以便您的应用程序可以看到这些更改并做出响应。Debezium 将每个数据库表中的所有行级更改记录在更改事件流中,应用程序只需读取这些流,即可按更改事件发生的顺序查看更改事件。
目前debezium有三种部署方式:
Kafka Connect 模式
Debezium 作为一个 Kafka Connect 的 Source Connector 运行,将数据库的变更事件发送到 Kafka 中。
适用场景:适用于需要将数据库变更事件持久化到 Kafka,并且需要 Kafka Connect 提供的分布式、可扩展和容错能力的场景。
主要特点:可以利用 Kafka 的可靠性和容错性,支持高吞吐量和低延迟的数据传输。
Debezium Server 模式
Debezium Server 是一个独立的应用程序,它可以将数据库的变更事件流式传输到各种消息传递基础设施,如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。
适用场景:适用于需要将数据库变更事件发送到非 Kafka 的消息队列或流处理系统的场景。
主要特点:提供了更多的灵活性,可以支持多种不同的消息传递基础设施。
Embedded Engine 模式
在这种模式下,Debezium 不通过 Kafka Connect 运行,而是作为一个库嵌入到自定义的 Java 应用程序中。
适用场景:适用于需要在应用程序内部直接消费数据库变更事件,而不希望通过 Kafka 进行中转的场景。
主要特点:减少了对外部系统的依赖,适合于轻量级的应用程序或微服务架构。
Debezium特点:
简单易上手
快速稳定,可以扩展,可以通过kafka构建
能够监控多种数据库 mysql pgsql等等
下面介绍基于kafka connector部署 Debeziunm:
Strimzi简化了Kafka在Kubernetes上的部署和管理:
安装Strimzi Operator
1 | # 添加Strimzi Helm仓库 |
这个是添加的最新的仓库,我需要安装的是历史版本(0.33.0)的,所以把strimzi-kafka-operator-helm-3-chart-0.33.0.tgz下载到本地安装:
1 | helm install strimzi-kafka ./strimzi-kafka-operator-helm-3-chart-0.33.0.tgz -n qifu-develop |
安装完成后会有这个pod:

ps:卸载命令:
1 | helm uninstall strimzi-kafka -n qifu-develop |
部署kafka集群:
debezium-cluster.yaml文件:
1 | apiVersion: kafka.strimzi.io/v1beta2 |
部署:
1 | kubectl apply -f debezium-cluster.yaml |
部署完成后会有3个pod:

构建包含Debezium插件的kafka Connect镜像
创建Dockerfile,将Debezium插件添加到Kafka Connect:
1 | FROM quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 |
构建并推送镜像到容器仓库:
1 | docker build -t harbor.qifu.com/qifu-develop/debezium-connect:2.3.4.Final . |
部署Kafka Connect
创建KafkaConnect资源(kafka-connect.yaml
):
1 | apiVersion: kafka.strimzi.io/v1beta2 |
部署:
1 | kubectl apply -f kafka-connect.yaml |
部署完成后会有以下pod

创建Debezium连接器
创建连接器配置(mysql-connector-test.yaml
):
1 | apiVersion: kafka.strimzi.io/v1beta2 |
部署:
1 | kubectl apply -f mysql-connector-test.yaml |
验证:
可以到debezium-connect-cluster-connect容器里执行以下命令查看连接器:
1 | curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/mysql-connector-test |

或者使用以下命令查看:
1 | kubectl describe KafkaConnector mysql-connector-test -n qifu-develop |

查看Kafka主题中的变更事件:
1 | kubectl -n qifu-develop run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.33.0-kafka-3.2.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables |
登录kafka容器查看主题列表:
1 | bin/kafka-topics.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list |
启动一个消费者,消费主题:
1 | bin/kafka-console-consumer.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --topic test.qifu_saas_oms.tables |
查看消费群组详情:
1 | bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --list |

查看消费群组消息积压:
1 | bin/kafka-consumer-groups.sh --bootstrap-server debezium-cluster-kafka-bootstrap:9092 --describe --group consumer-oms |

修改连接器配置
之后需要修改mysql连接器配置的话,可以到debezium-connect-cluster-connect容器里执行以下命令修改:
1 | curl -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/mysql-connector-test/config \ |
删除连接器:
1 | curl -i -X DELETE localhost:8083/connectors/mysql-connector-test/ |
或者修改mysql-connector-test.yaml文件之后再部署:
1 | kubectl apply -f mysql-connector-test.yaml |
删除连接器:
1 | kubectl delete -f mysql-connector-test.yaml |
遇到的问题
问题1:
遇到开发环境部署mysql连接器之后同步完数据后报错’performance_schema.session_status’ doesn’t exist,然后重复一直同步的问题。到数据库里查看performance_schema库,确实没有session_status表,其他环境的数据库都有。
解决办法:
在开发环境数据库执行以下命令:
1 | mysql_upgrade -u root -p --force |

然后就会重新生成performance_schema.session_status,需要重启数据库生效。
问题2:
mysql连接器配置里加了include.query: true,并且mysql开启binlog_rows_query_log_events之后,会记录具体的sql语句,其他字段的中文正常显示,但是query字段的中文会乱码:

问了Debezium社区的人,他们说3.0的版本没有这个问题,于是用docker临时启动了服务来测试:
zookeeper:
1 | docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 --security-opt seccomp=unconfined quay.io/debezium/zookeeper:3.0 |
kafka:
1 | docker run -it --rm --name kafka -p 9092:9092 --security-opt seccomp=unconfined --link zookeeper:zookeeper quay.io/debezium/kafka:3.0 |
mysql:
1 | docker run -it --rm --name mysql -p 3306:3306 -v /root/my.cnf:/etc/my.cnf --security-opt seccomp=unconfined -e LANG=en_US.UTF-8 -e LC_ALL=en_US.UTF-8 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:3.0 |
connect:
1 | docker run -it --rm --name connect -p 8083:8083 --security-opt seccomp=unconfined -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:3.0 |
注册mysql连接器:
1 | curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "dev-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "10.168.2.187", "database.port": "30336", "database.user": "root", "database.password": "******", "database.server.id": "184059", "topic.prefix": "dev", "database.include.list": "dev_qifu_saas_aggregation", "table.include.list": "dev_qifu_saas_aggregation.base_warehouse_area", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory-dev", "include.schema.changes": "true", "include.query":"true", "provide.transaction.metadata": "true" } }' |
查看消费内容:

发现确实可以正常显示query字段的中文,研发说这个不影响,就没在k8s上面升级版本,在此记录下docker部署的3.0版本各组件的版本情况,以便以后k8s部署的Debezium服务如果需要升级,可直接安装对应版本:
- zookeeper:3.8.4
- kafka:3.9.0
- Debezium:3.0.8.final