Spark的左外连接解决方案
2017-11-08 20:05
141 查看
Hadoop/MapReduce的左外连接解决方案
1、Spark的左外连接解决方案之不使用letfOutJoin()
import org.apache.spark.{SparkConf, SparkContext}
object LeftOutJoinTest {
def main(args: Array[String]): Unit = {
//连接SparkMaster
val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")
val sc = new SparkContext(conf)
//从HDFS中读取输入文件并创建RDD
val users = sc.textFile("hdfs://pc1:9000/input/users.txt")
val user_map = users.map(line=>Tuple2(line.split("\t")(0),Tuple2("L",line.split("\t")(1))))
val transactions = sc.textFile("hdfs://pc1:9000/input/transactions.txt")
val transaction_map = transactions.map(line=>Tuple2(line.split("\t")(2),Tuple2("P",line.split("\t")(1))))
val all = transaction_map.union(user_map)
val groupedRDD = all.groupByKey()
val productLocationsRDD = groupedRDD.flatMap{tuple=>
val pairs = tuple._2
var location = "UNKOWN"
val products = new scala.collection.mutable.ArrayBuffer[String]()
pairs.foreach{t2=>
if(t2._1.equals("L")){
location = t2._2
}
else{
products.+=(t2._2)
}
}
val kvlist = new scala.collection.mutable.ArrayBuffer[Tuple2[String,String]]()
for(product <- products){
kvlist.+=((new Tuple2(product,location)))
}
kvlist
}
productLocationsRDD.distinct().groupByKey().map{pair=>
val key = pair._1
val locations = pair._2
val length = locations.size
Tuple2(key,Tuple2(locations,length))
}.saveAsTextFile("hdfs://pc1:9000/output/leftoutjoin_1")
}
}
运行结果:
2、Spark的左外连接解决方案之使用letfOutJoin():避免标志位等麻烦
import org.apache.spark.{SparkConf, SparkContext}
object LeftOutJoinTest {
def main(args: Array[String]): Unit = {
//连接SparkMaster
val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")
val sc = new SparkContext(conf)
//从HDFS中读取输入文件并创建RDD
val users = sc.textFile("hdfs://pc1:9000/input/users.txt")
val user_map = users.map(line=>Tuple2(line.split("\t")(0),line.split("\t")(1)))
val transactions = sc.textFile("hdfs://pc1:9000/input/transactions.txt")
val transaction_map = transactions.map(line=>Tuple2(line.split("\t")(2),line.split("\t")(1)))
val joined = transaction_map.leftOuterJoin(user_map)
joined.map(line=>Tuple2(line._2._1,line._2._2.get)).distinct().groupByKey().map{pair=>
val key = pair._1
val locations = pair._2
val length = locations.size
Tuple2(key,Tuple2(locations,length))
}.saveAsTextFile("hdfs://pc1:9000/output/leftoutjoin_2")
}
}
1、Spark的左外连接解决方案之不使用letfOutJoin()
import org.apache.spark.{SparkConf, SparkContext}
object LeftOutJoinTest {
def main(args: Array[String]): Unit = {
//连接SparkMaster
val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")
val sc = new SparkContext(conf)
//从HDFS中读取输入文件并创建RDD
val users = sc.textFile("hdfs://pc1:9000/input/users.txt")
val user_map = users.map(line=>Tuple2(line.split("\t")(0),Tuple2("L",line.split("\t")(1))))
val transactions = sc.textFile("hdfs://pc1:9000/input/transactions.txt")
val transaction_map = transactions.map(line=>Tuple2(line.split("\t")(2),Tuple2("P",line.split("\t")(1))))
val all = transaction_map.union(user_map)
val groupedRDD = all.groupByKey()
val productLocationsRDD = groupedRDD.flatMap{tuple=>
val pairs = tuple._2
var location = "UNKOWN"
val products = new scala.collection.mutable.ArrayBuffer[String]()
pairs.foreach{t2=>
if(t2._1.equals("L")){
location = t2._2
}
else{
products.+=(t2._2)
}
}
val kvlist = new scala.collection.mutable.ArrayBuffer[Tuple2[String,String]]()
for(product <- products){
kvlist.+=((new Tuple2(product,location)))
}
kvlist
}
productLocationsRDD.distinct().groupByKey().map{pair=>
val key = pair._1
val locations = pair._2
val length = locations.size
Tuple2(key,Tuple2(locations,length))
}.saveAsTextFile("hdfs://pc1:9000/output/leftoutjoin_1")
}
}
运行结果:
2、Spark的左外连接解决方案之使用letfOutJoin():避免标志位等麻烦
import org.apache.spark.{SparkConf, SparkContext}
object LeftOutJoinTest {
def main(args: Array[String]): Unit = {
//连接SparkMaster
val conf = new SparkConf().setAppName("Chenjie's first spark App").setMaster("local")
val sc = new SparkContext(conf)
//从HDFS中读取输入文件并创建RDD
val users = sc.textFile("hdfs://pc1:9000/input/users.txt")
val user_map = users.map(line=>Tuple2(line.split("\t")(0),line.split("\t")(1)))
val transactions = sc.textFile("hdfs://pc1:9000/input/transactions.txt")
val transaction_map = transactions.map(line=>Tuple2(line.split("\t")(2),line.split("\t")(1)))
val joined = transaction_map.leftOuterJoin(user_map)
joined.map(line=>Tuple2(line._2._1,line._2._2.get)).distinct().groupByKey().map{pair=>
val key = pair._1
val locations = pair._2
val length = locations.size
Tuple2(key,Tuple2(locations,length))
}.saveAsTextFile("hdfs://pc1:9000/output/leftoutjoin_2")
}
}
相关文章推荐
- spark 倾斜连接 java实现
- MySQL数据库远程连接很慢的解决方案
- 可以ping通,但无法通过ssh连接虚拟机的解决方案
- linux下SSH远程连接服务慢解决方案
- 无线路由器连接WAN口上不了网的解决方案
- mysql Error 1130 远程连接访问 问题的解决方案
- TERMINAL SERVER无法连接的解决方案
- 【oracle】Enterprise Manager 无法连接到数据库实例。下面列出了组件的状态---个人解决方案
- 【转载】mysql 远程连接速度慢的解决方案
- vmware安装linux 成功后网络无连接的问题解决方案
- SQL SERVER 不能连接的完美解决方案
- SQL Server 连接error: 40 - 无法打开到 SQL Server 的连接错误解决方案
- 解决方案:用户 'sa' 登录失败。原因: 未与信任 SQL Server 连接相关联。
- Oracle 10g 异常:ORA-12518: TNS: 监听程序无法分发客户机连接 解决方案
- 在64位Win2008上面安装P/LSQL工具不能连接Oracle服务器的解决方案
- domino连接oracle解决方案---jdbc
- 【问题收录】Ubuntu14.04连接两个双显示器失败的解决方案
- VS2008连接SQL Server数据库文件出错的解决方案
- 使用Python连接spark程序测试
- ubuntu 14.04 无线网连接一会儿后, wifi 没掉, 但不能上网的解决方案