postgresql逻辑复制槽的推进

news/2024/7/9 23:23:54 标签: postgresql, 网络, 服务器

逻辑复制槽为什么要推进?

  • 我们知道逻辑复制槽有两个作用,一个是保护系统表避免被vacuum,一个是保护xlog,防止需要解码的xlog被回收
  • 如果系统表不往前推进,则系统表就会发生膨胀
  • 如果xlog不往前推进,xlog就会堆积在磁盘。
  • 所以,我们根据当前逻辑复制的进度,推进逻辑复制槽

逻辑复制槽的推进主要表现在哪些字段的推进?

  • 对于xlog,与物理复制槽相同,通过restart_lsn控制
  • 对于系统表,通过catalog_xmin控制

restart_lsn的推进

  • 物理复制restart lsn的推进比较简单,当备机flush某个lsn以后,告诉主机,主机收到后,立即就会推进restart lsn
  • 而逻辑复制,则相对比较复杂。
    • 我们用confirmed_flush表示当前订阅端逻辑复制的进度。但是,不是说confirmed_flush之前的xlog就可以回收。
    • 因为逻辑解码需要历史快照,而历史快照的构建本质就是根据running_xacts以及后续的commit等xlog来完成的
    • 所以为了能够解析后面的xlog,restart lsn还要对后续解码需要用的快照xlog进行保护。
    • 那么此时就得看confirmed_flush更新时,需要用到的最老的快照lsn是多少了。
    • 所以我们用两个lsn实现restart_lsn的推进,candidate_restart_valid就表示当前解码的进度lsn,而candidate_restart_lsn表示解码到candidate_restart_valid的时候,需要用的最老的快照的lsn。
    • 当confirmed_flush超过candidate_restart_valid时,就可以推进restart_lsn到candidate_restart_lsn了,可以进行更新了。
// 主机或者发布端收到备机或订阅端的flush后的处理
ProcessStandbyReplyMessage(void)
{
    if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
    {
        if (SlotIsLogical(MyReplicationSlot))
            LogicalConfirmReceivedLocation(flushPtr);
        else
            PhysicalConfirmReceivedLocation(flushPtr);
    }
}

// 物理复制槽,只要备机flush某个lsn,主机收到后,立即就会推进restart lsn
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
    if (slot->data.restart_lsn != lsn)
    {
        changed = true;
        slot->data.restart_lsn = lsn;
    }
}


// 逻辑复制槽,如果订阅端flush了某个lsn,并不会直接推进restart lsn。而是根据candidate的情况,判断是否推进restart lsn。
// 如果可以就推进,如果不行,只把当前订阅端flush的lsn记录到confirmed_flush
LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
    if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
        MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    {
        if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
            MyReplicationSlot->candidate_restart_valid <= lsn)
        {
            MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
            MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
            MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
            updated_restart = true;
        }
    }
    else
    {
        SpinLockAcquire(&MyReplicationSlot->mutex);
        MyReplicationSlot->data.confirmed_flush = lsn;
        SpinLockRelease(&MyReplicationSlot->mutex);
    }
}

// 为什么逻辑复制不能直接根据flush推进restart lsn呢?
// 因为逻辑解码需要历史快照,而历史快照的构建本质就是根据running_xacts以及后续的commit等xlog来完成的
// 所以为了能够解析后面的xlog,restart lsn还要保护对后续xlog解码需要用的快照xlog进行保护。
// 因为,我们每次会在解析running_xacts后,序列化一次快照,所以我们可以在此时推进一下我们需要保护的lsn,也就是所谓的candidate
// 等订阅端的flush满足条件后,就可以真正的推进restart lsn了
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
    // 处理running_xacts后,序列化一次快照
    SnapBuildSerialize(builder, lsn);

    // 我们从reorder buffer中找最老的txn,这样我们就可以获取最老的需要保护的xlog
    txn = ReorderBufferGetOldestTXN(builder->reorder);
    if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
        LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    // 如果当前没有在处理的事务,则使用最新序列化的快照位置即可
    else if (txn == NULL &&
             builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
             builder->last_serialized_snapshot != InvalidXLogRecPtr)
        LogicalIncreaseRestartDecodingForSlot(lsn,
                                              builder->last_serialized_snapshot);
}

// 这里有两个lsn,candidate_restart_valid是一个参照物,他是当前的lsn,当订阅端confirmed_flush超过他的时候,就可以推进此时对应的restart_lsn
// 为什么这么设计?因为confirmed_flush只代表flush的进度,或者说只代表解码的xlog进度,而不代表快照的xlog进度。
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
{
    if (current_lsn <= slot->data.confirmed_flush)
    {
        slot->candidate_restart_valid = current_lsn;
        slot->candidate_restart_lsn = restart_lsn;
        updated_lsn = true;
    }

    if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    {
        slot->candidate_restart_valid = current_lsn;
        slot->candidate_restart_lsn = restart_lsn;
    }

    if (updated_lsn)
        LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
}

catalog_xmin的推进

  • 与restart lsn的推进是类似的。
  • 在序列化快照的时候,使用candidate_xmin_lsn表示当前解码的进度,使用candidate_catalog_xmin表示当前解码需要用的最老xmin
  • 当confirmed_flush超过candidate_xmin_lsn的时候,就可以更新复制槽的catalog_xmin为candidate_catalog_xmin了
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
    SnapBuildSerialize(builder, lsn);
    xmin = ReorderBufferGetOldestXmin(builder->reorder);
    LogicalIncreaseXminForSlot(lsn, xmin);
}

LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
{
    if (current_lsn <= slot->data.confirmed_flush)
    {
        slot->candidate_catalog_xmin = xmin;
        slot->candidate_xmin_lsn = current_lsn;
        updated_xmin = true;
    }
    else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    {
        slot->candidate_catalog_xmin = xmin;
        slot->candidate_xmin_lsn = current_lsn;
    }
}

http://www.niftyadmin.cn/n/5171828.html

相关文章

抖音小店怎么做?五个步骤教会你,让你从0到1学会做店!

我是电商珠珠 抖音小店大家都在做&#xff0c;有的人月入几万&#xff0c;有的人把自己赔的裤衩子都不剩。 包括身边做抖音小店的这些小伙伴&#xff0c;他们前期的时候就是不懂流程&#xff0c;不知道怎么做&#xff0c;顶着一股铆劲往前冲&#xff0c;不管不顾的把店铺搞赔…

python设计模式12:状态模式

什么是状态机&#xff1f; 关键属性&#xff1a; 状态和转换 状态&#xff1a; 系统当前状态 转换&#xff1a;一种状态到另外一种状态的变化。 转换由触发事件或是条件启动。 状态机-状态图 状态机使用场景&#xff1a; 自动售货机 电梯 交通灯 组合锁 停车计时…

【从0到1设计一个网关】上岸大厂的秘诀之一

文章目录 前言【从0到1设计一个网关】什么是网关&#xff1f;以及为什么需要自研网关&#xff1f;【从0到1设计一个网关】自研网关的设计要点以及架构设计【从0到1设计一个网关】自研网关的架构搭建【从0到1设计一个网关】网络通信框架Netty的设计【从0到1设计一个网关】整合Na…

2300. 咒语和药水的成功对数 : 经典二分运用题

题目描述 这是 LeetCode 上的 「2300. 咒语和药水的成功对数」 &#xff0c;难度为 「中等」。 Tag : 「排序」、「二分」 给你两个正整数数组 spells 和 potions&#xff0c;长度分别为 n 和 m&#xff0c;其中 spells[i] 表示第 i 个咒语的能量强度&#xff0c;potions[j] 表…

【ROS系统】colcon编译器的使用

colcon编译器 参考链接&#xff1a;https://www.rstk.cn/news/33292.html?actiononClick 指令教学&#xff1a;https://blog.csdn.net/u014603518/article/details/127717928 基础指令colcon build 参数及效果 无参数 colcon build : 编译子环境下的全部ROS2项目依赖 --…

DevEco Studio harmonyOS 模拟器 Unable to install HAXM

在Intel CPU的Windows电脑下&#xff0c;启动模拟器失败&#xff0c;提示“Unable to install HAXM.”&#xff0c;无法安装HAXM。 打开任务管理器&#xff0c;在“性能”选项&#xff0c;检查CPU虚拟化是否已经启用。如果未启用&#xff0c;需要进入电脑的BIOS中&#xff0c;将…

【linux卸载已安装软件的命令】

在Linux系统中&#xff0c;我们可以使用不同的命令来卸载已安装的软件。下面是一些常用的命令和方法&#xff1a; 1. 使用apt-get命令&#xff08;适用于Debian和Ubuntu系统&#xff09;&#xff1a; - 要卸载一个已安装的软件&#xff0c;可以使用以下命令&#xff1a; sud…

Lightgraph.js节点图引擎【低代码开发利器】

Lightgraph.js是一个 Javascript 节点图引擎库&#xff0c;可以实现类似虚幻引擎的蓝图编程&#xff0c;包括一个编辑器来构建和测试节点图&#xff0c;支持浏览器和Node.js&#xff0c;可以轻松集成到任何现有的 Web 应用程序中&#xff0c;并且无需编辑器即可运行节点图。 在…