[自制操作系统] 原子操作&核间中断&读写锁&PRWLock
2017-08-27 20:14
357 查看
本文主要为读论文Scalable Read-mostly Synchronization Using Passive Reader-Writer Locks的记录。
并将其在
本文中支持的新特性:
支持原子操作
支持读写锁
支持针对单一核心
支持
Github : https://github.com/He11oLiu/MOS
单核主要为计算密集型模型,多核主要为并行化模型
并行化模型面临的问题:多个处理器共享数据结构,需要同步原语来保证其一致性
更底层:内存一致性 然而会导致可扩展性问题
Strict Consistency Memory Barrier
Sequential Consistency TSO
其中
内存屏障时等待之前的指令执行完
MESI模型中等待获取最新值
等待其他处理器释放锁
多核之间的通讯带宽受限,阻塞
适合读数据频率远大于写数据频率的应用
单核上的实现思路:读者锁不冲突,当需要加写者锁的时候需要等所有的读者锁释放。利用一个读者计数器来实现。
多核上最直观的实现思路:每个核上保存自己的读者锁,写者需要等到所有的读者锁全部释放了才能开始获取锁。
现有的
当只有读者时,由于只是访问自己的核上的锁,所以有良好的扩展性
写者需要获取所有核上的互斥锁,恒定延迟,与处理器数量有关。
SNZI利用树进行了一定优化
读者无锁,写着读者可以同时进行
先写值再改指针
写者开销大,要处理旧的数据
垃圾回收(无抢占调度一圈)
![](http://img.blog.csdn.net/20170827201307549?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvSGUxMW9fTGl1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
![](http://img.blog.csdn.net/20170827201324433?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvSGUxMW9fTGl1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
利用TSO架构的优势,版本控制隐式表示退出CS区域
并不完全依赖于短时间可见,对于特殊情况,保证一致性,利用IPI要求进行Report IPI消息传递开销较小,且可以相互掩盖。
两种模式 支持调度(睡眠与抢占)
共享内存陈旧化
弱控制,通过版本控制隐式反馈+少数情况IPI
通过维护active检测数量
强控制,主动监听
主动等待
对于需要较长时间才能看到最新的版本号或没有读者期望获取读者锁提供了IPI来降低等待时间,避免无限等待。
若上下文切换到了其他的进程,就没必要管这个核的锁了
锁域的上下线可以避免一些没有必要的一致性检测
注意利用内存屏障来保证一致性
用户态不能随时关闭抢占(Preemption)
用户态不能发送核间中断(IPI)
所以
利用抢断标记位避免特殊窗口时被抢断
写者必须陷入内核态发送IPI
锁域上下线本来就是极少的操作,用来改善性能的,所以其中的内存屏障影响不大
对于长CS区的读者,与传统一样
多个写者可以直接把锁传递
利用了TSO的特性设计的版本控制来隐式维护语义
利用IPI来保证特殊情况
利用两种模式支持调度
读者之间无关联(内存屏障),提升读者性能
PWAKE 分布式唤醒,提高了唤醒并行性
移植
关于
在一个基于
此外,还有一个CPU外面的
想要开启
使用
周期触发模式
周期触发模式中,程序设置一个”初始计数“寄存器(Initial Count),同时Local APIC会将这个数复制到”当前计数“寄存器(Current Count)。Local APIC会将这个数(当前计数)递减,直到减到0为止,这时候将触发一个IRQ(可以理解为触发一次中断),与此同时将当前计数恢复到初始计数寄存器的值,然后周而复始的执行上述逻辑。可以使用这种方法通过Local APIC实现定时按照一定时间间隔触发中断的功能。
一次性触发模式
同之前一样,但是不会恢复到初始计数。
来源Blog
每个本地
Intel x86架构提供LINT0和LINT1两个中断引脚,他们通常与Local APIC相连,用于接收Local APIC传递的中断信号,另外,当Local APIC被禁用的时候,LINT0和LINT1即被配置为INTR和NMI管脚,即外部IO中断管脚和非屏蔽中断管脚。
来源博客
![](http://img.blog.csdn.net/20170904113814514?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvSGUxMW9fTGl1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)
The local APIC registers are memory mapped to an address that can be found in the MP/MADT tables. Make sure you map these to virtual memory if you are using paging. Each register is 32 bits long, and expects to written and read as a 32 bit integer. Although each register is 4 bytes, they are all aligned on a 16 byte boundary.
再一次详细查看
由于这段映射,设置了
这里的宏定义为
下面来看一下主要的
EOI Register
Write to the register with offset 0xB0 using the value 0 to signal an end of interrupt. A non-zero values causes a general protection fault.
Local Vector Table Registers
There are some special interrupts that the processor and LAPIC can generate themselves. While external interrupts are configured in the I/O APIC, these interrupts must be configured using registers in the LAPIC. The most interesting registers are:
Spurious Interrupt Vector Register
The offset is 0xF0. The low byte contains the number of the spurious interrupt. As noted above, you should probably set this to 0xFF. To enable the APIC, set bit 8 (or 0x100) of this register. If bit 12 is set then EOI messages will not be broadcast. All the other bits are currently reserved.
Interrupt Command Register
The interrupt command register is made of two 32-bit registers; one at 0x300 and the other at 0x310.
It is used for sending interrupts to different processors.
The interrupt is issued when 0x300 is written to, but not when 0x310 is written to. Thus, to send an interrupt command one should first write to 0x310, then to 0x300.
需要先写
At 0x310 there is one field at bits 24-27, which is local APIC ID of the target processor (for a physical destination mode).
给
ICRLO的分布比较重要
其中目标模式有(
其中发送模式有(
不设置的话则为发送给
综上,打包了一个
用于发送
这里测试一下,先设置
在
最后在
设置
非
然后发现一个比较大的问题,
仿造
然后在内核中对读写锁的功能进行测试。
遇到两个问题
一个是
另一个是设计内核中的测试
多核上的输出可能会并行化,要减短输出内容。
在用户空间的锁分享目前不好做,linux是基于文件的。
故设计了两个锁来进行测试
一个是
对于每个核上,分别获取
在给
可以观察到一旦
这与期望的读写锁的功能是一致的。至此普通读写锁的实现完成。
首先有几个重点:
锁的具体实现
调度时调用内容
对于一个
而全局内核所拥有的读写锁通过
剩下的与论文中伪代码的实现思路相同,只是具体调用的函数有一些差别。
读者锁中包括向核心发送
每个核心接到
具体的
测试
给一个核心发送
这里先用
测试选择
当
并将其在
JOS上实现。其中包括
lapic原理,
IPI实现。
本文中支持的新特性:
支持原子操作
支持读写锁
支持针对单一核心
IPI
支持
PRWLock
Github : https://github.com/He11oLiu/MOS
论文阅读记录
研究背景
单核性能提升遇到瓶颈 转向多核提升性能单核主要为计算密集型模型,多核主要为并行化模型
并行化模型面临的问题:多个处理器共享数据结构,需要同步原语来保证其一致性
更底层:内存一致性 然而会导致可扩展性问题
Strict Consistency Memory Barrier
Sequential Consistency TSO
可扩展性 Scalable
提到可扩展性,不得不提Amdahl's law
S(latency)(s) = 1/((1-p)+p/s)
其中
1-p极为不可以并行的部分,而对于一个处理器,造成(1-P)部分有以下几种原因:
内存屏障时等待之前的指令执行完
MESI模型中等待获取最新值
等待其他处理器释放锁
多核之间的通讯带宽受限,阻塞
关于读写锁
读读不阻塞,读写阻塞适合读数据频率远大于写数据频率的应用
单核上的实现思路:读者锁不冲突,当需要加写者锁的时候需要等所有的读者锁释放。利用一个读者计数器来实现。
多核上最直观的实现思路:每个核上保存自己的读者锁,写者需要等到所有的读者锁全部释放了才能开始获取锁。
现有的RWLock
所做的尝试
BRLOCK C-SNZI
读者申请自己核上的锁当只有读者时,由于只是访问自己的核上的锁,所以有良好的扩展性
写者需要获取所有核上的互斥锁,恒定延迟,与处理器数量有关。
SNZI利用树进行了一定优化
RCU
弱化语义,可能会读到脏的数据(逻辑矛盾)读者无锁,写着读者可以同时进行
先写值再改指针
写者开销大,要处理旧的数据
垃圾回收(无抢占调度一圈)
Bounded staleness 短内存可见性
所谓短内存可见性,也就是在很短的时间周期内,由于每个核上面的单独cache非常的小,很大几率会被替换掉,从而能看到最新的数据。下面是具体的图表
PRWLock的设计思路
在短时间内各个处理器均可以看到最新的版本号利用TSO架构的优势,版本控制隐式表示退出CS区域
并不完全依赖于短时间可见,对于特殊情况,保证一致性,利用IPI要求进行Report IPI消息传递开销较小,且可以相互掩盖。
两种模式 支持调度(睡眠与抢占)
PRWLock 的两种模式
Passive Mode
用于处理没有经过调度的读者共享内存陈旧化
弱控制,通过版本控制隐式反馈+少数情况IPI
Active Mode (用于支持睡眠与调度类似BRLock)
用于处理被调度过的进程(睡眠/抢占)通过维护active检测数量
强控制,主动监听
主动等待
PRWLock流程视频:
优酷视频PRWLock的正确性
写者发现读者获取了最新的版本变量时,由于TSO的特性,也一定看到了写者上的写锁,确信其不会再次进入临界区。对于需要较长时间才能看到最新的版本号或没有读者期望获取读者锁提供了IPI来降低等待时间,避免无限等待。
PRWLock 内核中减少IPIs
锁域(Lock Domain)用于表示一个锁对应的处理器范围若上下文切换到了其他的进程,就没必要管这个核的锁了
锁域的上下线可以避免一些没有必要的一致性检测
注意利用内存屏障来保证一致性
PRWLock 用户态实现
由于在用户态有以下两个特点用户态不能随时关闭抢占(Preemption)
用户态不能发送核间中断(IPI)
所以
PRWLock在用户态实现的思路如下:
利用抢断标记位避免特殊窗口时被抢断
写者必须陷入内核态发送IPI
PRWLock 性能分析
读者
读者之间无内存屏障 (无关联)锁域上下线本来就是极少的操作,用来改善性能的,所以其中的内存屏障影响不大
对于长CS区的读者,与传统一样
写者
IPI只要几百个cycle 本身也要等待多个写者可以直接把锁传递
总结
利用了短内存写全局可见时间利用了TSO的特性设计的版本控制来隐式维护语义
利用IPI来保证特殊情况
利用两种模式支持调度
读者之间无关联(内存屏障),提升读者性能
PWAKE 分布式唤醒,提高了唤醒并行性
移植PRWLock
到JOS
JOS
的核间中断实现
关于Local APIC
在一个基于APIC的系统中,每一个核心都有一个
Local APIC,
Local APIC负责处理
CPU中特定的中断配置。还有其他事情,它包含了
Local Vector Table(LVT)负责配置事件中断。
此外,还有一个CPU外面的
IO APIC(例如
Intel82093AA)的芯片组,并且提供基于多处理器的中断管理,在多个处理器之间,实现静态或动态的中断触发路由。
Inter-Processor Interrupts(IPIs)是一种由
Local APIC触发的中断,一般可以用于多CPU间调度之类的使用
想要开启
Local APIC接收中断,则需要设置
Spurious Interrupt Vector Register的第8位即可。
使用
APIC Timer的最大好处是每个cpu内核都有一个定时器。相反
PIT(Programmable Interval Timer)就不这样,
PIT是共用的一个。
周期触发模式
周期触发模式中,程序设置一个”初始计数“寄存器(Initial Count),同时Local APIC会将这个数复制到”当前计数“寄存器(Current Count)。Local APIC会将这个数(当前计数)递减,直到减到0为止,这时候将触发一个IRQ(可以理解为触发一次中断),与此同时将当前计数恢复到初始计数寄存器的值,然后周而复始的执行上述逻辑。可以使用这种方法通过Local APIC实现定时按照一定时间间隔触发中断的功能。
一次性触发模式
同之前一样,但是不会恢复到初始计数。
TSC-Deadline Modie
cpu的时间戳到达
deadline的时候会触发
IRQ
来源Blog
每个本地
APIC都有 32 位的寄存器,一个内部时钟,一个本地定时设备以及为本地中断保留的两条额外的 IRQ 线 LINT0 和 LINT1。所有本地 APIC 都连接到 I/O APIC,形成一个多级 APIC 系统。
Intel x86架构提供LINT0和LINT1两个中断引脚,他们通常与Local APIC相连,用于接收Local APIC传递的中断信号,另外,当Local APIC被禁用的时候,LINT0和LINT1即被配置为INTR和NMI管脚,即外部IO中断管脚和非屏蔽中断管脚。
来源博客
The local APIC registers are memory mapped to an address that can be found in the MP/MADT tables. Make sure you map these to virtual memory if you are using paging. Each register is 32 bits long, and expects to written and read as a 32 bit integer. Although each register is 4 bytes, they are all aligned on a 16 byte boundary.
再一次详细查看
JOS中的核间中断的实现方式,
由于这段映射,设置了
nocache和直写的特性,便于对于
IO的操作。
void lapic_init(void) { if (!lapicaddr) return; // lapicaddr is the physical address of the LAPIC's 4K MMIO // region. Map it in to virtual memory so we can access it. lapic = mmio_map_region(lapicaddr, 4096); // Enable local APIC; set spurious interrupt vector. lapicw(SVR, ENABLE | (IRQ_OFFSET + IRQ_SPURIOUS)); // The timer repeatedly counts down at bus frequency // from lapic[TICR] and then issues an interrupt. // If we cared more about precise timekeeping, // TICR would be calibrated using an external time source. lapicw(TDCR, X1); lapicw(TIMER, PERIODIC | (IRQ_OFFSET + IRQ_TIMER)); lapicw(TICR, 10000000); // Leave LINT0 of the BSP enabled so that it can get // interrupts from the 8259A chip. // // According to Intel MP Specification, the BIOS should initialize // BSP's local APIC in Virtual Wire Mode, in which 8259A's // INTR is virtually connected to BSP's LINTIN0. In this mode, // we do not need to program the IOAPIC. if (thiscpu != bootcpu) lapicw(LINT0, MASKED); // Disable NMI (LINT1) on all CPUs lapicw(LINT1, MASKED); // Disable performance counter overflow interrupts // on machines that provide that interrupt entry. if (((lapic[VER] >> 16) & 0xFF) >= 4) lapicw(PCINT, MASKED); // Map error interrupt to IRQ_ERROR. lapicw(ERROR, IRQ_OFFSET + IRQ_ERROR); // Clear error status register (requires back-to-back writes). lapicw(ESR, 0); lapicw(ESR, 0); // Ack any outstanding interrupts. lapicw(EOI, 0); // Send an Init Level De-Assert to synchronize arbitration ID's. lapicw(ICRHI, 0); lapicw(ICRLO, BCAST | INIT | LEVEL); while (lapic[ICRLO] & DELIVS) ; // Enable interrupts on the APIC (but not on the processor). lapicw(TPR, 0); }
lapic_init将
LAPIC映射到
lapicaddr地址上,并且初始化
LAPIC各种中断参数。
// Local APIC registers, divided by 4 for use as uint32_t[] indices. #define ID (0x0020 / 4) // ID
这里的宏定义为
/4是因为
MMIO映射到
MMIOaddr,保存在
volatile uint32_t *lapic;中。这个单位是
uint32_t,故所有的地址均
/4
下面来看一下主要的
APIC Registers
EOI Register
Write to the register with offset 0xB0 using the value 0 to signal an end of interrupt. A non-zero values causes a general protection fault.
#define EOI (0x00B0 / 4) // EOI // Acknowledge interrupt. void lapic_eoi(void) { if (lapic) lapicw(EOI, 0); }
Local Vector Table Registers
There are some special interrupts that the processor and LAPIC can generate themselves. While external interrupts are configured in the I/O APIC, these interrupts must be configured using registers in the LAPIC. The most interesting registers are:
0x320 = lapic timer
0x350 = lint0
0x360 = lint1
JOS在这里只保留了
BSP的
LINT0用于接受
8259A的中断,其他的
LINT0与
LINT1非屏蔽中断,均设置为
MASKED
// Leave LINT0 of the BSP enabled so that it can get // interrupts from the 8259A chip. // // According to Intel MP Specification, the BIOS should initialize // BSP's local APIC in Virtual Wire Mode, in which 8259A's // INTR is virtually connected to BSP's LINTIN0. In this mode, // we do not need to program the IOAPIC. if (thiscpu != bootcpu) lapicw(LINT0, MASKED); // Disable NMI (LINT1) on all CPUs lapicw(LINT1, MASKED);
Spurious Interrupt Vector Register
The offset is 0xF0. The low byte contains the number of the spurious interrupt. As noted above, you should probably set this to 0xFF. To enable the APIC, set bit 8 (or 0x100) of this register. If bit 12 is set then EOI messages will not be broadcast. All the other bits are currently reserved.
// Enable local APIC; set spurious interrupt vector. lapicw(SVR, ENABLE | (IRQ_OFFSET + IRQ_SPURIOUS));
Interrupt Command Register
The interrupt command register is made of two 32-bit registers; one at 0x300 and the other at 0x310.
#define ICRHI (0x0310 / 4) // Interrupt Command [63:32] #define ICRLO (0x0300 / 4) // Interrupt Command [31:0]
It is used for sending interrupts to different processors.
The interrupt is issued when 0x300 is written to, but not when 0x310 is written to. Thus, to send an interrupt command one should first write to 0x310, then to 0x300.
需要先写
ICRHI,然后在写
ICRLO的时候就会产生中断。
At 0x310 there is one field at bits 24-27, which is local APIC ID of the target processor (for a physical destination mode).
lapicw(ICRHI, apicid << 24);
给
ICRHI中断目标核心的
local APIC ID。这里的
apicid是在
MP Floating Pointer Structure读的时候顺序给的
cpu_id。
ICRLO的分布比较重要
其中目标模式有(
8-10)
#define INIT 0x00000500 // INIT/RESET #define STARTUP 0x00000600 // Startup IPI
其中发送模式有(
18~19)
#define SELF 0x00040000 // Send to self #define BCAST 0x00080000 // Send to all APICs, including self. #define OTHERS 0x000C0000 // Send to all APICs, excluding self.
不设置的话则为发送给
0x310 ICRHI制定的核心。
综上,打包了一个
IPI发送的接口,
void lapic_ipi(int vector) { lapicw(ICRLO, OTHERS | FIXED | vector); while (lapic[ICRLO] & DELIVS) ; }
用于发送
IPI与
IPI ACK均是利用MMIO直接对相应地址书写,比较简单。
这里测试一下,先设置
trap中的
IPI中断
#define T_PRWIPI 20 // IPI report for PRWLock void prw_ipi_report(struct Trapframe *tf) { cprintf("%d in ipi report\n",cpunum()); }
在
trap_dispatch中加入对这个中断的分发
case T_PRWIPI: prw_ipi_report(tf); break;
最后在
init的时候用
bsp发送
IPI给所有其他核心
lapic_ipi(T_PRWIPI);
设置
QEMU模拟4个核心来测试
IPI是否正确
1 in ipi report 3 in ipi report 2 in ipi report
非
BSP可以正确的接受
IPI并进入中断处理历程。
JOS
实现传统内核态读写锁
typedef struct dumbrwlock { struct spinlock lock; atomic_t readers; }dumbrwlock; void rw_initlock(dumbrwlock *rwlk) { spin_initlock(&rwlk->lock); rwlk->readers.counter = 0; } void dumb_wrlock(dumbrwlock *rwlk) { spin_lock(&rwlk->lock); while (rwlk->readers.counter > 0) asm volatile("pause"); } void dumb_wrunlock(dumbrwlock *rwlk) { spin_unlock(&rwlk->lock); } void dumb_rdlock(dumbrwlock *rwlk) { while (1) { atomic_inc(&rwlk->readers); if (!rwlk->lock.locked) return; atomic_dec(&rwlk->readers); while (rwlk->lock.locked) asm volatile("pause"); } } void dumb_rdunlock(dumbrwlock *rwlk) { atomic_dec(&rwlk->readers); }
然后发现一个比较大的问题,
JOS没有实现原子操作,先实现原子操作再进行下面的尝试。
JOS
实现原子操作
仿造linux 2.6内核,实现原子操作
#ifndef JOS_INC_ATOMIC_H_ #define JOS_INC_ATOMIC_H_ /* * Atomic operations that C can't guarantee us. Useful for * resource counting etc.. */ #include <inc/types.h> #define LOCK "lock ; " /* * Make sure gcc doesn't try to be clever and move things around * on us. We need to use _exactly_ the address the user gave us, * not some alias that contains the same information. */ typedef struct { volatile int counter; } atomic_t; #define ATOMIC_INIT(i) \ { \ (i) \ } /** * atomic_read - read atomic variable * @v: pointer of type atomic_t * * Atomically reads the value of @v. */ #define atomic_read(v) ((v)->counter) /** * atomic_set - set atomic variable * @v: pointer of type atomic_t * @i: required value * * Atomically sets the value of @v to @i. */ #define atomic_set(v, i) (((v)->counter) = (i)) /** * atomic_add - add integer to atomic variable * @i: integer value to add * @v: pointer of type atomic_t * * Atomically adds @i to @v. */ static __inline__ void atomic_add(int i, atomic_t *v) { __asm__ __volatile__( LOCK "addl %1,%0" : "=m"(v->counter) : "ir"(i), "m"(v->counter)); } /** * atomic_sub - subtract the atomic variable * @i: integer value to subtract * @v: pointer of type atomic_t * * Atomically subtracts @i from @v. */ static __inline__ void atomic_sub(int i, atomic_t *v) { __asm__ __volatile__( LOCK "subl %1,%0" : "=m"(v->counter) : "ir"(i), "m"(v->counter)); } /** * atomic_sub_and_test - subtract value from variable and test result * @i: integer value to subtract * @v: pointer of type atomic_t * * Atomically subtracts @i from @v and returns * true if the result is zero, or false for all * other cases. */ static __inline__ int atomic_sub_and_test(int i, atomic_t *v) { unsigned char c; __asm__ __volatile__( LOCK "subl %2,%0; sete %1" : "=m"(v->counter), "=qm"(c) : "ir"(i), "m"(v->counter) : "memory"); return c; } /** * atomic_inc - increment atomic variable * @v: pointer of type atomic_t * * Atomically increments @v by 1. */ static __inline__ void atomic_inc(atomic_t *v) { __asm__ __volatile__( LOCK "incl %0" : "=m"(v->counter) : "m"(v->counter)); } /** * atomic_dec - decrement atomic variable * @v: pointer of type atomic_t * * Atomically decrements @v by 1. */ static __inline__ void atomic_dec(atomic_t *v) { __asm__ __volatile__( LOCK "decl %0" : "=m"(v->counter) : "m"(v->counter)); } /** * atomic_dec_and_test - decrement and test * @v: pointer of type atomic_t * * Atomically decrements @v by 1 and * returns true if the result is 0, or false for all other * cases. */ static __inline__ int atomic_dec_and_test(atomic_t *v) { unsigned char c; __asm__ __volatile__( LOCK "decl %0; sete %1" : "=m"(v->counter), "=qm"(c) : "m"(v->counter) : "memory"); return c != 0; } /** * atomic_inc_and_test - increment and test * @v: pointer of type atomic_t * * Atomically increments @v by 1 * and returns true if the result is zero, or false for all * other cases. */ static __inline__ int atomic_inc_and_test(atomic_t *v) { unsigned char c; __asm__ __volatile__( LOCK "incl %0; sete %1" : "=m"(v->counter), "=qm"(c) : "m"(v->counter) : "memory"); return c != 0; } /** * atomic_add_negative - add and test if negative * @v: pointer of type atomic_t * @i: integer value to add * * Atomically adds @i to @v and returns true * if the result is negative, or false when * result is greater than or equal to zero. */ static __inline__ int atomic_add_negative(int i, atomic_t *v) { unsigned char c; __asm__ __volatile__( LOCK "addl %2,%0; sets %1" : "=m"(v->counter), "=qm"(c) : "ir"(i), "m"(v->counter) : "memory"); return c; } /** * atomic_add_return - add and return * @v: pointer of type atomic_t * @i: integer value to add * * Atomically adds @i to @v and returns @i + @v */ static __inline__ int atomic_add_return(int i, atomic_t *v) { int __i; /* Modern 486+ processor */ __i = i; __asm__ __volatile__( LOCK "xaddl %0, %1;" : "=r"(i) : "m"(v->counter), "0"(i)); return i + __i; } static __inline__ int atomic_sub_return(int i, atomic_t *v) { return atomic_add_return(-i, v); } #define atomic_inc_return(v) (atomic_add_return(1, v)) #define atomic_dec_return(v) (atomic_sub_return(1, v)) /* These are x86-specific, used by some header files */ #define atomic_clear_mask(mask, addr) \ __asm__ __volatile__(LOCK "andl %0,%1" \ : \ : "r"(~(mask)), "m"(*addr) \ : "memory") #define atomic_set_mask(mask, addr) \ __asm__ __volatile__(LOCK "orl %0,%1" \ : \ : "r"(mask), "m"(*(addr)) \ : "memory") #endif
然后在内核中对读写锁的功能进行测试。
遇到两个问题
一个是
asm volatile("pause");容易死在那个循环里面,不会重新换到这个
CPU中,在
DEBUG的时候发现在前后加上
cprintf其就会顺利换回来。
while (rwlk->lock.locked) { cprintf(""); asm volatile("pause"); }
另一个是设计内核中的测试
多核上的输出可能会并行化,要减短输出内容。
在用户空间的锁分享目前不好做,linux是基于文件的。
故设计了两个锁来进行测试
一个是
CPU 0的
writer锁,一个是
reader锁。
// test reader-writer lock rw_initlock(&lock1); rw_initlock(&lock2); dumb_wrlock(&lock1); cprintf("[rw] CPU %d gain writer lock1\n", cpunum()); dumb_rdlock(&lock2); cprintf("[rw] CPU %d gain reader lock2\n", cpunum()); // Starting non-boot CPUs boot_aps(); cprintf("[rw] CPU %d going to release writer lock1\n", cpunum()); dumb_wrunlock(&lock1); cprintf("[rw] CPU %d going to release reader lock2\n", cpunum()); dumb_rdunlock(&lock2);
对于每个核上,分别获取
lock1的读着锁与
lock2的写者锁。添加
asm volatile("pause");是想让其他核模拟上线来检测各种情况。
dumb_rdlock(&lock1); cprintf("[rw] %d l1\n", cpunum()); asm volatile("pause"); dumb_rdunlock(&lock1); cprintf("[rw] %d unl1\n", cpunum()); dumb_wrlock(&lock2); cprintf("[rw] %d l2\n", cpunum()); asm volatile("pause"); cprintf("[rw] %d unl2\n", cpunum()); dumb_wrunlock(&lock2);
在给
QEMU四核参数
CPUS=4的时候下的运行情况如下:
[rw] CPU 0 gain writer lock1 [rw] CPU 0 gain reader lock2 [MP] CPU 1 starting [MP] CPU 2 starting [MP] CPU 3 starting [rw] CPU 0 going to release writer lock1 [rw] CPU 0 going to release reader lock2 [rw] 1 l1 [rw] 2 l1 [rw] 3 l1 [rw] 2 unl1 [rw] 2 l2 [rw] 3 unl1 [rw] 1 unl1 [rw] 2 unl2 [MP] CPU 2 sched [rw] 3 l2 [rw] 3 unl2 [rw] 1 l2 [MP] CPU 3 sched [rw] 1 unl2 [MP] CPU 1 sched
可以观察到一旦
CPU0释放了
lock1的写者锁,所有的核均可以获得
lock1的读者锁。而后
CPU2获得了
lock2的写者锁后,其他核上线,
CPU3与
CPU1只是释放了
lock1,无法获得
lock2,只有等
CPU2释放了
lock2才能获取。
这与期望的读写锁的功能是一致的。至此普通读写锁的实现完成。
JOS
实现PRWLock
首先有几个重点:PRWLock数据结构设计
锁的具体实现
调度时调用内容
PRWLock的测试
PRWLock
的数据结构
enum lock_status { FREE = 0, LOCKED, PASS, PASSIVE }; struct percpu_prwlock { enum lock_status reader; atomic_t version; }; typedef struct prwlock { enum lock_status writer; struct percpu_prwlock lockinfo[NCPU]; atomic_t active; atomic_t version; } prwlock;
对于一个
prwlock,除了其主要的版本以及
ACTIVE的读者数量,还需要保存每个核心持有该锁的版本号,以及每个核上该锁的读者状态。这里直接通过
lockinfo数组索引每个核对应的该锁信息。
而全局内核所拥有的读写锁通过
locklist进行索引,在
init的时候加入到这个
list中去。
extern unsigned int prwlocknum; extern prwlock *locklist[MAXPRWLock];
锁的具体操作
初始化操作的时候需要设置各种初值,并将其添加到list中
void prw_initlock(prwlock *rwlk) { int i = 0; rwlk->writer = FREE; for (i = 0; i < NCPU; i++) { rwlk->lockinfo[i].reader = FREE; atomic_set(&rwlk->lockinfo[i].version, 0); } atomic_set(&rwlk->active, 0); atomic_set(&rwlk->version, 0); locklist[prwlocknum++] = rwlk; }
剩下的与论文中伪代码的实现思路相同,只是具体调用的函数有一些差别。
读者锁中包括向核心发送
ipi。这里只是示意,就没有写
PASS的具体部分,可以通过添加一个等待标志变量来实现。
void prw_wrlock(prwlock *rwlk) { int newVersion; int id = 0; unsigned int corewait = 0; if (rwlk->writer == PASS) return; rwlk->writer = LOCKED; newVersion = atomic_inc_return(&rwlk->version); for (id = 0; id < ncpu; id++) { #ifdef TESTPRW cprintf("CPU %d Ver %d\n", id, atomic_read(&rwlk->lockinfo[id].version)); #endif if (id != cpunum() && atomic_read(&rwlk->lockinfo[id].version) != newVersion) { lapic_ipi_dest(id, PRWIPI); corewait |= binlist[id]; #ifdef TESTPRW cprintf("send ipi %d\n", id); #endif } } for (id = 0; id < ncpu; id++) { if (corewait & binlist[id]) { while (atomic_read(&rwlk->lockinfo[id].version) != newVersion) asm volatile("pause"); } } while (atomic_read(&rwlk->active) != 0) { lock_kernel(); sched_yield(); } } void prw_wrunlock(prwlock *rwlk) { // if someone waiting to gain write lock rwlk->writer should be PASS rwlk->writer = FREE; } void prw_rdlock(prwlock *rwlk) { struct percpu_prwlock *st; int lockversion; st = &rwlk->lockinfo[cpunum()]; st->reader = PASSIVE; while (rwlk->writer != FREE) { st->reader = FREE; lockversion = atomic_read(&rwlk->version); atomic_set(&st->version, lockversion); while (rwlk->writer != FREE) asm volatile("pause"); st = &rwlk->lockinfo[cpunum()]; st->reader = PASSIVE; } } void prw_rdunlock(prwlock *rwlk) { struct percpu_prwlock *st; int lockversion; st = &rwlk->lockinfo[cpunum()]; if (st->reader == PASSIVE) st->reader = FREE; else atomic_dec(&rwlk->active); lockversion = atomic_read(&rwlk->version); atomic_set(&st->version, lockversion); }
每个核心接到
PRWIPI的处理函数
void prw_ipi_report(struct Trapframe *tf) { int lockversion, i; struct percpu_prwlock *st; cprintf("In IPI_report CPU %d\n", cpunum()); for (i = 0; i < prwlocknum; i++) { st = &locklist[i]->lockinfo[cpunum()]; if (st->reader != PASSIVE) { lockversion = atomic_read(&locklist[i]->version); atomic_set(&st->version, lockversion); } } }
调度时调用内容
调度时需要将所有的锁均进行处理,所以要遍历locklist
// Implement PRWLock if (prwlocknum != 0) for (j = 0; j < prwlocknum; j++) prw_sched(locklist[j]);
具体的
prw_sched如下:
void prw_sched(prwlock *rwlk) { struct percpu_prwlock *st; int lockversion; st = &rwlk->lockinfo[cpunum()]; if (st->reader == PASSIVE) { atomic_inc(&rwlk->active); st->reader = FREE; } lockversion = atomic_read(&rwlk->version); atomic_set(&st->version, lockversion); }
PRWLock
的测试
测试PRWLock也比较复杂,由于我们使用的是
big kernel lock,所以内核态里面不好测试,直接在初始化开始
RR之前测试。这里引入一个新的
IPI进行测试。
void prw_debug(struct Trapframe *tf) { int needlock = 0; cprintf("====CPU %d in prw debug====\n",cpunum()); if(kernel_lock.cpu == thiscpu && kernel_lock.locked == 1) { unlock_kernel(); needlock = 1; } prw_wrlock(&lock1); cprintf("====%d gain lock1====\n",cpunum()); prw_wrunlock(&lock1); cprintf("====%d release lock1====\n",cpunum()); if(needlock) lock_kernel(); }
给一个核心发送
DEBUGPRW中断,即让其获取
lock1的写者锁。
#ifdef TESTPRW unlock_kernel(); prw_initlock(&lock1); prw_wrlock(&lock1); prw_wrunlock(&lock1); prw_rdlock(&lock1); cprintf("====%d Gain Reader Lock====\n", cpunum()); lapic_ipi_dest(3, DEBUGPRW); for (int i = 0; i < 10000; i++) asm volatile("pause"); prw_rdunlock(&lock1); cprintf("====%d release Reader Lock====\n", cpunum()); lock_kernel(); #endif
这里先用
unlock_kernel,避免其他核心无法接收中断,最后再
lock_kernel,才能开始
sched。
测试选择
6个核心
SMP: CPU 0 found 6 CPU(s) enabled interrupts: 1 2 4 [MP] CPU 1 starting [MP] CPU 2 starting [MP] CPU 3 starting [MP] CPU 4 starting [MP] CPU 5 starting [MP] CPU 1 sched [MP] CPU 2 sched [MP] CPU 3 sched [MP] CPU 4 sched [MP] CPU 5 sched CPU 0 Ver 0 CPU 1 Ver 0 send ipi 1 CPU 2 Ver 0 send ipi 2 CPU 3 Ver 0 send ipi 3 CPU 4 Ver 0 send ipi 4 CPU 5 Ver 0 ====0 Gain Reader Lock==== In IPI_report CPU 1 In IPI_report CPU 2 In IPI_report CPU 4 FS is running ====CPU 3 in prw debug==== FS can do I/O CPU 0 Ver 0 Dsend ipi 0 evice 1 presence: 1 CPU 1 Ver 1 send ipi 1 CPU 2 Ver 2 CPU 3 Ver 1 CPU 4 Ver 1 send ipi 4 CPU 5 Ver 1 send ipi 5 In IPI_report CPU 5 $ block cache is good superblock is good bitmap is good alloc_block is good file_open is good file_get_block is good file_flush is good file_truncate is good file rewrite is good ====0 release Reader Lock==== Init finish! Sched start... ====3 gain lock1==== ====3 release lock1====
当
CPU0释放了读着锁之后,
CPU3才能够获取
lock1,测试正确
相关文章推荐
- 操作系统学习笔记(20)--开中断、关中断及原子操作
- 发布在《30天自制操作系统》之前的操作捷径必读贴
- (30天自制操作系统)中断13h 在vmware和virtualbox中发生故障的问题
- 学习笔记——操作系统_Linux原子操作
- [读书笔记]30 天自制操作系统 day6 分割编译与中断处理
- 「30天自制操作系统」 Stop & 「OS67 」 Start
- java并发:线程同步机制之Volatile关键字&原子操作Atomic
- 自制操作系统(七) 加快中断处理,和加入FIFO缓冲区
- 中断屏蔽 原子操作 信号量
- 《30天自制操作系统》读书笔记(5) GDT&IDT
- Windows并发&异步编程(2)原子操作Interlocked
- linux驱动中的互斥途径一二:中断屏蔽和原子操作
- [Linux]互斥机制(中断屏蔽、原子操作、自旋锁、信号量)
- 同步代码快效率>同步函数,原子操作效率>非原子操作
- 黑马程序员_线程(3) 线程定时器&ThreadLoad&原子性操作
- Driver:内核的竞态和并发:中断屏蔽、原子操作、自旋锁、信号量
- 30天自制操作系统------窗口操作
- 2、CC2541芯片中级教程-OSAL操作系统(进一步了解-OLED && 普通按键和5方向按键-中断!!!)这个系统驱动层和应用层不一样~
- 操作系统概念-内核同步-原子操作