您的位置:首页 > 编程语言 > C语言/C++

Conway’s Game of Life中看C++SSE2并行化计算

2017-01-08 16:20 525 查看

一、Conway’s Game of Life描述

康威生命游戏(英语:Conway's Game of Life),又称康威生命棋,是英国数学家约翰·何顿·康威在1970年发明的细胞自动机。

 

1、规则

生命游戏中,对于任意细胞,规则如下。每个细胞有两种状态:存活或死亡,每个细胞与以自身为中心的周围八格细胞产生互动。

当前细胞为存活状态时,当周围低于2个(不包含2个)存活细胞时, 该细胞变成死亡状态。(模拟生命数量稀少)
当前细胞为存活状态时,当周围有2个或3个存活细胞时, 该细胞保持原样。
当前细胞为存活状态时,当周围有3个以上的存活细胞时,该细胞变成死亡状态。(模拟生命数量过多)
当前细胞为死亡状态时,当周围有3个存活细胞时,该细胞变成存活状态。 (模拟繁殖)

可以把最初的细胞结构定义为种子,当所有在种子中的细胞同时被以上规则处理后, 可以得到第一代细胞图。按规则继续处理当前的细胞图,可以得到下一代的细胞图,周而复始。随题给出了一个细胞图的不断演变过程,见Gospers_glider_gun.gif。



Gospers_glider_gun.gif

二、实现思路

准备先用java和C++各先实现一次串行的。因为java不支持SIMD所以并行版本只用C++实现。

2.1基本实现思路

输入数据格式
输入数据格式如下所示:
因为屏幕限制,不能显示全部,数据为50X100。以空格为间隔。



数据结构及实现思路

使用以为数组存储,在本例中采用一维数组存储。也可以采用二维数组实现,基本原理是一样的。遍历整个一维数组,对每个数组的周围的八个值进行判断并计算周围八个cell存活的个数count(即周围为1的值个数)。然后根据个数count以及自身是否存活(0 or 1),判断这次迭代中,本细胞该不该存活。依次类推,遍历50000次。

输入输出的问题

文件输入的话,读取出来的是字符,所以1变成了49,0变成了48。还要注意去除空格。
输出数据的话,如果在程序内部把字符1(49)转成了数字1,那么输出的时候要想输出字符1,则要把数组里面的数字1转化成字符1(49)。(0亦然)
具体细节请看博客:http://blog.csdn.net/anialy/article/details/7961179

三、串行java实现

Conway2D

import java.io.*;

/**
* Java class for simulation of Conway's Game of Life.
*/

class Conway2D {
private final int width;
private final int height;
private final int size;
private byte[] data;

/**
* @param width
* @param height
*/
Conway2D(int width, int height) {
this.width = width;
this.height = height;
this.size = width * height;
data = new byte[size];
}

/**
* 生命游戏迭代一次
*/
void iterate() {
byte[] prev = new byte[size];
System.arraycopy(data, 0, prev, 0, size);
byte[] next = new byte[size];
for (int i = 0; i < width; i++) {
for (int j = 0; j < height; j++) {
if (isAlive(i, j, prev)) {
next[j * width + i] = 49;
} else {
next[j * width + i] = 48;
}
}
}
System.arraycopy(next, 0, data, 0, size);
}

/**
* 检查cell是否存活
*
* @param x The x position
* @param y The y position
* @param d The grid data.
* @return
*/

private boolean isAlive(int x, int y, byte[] d) {
int count = 0;
int pos1 = y * width + x;
//循环该cell的周围,累计周围活细胞个数
for (int i = x - 1; i <= x + 1; i++) {
for (int j = y - 1; j <= y + 1; j++) {
int pos = j * width + i;
if (pos >= 0 && pos < size - 1 && pos != pos1) //如果点在圈内 并且 不是自己
{
if (d[pos] == 49) {
count++;
}
}
}
}
//如果该细胞死亡
if (d[pos1] == 48) {
return count == 3;
} else  {//如果该细胞活着
return !(count < 2 || count > 3);
}
}

/**
* 读取文件中的数据
* 注意byte的不同,空格和换行在byte中的表示不同
*
* @param file
*/
void readData(String file) throws IOException {
BufferedInputStream in = new BufferedInputStream(new FileInputStream(file));
ByteArrayOutputStream out = new ByteArrayOutputStream(1);

byte[] temp = new byte[1];
int size;
while ((size = in.read(temp)) != -1) {
if (temp[0] != 32 && temp[0] != 10) {
out.write(temp, 0, size);
}
}
in.close();
data = out.toByteArray();
}

void writeToFile(String file) throws IOException {
FileOutputStream fos =new FileOutputStream(file);
BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(data));
byte[] tmp = new byte[100];
int size;
while((size = bis.read(tmp)) != -1){
fos.write(tmp);
fos.write(new byte[]{10});
}
}
}


main
import java.io.IOException;

/**
* Created by anyuan on 2017/1/4.
*/
public class Main {
public static final int MAXTIMES = 50000;

public static void main(String[] args) {
Conway2D conway2D = new Conway2D(100, 50);
try {
conway2D.readData("C:\\Users\\anyuan\\IdeaProjects\\操作系统实验\\LifeGame\\input_50x100");
double start = System.currentTimeMillis();
for (int i = 0; i < MAXTIMES; i++) {
conway2D.iterate();
}
double end = System.currentTimeMillis();
System.out.println("运行时间:" + (end - start)/1000 + "秒");//应该是end - start
conway2D.writeToFile("output_java串行");
} catch (IOException e) {
e.printStackTrace();
}
}
}


结果:



运行时间大概6秒左右。

四、C++串行和并行实现

C++串行思路和java差不多。只不过在并行代码中的步骤稍微有些不同。并行思路如下:

读入数据//去空格读取
数据转换49to1//将48转换为0,49转换为1。
迭代50000次//思路为将第一遍历,一次性读入八个数据到__m128i寄存器,然后对该寄存器的数据周边八个数值进行相加。作为count以判断是否该存活。所以实际上只需要迭代50000/8次。
数据转换1to49//1转换成49,0转换成48
输出数据

下面是C++的代码。关于SIMD的相关介绍,我会在下一节稍微介绍一下,并给出几个供参考的博客。下一节中也会介绍一下并行化的思路。

Header.h

#pragma once
#include <algorithm>
#include <fstream>
#include <iostream>
#include <string>
#define D_SCL_SECURE_NO_WARNINGS

#define WIDTH 100
#define HEIGHT 50
#define MAXSIZE 50*100

class Conway2D
{
const int width;
const int height;
const int size;
int data[MAXSIZE];

public:
Conway2D() : width(100), height(50), size(width * height)
{
}

Conway2D(int width, int height)
: width(width),
height(height),
size(width * height)
{
}

void iterate()
{
int prev[MAXSIZE];
std::copy(data, data + size, prev);
int next[MAXSIZE];
for (int i = 0; i < width; i++)
{
for (int j = 0; j < height; j++)
{
if (isAlive(i, j, prev))
{
next[j * width + i] = 49;
}
else
{
next[j * width + i] = 48;
}
}
}
std::copy(next, next + size, data);
}

bool isAlive(int x, int y, int d[]) const
{
int count = 0;
int pos1 = y * width + x;
//循环该cell的周围,累计周围活细胞个数
for (int i = x - 1; i <= x + 1; i++)
{
for (int j = y - 1; j <= y + 1; j++)
{
int pos = j * width + i;
if (pos >= 0 && pos < size - 1 && pos != pos1) //如果点在圈内 并且 不是自己
if (d[pos] == 49)
++count;
}
}
//如果该细胞死亡
if (d[pos1] == 48)
{
return count == 3;
}
//如果该细胞活着
return !(count < 2 || count > 3);
}

void readData(std::string file)
{
std::ifstream f1(file);
if (!f1)
std::cout << "文件打开失败" << std::endl;
char tmp[1];
int count = 0;
while (count < MAXSIZE)
{
f1.read(tmp, 1);
if (tmp[0] == '1' || tmp[0] == '0')
{
data[count] = static_cast<int>(tmp[0]);
++count;
}
}
}

void writeToFile(std::string file)
{
std::ofstream f1(file);
if (!f1)
std::cout << "文件创建失败" << std::endl;
char tmp[WIDTH];
for (int i = 0; i < HEIGHT; ++i)
{
std::copy(data + i*WIDTH, data + (i + 1)*WIDTH, tmp);
f1.write(tmp,WIDTH);
f1.write("\n",1);
}
}
};


Header_simd_2.h

#pragma once
#include <fstream>
#include <iostream>
#include <emmintrin.h>//sse2
#define D_SCL_SECURE_NO_WARNINGS
#define WIDTH 100
#define HEIGHT 50
#define MAXSIZE 50*100

class Conway2D_simd_2
{
const __int16 width;
const __int16 height;
const __int16 size;
__int16 data_s[MAXSIZE + 2 * WIDTH + 2];//这样就可以考虑边界cell的四周的cell了
__int16* data = &data_s[WIDTH + 1];//从data_s的WIDTH+1开始,到MAXSIZE+WIDTH+1结束
//之后要使用的8个寄存器
__m128i _m128_1;
__m128i _m128_2;
__m128i _m128_r;
__m128i _m128_s;

public:

Conway2D_simd_2(__int16 width, __int16 height)
: width(width),
height(height),
size(width * height)
{
for (int i = 0; i < MAXSIZE + 2 * WIDTH + 2; ++i)
{
data_s[i] = 48;
}
//		data = &data_s[WIDTH + 1];
}

void iterate()
{
__int16 prev[MAXSIZE];
//		std::copy(data, data + size, prev);
__int16 pos_s = 0;
for (; pos_s < MAXSIZE; pos_s += 8)
{
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH - 1]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH]));
_m128_r = _mm_add_epi16(_m128_1, _m128_2);
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH + 1]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - 1]));
_m128_s = _mm_add_epi16(_m128_1, _m128_2);
_m128_r = _mm_add_epi16(_m128_r, _m128_s);
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + 1]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH - 1]));
_m128_s = _mm_add_epi16(_m128_1, _m128_2);
_m128_r = _mm_add_epi16(_m128_r, _m128_s);
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH + 1]));
_m128_s = _mm_add_epi16(_m128_1, _m128_2);
_m128_r = _mm_add_epi16(_m128_r, _m128_s);
for (int i = 0; i < 8; ++i)
{
//如果该细胞死亡
if (data[pos_s+i] == 0)
{
//					if (_m128_r.m128i_i16[i] == 3)
//					{
//						_m128_r.m128i_i16[i] = 1;
//					}
//					else
//					{
//						_m128_r.m128i_i16[i] = 0;
//					}
_m128_r.m128i_i16[i] = (_m128_r.m128i_i16[i] == 3);
}
else
{
//如果该细胞活着
//					if(_m128_r.m128i_i16[i] == 2 || _m128_r.m128i_i16[i] == 3)
//					{
//						_m128_r.m128i_i16[i] = 1;
//					}
//					else
//					{
//						_m128_r.m128i_i16[i] = 0;
//
//					}
_m128_r.m128i_i16[i] = (!(_m128_r.m128i_i16[i] < 2 || _m128_r.m128i_i16[i] > 3));
}
}
_mm_storeu_si128(reinterpret_cast<__m128i *>(&prev[pos_s]), _m128_r);
}
std::copy(prev, prev + size, data);
}

void readData(std::string file) const
{
std::ifstream f1(file);
if (!f1)
std::cout << "文件打开失败" << std::endl;
char tmp[1];
__int16 count = 0;
while (count < MAXSIZE)
{
f1.read(tmp, 1);
if (tmp[0] == '1' || tmp[0] == '0')
{
data[count] = static_cast<int>(tmp[0]);
++count;
}
}
}

void writeToFile(std::string file) const
{
std::ofstream f1(file);
if (!f1)
std::cout << "文件创建失败" << std::endl;
char tmp[WIDTH];
for (__int16 i = 0; i < HEIGHT; ++i)
{
std::copy(data + i * WIDTH, data + (i + 1) * WIDTH, tmp);
f1.write(tmp, WIDTH);
f1.write("\n", 1);
}
}

void dataFrom1to49()
{
for (int i = 0; i < MAXSIZE + 2 * WIDTH + 2; ++i)
{
if (data_s[i] == 1)
{
data_s[i] = 49;
}
else
{
data_s[i] = 48;
}
}
}

void dataFrom49to1()
{
for (int i = 0; i < MAXSIZE + 2 * WIDTH + 2; ++i)
{
if (data_s[i] == 49)
{
data_s[i] = 1;
}
else
{
data_s[i] = 0;
}
}
}
};

Source.cpp

#include "Header.h"
#include <time.h>
//#include "Header_simd.h"
#include "Header_simd_2.h"
#define MAXTIMES 50000
void main()
{
//串行
Conway2D conway2_d = Conway2D(100, 50);
conway2_d.readData("input_50x100");
clock_t start_time = clock();
for (int i = 0; i < MAXTIMES; ++i)
{
conway2_d.iterate();
}
clock_t end_time = clock();
std::cout << "串行 Runing time is:" << static_cast<double>(end_time - start_time) / CLOCKS_PER_SEC << "s" << std::endl;
conway2_d.writeToFile("output_Cpp串行");

//	//并行
//	Conway2D_simd conway2_d_simd = Conway2D_simd(100, 50);
//	conway2_d_simd.readData("input_50x100");
//	clock_t start_time_simd = clock();
//	for (int i = 0; i < MAXTIMES; ++i)
//	{
//		conway2_d_simd.iterate();
//	}
//	clock_t end_time_simd = clock();
//	std::cout << "Runing time is:" << static_cast<double>(end_time_simd - start_time_simd) / CLOCKS_PER_SEC << "s" << std::endl;
//	conway2_d_simd.writeToFile("output_Cpp并行");

//并行2
Conway2D_simd_2 conway2_d_simd_2 = Conway2D_simd_2(100, 50);
conway2_d_simd_2.readData("input_50x100");
conway2_d_simd_2.dataFrom49to1();
clock_t start_time_simd_2 = clock();
for (int i = 0; i < MAXTIMES; ++i)
{
conway2_d_simd_2.iterate();
}
clock_t end_time_simd_2 = clock();
std::cout << "并行2 Runing time is:" << static_cast<double>(end_time_simd_2 - start_time_simd_2) / CLOCKS_PER_SEC << "s" << std::endl;
conway2_d_simd_2.dataFrom1to49();
conway2_d_simd_2.writeToFile("output_Cpp并行_2");
system("pause");
}


五、SIMD介绍和C++并行思路

SIMD简要介绍

SIMD  intrinsics有一些类似于C语言中的函数,可以被其它代码直接调用,可以像其它函数一样给它传递参数,Intel C++编译器支持SIMD intrinsics(VS2005/2010也支持,GCC也支持),并且可以针对函数进行内联等优化。需要包含的头文件:

#include   //MMX
#include   //SSE (include mmintrin.h)
#include   //SSE2 (include xmmintrin.h)
#include   //SSE3 (include emmintrin.h)

这些头文件定了一些数据类型对应于那些SIMD指令要适应的浮点数和整数。

这些数据类型名以两个下划线开始:
__m64用于表示一个MMX寄存器的内容,用于64bi的MMX整数指令,可以封装正8个8bit,4个16bit,2个32bit和一个64bit的整数。
__m128用于SSE,表示封装了四个32bit的单精度浮点数的SSE寄存器。
__m128d可以封装2个64bit的双精度浮点数

__m128i用于表示支持128bi的内存变量,位于16B的边界。声明为__m64的内存变量位于8B的边界。

SSE(一种SIMD指令集)基本操作

SSE的基本命令有以下几种:

load系列命令(读取连续的内存内容到寄存器中)
set系列命令(读取内存内容到寄存器中,与load的区别在于不要连续,可以传入多个参数)
算数逻辑系列命令(这些命令和汇编比较类似,执行一些简单的算数和逻辑操作)
store系列命令(把寄存器内容存储到内存中)

SSE基本操作流程:

SSE的操作流程比较单纯,所以目前使用SIMD编程的难度比较大。指令较少,要实现复杂功能比较困难。

load/set指令把内存数据读取到寄存器中。
算数指令对寄存器进行相应的操作。
store指令将操作结果存回到内存中。

SSE指令格式

SMD intrinsics函数采用一个非常标准的命名格式,大部分采取:_mm__的格式,函数名以_mm开头,然后表示函数要执行的SIMD指令,比如,add,sub,srli分别表示加法,减法,以为运算,最后是后缀,后缀的一部分给出了药进行运算的函数的数据范围,其中p表示封装操作,s表示变量操作,而ep表示扩展操作,接下来的部分表示要进行运算的数据类型,其中s表示单精度操作,d表示双精度操作,i表示整数操作,u表示无符号整数,数字表示整数的比特数。例如:__m128
_mm_set_ss (float w),__m128 _mm_add_ss (__m128 a, __m128 b)。

SSE另外要注意的事项

u代表不需要对齐,没有u的命令代表需要对齐。(对齐和连续概念是不一样的)。本例中使用的命令例如:_mm_loadu_si128,是不需要对齐的。如要要求数据对齐,则在分配内存的时候要特别声明。如:
#include <intrin.h>
__declspec(align(16)) float op1[4] = {1.0, 2.0, 3.0, 4.0};


关于SIMD的其他博客介绍:
http://blog.csdn.net/hellochenlu/article/details/52370757
http://www.cnblogs.com/dragon2012/p/5200698.html

本例中使用的寄存器和数据类型

typedef union __declspec(intrin_type) __declspec(align(16)) __m128i {
__int8              m128i_i8[16];
__int16             m128i_i16[8];
__int32             m128i_i32[4];
__int64             m128i_i64[2];
unsigned __int8     m128i_u8[16];
unsigned __int16    m128i_u16[8];
unsigned __int32    m128i_u32[4];
unsigned __int64    m128i_u64[2];
} __m128i;

本例用__m128i存8个__int16数据。

一次迭代的操作

void iterate()
{
__int16 prev[MAXSIZE];
//		std::copy(data, data + size, prev);
__int16 pos_s = 0;
for (; pos_s < MAXSIZE; pos_s += 8)
{
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH - 1]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH]));
_m128_r = _mm_add_epi16(_m128_1, _m128_2);
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - WIDTH + 1]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s - 1]));
_m128_s = _mm_add_epi16(_m128_1, _m128_2);
_m128_r = _mm_add_epi16(_m128_r, _m128_s);
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + 1]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH - 1]));
_m128_s = _mm_add_epi16(_m128_1, _m128_2);
_m128_r = _mm_add_epi16(_m128_r, _m128_s);
_m128_1 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH]));
_m128_2 = _mm_loadu_si128(reinterpret_cast<__m128i const*>(&data[pos_s + WIDTH + 1]));
_m128_s = _mm_add_epi16(_m128_1, _m128_2);
_m128_r = _mm_add_epi16(_m128_r, _m128_s);
for (int i = 0; i < 8; ++i)
{
//如果该细胞死亡
if (data[pos_s+i] == 0)
{
//					if (_m128_r.m128i_i16[i] == 3)
//					{
//						_m128_r.m128i_i16[i] = 1;
//					}
//					else
//					{
//						_m128_r.m128i_i16[i] = 0;
//					}
_m128_r.m128i_i16[i] = (_m128_r.m128i_i16[i] == 3);
}
else
{
//如果该细胞活着
//					if(_m128_r.m128i_i16[i] == 2 || _m128_r.m128i_i16[i] == 3)
//					{
//						_m128_r.m128i_i16[i] = 1;
//					}
//					else
//					{
//						_m128_r.m128i_i16[i] = 0;
//
//					}
_m128_r.m128i_i16[i] = (!(_m128_r.m128i_i16[i] < 2 || _m128_r.m128i_i16[i] > 3));
}
}
_mm_storeu_si128(reinterpret_cast<__m128i *>(&prev[pos_s]), _m128_r);
}
std::copy(prev, prev + size, data);
}


代码中一次取八个数,把这八个数的周边八个(逻辑上)数值依次相加,(//另外说一句,为了防止可能出现周边没有值的情况,也就是边界的cell。在分配数组时,特意多分配2*WIGHT+2个__int16的内存。而data取其中间区域,多余区域设置为0,从而避免数组越界问题。)相加的结果即为count,然后根据条件判断给寄存器赋值,最后写会内存中。因为每一次迭代中,处理的数据之间不能互相影响,所以也要有一次数组拷贝。具体细节请参考上一小节的源码。
至此C++并行思路的介绍已经完毕。最后迭代50000次的结果和java一样。时间上并行要比串行块10倍左右(8次一处理,然后再加上是寄存器操作,这种加速比很正常)。但是目前我电脑的CPU被占满了。所以和上次跑的不一样。电脑空闲时候的时间是
串行:19秒左右
并行:2秒多一点(2.3左右)
这次CPU被其他程序占满了90%以上的情况下结果如图:



加速比倒是差不多。不过还是可以看出来SSE并行化真的比串行程序快很多啊!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: