PostgreSQL 变化数据捕捉(CDC)
基于CDC(变更数据捕捉)的增量数据集成总体步骤:
1.捕获源数据库中的更改数据
2.将变更的数据转换为您的消费者可以接受的格式
3.将数据发布到消费者或目标数据库
PostgreSQL支持触发器(trigger)和预写日志(WAL)两种CDC机制。
如果您希望 Postgres 数据更改发生时逐行流式传输,则需要逻辑解码或 Postgres 逻辑复制功能。
使用 Postgres 逻辑解码
逻辑解码是 PostgreSQL 的基于日志的 CDC(逻辑复制)的正式名称。逻辑解码使用 PostgreSQL 预写日志的内容来存储数据库中发生的所有活动。
step1:修改PostgreSQL数据库配置postgresql.conf文件
wal_level = logical
max_replication_slots = 10
max_wal_senders = 20
wal_level :设置为 logical,允许 WAL 日志记录逻辑解码所需的信息。
max_replication_slots :确保 max_replication_slots >= 使用 WAL 的 PostgreSQL 连接器的数量加上您的数据库使用的其他复制槽的数量。
max_wal_senders :指定 WAL 的最大并发连接数的参数,确保 max_wal_senders 至少是逻辑复制槽数的两倍。例如,如果您的数据库总共使用 10 个复制槽,则该 max_wal_senders 值必须为 20 或更大。
配置修改以后,需要重复 PostgreSQL 服务生效配置。
step2:为需要同步的数据库(db)创建逻辑复制插槽(replication slot)
关于复制 SQL 函数更多详情,可以参考官方文档(9.26.系统管理函数)介绍: http://www.postgres.cn/docs/12/functions-admin.html
函数:pg_create_logical_replication_slot(slot_name name, plugin name)
返回类型:(slot_name name, lsn pg_lsn)
说明:使用输出插件plugin创建一个名为 slot_name的新逻辑(解码)复制槽。
创建时需要指定逻辑复制插槽名称和输出插件:
SELECT pg_create_logical_replication_slot('replication_slot01', 'test_decoding'); -- 使用 test_decoding 输出插件
SELECT pg_create_logical_replication_slot('replication_slot01', 'pgoutput'); -- 使用 pgoutput 输出插件
注意权限:
如果用户没有权限可能会报错:ERROR: must be superuser or replication role to use replication slots
授权:
\c - postgres
ALTER ROLE test REPLICATION; --流复制权限
\du
验证插槽是否创建成功:
SELECT * FROM pg_replication_slots;
testdb01=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
--------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
replication_slot01 | test_decoding | logical | 16437 | testdb01 | f | f | | | 585 | 0/1807EA8 | 0/1807EE0
replication_slot02 | pgoutput | logical | 16437 | testdb01 | f | f | | | 585 | 0/1809CD8 | 0/1809D10
(2 rows)
testdb01=>
注意:
1)逻辑复制插槽名称:每个逻辑复制插槽都有一个名称,创建时需要指定名称,可以包含小写字母、数字和下划线
2)输出插件:创建逻辑复制插槽时需要指定输出插件,输出插件有:test_decoding、pgoutput、wal2json等
test_decoding 输出插件 :PostgreSQL 9.4+原生附带了 test_decoding 输出插件($PG_HOME/lib 目录下对应有 test_decoding.so),如果您的消费者支持 test_decoding ,则可以使用 test_decoding 输出插件。
pgoutput 输出插件 :PostgreSQL 10+原生附带了 pgoutput 输出插件($PG_HOME/lib 目录下对应有 pgoutput.so),如果您的消费者支持 pgoutput ,则可以使用 pgoutput 输出插件。pgoutput插件输出的是二进制的数据
wal2json 输出插件 :wal2json 是另一个流行的逻辑解码输出插件,PostgreSQL原生不携带、需要数据库服务单独安装插件才能使用。wal2json输出的是json格式的数据
step3:为数据库中的所有表或指定表创建一个发布。如果以指定表方式创建发布则后面可以在发布中管理(添加、删除)表
帮助说明:\h create publication
DROP PUBLICATION IF EXISTS pub01;
CREATE PUBLICATION pub01 FOR TABLE test01;
或者
DROP PUBLICATION IF EXISTS pub01;
CREATE PUBLICATION pub01 FOR TABLE test01, test02, test03;
或者
DROP PUBLICATION IF EXISTS pub01;
CREATE PUBLICATION pub01 FOR ALL TABLES;
查看创建发布的结果:
testdb01=> select * from pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-------+---------+----------+--------------+-----------+-----------+-----------+-------------
24629 | pub01 | 16436 | f | t | t | t | t
(1 row)
也可以执行查看:
testdb01=> \dRp
List of publications
Name | Owner | All tables | Inserts | Updates | Deletes | Truncates
-------+-------+------------+---------+---------+---------+-----------
pub01 | test | f | t | t | t | t
(1 row)
创建发布时,还可以选择在发布中包含哪些操作。例如,下面仅创建table01表 & 仅包含INSERT和UPDATE的发布:
CREATE PUBLICATION pub01 FOR TABLE table01 WITH (publish = 'INSERT, UPDATE');
注意:
非超级用户创建发布只能指定表创建、不能 FOR ALL TABLES 创建,否则报错:ERROR: must be superuser to create FOR ALL TABLES publication
FOR ALL TABLES 只允许超级用户创建发布,或者创建后由超级用户修改已创建的发布的配置开启foralltables。
testdb01=> \c - postgres
testdb01=# update pg_publication set puballtables=true where pubname is not null; -- 设置puballtables开关
testdb01=# select * from pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-------+---------+----------+--------------+-----------+-----------+-----------+-------------
24629 | pub01 | 16436 | t | t | t | t | t
(1 row)
step4:验证指定的表是否在发布中
testdb01=> SELECT * FROM pg_publication_tables WHERE pubname='pub01';
注意:创建发布不会开始复制。它只为未来的订阅者定义一个分组和过滤逻辑。
step5:查看逻辑复制效果:
获取结果的相关命令:
pg_logical_slot_get_changes : 查询并删除数据。仅在第一次返回结果,多次调用可能会返回空结果集,这意味着当get命令执行时,结果会被提供和删除,这增强了我们编写使用这些事件创建表副本的逻辑的能力。
函数:pg_logical_slot_get_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[])
返回类型:(lsn pg_lsn, xid xid, data text)
说明:返回槽slot_name中的改变,从上一次已经被消费的点开始返回。 如果upto_lsn和upto_nchanges为 NULL,逻辑解码将一 直继续到 WAL 的末尾。如果upto_lsn为非 NULL,解码将只包括那些在指 定 LSN 之前提交的事务。如果upto_nchanges为非 NULL, 解码将在其产生的行数超过指定值后停止。不过要注意, 被返回的实际行数可能更大,因为对这个限制的检查只会在增加了解码每个新的提交事务产生 的行之后进行。
pg_logical_slot_get_binary_changes:变化数据以bytea返回。
pg_logical_slot_peek_changes : 只查询不删数据。多次调用每次都会返回相同的结果。是另一个 PostgreSQL 命令,用于在不使用 WAL 条目的情况下查看更改。
函数:pg_logical_slot_peek_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[])
返回类型:(lsn text, xid xid, data text)
说明:行为就像pg_logical_slot_get_changes()函数, 不过改变不会被消费, 即在未来的调用中还会返回这些改变。
pg_logical_slot_peek_binary_changes:变化数据以bytea返回。
关于复制 SQL 函数更多详情,可以参考官方文档(9.26.系统管理函数)介绍: http://www.postgres.cn/docs/12/functions-admin.html
测试表中I/D/U变更数据操作,然后查看逻辑复制效果:
testdb01=> SELECT * FROM pg_logical_slot_peek_changes('replication_slot01', NULL, NULL); -- 只查询不删数据
......
testdb01=> SELECT * FROM pg_logical_slot_get_changes('replication_slot01', NULL, NULL); -- 查询并删除数据
lsn | xid | data
-----------+-----+------------------------------------------------------------------------------------------
0/180A050 | 586 | BEGIN 586
0/180A050 | 586 | table public.test01: INSERT: id[bigint]:1 info[character varying]:'aa' cnt[integer]:123
0/180A160 | 586 | table public.test01: INSERT: id[bigint]:2 info[character varying]:'bb' cnt[integer]:456
0/180A1E8 | 586 | table public.test01: INSERT: id[bigint]:3 info[character varying]:'cc' cnt[integer]:55
0/180A2A0 | 586 | COMMIT 586
0/180A2A0 | 587 | BEGIN 587
0/180A2A0 | 587 | table public.test01: INSERT: id[bigint]:4 info[character varying]:'uuu' cnt[integer]:66
0/180A358 | 587 | COMMIT 587
0/180A358 | 588 | BEGIN 588
0/180A358 | 588 | table public.test01: DELETE: id[bigint]:2
0/180A3D0 | 588 | COMMIT 588
0/180A3D0 | 589 | BEGIN 589
0/180A3D0 | 589 | table public.test01: UPDATE: id[bigint]:3 info[character varying]:'xxxx' cnt[integer]:55
0/180A458 | 589 | COMMIT 589
(14 rows)
testdb01=> SELECT * FROM pg_logical_slot_get_changes('replication_slot01', NULL, NULL); -- 查询并删除数据
lsn | xid | data
-----+-----+------
(0 rows)
testdb01=>
pgoutput输出插件的结果查看,则执行:
testdb01=> SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot02', null, null, 'proto_version', '1', 'publication_names', 'pub01'); -- 查询但不删除数据
......
testdb01=> SELECT * FROM pg_logical_slot_get_binary_changes('replication_slot02', null, null, 'proto_version', '1', 'publication_names', 'pub01'); -- 查询并删除数据
lsn | xid | data
-----------+-----+------------------------------------------------------------------------------------------------------------------------------
0/1809DF8 | 585 | \x420000000001809fd0000296c5f556e57d00000249
0/1809DF8 | 585 | \x52000060417075626c696300746573743031006400030169640000000014ffffffff00696e666f00000004130000006800636e740000000017ffffffff
0/1809DF8 | 585 | \x44000060414b00037400000001316e6e
0/1809F40 | 585 | \x44000060414b00037400000001346e6e
0/1809F88 | 585 | \x44000060414b00037400000001336e6e
0/180A000 | 585 | \x43000000000001809fd0000000000180a000000296c5f556e57d
0/180A050 | 586 | \x42000000000180a270000296c5f7bb75300000024a
0/180A050 | 586 | \x49000060414e0003740000000131740000000261617400000003313233
0/180A160 | 586 | \x49000060414e0003740000000132740000000262627400000003343536
0/180A1E8 | 586 | \x49000060414e00037400000001337400000002636374000000023535
0/180A2A0 | 586 | \x4300000000000180a270000000000180a2a0000296c5f7bb7530
0/180A2A0 | 587 | \x42000000000180a328000296c5f7d8abe90000024b
0/180A2A0 | 587 | \x49000060414e0003740000000134740000000375757574000000023636
0/180A358 | 587 | \x4300000000000180a328000000000180a358000296c5f7d8abe9
0/180A358 | 588 | \x42000000000180a3a0000296c5f80ae6ca0000024c
0/180A358 | 588 | \x44000060414b00037400000001326e6e
0/180A3D0 | 588 | \x4300000000000180a3a0000000000180a3d0000296c5f80ae6ca
0/180A3D0 | 589 | \x42000000000180a428000296c5f826073e0000024d
0/180A3D0 | 589 | \x55000060414e000374000000013374000000047878787874000000023535
0/180A458 | 589 | \x4300000000000180a428000000000180a458000296c5f826073e
(20 rows)
testdb01=> SELECT * FROM pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
--------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
replication_slot01 | test_decoding | logical | 16437 | testdb01 | f | f | | | 590 | 0/180A540 | 0/180A578
replication_slot02 | pgoutput | logical | 16437 | testdb01 | f | f | | | 590 | 0/180A540 | 0/180A578
(2 rows)
PG logical replication plug-in ("pgoutput") 输出的消息格式详情可以参考PG官方说明:https://www.postgresql.org/docs/10/protocol-logicalrep-message-formats.html
step6:销毁逻辑复制插槽
SELECT pg_drop_replication_slot('replication_slot01');
使用复制插槽的注意事项:
1)每个插槽只有一个输出插件(创建插槽时由您来选择使用哪个)。
2)每个插槽仅提供来自一个数据库的更改。
3)一个数据库可以有多个插槽。
4)每个数据更改通常在每个插槽中发出一次。
5)但是当 Postgres 实例重新启动时,插槽可能会重新发出更改。消费者必须处理这种情况。
6)未使用的插槽对 Postgres 实例的可用性构成威胁。Postgres 将为这些未使用的更改保存所有 WAL 文件。这可能导致存储溢出。
关于 PostgreSQL WAL 消费者:
可以获取 Postgres 逻辑解码流的任何应用程序都是 PostgreSQL WAL 消费者。
pg_recvlogical:
pg_recvlogical 是一个 PostgreSQL 应用程序,他是 PostgreSQL 原生的逻辑解码工具,它可以管理槽并使用槽中的流。它包含在 Postgres 发行版中,因此它可能已经随 PostgreSQL 一起安装,在 $PG_HOME/bin 目录下。
pg_recvlogical 使用默认的 test_decoding 逻辑解码插件,pg安装完后会在pg安装目录的lib目录下创建test_decoding链接库文件。
pg_recvlogical 使用介绍:
创建逻辑复制插槽:
$ pg_recvlogical --create-slot -S replication_slot01 -d testdb01
启动复制槽解码(启动后会实时的将日志解码到制定的文件中,也可以不启动、等需要解码时再启动解码)
$ pg_recvlogical --start -S replication_slot01 -d testdb01 -f replication_slot01_decoding.log &
查看解码结果:
$ cat replication_slot01_decoding.log
查看数据库lsn:
postgres=# select pg_current_wal_lsn();
使用pg_recvlogical进行日志区间解码:
$ pg_recvlogical --start -S replication_slot01 -d testdb01 -I 4C/180B020 -E 4C/180B1EE -f ret.log