PostgreSQL 变化数据捕捉(CDC)

news/2024/7/9 20:21:13 标签: postgresql, 数据库, oracle

PostgreSQL 变化数据捕捉(CDC)

基于CDC(变更数据捕捉)的增量数据集成总体步骤:

1.捕获源数据库中的更改数据

2.将变更的数据转换为您的消费者可以接受的格式

3.将数据发布到消费者或目标数据库

PostgreSQL支持触发器(trigger)和预写日志(WAL)两种CDC机制。

如果您希望 Postgres 数据更改发生时逐行流式传输,则需要逻辑解码或 Postgres 逻辑复制功能。

使用 Postgres 逻辑解码

逻辑解码是 PostgreSQL 的基于日志的 CDC(逻辑复制)的正式名称。逻辑解码使用 PostgreSQL 预写日志的内容来存储数据库中发生的所有活动。

step1:修改PostgreSQL数据库配置postgresql.conf文件

wal_level = logical

max_replication_slots = 10

max_wal_senders = 20

wal_level :设置为 logical,允许 WAL 日志记录逻辑解码所需的信息。

max_replication_slots :确保 max_replication_slots >= 使用 WAL 的 PostgreSQL 连接器的数量加上您的数据库使用的其他复制槽的数量。

max_wal_senders :指定 WAL 的最大并发连接数的参数,确保 max_wal_senders 至少是逻辑复制槽数的两倍。例如,如果您的数据库总共使用 10 个复制槽,则该 max_wal_senders 值必须为 20 或更大。

配置修改以后,需要重复 PostgreSQL 服务生效配置。

step2:为需要同步的数据库(db)创建逻辑复制插槽(replication slot)

关于复制 SQL 函数更多详情,可以参考官方文档(9.26.系统管理函数)介绍: http://www.postgres.cn/docs/12/functions-admin.html

函数:pg_create_logical_replication_slot(slot_name name, plugin name)

返回类型:(slot_name name, lsn pg_lsn)

说明:使用输出插件plugin创建一个名为 slot_name的新逻辑(解码)复制槽。

创建时需要指定逻辑复制插槽名称和输出插件:

SELECT pg_create_logical_replication_slot('replication_slot01', 'test_decoding'); -- 使用 test_decoding 输出插件

SELECT pg_create_logical_replication_slot('replication_slot01', 'pgoutput'); -- 使用 pgoutput 输出插件

注意权限:

如果用户没有权限可能会报错:ERROR: must be superuser or replication role to use replication slots

授权:

\c - postgres

ALTER ROLE test REPLICATION; --流复制权限

\du

验证插槽是否创建成功:

SELECT * FROM pg_replication_slots;

testdb01=> SELECT * FROM pg_replication_slots;

slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

--------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

replication_slot01 | test_decoding | logical | 16437 | testdb01 | f | f | | | 585 | 0/1807EA8 | 0/1807EE0

replication_slot02 | pgoutput | logical | 16437 | testdb01 | f | f | | | 585 | 0/1809CD8 | 0/1809D10

(2 rows)

testdb01=>

注意:

1)逻辑复制插槽名称:每个逻辑复制插槽都有一个名称,创建时需要指定名称,可以包含小写字母、数字和下划线

2)输出插件:创建逻辑复制插槽时需要指定输出插件,输出插件有:test_decoding、pgoutput、wal2json等

test_decoding 输出插件 :PostgreSQL 9.4+原生附带了 test_decoding 输出插件($PG_HOME/lib 目录下对应有 test_decoding.so),如果您的消费者支持 test_decoding ,则可以使用 test_decoding 输出插件。

pgoutput 输出插件 :PostgreSQL 10+原生附带了 pgoutput 输出插件($PG_HOME/lib 目录下对应有 pgoutput.so),如果您的消费者支持 pgoutput ,则可以使用 pgoutput 输出插件。pgoutput插件输出的是二进制的数据

wal2json 输出插件 :wal2json 是另一个流行的逻辑解码输出插件,PostgreSQL原生不携带、需要数据库服务单独安装插件才能使用。wal2json输出的是json格式的数据

step3:为数据库中的所有表或指定表创建一个发布。如果以指定表方式创建发布则后面可以在发布中管理(添加、删除)表

帮助说明:\h create publication

DROP PUBLICATION IF EXISTS pub01;

CREATE PUBLICATION pub01 FOR TABLE test01;

或者

DROP PUBLICATION IF EXISTS pub01;

CREATE PUBLICATION pub01 FOR TABLE test01, test02, test03;

或者

DROP PUBLICATION IF EXISTS pub01;

CREATE PUBLICATION pub01 FOR ALL TABLES;

查看创建发布的结果:

testdb01=> select * from pg_publication;

oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate

-------+---------+----------+--------------+-----------+-----------+-----------+-------------

24629 | pub01 | 16436 | f | t | t | t | t

(1 row)

也可以执行查看:

testdb01=> \dRp

List of publications

Name | Owner | All tables | Inserts | Updates | Deletes | Truncates

-------+-------+------------+---------+---------+---------+-----------

pub01 | test | f | t | t | t | t

(1 row)

创建发布时,还可以选择在发布中包含哪些操作。例如,下面仅创建table01表 & 仅包含INSERT和UPDATE的发布:

CREATE PUBLICATION pub01 FOR TABLE table01 WITH (publish = 'INSERT, UPDATE');

注意:

非超级用户创建发布只能指定表创建、不能 FOR ALL TABLES 创建,否则报错:ERROR: must be superuser to create FOR ALL TABLES publication

FOR ALL TABLES 只允许超级用户创建发布,或者创建后由超级用户修改已创建的发布的配置开启foralltables。

testdb01=> \c - postgres

testdb01=# update pg_publication set puballtables=true where pubname is not null; -- 设置puballtables开关

testdb01=# select * from pg_publication;

oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate

-------+---------+----------+--------------+-----------+-----------+-----------+-------------

24629 | pub01 | 16436 | t | t | t | t | t

(1 row)

step4:验证指定的表是否在发布中

testdb01=> SELECT * FROM pg_publication_tables WHERE pubname='pub01';

注意:创建发布不会开始复制。它只为未来的订阅者定义一个分组和过滤逻辑。

step5:查看逻辑复制效果:

获取结果的相关命令:

pg_logical_slot_get_changes : 查询并删除数据。仅在第一次返回结果,多次调用可能会返回空结果集,这意味着当get命令执行时,结果会被提供和删除,这增强了我们编写使用这些事件创建表副本的逻辑的能力。

函数:pg_logical_slot_get_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[])

返回类型:(lsn pg_lsn, xid xid, data text)

说明:返回槽slot_name中的改变,从上一次已经被消费的点开始返回。 如果upto_lsn和upto_nchanges为 NULL,逻辑解码将一 直继续到 WAL 的末尾。如果upto_lsn为非 NULL,解码将只包括那些在指 定 LSN 之前提交的事务。如果upto_nchanges为非 NULL, 解码将在其产生的行数超过指定值后停止。不过要注意, 被返回的实际行数可能更大,因为对这个限制的检查只会在增加了解码每个新的提交事务产生 的行之后进行。

pg_logical_slot_get_binary_changes:变化数据以bytea返回。

pg_logical_slot_peek_changes : 只查询不删数据。多次调用每次都会返回相同的结果。是另一个 PostgreSQL 命令,用于在不使用 WAL 条目的情况下查看更改。

函数:pg_logical_slot_peek_changes(slot_name name, upto_lsn pg_lsn, upto_nchanges int, VARIADIC options text[])

返回类型:(lsn text, xid xid, data text)

说明:行为就像pg_logical_slot_get_changes()函数, 不过改变不会被消费, 即在未来的调用中还会返回这些改变。

pg_logical_slot_peek_binary_changes:变化数据以bytea返回。

关于复制 SQL 函数更多详情,可以参考官方文档(9.26.系统管理函数)介绍: http://www.postgres.cn/docs/12/functions-admin.html

测试表中I/D/U变更数据操作,然后查看逻辑复制效果:

testdb01=> SELECT * FROM pg_logical_slot_peek_changes('replication_slot01', NULL, NULL); -- 只查询不删数据

......

testdb01=> SELECT * FROM pg_logical_slot_get_changes('replication_slot01', NULL, NULL); -- 查询并删除数据

lsn | xid | data

-----------+-----+------------------------------------------------------------------------------------------

0/180A050 | 586 | BEGIN 586

0/180A050 | 586 | table public.test01: INSERT: id[bigint]:1 info[character varying]:'aa' cnt[integer]:123

0/180A160 | 586 | table public.test01: INSERT: id[bigint]:2 info[character varying]:'bb' cnt[integer]:456

0/180A1E8 | 586 | table public.test01: INSERT: id[bigint]:3 info[character varying]:'cc' cnt[integer]:55

0/180A2A0 | 586 | COMMIT 586

0/180A2A0 | 587 | BEGIN 587

0/180A2A0 | 587 | table public.test01: INSERT: id[bigint]:4 info[character varying]:'uuu' cnt[integer]:66

0/180A358 | 587 | COMMIT 587

0/180A358 | 588 | BEGIN 588

0/180A358 | 588 | table public.test01: DELETE: id[bigint]:2

0/180A3D0 | 588 | COMMIT 588

0/180A3D0 | 589 | BEGIN 589

0/180A3D0 | 589 | table public.test01: UPDATE: id[bigint]:3 info[character varying]:'xxxx' cnt[integer]:55

0/180A458 | 589 | COMMIT 589

(14 rows)

testdb01=> SELECT * FROM pg_logical_slot_get_changes('replication_slot01', NULL, NULL); -- 查询并删除数据

lsn | xid | data

-----+-----+------

(0 rows)

testdb01=>

pgoutput输出插件的结果查看,则执行:

testdb01=> SELECT * FROM pg_logical_slot_peek_binary_changes('replication_slot02', null, null, 'proto_version', '1', 'publication_names', 'pub01'); -- 查询但不删除数据

......

testdb01=> SELECT * FROM pg_logical_slot_get_binary_changes('replication_slot02', null, null, 'proto_version', '1', 'publication_names', 'pub01'); -- 查询并删除数据

lsn | xid | data

-----------+-----+------------------------------------------------------------------------------------------------------------------------------

0/1809DF8 | 585 | \x420000000001809fd0000296c5f556e57d00000249

0/1809DF8 | 585 | \x52000060417075626c696300746573743031006400030169640000000014ffffffff00696e666f00000004130000006800636e740000000017ffffffff

0/1809DF8 | 585 | \x44000060414b00037400000001316e6e

0/1809F40 | 585 | \x44000060414b00037400000001346e6e

0/1809F88 | 585 | \x44000060414b00037400000001336e6e

0/180A000 | 585 | \x43000000000001809fd0000000000180a000000296c5f556e57d

0/180A050 | 586 | \x42000000000180a270000296c5f7bb75300000024a

0/180A050 | 586 | \x49000060414e0003740000000131740000000261617400000003313233

0/180A160 | 586 | \x49000060414e0003740000000132740000000262627400000003343536

0/180A1E8 | 586 | \x49000060414e00037400000001337400000002636374000000023535

0/180A2A0 | 586 | \x4300000000000180a270000000000180a2a0000296c5f7bb7530

0/180A2A0 | 587 | \x42000000000180a328000296c5f7d8abe90000024b

0/180A2A0 | 587 | \x49000060414e0003740000000134740000000375757574000000023636

0/180A358 | 587 | \x4300000000000180a328000000000180a358000296c5f7d8abe9

0/180A358 | 588 | \x42000000000180a3a0000296c5f80ae6ca0000024c

0/180A358 | 588 | \x44000060414b00037400000001326e6e

0/180A3D0 | 588 | \x4300000000000180a3a0000000000180a3d0000296c5f80ae6ca

0/180A3D0 | 589 | \x42000000000180a428000296c5f826073e0000024d

0/180A3D0 | 589 | \x55000060414e000374000000013374000000047878787874000000023535

0/180A458 | 589 | \x4300000000000180a428000000000180a458000296c5f826073e

(20 rows)

testdb01=> SELECT * FROM pg_replication_slots;

slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn

--------------------+---------------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------

replication_slot01 | test_decoding | logical | 16437 | testdb01 | f | f | | | 590 | 0/180A540 | 0/180A578

replication_slot02 | pgoutput | logical | 16437 | testdb01 | f | f | | | 590 | 0/180A540 | 0/180A578

(2 rows)

PG logical replication plug-in ("pgoutput") 输出的消息格式详情可以参考PG官方说明:https://www.postgresql.org/docs/10/protocol-logicalrep-message-formats.html

step6:销毁逻辑复制插槽

SELECT pg_drop_replication_slot('replication_slot01');

使用复制插槽的注意事项:

1)每个插槽只有一个输出插件(创建插槽时由您来选择使用哪个)。

2)每个插槽仅提供来自一个数据库的更改。

3)一个数据库可以有多个插槽。

4)每个数据更改通常在每个插槽中发出一次。

5)但是当 Postgres 实例重新启动时,插槽可能会重新发出更改。消费者必须处理这种情况。

6)未使用的插槽对 Postgres 实例的可用性构成威胁。Postgres 将为这些未使用的更改保存所有 WAL 文件。这可能导致存储溢出。

关于 PostgreSQL WAL 消费者:

可以获取 Postgres 逻辑解码流的任何应用程序都是 PostgreSQL WAL 消费者。

pg_recvlogical:

pg_recvlogical 是一个 PostgreSQL 应用程序,他是 PostgreSQL 原生的逻辑解码工具,它可以管理槽并使用槽中的流。它包含在 Postgres 发行版中,因此它可能已经随 PostgreSQL 一起安装,在 $PG_HOME/bin 目录下。

pg_recvlogical 使用默认的 test_decoding 逻辑解码插件,pg安装完后会在pg安装目录的lib目录下创建test_decoding链接库文件。

pg_recvlogical 使用介绍:

创建逻辑复制插槽:

$ pg_recvlogical --create-slot -S replication_slot01 -d testdb01

启动复制槽解码(启动后会实时的将日志解码到制定的文件中,也可以不启动、等需要解码时再启动解码)

$ pg_recvlogical --start -S replication_slot01 -d testdb01 -f replication_slot01_decoding.log &

查看解码结果:

$ cat replication_slot01_decoding.log

查看数据库lsn:

postgres=# select pg_current_wal_lsn();

使用pg_recvlogical进行日志区间解码:

$ pg_recvlogical --start -S replication_slot01 -d testdb01 -I 4C/180B020 -E 4C/180B1EE -f ret.log


http://www.niftyadmin.cn/n/49282.html

相关文章

SSJ-21A AC220V静态【时间继电器】

系列型号: SSJ-11B静态时间继电器;SSJ-21B静态时间继电器 SSJ-21A静态时间继电器;SSJ-22A静态时间继电器 SSJ-22B静态时间继电器SSJ-42B静态时间继电器 SSJ-42A静态时间继电器SSJ-41A静态时间继电器 SSJ-41B静态时间继电器SSJ-32B静态时间继电…

2023 AIME 答案与解析 - 第一期

这里放广告 原题目 Five men and nine women stand equally spaced around a circle in random order. The probability that every man stands diametrically opposite a woman is where and are relatively prime positive integers. Find 绿树教育中心独家解析 先选取 5 对座…

CUDA虚拟内存管理

CUDA中的虚拟内存管理 文章目录CUDA中的虚拟内存管理1. Introduction2. Query for support3. Allocating Physical Memory3.1. Shareable Memory Allocations3.2. Memory Type3.2.1. Compressible Memory4. Reserving a Virtual Address Range5. Virtual Aliasing Support6. Ma…

国产化适配迁移记录

国产化适配迁移记录 本项目基于RuoYi-Vue的框架进行迁移。目前已完成覆盖测试暂无其他问题。 国产化环境 名称版本达梦数据库DmJdbcDriver18 8.1.2.144通用mapper – tk.mybatismapper-spring-boot-starter 4.2.5<!-- 达梦数据库--><dependency><groupId>…

Python学习------起步7(字符串的连接、删除、修改、查询与统计、类型判断及字符串字母大小写转换)

目录 前言&#xff1a; 1.字符串的连接 join() 函数 2.字符串的删除&取代 replace()函数 3.字符串的修改&切割 &#xff08;1&#xff09;strip() 函数 &#xff08;2&#xff09;lstrip()函数 和 rstrip()函数 &#xff08;3&#xff09;split()函数-->…

新特性解读 | 针对用户定制不同格式执行计划

作者&#xff1a;杨涛涛 资深数据库专家&#xff0c;专研 MySQL 十余年。擅长 MySQL、PostgreSQL、MongoDB 等开源数据库相关的备份恢复、SQL 调优、监控运维、高可用架构设计等。目前任职于爱可生&#xff0c;为各大运营商及银行金融企业提供 MySQL 相关技术支持、MySQL 相关课…

微信小程序开发你可能不知道的开发技巧

1. 页面级data&#xff0c;组件data的查看 页面级data的查看&#xff0c;很多开发者应该都知道&#xff1b;组件级的数据查看我是开发了大半年才发现的&#xff1b; 页面级的data查看&#xff1a; 组件的data查看&#xff1a; 2. 放大模拟器【调整一些UI细节】 效果&#x…

【LeetCode】石子游戏 IV [H](动态规划)

1510. 石子游戏 IV - 力扣&#xff08;LeetCode&#xff09; 一、题目 Alice 和 Bob 两个人轮流玩一个游戏&#xff0c;Alice 先手。 一开始&#xff0c;有 n 个石子堆在一起。每个人轮流操作&#xff0c;正在操作的玩家可以从石子堆里拿走 任意 非零 平方数 个石子。 如果石…