Spark中的键值对操作
2015-11-30 20:08
369 查看
1.PairRDD介绍 Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairRDD
![](http://images2015.cnblogs.com/blog/689069/201511/689069-20151130200809437-365190244.jpg)
针对两个PairRDD的转化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
程序实例:针对2 中程序生成的PairRDD,删选掉长度超过20个字符的行。
6.数据排序在Java中以字符串顺序对正数进行自定义排序(1)对RDD进行排序:
9.Java设置自定义分区方式 Spark允许你通过自定义Partitioner对象来控制RDD的分区方式,这样可以让你利用领域知识进一步减少通信消耗。 举个例子,假设我们要在一个网页的集合上运行前一届中的PageRank算法。在这里,每个页面的ID是页面的URL。当我们使用简单的哈希函数进行分区时,拥有相似的URL的页面比如 http://www.baidu.com/news 与 http://www.baidu.com/map 可能被分在完全不同的节点上。但是我们知道,同一个域名下的网页更有可能相互连接。由于PageRank需要在每次迭代中从每个页面向它所有相邻的页面发送一条消息,因袭把这些页面分组在同一个分区中会更好。可以使用自定义的分区器来实现仅根据域名而不是整个URL进行分区。 要实现先自定义Partitioner,需要继承Partitioner类并实现其下述方法: public int numPartitions() 返回创建的分区数量 public int getPartition(Object key) 返回给定键的分区编号 public boolean equals(Object obj) Spark需要这个方法来检查分区器对象是否与其他分区器实例相同,这样Spark才能判断两个RDD的分区方式是否相同。
来自为知笔记(Wiz)
List<String> list=new ArrayList<String>(); list.add("this is a test"); list.add("how are you?"); list.add("do you love me?"); list.add("can you tell me?"); JavaRDD<String> lines=sc.parallelize(list); JavaPairRDD<String,String> map =lines.mapToPair( new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.split(" ")[0],s); //获取第一个单词作为key,s为value } } );3.PairRDD的转化操作 PairRDD可以使用所有标准RDD上可用的转化操作。传递函数的规则也适用于PairRDD。由于PairRDD中包含二元组,所以需要传递的函数应当操作而元素而不是独立的元素。 PairRDD的相关转化操作如下表所示
![](http://images2015.cnblogs.com/blog/689069/201511/689069-20151130200809437-365190244.jpg)
针对两个PairRDD的转化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
函数名 | 目的 | 示例 | 结果 |
substractByKey | 删掉RDD中键与other RDD 中的键相同的元素 | rdd.subtractByKey(other) | {(1,2)} |
join | 对两个RDD进行内连接 | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
rightOuterJoin | 对两个RDD进行连接操作,右外连接 | rdd.rightOuterJoin(other) | {(3,(4,9)),(3,(6,9))} |
leftOuterJoin | 对两个RDD进行连接操作,左外连接 | rdd.rightOuterJoin(other) | {(1,(2,None)),(3,(4,9)),(3,(6,9))} |
cogroup | 将两个RDD中拥有相同键的数据分组 | rdd.cogroup(other) | {1,([2],[]),(3,[4,6],[9])} |
JavaPairRDD<String,String> result=map.filter( new Function<Tuple2<String, String>, Boolean>() { public Boolean call(Tuple2<String, String> value) throws Exception { return value._2().length()<20; } } ); for(Tuple2 tuple:result.collect()){ System.out.println(tuple._1()+": "+tuple._2());4.聚合操作 RDD上有fold(),combine(),reduce()等行动操作,pair RDD上则有相应的针对键的转化操作。 (1)reduceByKey()与reduce()操作类似,它们都接收一个函数,并使用该函数对值进行合并。reduceByKey()会为数据集中的每个键进行并行的规约操作,每个规约操作会将键相同的值合并起来。reduceBykey()最终返回一个由各键规约出来的结果值组成的新的RDD。程序示例:用reduceByKey实现单词计数
strLine.add("how are you"); strLine.add("I am ok"); strLine.add("do you love me"); JavaRDD<String> input=sc.parallelize(strLine); JavaRDD<String> words=input.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } } ); JavaPairRDD<String,Integer> result=words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2(s, 1); } } ).reduceByKey( new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } } ) ;(2)foldByKey()与fold()操作类似,他们都使用一个与RDD和合并函数中的数据类型相同的零值作为初始值。与fold()一样,foldByKey()操作所使用的合并函数对零值与另一个元素进行合并,结果仍为该元素。 程序示例:求对应key的value之和
List<Tuple2<Integer,Integer>> list=new ArrayList<Tuple2<Integer, Integer>>(); list.add(new Tuple2<Integer,Integer>(1,1)); list.add(new Tuple2<Integer, Integer>(1,3)); list.add(new Tuple2<Integer, Integer>(2,2)); list.add(new Tuple2<Integer, Integer>(2,8)); JavaPairRDD<Integer,Integer> map=sc.parallelizePairs(list); JavaPairRDD<Integer,Integer> results=map.foldByKey(0, new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); for(Tuple2<Integer,Integer> tuple:results.collect()) System.out.println(tuple._1()+"->"+tuple._2());结果:1->42->10 (3) combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它实现的。和aggregate()一样,combineByKey()可以让用户返回与输入数据类型不同的返回值。combineByKey()会遍历分区中的所有元素,因此,每个元素的键要么还么有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现每个键时发生,而不是在整个RDD中第一次出现一个键时发生。 如果这是一个处理当前分区之前就已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。 以下程序示例使用combineBykey()求每个键对应的平均值。
public class AvgCount implements Serializable{ private int total_; private int num_; public AvgCount(int total,int num){ total_=total; num_=num; } public float avg(){ return total_/(float) num_; }//createCombiner()[/b] static Function<Integer,AvgCount> createAcc =new Function<Integer,AvgCount>(){ public AvgCount call(Integer x){ return new AvgCount(x,1); } };//mergeValue() static Function2<AvgCount,Integer,AvgCount> addAndCount=new Function2<AvgCount, Integer, AvgCount>() { public AvgCount call(AvgCount a, Integer x) throws Exception { a.total_+=x; a.num_+=1; return a; } }; //m[/b]m[/b]ergeCombiners()[/b] static Function2<AvgCount,AvgCount,AvgCount> combine=new Function2<AvgCount, AvgCount, AvgCount>() { public AvgCount call(AvgCount a, AvgCount b) throws Exception { a.total_+=b.total_; a.num_+=b.num_; return a; } }; public static void main(String args[]){ AvgCount initial =new AvgCount(0,0); SparkConf conf = new SparkConf().setMaster("local").setAppName("my app"); JavaSparkContext sc = new JavaSparkContext(conf); List<Tuple2<Integer,Integer>> list=new ArrayList<Tuple2<Integer, Integer>>(); list.add(new Tuple2<Integer,Integer>(1,1)); list.add(new Tuple2<Integer, Integer>(1,3)); list.add(new Tuple2<Integer, Integer>(2,2)); list.add(new Tuple2<Integer, Integer>(2,8)); JavaPairRDD<Integer,Integer> nums=sc.parallelizePairs(list); JavaPairRDD<Integer,AvgCount> avgCounts=nums.combineByKey(createAcc,addAndCount,combine); Map<Integer,AvgCount> countMap= avgCounts.collectAsMap(); for(Map.Entry<Integer,AvgCount> entry:countMap.entrySet()) System.out.println(entry.getKey()+": "+entry.getValue().avg()); } }结果:2: 5.01: 2.0成功求出每个key对应value对应的平均值*(4)并行度调优 每个RDD都有固定数目的分区,分区数决定了在RDD上执行操作时的并行度。 在执行聚合或者分组操作时,可以要求Spark使用给定的分区数。Spark始终尝试根据集群的大小推断出一个有意义的默认值,但是你可以通过对并行度进行调优来获得更好的性能表现。 在Java中,combineByKey()函数和reduceByKey()函数的最后一个可选的参数用于指定分区的数目,即numPartitions,使用如下:
JavaPairRDD<Integer,AvgCount> avgCounts=nums.combineByKey(createAcc,addAndCount,combine,10);5.数据分组(1)groupByKey() groupByKey()会使用RDD中的键来对数据进行分组。对于一个由类型K的键和类型V的值组成的RDD,锁的到的结果RDD类型会是[K,Iterable[v]]。 以下是程序示例,对PairRDD调用groupByKey()函数之后,会返回JavaPairRDD<Integer,Iterable<Integer>>类型的结果,也就是所有同一个Key的value都可以调用Iterator进行遍历。
List<Tuple2<Integer,Integer>> list1=new ArrayList<Tuple2<Integer, Integer>>(); list1.add(new Tuple2<Integer,Integer>(1,1)); list1.add(new Tuple2<Integer, Integer>(2,2)); list1.add(new Tuple2<Integer, Integer>(1,3)); list1.add(new Tuple2<Integer, Integer>(2,4)); JavaPairRDD<Integer,Integer> nums1=sc.parallelizePairs(list1); JavaPairRDD<Integer,Iterable<Integer>>results =nums1.groupByKey(); //接下来遍历输出results,注意其中关于Iterable遍历的处理 for(Tuple2<Integer,Iterable<Integer>> tuple :results.collect()){ System.out.print(tuple._1()+": "); Iterator<Integer> it= tuple._2().iterator(); while(it.hasNext()){ System.out.print(it.next()+" "); } System.out.println(); }输出结果:1: 1 3 2: 2 4 (2)cogroup() 除了对单个RDD的数据进行分组,还可以使用cogroup()函数对对个共享同一个键的RDD进行分组。对两个键的类型均为K而值得类型分别为V和W的RDD进行cogroup()时,得到结果的RDD类型为[(K,(Iterable[V],Iterable[W]))]。如果其中一个RDD对于另一个RDD中存在的某个键没有对应的记录,那么对应的迭代器则为空。举例:
List<Tuple2<Integer,Integer>> list1=new ArrayList<Tuple2<Integer, Integer>>(); List<Tuple2<Integer,Integer>> list2=new ArrayList<Tuple2<Integer, Integer>>(); list1.add(new Tuple2<Integer,Integer>(1,1)); list1.add(new Tuple2<Integer, Integer>(2,2)); list1.add(new Tuple2<Integer, Integer>(1,3)); list1.add(new Tuple2<Integer, Integer>(2,4)); list1.add(new Tuple2<Integer, Integer>(3,4)); list2.add(new Tuple2<Integer,Integer>(1,1)); list2.add(new Tuple2<Integer, Integer>(1,3)); list2.add(new Tuple2<Integer, Integer>(2,3)); JavaPairRDD<Integer,Integer> nums1=sc.parallelizePairs(list1); JavaPairRDD<Integer,Integer> nums2=sc.parallelizePairs(list2); JavaPairRDD<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> results=nums1.cogroup(nums2); for(Tuple2<Integer,Tuple2<Iterable<Integer>,Iterable<Integer>>> tuple:results.collect()){ System.out.print(tuple._1()+" [ "); Iterator it1=tuple._2()._1().iterator(); while(it1.hasNext()){ System.out.print(it1.next()+" "); } System.out.print("] [ "); Iterator it2=tuple._2()._2().iterator(); while(it2.hasNext()){ System.out.print(it2.next()+" "); } System.out.print("] \n"); } }输出:1 [ 1 3 ] [ 1 3 ] 3 [ 4 ] [ ] 2 [ 2 4 ] [ 3 ]
6.数据排序在Java中以字符串顺序对正数进行自定义排序(1)对RDD进行排序:
JavaRDD<Integer> nums=sc.parallelize(Arrays.asList(1,5,3,2,6,3)); JavaRDD<Integer> results =nums.sortBy(new Function<Integer, Object>() { public Object call(Integer v1) throws Exception { return v1; } },false,1); for(Integer a:results.collect()) System.out.println(a);(2)对PairRDD,按key的值进行排序
ist<Tuple2<Integer, Integer>> list1 = new ArrayList<Tuple2<Integer, Integer>>(); list1.add(new Tuple2<Integer, Integer>(1, 1)); list1.add(new Tuple2<Integer, Integer>(2, 2)); list1.add(new Tuple2<Integer, Integer>(1, 3)); list1.add(new Tuple2<Integer, Integer>(2, 4)); list1.add(new Tuple2<Integer, Integer>(3, 4)); JavaPairRDD<Integer, Integer> nums1 = sc.parallelizePairs(list1); class comp implements Comparator<Integer>, Serializable { public int compare(Integer a, Integer b) { return a.compareTo(b); } }; JavaPairRDD<Integer,Integer> results=nums1.sortByKey(new comp()); for(Tuple2<Integer,Integer> tuple: results.collect()){ System.out.println(tuple._1()+": "+tuple._2()); }7.数据分区(1)创建数据分区 在分布式程序中,通信的代价很大,控制数据分布以获得最少的网络传输可以极大地提升整体性能。Spark程序可以通过控制RDD分区的方式来减少通信消耗。只有当数据集多次在诸如连接这种基于键的操作中,分区才会有作用。 Spark中所有的键值对RDD都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark可以确保同一组的键出现在一个节点上。 举个简单的例子,应用如下:内存中保存着很大的用户信息表,由(UserID,UserInfo[])组成的RDD,UserInfo是用户所订阅的所有主题列表。该应用会周期性地将这张表和一个小文件进行组合,这个小文件中存这过去5分钟发生的时间,其实就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用户访问的链接的主题。我们需要对用户访问其未订阅主题的页面情况进行统计。我们可以使用Spark的join()操作进行组合操作。将两者根据UserId连接之后,过滤出不在UserInfo[]中的LinkInfo,就是用户访问其未订阅主题的情况。
List<Tuple2<String,Iterable<String>>> list1=new ArrayList<Tuple2<String, Iterable<String>>>(); list1.add(new Tuple2<String, Iterable<String>>("zhou",Arrays.asList("it","math"))); list1.add(new Tuple2<String, Iterable<String>>("gan",Arrays.asList("money","book"))); JavaPairRDD<String,Iterable<String>> userData=sc.parallelizePairs(list1); List<Tuple2<String,String>> list2=new ArrayList<Tuple2<String, String>>(); list2.add(new Tuple2<String, String>("zhou","it") ); list2.add(new Tuple2<String,String>("zhou","stock")); list2.add(new Tuple2<String, String>("gan","money")); list2.add(new Tuple2<String, String>("gan","book")); JavaPairRDD<String,String> events=sc.parallelizePairs(list2); JavaPairRDD<String, Tuple2<Iterable<String>, String>> joined = userData.join(events); long a=joined.filter( new Function<Tuple2<String, Tuple2<Iterable<String>, String>>, Boolean>() { public Boolean call(Tuple2<String, Tuple2<Iterable<String>, String>> tuple) throws Exception { boolean has = false; Iterable<String> user=tuple._2()._1(); String link=tuple._2()._2(); for (String s : user) { if (s.compareTo(link) == 0) { has = true; break; } } //保留不在用户订阅表中的RDD return !has; } } ).count(); System.out.println(a);输出:1 这段代码可以正确运行,但是效率不高。因为每5分钟就要进行一次join()操作,而我们对数据集如何分区却一无所知。默认情况下,连接操作会将两个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作。因为userData表比每5分钟出现的访问日志表events要大很多,所以要浪费时间进行额外的工作:在每次调用时都对userDAta表进行哈希值计算和跨节点数据混洗,虽然这些数据从来不会变化。 要解决此问题:在程序开始的时候,对userData表进行partitionBy()转化操作,将这张表转化为哈希分区。可以通过向patitionBy传递一个spark.HashPartitioner对象来实现该操作。 Java自定义分区方式:
List<Tuple2<String,Iterable<String>>> list1=new ArrayList<Tuple2<String, Iterable<String>>>(); list1.add(new Tuple2<String, Iterable<String>>("zhou",Arrays.asList("it","math"))); list1.add(new Tuple2<String, Iterable<String>>("gan",Arrays.asList("money","book"))); JavaPairRDD<String,Iterable<String>> userData=sc.parallelizePairs(list1);//请注意,partitionBy是转化操作 userData=userData.partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_AND_DISK());这样以后在调用join()时,Spark就知道了了该RDD是根据键的哈希值来分区的,这样在调用join()时,Spark就会利用这一点,只会对events进行数据混洗操作,将events中特定userId的记录发送到userData的对应分区所在的那台机器上。这样,需要网络传输的数据就大大见笑了,程序运行的速度也显著提高。 请注意,我们还对userData 这个RDD进行了持久化操作,默认情况下,每一个由转化操作得到的RDD都会在每次执行启动操作时重新计算生成,将userData持久化之后,就能保证userData能够在访问时被快速获取。 *进一步解释数据分区带来的好处: 如果没有将partitionBy()转化操作的结果进行持久化,那么后面每次用到这个RDD时都会重复对数据进行分区操作。不进行持久化会导致整个RDD谱系图重新求值。那样的话,partitionBy()带来的好处就会抵消,导致重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况是十分相似的。(2)获取数据分区的方式接(1)中程序:
Optional<Partitioner> partitioner = userData.partitioner(); System.out.println(partitioner.get()); System.out.println(partitioner.isPresent());在Java中,通过调用partitioner()方法,可以获取一个Optional<Partitioner>对象,这是Scala中用来存放可能存在的对象的容器类。对该对象调用isPresent()方法可以检查其中是否有值,调用get可以获取partitionBy()的参数->1个partitioner对象。(3)从分区中获益的操作 Spark中的很多操作都引入了根据键跨结点进行混洗的过程。所有这些操作都会从数据分区中获益。能够从数据分区中获益的操作有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。 对于像reduceByKey()这样只作用于单个RDD的操作,运行在未分区的RDD的时候或导致每个键所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结果值从各工作节点传回主节点,所以原本的网络开销就不太大。而对于诸如cogroup()和join()这样的二元操作,预先进行数据分区会导致其中至少一个RDD(使用已知分区器的那个RDD)不发生数据混洗。如果两个RDD使用同样的分区方式,并且它们还缓存在同样的机器上(比如一个RDD是通过mapValues()从另一个RDD中创建出来的,这两个RDD就会拥有相同的键和分区方式),或者其中一个RDD还没有计算出来,那么跨节点数据混洗就不会发生了。(4)影响分区方式的操作 所有会为生成的结果RDD设好分区方式的操作:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父RDD有分区方式的话),filter()(如果父RDD有分区方式的话)。其他所有操作生成的结果都不会存在特定的分区方式。注意: 对于二元操作,输出数据的分区方式取决于父RDD的分区方式。默认情况下,结果会采用哈希分区,分区的数量和操作的并行度是一样的。如果其中一个父RDD已经设置过分区方式,那么结果就会采用那种分区方式;如果两个父RDD都设置过分区方式,结果RDD会采用第一个RDD的分区方式。8.示例程序-PageRank PageRank算法是一种从RDD分区中获益的更复杂的算法,我们以它为例进行分析。PageRank算法用来根据外部文档指向一个文档的链接,对集合中每个文档的重要程度赋一个度量值。该算法可以用于对网页进行排序,当然,也可以用于排序科技文章或社交网络中有影响的用户。 算法会维护两个数据集,一个由(pageID,linklist[])组成,包含每个页面的链接到的页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按以下步骤进行计算: ① 将每个页面的排序值初始化为1.0 ②在每次迭代中,向每个有直接链接的页面,发送一个值为rank(p)/numNeighbors(p)(出链数目) 的贡献量 ③将每个页面的排序值设置为0.15+0.85*contributionsReceived 最后两步会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要进行十个迭代。下面用Spark来实现PageRank算法:
public class main { private static class Sum implements Function2<Double, Double, Double> { public Double call(Double a, Double b) { return a + b; } } public static void main(String args[]){ SparkConf conf =new SparkConf(); conf.setAppName("my spark app"); conf.setMaster("local"); JavaSparkContext sc =new JavaSparkContext(conf); JavaRDD<String> inputs= sc.textFile("C:\\url.txt"); /* #以下是url的内容: www.baidu.com www.hao123.com www.baidu.com www.2345.com www.baidu.com www.zhouyang.com www.hao123.com www.baidu.com www.hao123.com www.zhouyang.com www.zhouyang.com www.baidu.com */ JavaPairRDD<String, Iterable<String>> links = inputs.mapToPair( new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) throws Exception { String[] parts = s.split(" "); return new Tuple2<String, String>(parts[0], parts[1]); } } ).distinct().groupByKey().cache(); JavaPairRDD<String, Double> ranks = links.mapValues( new Function<Iterable<String>, Double>() { public Double call(Iterable<String> v1) throws Exception { return 1.0; } } ); JavaPairRDD<String, Tuple2<Iterable<String>, Double>> join = links.join(ranks); for(int current=0;current<10;current++){ // Calculates URL contributions to the rank of other URLs. JavaPairRDD<String, Double> contribs = links.join(ranks).values() .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() { public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) { int urlCount = Iterables.size(s._1()); List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>(); for (String n : s._1()) { results.add(new Tuple2<String, Double>(n, s._2() / urlCount)); } return results; } }); // Re-calculates URL ranks based on neighbor contributions. ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() { public Double call(Double sum) { return 0.15 + sum * 0.85; } }); } // Collects all URL ranks and dump them to console. List<Tuple2<String, Double>> output = ranks.collect(); for (Tuple2<?,?> tuple : output) { System.out.println(tuple._1() + " has rank: " + tuple._2() + "."); } } }
9.Java设置自定义分区方式 Spark允许你通过自定义Partitioner对象来控制RDD的分区方式,这样可以让你利用领域知识进一步减少通信消耗。 举个例子,假设我们要在一个网页的集合上运行前一届中的PageRank算法。在这里,每个页面的ID是页面的URL。当我们使用简单的哈希函数进行分区时,拥有相似的URL的页面比如 http://www.baidu.com/news 与 http://www.baidu.com/map 可能被分在完全不同的节点上。但是我们知道,同一个域名下的网页更有可能相互连接。由于PageRank需要在每次迭代中从每个页面向它所有相邻的页面发送一条消息,因袭把这些页面分组在同一个分区中会更好。可以使用自定义的分区器来实现仅根据域名而不是整个URL进行分区。 要实现先自定义Partitioner,需要继承Partitioner类并实现其下述方法: public int numPartitions() 返回创建的分区数量 public int getPartition(Object key) 返回给定键的分区编号 public boolean equals(Object obj) Spark需要这个方法来检查分区器对象是否与其他分区器实例相同,这样Spark才能判断两个RDD的分区方式是否相同。
class myPartitioner extends Partitioner{ int num;//分区数目 public myPartitioner(int num){//构造方法,初始化num this.num=num; } @Override public int numPartitions() {//返回分区数目 return num; } @Override public int getPartition(Object key) { String url =(String)key; String domain=""; try { domain=new URL(url).getHost();//获取域名 } catch (MalformedURLException e) { e.printStackTrace(); } int code =domain.hashCode()%num;//获取该域名对应的hash值 if(code<0){//getPartition()方法只能返回非负数,对负数进行处理 code+=num; } return code; } @Override public boolean equals(Object obj) { if(obj instanceof myPartitioner){//如果obj是myPartitioner的实例 return ((myPartitioner) obj).num==num;//看分区数是否相同 } else//否则直接返回false return false; } }
来自为知笔记(Wiz)
相关文章推荐
- ruby字符串学习笔记4
- Tensorflow二分类处理dense或者sparse(文本分类)的输入数据
- FIFO,LRU,OPT置换算法
- java程序员常犯的几个错误
- 日期格式字符说明【转载】
- iOS开发网络—数据安全
- LeetCode 之 Intersection of Two Linked Lists
- 【Win10开发】绘制静态UI
- uboot 中 env相关问题
- 为树莓派安装配置openJDK
- 快速搭建一个成熟,强壮的App框架【转载】
- arm-linux内核编译过程小结
- Redhat7开机图形或文字界面
- OC-1-面向对象
- 第十一周学习总结
- Reactor 与 Proactor
- 【公告】个人站点
- win10 输入法禁用IME
- OpenCV访问图像数据并设定灰度值
- OC-7-内存管理