您的位置:首页 > 其它

ZeroMQ指南-第1章-基础-分而治之

2013-02-17 23:49 246 查看

分而治之

作为最终示例(你肯定对生动的代码开始生厌并希望回头去钻研关于比较性、抽象性准则的语言学探讨),让我们来做一个小型超级计算。然后喝个咖啡。我们的超级计算程序是个非常典型的并行处理模型。我们有:

一个通风机(ventilator)来产生可以并行处理的任务
一组工人(worker)来处理任务
一个水槽(sink)来回收工人处理的结果

事实上,工人运行于超快的机子,没准是GPU(图形处理单元)来做困难运算。这是通风机代码,生成100个任务,每个任务都是一条消息告诉工人休眠(sleep)几毫秒。

taskvent: Parallel task ventilator in C

//
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
//
#include "zhelpers.h"

int main (void)
{
void *context = zmq_ctx_new ();

// Socket to send messages on
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");

// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");

printf ("Press Enter when the workers are ready: ");
getchar ();
printf ("Sending tasks to workers…\n");

// The first message is "0" and signals start of batch
s_send (sink, "0");

// Initialize random number generator
srandom ((unsigned) time (NULL));

// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = randof (100) + 1;
total_msec += workload;
char string [10];
sprintf (string, "%d", workload);
s_send (sender, string);
}
printf ("Total expected cost: %d msec\n", total_msec);
sleep (1); // Give 0MQ time to deliver

zmq_close (sink);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}

图 5 - 并行管道



这是工人程序。接收消息,休眠指定的时间,然后表明自己完成任务:

taskwork: Parallel task worker in C

//
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
//
#include "zhelpers.h"

int main (void)
{
void *context = zmq_ctx_new ();

// Socket to receive messages on
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect (receiver, "tcp://localhost:5557");

// Socket to send messages to
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sender, "tcp://localhost:5558");

// Process tasks forever
while (1) {
char *string = s_recv (receiver);
// Simple progress indicator for the viewer
fflush (stdout);
printf ("%s.", string);

// Do the work
s_sleep (atoi (string));
free (string);

// Send results to sink
s_send (sender, "");
}
zmq_close (receiver);
zmq_close (sender);
zmq_ctx_destroy (context);
return 0;
}

这是水槽程序。它收集这100个任务,然后计算整个处理消耗的时间,让我们能够证实如果有多个工人时他们真的是并行运转的:

tasksink: Parallel task sink in C

//
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
//
#include "zhelpers.h"

int main (void)
{
// Prepare our context and socket
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_bind (receiver, "tcp://*:5558");

// Wait for start of batch
char *string = s_recv (receiver);
free (string);

// Start our clock now
int64_t start_time = s_clock ();

// Process 100 confirmations
int task_nbr;
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
char *string = s_recv (receiver);
free (string);
if ((task_nbr / 10) * 10 == task_nbr)
printf (":");
else
printf (".");
fflush (stdout);
}
// Calculate and report duration of batch
printf ("Total elapsed time: %d msec\n",
(int) (s_clock () - start_time));

zmq_close (receiver);
zmq_ctx_destroy (context);
return 0;
}

批处理的平均消耗为5秒。当我们启动1个、2个、4个工人时,我们从水槽取得的结果是这样的:

#   1 worker
Total elapsed time: 5034 msec
#   2 workers
Total elapsed time: 2421 msec
#   4 workers
Total elapsed time: 1018 msec

让我们更细致的查看这段代码的某些方面:

工人们上游连接通风机,下游连接水槽。这意味着你可以任意添加工人。如果工人绑定到他们的端点,你会需要(a)更多的端点(b)每添加一个工人都得修改通风机或水槽。我们说通风机和水槽是结构中的“稳定”部分,而工人们是“动态”部分。
我们不得不在批次的开始与所有工人们都起来运行两者间做出同步。这是一个ØMQ中特别常见的陷阱,也没有简单方案。“连接”方法需要一定时间。所以当一组工人连接到通风机,第一个成功连接的工人会在瞬间得到消息的全部负载,而其他人仍在连接。如果你总是不去同步批次的开始,系统完全不会并行运转。试着移除等待看看。
通风机的推送(PUSH)套接字均匀的分发任务到工人们(假定批次开始送出之前他们都已连接)。这叫做负载均衡,我们会再详细看看。
水槽的拉取(PULL)套接字均匀的收集工人的成果。这叫做公平队列。

图6 - 公平队列



管道模式也表现出“迟钝加入者”综合症,导致了对推送套接字不能正确负载均衡的控诉。如果你使用推送和拉取,而且其中一个工人比其他人得到更多的消息,那是因为他的推送套接字比别人连接的更快,然后在其他人连接达成之前捕获了一大堆消息。如果你想要正确的负载均衡,你可能想要看看第3章 - 高级请求应答模式中的小节:负载均衡模式。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: