Postgresql源码(117)libpq的两套实现(socket/shm_mq)

news/2024/7/9 22:05:40 标签: postgresql, 数据库, libpq

libpq_0">libpq的通信方式

libpq提供了两套通信方式

  • socket
  • shm_mq

分别实现在下面两个文件中

  • pqcomm.c
  • pqmq.c

什么时候用socket通信?

除了下述并行场景,其他场景全部使用socket通信。

static const PQcommMethods PqCommSocketMethods = {
	.comm_reset = socket_comm_reset,
	.flush = socket_flush,
	.flush_if_writable = socket_flush_if_writable,
	.is_send_pending = socket_is_send_pending,
	.putmessage = socket_putmessage,
	.putmessage_noblock = socket_putmessage_noblock
};

什么时候使用mq通信?

并行框架中会将子进程的libpq的通信改成mq通信,用于子进程给父进程发送错误信息。

static const PQcommMethods PqCommMqMethods = {
	.comm_reset = mq_comm_reset,
	.flush = mq_flush,
	.flush_if_writable = mq_flush_if_writable,
	.is_send_pending = mq_is_send_pending,
	.putmessage = mq_putmessage,
	.putmessage_noblock = mq_putmessage_noblock
};

使用MQ通信需要用pq_redirect_to_shm_mq函数指定使用的dsm和mq。

注意这个pq_mq_handle是申请在dsm上的,专门用于并行框架。

void
pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
{
	PqCommMethods = &PqCommMqMethods;
	pq_mq_handle = mqh;
	whereToSendOutput = DestRemote;
	FrontendProtocol = PG_PROTOCOL_LATEST;
	on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
}

使用位置在并行框架子进程入口

void ParallelWorkerMain(...)
{
  ...
  ...
  // 拿到父进程在共享内存中申请mq的内存其实地址
  error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
  
  // 用自己的woker num偏移得到自己的mq
	mq = (shm_mq *) (error_queue_space +
					 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
  
  // 配置PGPROC到mq上
	shm_mq_set_sender(mq, MyProc);
  // mq是单纯的mq抽象,用的时候一般使用mq handle,在这里包装一层成为mqh
	mqh = shm_mq_attach(mq, seg, NULL);
  // 配置libpq的消息队列为mqh
	pq_redirect_to_shm_mq(seg, mqh);
  // 记录父进程的pid为leader pid
	pq_set_parallel_leader(fps->parallel_leader_pid,
						   fps->parallel_leader_backend_id);
}

配置好后,子进程已经记录了父进程的pid,在子进程中需要发送消息时:

int
mq_putmessage(...)
{
  ...
  for (;;)
	{
		// 先把书库放入mq中,flush到共享内存
		result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);

		if (pq_mq_parallel_leader_pid != 0)
		{
				... // 这里只有子进程能走进来,通知父进程读取
				SendProcSignal(pq_mq_parallel_leader_pid,
							   PROCSIG_PARALLEL_MESSAGE,
							   pq_mq_parallel_leader_backend_id);
        ...
			}
		}

		if (result != SHM_MQ_WOULD_BLOCK)
			break;

		(void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
						 WAIT_EVENT_MESSAGE_QUEUE_PUT_MESSAGE);
		ResetLatch(MyLatch);
		CHECK_FOR_INTERRUPTS();
	}
  ...
}

子进程发完了,信息会留存在mq中。然后给父进程发信号。

父进程收到kill过来的信号,进入信号处理函数(函数已经绑定sigusr1了),标记ParallelMessagePending

void
procsignal_sigusr1_handler(SIGNAL_ARGS)
{
	...
	if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE))
		HandleParallelMessageInterrupt();
	...
}

void
HandleParallelMessageInterrupt(void)
{
	InterruptPending = true;
	ParallelMessagePending = true;
	SetLatch(MyLatch);
}

等下次调用CHECK_FOR_INTERRUPTS宏,执行ProcessInterrupts时处理具体的消息。

在函数中会shm_mq_receive接受子进程发到mq中的消息。

void
HandleParallelMessages(void)
{
	...
	HOLD_INTERRUPTS();
	...
  ...
				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
									 &data, true);
				if (res == SHM_MQ_WOULD_BLOCK)
					break;
				else if (res == SHM_MQ_SUCCESS)
				{
					StringInfoData msg;

					initStringInfo(&msg);
					appendBinaryStringInfo(&msg, data, nbytes);
					HandleParallelMessage(pcxt, i, &msg);
					pfree(msg.data);
				}
				else
					ereport(ERROR,
							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
							 errmsg("lost connection to parallel worker")));
			}
		}
	}

	...

	RESUME_INTERRUPTS();
}

elog如何发送错误日志?

无论是并发时的子进程,还是普通进程,调用elog发送日志都会经过两步:

errstart(int elevel, const char *domain)    

做一些初始化和配置

errfinish(const char *filename, int lineno, const char *funcname)

调用EmitErrorReport发送错误

EmitErrorReport负责将日志发送到client和server log

EmitErrorReport
  /* Send to server log, if enabled */
	if (edata->output_to_server)
		send_message_to_server_log(edata);

	/* Send to client, if enabled */
	if (edata->output_to_client)
		send_message_to_frontend(edata);

这里发送到client的日志send_message_to_frontend中,会走libpq的逻辑:

  1. 普通场景libpq使用PqCommSocketMethods的实现,将日志发送给客户端。
  2. 并发场景子进程中libpq使用PqCommMqMethods的实现,将日志发送给父进程。

http://www.niftyadmin.cn/n/5272910.html

相关文章

【网络安全】-Linux操作系统—CentOS安装、配置

文章目录 准备工作下载CentOS创建启动盘确保硬件兼容 安装CentOS启动安装程序分区硬盘网络和主机名设置开始安装完成安装 初次登录和配置更新系统安装额外的软件仓库安装网络工具配置防火墙设置SELinux安装文本编辑器配置SSH服务 总结 CentOS是一个基于Red Hat Enterprise Linu…

20231218给Firefly的AIO-3399J【RK3399】开发板刷Android12挖掘机方案

20231218给Firefly的AIO-3399J【RK3399】开发板刷Android12挖掘机方案 2023/12/18 21:07 一、整体编译Rockchip的的Android12的挖掘机方案! 由于RK3399的Android12系统默认是IND工业方案,需要修改一下【为挖掘机方案】。 Z:\3TB\81rk_android12_220722\…

福德植保无人机工厂:创新科技与绿色农业的完美结合

亲爱的读者们,欢迎来到福德植保无人机工厂的世界。这里,科技与农业的完美结合为我们描绘出一幅未来农业的新篇章。福德植保无人机工厂作为行业的领军者,以其领先的无人机技术,创新的理念,为我们展示了一种全新的农业服…

MATLAB图解傅里叶变换(初学者也可以理解)

1、概述 相信很多人对于傅里叶变换可能觉得比较复杂和有点难懂,其实不难,它只是一种积分变换。 傅里叶变换,表示能将满足一定条件的某个函数表示成三角函数(正弦和/或余弦函数)或者它们的积分的线性组合。也就是说&qu…

spring boot版本升级遇到的一些问题

背景:由于项目需求,需要将nacos 1.4.6版本升级到2.x版本,由此引发的springboot、springcloud、springcloud Alibaba一系列版本变更。 旧版本分别为: Spring Boot 2.3.5.RELEASE Spring Cloud Hoxton.SR9 Spring Cloud Alibaba 2.2…

Linux-----21、挂载

# 挂载命令 将硬件资源,或文件资源💿,和📂空目录🔗连接起来的过程 # mount linux 所有存储设备都必须挂载使用,包括硬盘 ​ 命令名称:mount ​ 命令所在路径:/bin/mount ​ 执行…

人工智能_机器学习069_SVM支持向量机_网格搜索_交叉验证参数优化_GridSearchCV_找到最优的参数---人工智能工作笔记0109

然后我们再来说一下SVC支持向量机的参数优化,可以看到 这次我们需要,test_data这个是测试数据,容纳后 train_data这个是训练数据 这里首先我们,导出 import numpy as np 导入数学计算包 from sklearn.svm import SVC 导入支持向量机包 分类器包 def read_data(path): wit…

SpringMVC映射请求数据

1、获取超链接的参数和值 <a href"vote/vote01?namelove">获取超链接的参数</a> //获取超链接的参数RequestMapping(value "/vote01")public String vote01(RequestParam(value "name", requiredfalse) String name){System.out…