4Debezium抽取部署-创新互联

本文目标

debezium,简称dbz,伪装为MySQL从库,当主库发生变化后,主库会主动将变化的信息同步到dbz内,dbz将收到的信息转为JSON推送到Kafka内。

公司主营业务:成都做网站、成都网站建设、成都外贸网站建设、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。成都创新互联是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。成都创新互联推出柳城免费做网站回馈大家。安装JDK11
yum -y install java-11-openjdk-devel
解压部署
tar xfz debezium-server-dist-2.0.0.Final.tar.gz
修改配置文件

application.properties

[yinyx@localhost conf]$ cat application.properties
quarkus.http.port=8999
rkus.log.level=INFO
quarkus.log.console.json=false

debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0

debezium.source.database.hostname=127.0.0.1
debezium.source.database.port=6306
debezium.source.database.user=test
debezium.source.database.password=test
debezium.source.database.server.id=2
debezium.source.database.include.list=test

debezium.source.topic.prefix=yyx
debezium.source.key.converter.schemas.enable=false
debezium.source.value.converter.schemas.enable=false
debezium.source.schema.history.internal.kafka.bootstrap.servers=127.0.0.1:9092
debezium.source.schema.history.internal.kafka.topic=schemahistory

debezium.source.decimal.handling.mode=string
debezium.source.lob.enabled=true
debezium.source.database.history.skip.unparseable.ddl=true
debezium.source.tombstones.on.delete=false

debezium.sink.type=kafka
debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer

debezium.format.key.schemas.enable=false
debezium.format.value.schemas.enable=false

[yinyx@localhost conf]$ 
启动
./run.sh

注意先启动kafka

测试 检查topic是否已经创建
[yinyx@localhost bin]$ ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
__consumer_offsets
schemahistory
yinyx
yyx
yyx.test.t1
启动kafka的消费

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.test.t1

到MySQL更新t1表的数据

insert update delete 随便整

查看kafka消费,应出现类似如下信息

{“before”:{“f1”:3,“f2”:“cc|33”,“f3”:1670056305000},“after”:{“f1”:3,“f2”:“cc|333”,“f3”:1670056305000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670030109000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:25”,“file”:“mysql-bin.000002”,“pos”:7907,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525419,“transaction”:null}

{“before”:{“f1”:2,“f2”:“bb|222”,“f3”:1670002422000},“after”:{“f1”:2,“f2”:“bb|222haha”,“f3”:1670002422000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670031487000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:27”,“file”:“mysql-bin.000002”,“pos”:8561,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525422,“transaction”:null}

总结

至此,MySQL的变化,会实时反应到Kafka的JSON数据里面,后续自己开发程序从Kafka接收处理即可。

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网站标题:4Debezium抽取部署-创新互联
网页地址:http://csdahua.cn/article/diicid.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流