您的位置:首页 > 大数据

百度实习生招聘的一道大数据处理题目(下)

2012-07-01 16:11 411 查看
图4为排序阶段CPU的使用率,可以看到只有一个核达到了100%的利用率。下面为一个多线程(线程的数量为核的数量)的排序版本,每个线程只对1G数据中的一部分进行快速排序,排序完成后再由另外一个线程进行归并,将结果写入文件。

多线程排序代码如下:
/*multi_thread_sort.c*/
/*

* Author: Chaos Lee

* Date: 2012-06-30

* Description: load, merge , store data with single core, but sorting data with all the cores provided by the SMP

*/

#include<stdio.h>

#include<pthread.h>

#include<sys/sysinfo.h>

#include<sys/stat.h>

#include<sys/types.h>

#include<stdint.h>

#include<stdlib.h>

#include<assert.h>

#include "../error.h"

#include "timer.h"

uint64_t * buffer = NULL;

pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t merge_start = PTHREAD_COND_INITIALIZER;

int cores_number;

int counter;

int uint64_compare(const void * ptr1,const void * ptr2)

{

return  *((uint64_t *)ptr1) > *((uint64_t *)ptr2) ? 1 : *((uint64_t *)ptr1) < *((uint64_t *)ptr2) ? -1 : 0;

}

typedef struct segment_tag

{

uint64_t start;

uint64_t end;

}segment_t,*segment_p;

void barrier()

{

int status;

status = pthread_mutex_lock(&counter_mutex);

if(0 != status)

err_abort("locking error.",status);

counter++;

if(cores_number == counter)

{

pthread_cond_signal(&merge_start);

}

status = pthread_mutex_unlock(&counter_mutex);

if(0 != status)

err_abort("unlocking error.",status);

}

void * sort_thread_routin(void * args)

{

DPRINTF(("%s","sorting thread start...\n"));

segment_p seg = (segment_p) args;

assert(buffer != NULL);

DPRINTF(("%s","begin to sort...\n"));

qsort(buffer+seg->start,seg->end-seg->start,sizeof(uint64_t),uint64_compare);

DPRINTF(("%s","Entering barrier...\n"));

barrier();

pthread_exit((void *)0);

}

void * merge_thread_routin(void * args)

{

int status,i,finish_count,elapsed_seconds;

FILE * fp_result;

uint64_t tmp;

restart_timer();

DPRINTF(("%s","merging thread start...\n"));

fp_result = fopen("multi-result.dat","wb");

while(cores_number != counter)

{

status = pthread_cond_wait(&merge_start,&counter_mutex);

if(0 != status)

err_abort("waiting condition error.",status);

}

elapsed_seconds = get_elapsed_time();

fprintf(stdout,"sorting cost %d seconds.\n",elapsed_seconds);

status = pthread_mutex_unlock(&counter_mutex);

if(0 != status)

err_abort("unlocking error.",status);

DPRINTF(("begin to merge...\n"));

finish_count = 0;

segment_p segs = (segment_p) args;

restart_timer();

while(finish_count<cores_number)

{

int i,first=0,j;

for(i=0;i<cores_number;i++)

{

if( 0 == first)

{

if(segs[i].start<segs[i].end)

{

tmp = buffer[segs[i].start];

j = i;

first = 1;

}

}

else

{

if(segs[i].start<segs[i].end && buffer[segs[i].start]<tmp)

{

tmp = buffer[segs[i].start];

j = i;

}

}

}

segs[j].start++;

if(segs[j].start >= segs[j].end)

{

finish_count++;

}

fwrite(&tmp,sizeof(uint64_t),1,fp_result);

}

elapsed_seconds = get_elapsed_time();

fprintf(stdout,"merging cost %d seconds.\n",elapsed_seconds);

DPRINTF(("merging is over\n"));

fclose(fp_result);

pthread_exit((void *)0);

}

int main(int argc,char *argv[])

{

int elapsed_seconds,status,i;

segment_p segments;

pthread_t * sort_threads;

pthread_t * merge_thread;

uint64_t size,length,seg_len;

FILE * fp;

struct stat data_stat;

cores_number = get_nprocs();

status = stat("data.dat",&data_stat);

if(0 != status)

error_abort("stat file error.\n");

size = data_stat.st_size;

length = size / sizeof(uint64_t);

seg_len = length / cores_number;

buffer = (uint64_t *) malloc(size);

if(NULL == buffer)

{

fprintf(stderr,"mallocing error.\n");

exit(1);

}

fp = fopen("data.dat","rb");

if(NULL == fp)

{

fprintf(stderr,"file open error.\n");

exit(1);

}

start_timer();

fread(buffer,size,1,fp);

elapsed_seconds = get_elapsed_time();

fprintf(stdout,"loading cost %d seconds\n",elapsed_seconds);

segments = (segment_p)malloc(sizeof(segment_t)*cores_number);

if(NULL == segments)

{

fprintf(stderr,"at %s:%d : %s",__FILE__,__LINE__,"malloc error.\n");

exit(1);

}

for(i=0;i<cores_number;i++)

{

segments[i].start = i * seg_len;

if(i != cores_number-1)

segments[i].end = (i + 1 ) * seg_len;

else

segments[i].end = length;

}

sort_threads = (pthread_t *)malloc(sizeof(pthread_t) * cores_number);

if(NULL == sort_threads)

{

fprintf(stderr,"at %s:%d :%s",__FILE__,__LINE__,"malloc failuer.\n");

exit(1);

}

merge_thread = (pthread_t *)malloc(sizeof(pthread_t));

if(NULL == merge_thread)

{

fprintf(stderr,"at %s:%d :%s",__FILE__,__LINE__,"malloc failuer.\n");

exit(1);

}

for(i=0;i<cores_number;i++)

{

status = pthread_create(&sort_threads[i],NULL,sort_thread_routin,(void *)&segments[i]);

if(0 != status)

err_abort("creating threads faulire.\n",status);

}

status = pthread_create(merge_thread,NULL,merge_thread_routin,(void *)segments);

if(0 != status)

err_abort("creating thread faulier.\n",status);

for(i=0;i<cores_number;i++)

{

status = pthread_join(sort_threads[i],NULL);

if(0 != status)

err_abort("joining threads error.\n",status);

}

status = pthread_join(*merge_thread,NULL);

if(0 != status)

err_abort("joining thread error.\n",status);

free(buffer);

fclose(fp);

return 0;

}


再编译运行下,以下为测试结果:

[lichao@sg01 thread_power]$ gcc multi_thread_sort.c -o multi_thread_sort timer.o -lpthread

[lichao@sg01 thread_power]$ ./multi_thread_sort

loading cost 14 seconds

sorting cost 22 seconds.

merging cost 44 seconds.

下图5为多线程排序时CPU的利用率,可以看到CPU的四个核都已经达到100%的利用率,即:硬件没有白投资:D。当然排序的时间效果也很好,几乎达到了之前的4倍的加速比。另外可以看到文件的加载速度和回写速度也有所提高,这点也是让我比较疑惑的。下面再次运行单线程排序版本。





图5 排序阶段CPU的利用率

[lichao@sg01 thread_power]$ ./single_thread_sort

loading cost 17 seconds

sorting cost 81 seconds

writing results cost 12 seconds

可以看到加载速度和回写速度有了显著的提升,虽然排序时间还是没有多大变化。
再次运行多线程排序版本试试:

[lichao@sg01 thread_power]$ ./multi_thread_sort

loading cost 31 seconds

sorting cost 22 seconds.

merging cost 23 seconds.

加载速度又延长了,排序速度几乎不变,回写速度也提高了不少。我想这主要是因为文件系统本身提供了缓冲的作用,即上次用过的文件可以放在交换区,便于迅速载入内存吧。这样第二次使用的时候,由于这些文件还存放在交换区中,所以以很高的速度传入内存中。回写的原理应该也一样。对于1G的文件回写到内存,只用了23s,大致的回写速度为50MB/s
假设文件系统一直起作用,并能达到第二次实验的效果,即分块排序22s,归并排序并回写文件系统23s,那么计算和归并回写是能够重合的。对于200G的文件A来说,分块排序的处理时间大致为:200*22s =~1.2h,就扩大为1小时15分钟吧。这样对文件B来说也差不多为1小时15分钟,一共需要2个半小时,接下来开始归并比较了,假设文件的缓冲系统能够启作用,即速度能达到50MB/s,这样,对于2个200G的文件都需要在内存中过一遍,大致时间应该为400*10^3/50 = 8000s,大致为2小时15分钟,所以加上前面的2个半小时,对于2个200G的文件寻找相同值共需要的时间为 5个小时左右,至少比300万年好点。
PS: =~这个符号表示约等于。

本文出自 “相信并热爱着” 博客,请务必保留此出处http://hipercomer.blog.51cto.com/4415661/915110
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: