您的位置:首页 > 其它

8天玩转并行开发——第三天 plinq的使用

2014-11-26 14:14 423 查看
相信在.net平台下,我们都玩过linq,是的,linq让我们的程序简洁优美,简直玩的是爱不释手,但是传统的linq只是串行代码,在并行的

年代如果linq不支持并行计算那该是多么遗憾的事情啊。

当然linq有很多种方式,比如linq to sql ,xml,object 等等,如果要将linq做成并行还是很简单的,这里我就举一个比较实际一点的例子,

我们知道为了更快的响应用户操作,码农们想尽了各种办法,绞尽了脑汁,其中有一个办法就是将数据库数据预加载到内存中,然后通过各种

数据结构的手段来加速CURD,是的,比如一个排序地球人只能做到N(lgN),那么如果我还想再快一点的话该怎么办呢?那么现在的并行就能发

挥巨大的优势,尤其是现在的服务器配置都是在8个硬件线程的情况下,你简直会狂笑好几天啊,好,不乱扯了。

1:AsParallel(并行化)

下面我们模拟给ConcurrentDictionary灌入1500w条记录,看看串行和并行效率上的差异,注意我的老爷机是2个硬件线程。

1 using System;
2 using System.Threading;
3 using System.Threading.Tasks;
4 using System.Diagnostics;
5 using System.Collections.Concurrent;
6 using System.Collections.Generic;
7
8 using System.Linq;
9
10 class Program
11 {
12     static void Main(string[] args)
13     {
14         var dic = LoadData();
15
16         Stopwatch watch = new Stopwatch();
17
18         watch.Start();
19
20         //串行执行
21         var query1 = (from n in dic.Values
22                       where n.Age > 20 && n.Age < 25
23                       select n).ToList();
24
25         watch.Stop();
26
27         Console.WriteLine("串行计算耗费时间:{0}", watch.ElapsedMilliseconds);
28
29         watch.Restart();
30
31         var query2 = (from n in dic.Values.AsParallel()
32                       where n.Age > 20 && n.Age < 25
33                       select n).ToList();
34
35         watch.Stop();
36
37         Console.WriteLine("并行计算耗费时间:{0}", watch.ElapsedMilliseconds);
38
39         Console.Read();
40     }
41
42     public static ConcurrentDictionary<int, Student> LoadData()
43     {
44         ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
45
46         //预加载1500w条记录
47         Parallel.For(0, 15000000, (i) =>
48         {
49             var single = new Student()
50             {
51                 ID = i,
52                 Name = "hxc" + i,
53                 Age = i % 151,
54                 CreateTime = DateTime.Now.AddSeconds(i)
55             };
56             dic.TryAdd(i, single);
57         });
58
59         return dic;
60     }
61
62     public class Student
63     {
64         public int ID { get; set; }
65
66         public string Name { get; set; }
67
68         public int Age { get; set; }
69
70         public DateTime CreateTime { get; set; }
71     }
72 }




执行的结果还是比较震撼的,将近7倍,这是因为plinq的查询引擎会尽量利用cpu的所有硬件线程。

2:常用方法的使用

<1> orderby

有时候我们并不是简单的select一下就ok了,可能需要将结果进行orderby操作,并行化引擎会把要遍历的数据分区,然后在每个区上进行

orderby操作,最后来一个总的orderby,这里很像算法中的“归并排序”。



1 using System;
2 using System.Threading;
3 using System.Threading.Tasks;
4 using System.Diagnostics;
5 using System.Collections.Concurrent;
6 using System.Collections.Generic;
7
8 using System.Linq;
9
10 class Program
11 {
12     static void Main(string[] args)
13     {
14         var dic = LoadData();
15
16         var query1 = (from n in dic.Values.AsParallel()
17                       where n.Age > 20 && n.Age < 25
18                       select n).ToList();
19
20
21         Console.WriteLine("默认的时间排序如下:");
22         query1.Take(10).ToList().ForEach((i) =>
23         {
24             Console.WriteLine(i.CreateTime);
25         });
26
27         var query2 = (from n in dic.Values.AsParallel()
28                       where n.Age > 20 && n.Age < 25
29                       orderby n.CreateTime descending
30                       select n).ToList();
31
32         Console.WriteLine("排序后的时间排序如下:");
33         query2.Take(10).ToList().ForEach((i) =>
34         {
35             Console.WriteLine(i.CreateTime);
36         });
37
38         Console.Read();
39     }
40
41     public static ConcurrentDictionary<int, Student> LoadData()
42     {
43         ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();
44
45         //预加载1500w条记录
46         Parallel.For(0, 15000000, (i) =>
47         {
48             var single = new Student()
49             {
50                 ID = i,
51                 Name = "hxc" + i,
52                 Age = i % 151,
53                 CreateTime = DateTime.Now.AddSeconds(i)
54             };
55             dic.TryAdd(i, single);
56         });
57
58         return dic;
59     }
60
61     public class Student
62     {
63         public int ID { get; set; }
64
65         public string Name { get; set; }
66
67         public int Age { get; set; }
68
69         public DateTime CreateTime { get; set; }
70     }
71 }




<2> sum(),average()等等这些聚合函数的效果跟orderby类型一样,都是实现了类型归并排序的效果,这里就不举例子了。

3:指定并行度,这个我在前面文章也说过,为了不让并行计算占用全部的硬件线程,或许可能要留一个线程做其他事情。

1         var query2 = (from n in dic.Values.AsParallel()
2                       .WithDegreeOfParallelism(Environment.ProcessorCount - 1)
3                       where n.Age > 20 && n.Age < 25
4                       orderby n.CreateTime descending
5                       select n).ToList();


4: 了解ParallelEnumerable类

首先这个类是Enumerable的并行版本,提供了很多用于查询实现的一组方法,截个图,大家看看是不是很熟悉,要记住,他们都是并行的。



下面列举几个简单的例子。

1 class Program
2 {
3     static void Main(string[] args)
4     {
5         ConcurrentBag<int> bag = new ConcurrentBag<int>();
6
7         var list = ParallelEnumerable.Range(0, 10000);
8
9         list.ForAll((i) =>
10         {
11             bag.Add(i);
12         });
13
14         Console.WriteLine("bag集合中元素个数有:{0}", bag.Count);
15
16         Console.WriteLine("list集合中元素个数总和为:{0}", list.Sum());
17
18         Console.WriteLine("list集合中元素最大值为:{0}", list.Max());
19
20         Console.WriteLine("list集合中元素第一个元素为:{0}", list.FirstOrDefault());
21
22         Console.Read();
23     }
24 }




5: plinq实现MapReduce算法

mapReduce是一个非常流行的编程模型,用于大规模数据集的并行计算,非常的牛X啊,记得mongodb中就用到了这个玩意。

map: 也就是“映射”操作,可以为每一个数据项建立一个键值对,映射完后会形成一个键值对的集合。

reduce:“化简”操作,我们对这些巨大的“键值对集合“进行分组,统计等等。

具体大家可以看看百科:http://baike.baidu.com/view/2902.htm

下面我举个例子,用Mapreduce来实现一个对age的分组统计。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Concurrent;

using System.Collections.Generic;

using System.Linq;

class Program
{
static void Main(string[] args)
{
List<Student> list = new List<Student>()
{
new Student(){ ID=1, Name="jack", Age=20},
new Student(){ ID=1, Name="mary", Age=25},
new Student(){ ID=1, Name="joe", Age=29},
new Student(){ ID=1, Name="Aaron", Age=25},
};

//这里我们会对age建立一组键值对
var map = list.AsParallel().ToLookup(i => i.Age, count => 1);

//化简统计
var reduce = from IGrouping<int, int> singleMap
in map.AsParallel()
select new
{
Age = singleMap.Key,
Count = singleMap.Count()
};

///最后遍历
reduce.ForAll(i =>
{
Console.WriteLine("当前Age={0}的人数有:{1}人", i.Age, i.Count);
});
}

public class Student
{
public int ID { get; set; }

public string Name { get; set; }

public int Age { get; set; }

public DateTime CreateTime { get; set; }
}
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: