预留位置队列PRQueue:多线程程序中消息输入队列和消息输出队列保持同序
2012-03-12 13:57
267 查看
译自: http://accu.org/var/uploads/journals/overload101.pdf
在多线程应用程序中,要求消息输入队列和消息输出队列顺序要求保持一致,而忽略多线程并发处理的顺序,这种情况是比较难处理的。在本文中,作者设计了一种新型解决方案:PRQueue(预留位置队列),较很好的解决这个问题。
PRQueue是使用c++的两个STL的deque还有pthread线程库实现的,并且在例子中使用了两个简单的类-Mutex和Lock来展示这个逻辑。StringMsg类表现一个样本消息,QueueTest类用来测试。
我选择STL的deque是因为deque拥有很多必要的操作(包括operator[])来实现PRQueue。特别的,有一点很重要的是push_back和pop_front()操作对于deque的元素的指针或引用来说都是有效地。
这里有一个简单的例子来展示PRQueue的作用。首先,我们需要记录大量的多域的消息流。转化文本字符串的数字域是一个慢的且并不关键的过程,因此我们决定将这份任务分派给能够生成日志的线程来做。处理的流程如下图所示:
因为消息的核心处理过程是发生在多线程中,消息的就绪顺序也许跟原始的输入队列不同。例如:如果一个线程从输入队列中拿走了一个报文消息后进入休眠状态,另外一个线程取出下一个报文消息,在第一个线程之前就运行完成处理这个报文消息,并把这个报文放入输出队列中。因此,这输出日志也许会无序了(我们假设消息被处理完之后才日志)。
使用PRQueue 则以上这种场景就能避免,它将会确保在输出队列中报文的顺序和输入队列中保持一致,而不管线程处理报文的顺序如何。PRQueue的基本逻辑比较简单,当下一个报文从输入队列中取出的时候,仍然放入锁中,下一个push_back的位置。然后释放锁,并且继续处理。在这个消息报文完全处理完之后,先前请求的位置用来把这个消息放入输出队列。
PRQueue 使用两个队列构造:'data’ 和'filled’。
'filled’队列的一个元素使用数据填充并且能够从PRQueue弹出。一个封装类DataQueue是'data’和'filled’队列的装载器。 这种设计允许我们在确切的实现过程中分离线程安全代码,因此用户不需要关心任何锁/解锁的逻辑。
下面让我们更详细的讨论PRQUEUE。
PRQueue方法主要做两件事情:它从输入队列中弹出数据,并且在输出队列中保留一个位置。PUSH方法使用之前保存的位置在输出队列中保存数据。
为了使用多线程测试PRQUEUE,PROCESS_MSG执行。它从输入队列中取出一个StringMsg,通过调用StringMsg::process()方法来处理这个消息并且push这个报文。
POP方法不仅等待输入队列中下一个报文的到来,也通过查看’filled’队列中元素来检查这个报文是否准备弹出了。如果数据还没有填充,pop将继续阻塞等待
POP方法的处理逻辑:
1. 锁定输入队列
2. 如果输入队列非空并且顶层元素填充了数据,则pop它(否则释放掉锁并继续睡眠)
3. 锁住输出队列
4. 保留输出队列底部位置
5. 解锁输出队列
6. 解锁输入队列
代码段如下:
PUSH方法拷贝数据到输出队列的保留位置并且设置'filled’指示为真。它也通过发送一个通知信号来释放掉等待一个条件变量的线程。
代码段如下:
现在,这个消息报文按序的到达了输出队列,如果我们想要更深的扩展我们的处理链的话,可以在后面再加上一个PRQueue。在以上的测试用例中我们不会这样做:我们使用一个单一的线程简单的从输出队列中读取处理完的报文并将它们打印出来。在最后的一步,只简单的使用了pop方法(未使用第二、第三个参数:指向输出队列和保留的位置的值)。
总结
在多线程应用程序中,当处理的消息流顺序需要保证的时候,本文所说的预留位置队列将会是有用的。PRQueue将会确保输出队列中报文顺序同输入队列保持一致,因为在输出队列中下一个push_back的位置在输入队列取出报文的时候就同步的保留了。当报文消息处理完成之后,所保留的位置随后将会被数据填充。
注:完整代码于此处下载:http://accu.org/content/journals/ol101/prqueue.zip(译此文时,时间较仓促,因此译文很粗糙,待时间较宽松时再细细校验)。
在多线程应用程序中,要求消息输入队列和消息输出队列顺序要求保持一致,而忽略多线程并发处理的顺序,这种情况是比较难处理的。在本文中,作者设计了一种新型解决方案:PRQueue(预留位置队列),较很好的解决这个问题。
PRQueue是使用c++的两个STL的deque还有pthread线程库实现的,并且在例子中使用了两个简单的类-Mutex和Lock来展示这个逻辑。StringMsg类表现一个样本消息,QueueTest类用来测试。
我选择STL的deque是因为deque拥有很多必要的操作(包括operator[])来实现PRQueue。特别的,有一点很重要的是push_back和pop_front()操作对于deque的元素的指针或引用来说都是有效地。
这里有一个简单的例子来展示PRQueue的作用。首先,我们需要记录大量的多域的消息流。转化文本字符串的数字域是一个慢的且并不关键的过程,因此我们决定将这份任务分派给能够生成日志的线程来做。处理的流程如下图所示:
因为消息的核心处理过程是发生在多线程中,消息的就绪顺序也许跟原始的输入队列不同。例如:如果一个线程从输入队列中拿走了一个报文消息后进入休眠状态,另外一个线程取出下一个报文消息,在第一个线程之前就运行完成处理这个报文消息,并把这个报文放入输出队列中。因此,这输出日志也许会无序了(我们假设消息被处理完之后才日志)。
使用PRQueue 则以上这种场景就能避免,它将会确保在输出队列中报文的顺序和输入队列中保持一致,而不管线程处理报文的顺序如何。PRQueue的基本逻辑比较简单,当下一个报文从输入队列中取出的时候,仍然放入锁中,下一个push_back的位置。然后释放锁,并且继续处理。在这个消息报文完全处理完之后,先前请求的位置用来把这个消息放入输出队列。
PRQueue 使用两个队列构造:'data’ 和'filled’。
'filled’队列的一个元素使用数据填充并且能够从PRQueue弹出。一个封装类DataQueue是'data’和'filled’队列的装载器。 这种设计允许我们在确切的实现过程中分离线程安全代码,因此用户不需要关心任何锁/解锁的逻辑。
下面让我们更详细的讨论PRQUEUE。
PRQueue方法主要做两件事情:它从输入队列中弹出数据,并且在输出队列中保留一个位置。PUSH方法使用之前保存的位置在输出队列中保存数据。
为了使用多线程测试PRQUEUE,PROCESS_MSG执行。它从输入队列中取出一个StringMsg,通过调用StringMsg::process()方法来处理这个消息并且push这个报文。
//------------ static void* process_msg( void* arg) { int thidx = ++Thidx; QueueTest* quetest =(QueueTest*)arg; Msg* msg; PRQueue< Msg*>::position pos; cout << "Input thread=" << thidx << " started" << endl; for( ;;) { // Wait for next message appeared in input queue, // pop it up and get push position allocated in output queue quetest->input_que.pop( msg, quetest->output_que, pos); // Process message msg->process( thidx); // Push processed message into output queue using acquired position quetest->output_que.push( msg, pos); } return NULL; }//
POP方法不仅等待输入队列中下一个报文的到来,也通过查看’filled’队列中元素来检查这个报文是否准备弹出了。如果数据还没有填充,pop将继续阻塞等待
POP方法的处理逻辑:
1. 锁定输入队列
2. 如果输入队列非空并且顶层元素填充了数据,则pop它(否则释放掉锁并继续睡眠)
3. 锁住输出队列
4. 保留输出队列底部位置
5. 解锁输出队列
6. 解锁输入队列
代码段如下:
// Pop data from input queue and reserve position in the output queue void pop( DT& data, PRQueue& outque, position& pos) { Lock lk( m_mux); while( true) { if( m_que.pop( data)) break; wait_while_empty(); } outque.reserve_pos( pos); }
PUSH方法拷贝数据到输出队列的保留位置并且设置'filled’指示为真。它也通过发送一个通知信号来释放掉等待一个条件变量的线程。
代码段如下:
// Simple push void push( const DT& data) { Lock lk( m_mux); m_que.push( data); notify_not_empty(); }
现在,这个消息报文按序的到达了输出队列,如果我们想要更深的扩展我们的处理链的话,可以在后面再加上一个PRQueue。在以上的测试用例中我们不会这样做:我们使用一个单一的线程简单的从输出队列中读取处理完的报文并将它们打印出来。在最后的一步,只简单的使用了pop方法(未使用第二、第三个参数:指向输出队列和保留的位置的值)。
//------------ static void* print_msg( void* arg) { QueueTest* quetest =(QueueTest*)arg; Msg* msg; cout << "Output thread started" << endl; for( ;;) { quetest->output_que.pop( msg); msg->print(); delete msg; } return NULL; }//
总结
在多线程应用程序中,当处理的消息流顺序需要保证的时候,本文所说的预留位置队列将会是有用的。PRQueue将会确保输出队列中报文顺序同输入队列保持一致,因为在输出队列中下一个push_back的位置在输入队列取出报文的时候就同步的保留了。当报文消息处理完成之后,所保留的位置随后将会被数据填充。
注:完整代码于此处下载:http://accu.org/content/journals/ol101/prqueue.zip(译此文时,时间较仓促,因此译文很粗糙,待时间较宽松时再细细校验)。
相关文章推荐
- 提问:如何控制控制台程序的输入/输出焦点的位置
- 编写一段程序,从标准输入读取string对象的序列直到连续出现两个相同的单词或者所有单词都读完为止。使用while循环一次读取一个单词,当一个单词连续出现两次是使用break语句终止循环。输出连续重复出现的单词,或者输出一个消息说明没有人任何单词是重复出现的。
- 多线程下面日志输出-线程安全-消息队列循环输出
- Joseph环程序,接受用户输入的Joseph长度Length,开始计数的位置Ini,以及读到第Count个节点时,将该节点从Joseph环中删除,最后输出出环序列。
- 练习1-22 编写一个程序,把较长的输入行折成短一些的两行或者多行,折行的位置在输入行的第N列之前的最后一个非空格之后。要保持程序能够智能地处理输入行很长以及在制定的列前没有空格或者制表符时的情况。
- 字符串数组在输入的时候应该给\0预留一个位置,否则将在c++中无法输出,并且编译的时候回报错!!!
- MSMQ消息通知系统之消息队列多线程监视器
- 一个关于消息队列通信的小程序
- 编写一个程序,当输入小写字母a,输出大写字母Z,当输入小写字母b,输出大写字母Y,以此类推,当输入小写字母z,输出大写字母A。
- 【任意输入一串整数输出该数的位数】新手每天学写C程序(1)
- IPC之消息队列·即时通讯小程序(一) 推荐
- 编写一个程序,从标准输入读取几行输入并把他们打印在标准输出上,它同时应该计算checksum值,并写在字符后面
- 【C语言】编写一个程序从标准输入读取字符,并把他们写到标准输出。除了大写字母转换成小写字母之外,其他的原样输出。
- 输入两个整数n和m, 从数列1,2,...,n中任意选择几个数,使其和等于m, 要求编写程序输出所有的组合
- 今天开始学Java 给定一个正整数,编写程序计算有多少对质数的和等于输入的这个正整数,并输出结果。
- 编写一个将输入复制到输出的程序,并将其中连续的多个空格用一个空格代替
- 基于Linux的消息队列及多线程编程实现的聊天室(一)
- /*编写程序,其中自定义一函数,用来判断一个整数是否为素数,主函数输入一个数,输出是否为素数*/
- 用指针方法编写一个程序,输入3个整数,将它们按由小到大的顺序输出
- 华为机试样题解析:已知2条地铁线路,其中A为环线,B为东西向线路,线路都是双向的。经过的站点名分别如下,两条线交叉的换乘点用T1、T2表示。 编写程序,任意输入两个站点名称,输出最少需要经过的站点数