使用Debezium同步PostgreSQL数据至Kafka

Debezium连接器与Kafka Connect框架一起使用,以捕获数据库中的更改并生成更改事件。然后,Kafka Connect工作程序将为连接器配置的转换应用于连接器生成的每个消息,使用工作程序的转换器将每个消息键和值序列化为二进制形式,最后将每个消息写入正确的Kafka主题。

Debezium的PostgreSQL连接器包含两个不同的部分,它们可以一起工作,以便能够读取和处理服务器更改,必须在PostgreSQL服务器中安装和配置的逻辑解码输出插件,其中之一:

  • decoderbufs(由Debezium社区维护,基于ProtoBuf)
  • wal2json(由wal2json社区维护,基于JSON)
  • pgoutput,PostgreSQL 10+中的标准逻辑解码插件(由Postgres社区维护,由Postgres自身用于逻辑复制);该插件始终存在,这意味着不必安装任何其他库,并且Debezium连接器将直接将原始复制事件流解释为更改事件。

由于我虚拟机安装的是PostgreSQL 9.6.8版本,所以并不支持pgoutput插件,所以需要额外安装。Debezium官网有安装wal2json的教程,为了方便起见,这里安装wal2json插件。

官方文档:

Debezium connector for PostgreSQL

Kafka Connect

wal2json Installation

sudo yum install wal2json12
cd /opt/module
git clone https://github.com/eulerto/wal2json -b master --single-branch
cd wal2json
make
make install

编译成功的信息显示:

Cloning into 'wal2json'...
remote: Counting objects: 445, done.
remote: Total 445 (delta 0), reused 0 (delta 0), pack-reused 445
Receiving objects: 100% (445/445), 180.70 KiB | 0 bytes/s, done.
Resolving deltas: 100% (317/317), done.
Note: checking out 'd2b7fef021c46e0d429f2c1768de361069e58696'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

git checkout -b new_branch_name

HEAD is now at d2b7fef... Improve style
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -I. -I./ -I/usr/pgsql-9.6/include/server -I/usr/pgsql-9.6/include/internal -D_GNU_SOURCE -I/usr/include/libxml2 -I/usr/include -c -o wal2json.o wal2json.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -g -pipe -Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -m64 -mtune=generic -fPIC -L/usr/pgsql-9.6/lib -Wl,--as-needed -L/usr/lib64 -Wl,--as-needed -Wl,-rpath,'/usr/pgsql-9.6/lib',--enable-new-dtags -shared -o wal2json.so wal2json.o
/usr/bin/mkdir -p '/usr/pgsql-9.6/lib'
/usr/bin/install -c -m 755 wal2json.so '/usr/pgsql-9.6/lib/'

如果在make时,检测到没有权限,则使用root账户执行命令chmod 777 -R /dic

PostgreSQL Server Configuration

Setting up libraries, WAL and replication parameters

编辑$PGDATA目录中postgresql.conf

vi $PGDATA/postgresql.conf
#编辑内容如下:
listen_addresses = '*'
port = 5432
wal_level = logical
max_wal_senders = 8
wal_keep_segments = 4
max_replication_slots = 4
shared_preload_libraries = 'wal2json'

Setting up replication permissions

编辑$PGDATA目录中pg_hba.conf文件

vi $PGDATA/pg_hba.conf
#编辑内容如下:

# IPv4 local connections:
host all all 0.0.0.0/0 md5
# replication privilege.
local replication postgres trust
host replication postgres 127.0.0.1/32 trust
host replication postgres ::1/128 trust
host replication postgres 192.168.142.102/32 trust

编辑完以上两文件,重启数据库服务:

pg_ctl restart

Database Test Environment Set-up

--切换到postgres用户,进入到postgresql交互式命令行
[postgres@hadoop102 monkey]$ psql

--创建测试库和测试表
postgres=# CREATE DATABASE test;
postgres=# \c test;
postgres=# CREATE TABLE test_table (
id char(10) NOT NULL,
code char(10),
PRIMARY KEY (id)
);

Decoding Output Plug-in Test

  • 使用wal2json,为数据库test创建一个名叫test_slot的slot
pg_recvlogical -d test --slot test_slot --create-slot -P wal2json
  • 开始使用 test_slot 对数据库 test进行数据streaming变化的监测
pg_recvlogical -d test --slot test_slot --start -o pretty-print=1 -f -
  • 对表test_tableINSERT/UPDATE/DELETE 操作
test=# INSERT INTO test_table (id, code) VALUES('id1', 'code1');
INSERT 0 1
test=# update test_table set code='code2' where id='id1';
UPDATE 1
test=# delete from test_table where id='id1';
DELETE 1

在监测窗口会接收到如下信息:

Output for INSERT event

{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code1 "]
}
]
}

Output for UPDATE event

{
"change": [
{
"kind": "update",
"schema": "public",
"table": "test_table",
"columnnames": ["id", "code"],
"columntypes": ["character(10)", "character(10)"],
"columnvalues": ["id1 ", "code2 "],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["id1 "]
}
}
]
}

Output for DELETE event

{
"change": [
{
"kind": "delete",
"schema": "public",
"table": "test_table",
"oldkeys": {
"keynames": ["id"],
"keytypes": ["character(10)"],
"keyvalues": ["id1 "]
}
}
]
}

当测试完成,对数据库test进行监测的test_slot也可以被移除:

pg_recvlogical -d test --slot test_slot --drop-slot

至此,wal2json插件算是安装成功并测试通过了。

Debezium PostgreSQL Connector相关配置

Debezium PostgreSQL Connector 安装

connector’s plug-in archive

注意版本问题:目前的稳定版为1.2.0,我下载的是1.0.3

将下载好的debezium-connector-postgres-1.0.3.Final-plugin.tar.gz文件解压到kafka对应的connect目录下

[monkey@hadoop102 kafka]$ cd /opt/module/kafka
[monkey@hadoop102 kafka]$ mkdir connect
[monkey@hadoop102 kafka]$ cd /opt/software
[monkey@hadoop102 software]$ tar -zxvf debezium-connector-postgres-1.0.3.Final-plugin.tar.gz -C /opt/module/kafka/connect

Kafka Connect目前支持两种模式: standalone(单进程)和distributed。

由于我是单机,所以用standalone模式来测试。

编辑worker配置文件:

[monkey@hadoop102 kafka]$ cd /opt/module/kafka/config
[monkey@hadoop102 kafka]$ vi connect-standalone.properties
# 编辑最后一行plugin.path,路径为Debezium connector的jar file所在目录,不用配置到最底层目录(注意可配置多个路径,单个路径也要以,结尾)
plugin.path=/opt/module/kafka/connect,

image-20200813103317383

Connector 配置样例

配置样例有两种形式:本地编辑properties文件和使用POST方式提交到Kafka Connect 服务

1、properties文件格式

name=student-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
tasks.max=1
database.hostname=192.168.142.102
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=test
database.server.name=info
table.whitelist=public.student
plugin.name=wal2json
slot.name=my_slot

2、使用POST方式,提交JSON格式文件

{
"name": "student-connector",
"config": {
"name": "student-connector",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "192.168.142.102",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "test",
"database.server.name": "info",
"table.whitelist": "public.student",
"plugin.name": "wal2json",
"slot.name":"my_slot"
}
}

以上的slot.name为从插件和数据库实例进行流式更改而创建的Postgres逻辑解码插槽的名称。
值必须符合Postgres复制插槽的命名规则,该规则指出:“每个复制插槽都有一个名称,该名称可以包含小写字母,数字和下划线字符”。不指定默认为debezium,如果需要添加多个connector,不指定的话,会报错:
Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "debezium" is active for PID 52197

standalone模式测试

1、开启connector

[monkey@hadoop102 kafka]$ bin/connect-standalone.sh config/connect-standalone.properties postgres-student.properties

2、操作PostgreSQL数据库test下的student表

test=# INSERT INTO student (id, name) VALUES('5', 'sam');
INSERT 0 1
test=# update student set name = 'ethan' where id = '5';
UPDATE 1
test=# delete from student where id='5';
DELETE 1

3、启用kafka消费者,查看对应topic的消息接收情况

[monkey@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic info.public.student

Output for INSERT event

{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"name"
}
],
"optional":true,
"name":"info.public.student.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"name"
}
],
"optional":true,
"name":"info.public.student.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"info.public.student.Envelope"
},
"payload":{
"before":null,
"after":{
"id":"5 ",
"name":"sam "
},
"source":{
"version":"1.0.3.Final",
"connector":"postgresql",
"name":"info",
"ts_ms":1583920562395,
"snapshot":"false",
"db":"test",
"schema":"public",
"table":"student",
"txId":1760,
"lsn":23397480,
"xmin":null
},
"op":"c",
"ts_ms":1583920562442
}
}

Output for UPDATE event

{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"name"
}
],
"optional":true,
"name":"info.public.student.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"name"
}
],
"optional":true,
"name":"info.public.student.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"info.public.student.Envelope"
},
"payload":{
"before":{
"id":"5 ",
"name":null
},
"after":{
"id":"5 ",
"name":"ethan "
},
"source":{
"version":"1.0.3.Final",
"connector":"postgresql",
"name":"info",
"ts_ms":1583920898322,
"snapshot":"false",
"db":"test",
"schema":"public",
"table":"student",
"txId":1761,
"lsn":23398864,
"xmin":null
},
"op":"u",
"ts_ms":1583920898326
}
}

Output for DEETE event

{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"name"
}
],
"optional":true,
"name":"info.public.student.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":true,
"field":"name"
}
],
"optional":true,
"name":"info.public.student.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"info.public.student.Envelope"
},
"payload":{
"before":{
"id":"5 ",
"name":null
},
"after":null,
"source":{
"version":"1.0.3.Final",
"connector":"postgresql",
"name":"info",
"ts_ms":1583921079909,
"snapshot":"false",
"db":"test",
"schema":"public",
"table":"student",
"txId":1762,
"lsn":23399936,
"xmin":null
},
"op":"d",
"ts_ms":1583921079912
}
}

主要有用的信息在:

{
"payload":{
"before":{
"id":"5 ",
"name":null
},
"after":{
"id":"5 ",
"name":"ethan "
},
"source":{
"version":"1.0.3.Final",
"connector":"postgresql",
"name":"info",
"ts_ms":1583920898322,
"snapshot":"false",
"db":"test",
"schema":"public",
"table":"student",
"txId":1761,
"lsn":23398864,
"xmin":null
},
"op":"u",
"ts_ms":1583920898326
}
}
distributed模式测试
[monkey@hadoop102 kafka]$ bin/connect-distributed.sh config/connect-distributed.properties

得到的结果和standalone模式是一致的。

------ 本文结束感谢您的阅读 ------
坚持原创技术分享,您的支持将鼓励我继续创作!