Debezium连接器与Kafka Connect框架一起使用,以捕获数据库中的更改并生成更改事件。然后,Kafka Connect工作程序将为连接器配置的转换应用于连接器生成的每个消息,使用工作程序的转换器将每个消息键和值序列化为二进制形式,最后将每个消息写入正确的Kafka主题。
Debezium的PostgreSQL连接器包含两个不同的部分,它们可以一起工作,以便能够读取和处理服务器更改,必须在PostgreSQL服务器中安装和配置的逻辑解码输出插件,其中之一:
- decoderbufs(由Debezium社区维护,基于ProtoBuf)
- wal2json(由wal2json社区维护,基于JSON)
- pgoutput,PostgreSQL 10+中的标准逻辑解码插件(由Postgres社区维护,由Postgres自身用于逻辑复制);该插件始终存在,这意味着不必安装任何其他库,并且Debezium连接器将直接将原始复制事件流解释为更改事件。
由于我虚拟机安装的是PostgreSQL 9.6.8版本,所以并不支持pgoutput插件,所以需要额外安装。Debezium官网有安装wal2json的教程,为了方便起见,这里安装wal2json插件。
官方文档:
Debezium connector for PostgreSQL
wal2json Installation
sudo yum install wal2json12 |
编译成功的信息显示:
Cloning into 'wal2json'... |
如果在make时,检测到没有权限,则使用root账户执行命令chmod 777 -R /dic
PostgreSQL Server Configuration
Setting up libraries, WAL and replication parameters
编辑$PGDATA目录中postgresql.conf
vi $PGDATA/postgresql.conf |
Setting up replication permissions
编辑$PGDATA目录中pg_hba.conf文件
vi $PGDATA/pg_hba.conf |
编辑完以上两文件,重启数据库服务:
pg_ctl restart |
Database Test Environment Set-up
--切换到postgres用户,进入到postgresql交互式命令行 |
Decoding Output Plug-in Test
- 使用
wal2json
,为数据库test
创建一个名叫test_slot
的slot
pg_recvlogical -d test --slot test_slot --create-slot -P wal2json |
- 开始使用
test_slot
对数据库test
进行数据streaming变化的监测
pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f - |
- 对表
test_table
做INSERT
/UPDATE
/DELETE
操作
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1'); |
在监测窗口会接收到如下信息:
Output for INSERT
event
{ |
Output for UPDATE
event
{ |
Output for DELETE
event
{ |
当测试完成,对数据库test
进行监测的test_slot
也可以被移除:
pg_recvlogical -d test --slot test_slot --drop-slot |
至此,wal2json插件算是安装成功并测试通过了。
Debezium PostgreSQL Connector相关配置
Debezium PostgreSQL Connector 安装
注意版本问题:目前的稳定版为1.2.0,我下载的是1.0.3
将下载好的debezium-connector-postgres-1.0.3.Final-plugin.tar.gz文件解压到kafka对应的connect目录下
[monkey@hadoop102 kafka]$ cd /opt/module/kafka |
Kafka Connect目前支持两种模式: standalone(单进程)和distributed。
由于我是单机,所以用standalone模式来测试。
编辑worker配置文件:
[monkey@hadoop102 kafka]$ cd /opt/module/kafka/config |
Connector 配置样例
配置样例有两种形式:本地编辑properties文件和使用POST
方式提交到Kafka Connect 服务
1、properties文件格式
name=student-connector |
2、使用POST方式,提交JSON格式文件
{ |
以上的slot.name
为从插件和数据库实例进行流式更改而创建的Postgres逻辑解码插槽的名称。
值必须符合Postgres复制插槽的命名规则,该规则指出:“每个复制插槽都有一个名称,该名称可以包含小写字母,数字和下划线字符”。不指定默认为debezium,如果需要添加多个connector,不指定的话,会报错:Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID 52197
standalone模式测试
1、开启connector
[monkey@hadoop102 kafka]$ bin/connect-standalone.sh config/connect-standalone.properties postgres-student.properties |
2、操作PostgreSQL数据库test下的student表
test=# INSERT INTO student (id, name) VALUES('5', 'sam'); |
3、启用kafka消费者,查看对应topic的消息接收情况
[monkey@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic info.public.student |
Output for INSERT
event
{ |
Output for UPDATE
event
{ |
Output for DEETE
event
{ |
主要有用的信息在:
{ |
distributed模式测试
[monkey@hadoop102 kafka]$ bin/connect-distributed.sh config/connect-distributed.properties |
得到的结果和standalone模式是一致的。