【Postgresql源码分析之三】同步复制源码分析
2016-07-27 19:59
561 查看
前面介绍了,在开启同步复制模式后,进程在主机上提交事物之后,需要等待同步备机的响应(确保备机已经接收到这条xlog日志),主要是在函数SyncRepWaitForLSN中等待同步状态的更新(由SYNC_REP_WAITING变为SYNC_REP_WAIT_COMPLETE)。下面介绍状态更新的过程,即主机如何感知到等待的xlog日志发送到备机。
我们知道,在建立正常的流复制关系之后,备机会不断的向主机发送状态,报告自己的当前的位置。主机在处理状态信息的时候,就会检查是否需要解除阻塞,即更新状态。在函数ProcessStandbyReplyMessage中,根据条件判断,如下:
if (!am_cascading_walsender)
<strong>SyncRepReleaseWaiters</strong>(); 函数SyncRepReleaseWaiters是解除阻塞的关键函数,下面重点介绍该函数的流程。
从注释中可以看出,该函数主要是基于最新的状态更新LSN队列。实现方式如下:
步骤1:判断是否是同步备机、复制状态以及flush的位置是否合法
步骤2:找到优先级最高的备机对应的发送进程
步骤3:判断是否是当前的发送进程
步骤4:如果是同步复制备机,调用函数SyncRepWakeQueue唤醒对应模式下的等待队列
接下来介绍函数SyncRepWakeQueue,唤醒的等待队列的过程,其实就是更新同步复制状态信息。
从注释中可以看出,该函数的主要作用是从队列的头部开始遍历,设置任何需要备唤醒的后台进程的状态,从队列中移除并唤醒他们。如果传输的参数为all,表示唤醒所有队列,否则,仅仅唤醒walsender进程指定的LSN。
步骤1:获取从头部开始队列的proc进程信息
步骤2:判断当前walsndctl中等待LSN与proc中记录的LSN之间的关系
步骤3:更改状态信息,并唤醒队列
在这里更新同步复制的状态信息,前面提交的事物在检测到状态改变之后,解除阻塞。
我们知道,在建立正常的流复制关系之后,备机会不断的向主机发送状态,报告自己的当前的位置。主机在处理状态信息的时候,就会检查是否需要解除阻塞,即更新状态。在函数ProcessStandbyReplyMessage中,根据条件判断,如下:
if (!am_cascading_walsender)
<strong>SyncRepReleaseWaiters</strong>(); 函数SyncRepReleaseWaiters是解除阻塞的关键函数,下面重点介绍该函数的流程。
SyncRepReleaseWaiters
首先看看该函数的注释/* * Update the LSNs on each queue based upon our latest state. This * implements a simple policy of first-valid-standby-releases-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. */
从注释中可以看出,该函数主要是基于最新的状态更新LSN队列。实现方式如下:
步骤1:判断是否是同步备机、复制状态以及flush的位置是否合法
if (MyWalSnd->sync_standby_priority == 0 || MyWalSnd->state < WALSNDSTATE_STREAMING || XLogRecPtrIsInvalid(MyWalSnd->flush)) return;
步骤2:找到优先级最高的备机对应的发送进程
for (i = 0; i < max_wal_senders; i++) { /* use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &walsndctl->walsnds[i]; if (walsnd->pid != 0 && walsnd->state == WALSNDSTATE_STREAMING && walsnd->sync_standby_priority > 0 && (priority == 0 || <strong> priority > walsnd->sync_standby_priority</strong>) && !XLogRecPtrIsInvalid(walsnd->flush)) { priority = walsnd->sync_standby_priority; syncWalSnd = walsnd; } }寻找方法中,实现的关键步骤是判断中的priority > walsnd->sync_standby_priority,如果当前的优先级大于发送进程记录的优先级,就进行更新(这里需要注意的是:优先级数字越低,代表的优先级越高,1是同步备机中优先级最高的)。
步骤3:判断是否是当前的发送进程
if (syncWalSnd != MyWalSnd) { LWLockRelease(SyncRepLock); announce_next_takeover = true; return; }
步骤4:如果是同步复制备机,调用函数SyncRepWakeQueue唤醒对应模式下的等待队列
/* * Set the lsn first so that when we wake backends they will release up to * this location. */ if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_WRITE], MyWalSnd->write)) { walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } if (XLByteLT(walsndctl->lsn[SYNC_REP_WAIT_FLUSH], MyWalSnd->flush)) { walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); }
接下来介绍函数SyncRepWakeQueue,唤醒的等待队列的过程,其实就是更新同步复制状态信息。
SyncRepWakeQueue
前面介绍过,该函数的主要作用是唤醒等待队列,其实就是更新同步复制状态信息。首先看看函数的注释:/* * Walk the specified queue from head. Set the state of any backends that * need to be woken, remove them from the queue, and then wake them. * Pass all = true to wake whole queue; otherwise, just wake up to * the walsender's LSN. * * Must hold SyncRepLock. */
从注释中可以看出,该函数的主要作用是从队列的头部开始遍历,设置任何需要备唤醒的后台进程的状态,从队列中移除并唤醒他们。如果传输的参数为all,表示唤醒所有队列,否则,仅仅唤醒walsender进程指定的LSN。
步骤1:获取从头部开始队列的proc进程信息
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(WalSndCtl->SyncRepQueue[mode]), offsetof(PGPROC, syncRepLinks));
步骤2:判断当前walsndctl中等待LSN与proc中记录的LSN之间的关系
if (!all && XLByteLT(walsndctl->lsn[mode], proc->waitLSN)) return numprocs;如果不是唤醒所有等待队列以及walsndctl中记录的等待LSN小于proc中记录的waitLSN,直接返回,表示不需要等待。这里的原因是:walsndctl中记录是备机响应主机的最新的LSN,如果其小于proc中记录的waitLSN,表明进程中的LSN仍然需要等待(proc中记录的waitLSN是按照先后顺序排列的)
步骤3:更改状态信息,并唤醒队列
/* * Move to next proc, so we can delete thisproc from the queue. * thisproc is valid, proc may be NULL after this. */ thisproc = proc; proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]), &(proc->syncRepLinks), offsetof(PGPROC, syncRepLinks)); /* * Set state to complete; see SyncRepWaitForLSN() for discussion of * the various states. */ <strong>thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;</strong> /* * Remove thisproc from queue. */ SHMQueueDelete(&(thisproc->syncRepLinks)); /* * Wake only when we have set state and removed from queue. */ SetLatch(&(thisproc->procLatch));
在这里更新同步复制的状态信息,前面提交的事物在检测到状态改变之后,解除阻塞。
SyncRepUpdateSyncStandbysDefined
该函数主要是更新同步复制的全局变量sync_standbys_defined,该函数主要是给checkpointer进程使用的,如果检测到配置参数信息有变化,就更新该标志,如果取消同步复制模式就唤醒所有等待队列。bool sync_standbys_defined = SyncStandbysDefined(); if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) { LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* * If synchronous_standby_names has been reset to empty, it's futile * for backends to continue to waiting. Since the user no longer * wants synchronous replication, we'd better wake them up. */ if (!sync_standbys_defined) { int i; <strong> for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) SyncRepWakeQueue(true, i);</strong> } /* * Only allow people to join the queue when there are synchronous * standbys defined. Without this interlock, there's a race * condition: we might wake up all the current waiters; then, some * backend that hasn't yet reloaded its config might go to sleep on * the queue (and never wake up). This prevents that. */ WalSndCtl->sync_standbys_defined = sync_standbys_defined; LWLockRelease(SyncRepLock); }
相关文章推荐
- Android之获取手机上的图片和视频缩略图thumbnails
- 从源码安装Mysql/Percona 5.5
- 数据库链接字符串查询网站
- DB2实例管理
- DB2实例管理
- 保障MySQL数据安全的14个最佳方法
- mysql问答汇集
- 第三章 数据库备份和还原
- 创建一个空的IBM DB2 ECO数据库的方法
- Access 2000 数据库 80 万记录通用快速分页类
- 开通一个数据库失败的原因的和解决办法
- 一个简单的asp数据库操作类
- CentOS下DB2数据库安装过程详解
- PostgreSQL新手入门教程
- EasyASP v1.5发布(包含数据库操作类,原clsDbCtrl.asp)第1/2页
- PostgreSQL教程(十):性能提升技巧
- PostgreSQL教程(二):模式Schema详解
- PostgreSQL教程(十三):数据库管理详解