您的位置:首页 > 编程语言 > Python开发

Python+Jupyter+Spark编程经验总结

2018-04-01 23:41 519 查看

Jupyter中使用TAB键加速输入

Jupyter中编写程序时,有函数提示功能。在Jupyter中编写Spark程序对RDD进行操作时,在输入
.
之后,可以按TAB键自动补全要输入的“转换”或“行动”。

例如:

输入
rdd = sc.pa
之后,再按TAB键就能自动补全
rdd= sc.parallelize
。在eclipse环境中编写spark程序时,提示功能更好用。

将程序输出按指定的格式存储

Spark程序输出时一般是以(K,V)对的形式输出,有时候需要以特定形式(如:数据各列以空格分割)保存文件,那么就要对Spark输出格式做更改。

……
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
.map(lambda x:x[0]+' '+str(x[1])).saveAsTextFile("result.txt")#将文件各字段以空格隔开


Python中RDD编程实例

Student文件:

yang 85 90 30
wang 20 60 50
zhang 90 90 100
zhang 90 90 100
li 100 54 0
li 100 54 0
yanf 0 0 0


def map_func(x):
s = x.split()
return (s[0],[int(s[1]),int(s[2]),int(s[3])])

def has100(x):
for y in x:
if(y==100):
return True
return False

def allIs0(x):
if(type(x) == list and sum(x) == 0):
return True
return False

def subMax(x,y):
m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]
return('',m)

def sumKeyValue(x,y):
return (x[0]+y[0],[x[1][0]+y[1][0],x[1][1]+y[1][1],x[1][2]+y[1][2]])

def sumAll(x,y):
return ('',[x[1][i]+y[1][i] for i in range(3)])

from pyspark import SparkContext
sc=SparkContext(appName='Student')
lines=sc.textFile("student.txt").map(map_func).cache()
count=lines.count()
whohas100 = lines.filter(lambda x: has100(x[1])).collect()
whoIs0 = lines.filter(lambda x: allIs0(x[1])).collect()
# print(lines)
subM = lines.reduce(lambda x,y:subMax(x,y))
sumA = lines.reduce(lambda x,y:sumAll(x,y))
sumKV = lines.reduce(lambda x,y:sumKeyValue(x,y))
sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
maxScore = max(sumScore,key = lambda x: x[1])
minScore = min(sumScore,key = lambda x: x[1])
avgA = [x/count for x in sumA[1]]
sorted = lines.sortBy(lambda x: sum(x[1]))
sortedWithRank = sorted.zipWithIndex().collect()
first3 = sorted.takeOrdered(3,key = lambda x: -sum(x[1]))
first3RDD = sc.parallelize(first3)\
.map(lambda x:str(x[0])+' '+str(x[1][0])+' '+str(x[1][1])+' '+str(x[1][2]))\
.saveAsTextFile("test2")

print(whohas100)
print(whoIs0)
print(subM)
print(sumA)
print(sumKV)
print(sumScore)
print(maxScore)
print(minScore)
print(avgA)
print(sorted)
print(sortedWithRank)
print(first3)
print(first3RDD)


输出为:

[('zhang', [90, 90, 100]), ('zhang', [90, 90, 100]), ('li', [100, 54, 0]), ('li', [100, 54, 0])]
[('yanf', [0, 0, 0])]
('', [100, 90, 100])
('', [485, 438, 280])
('yangwangzhangzhangliliyanf', [485, 438, 280])
[('yang', 205), ('wang', 130), ('zhang', 280), ('zhang', 280), ('li', 154), ('li', 154), ('yanf', 0)]
('zhang', 280)
('yanf', 0)
[69.28571428571429, 62.57142857142857, 40.0]
Python
abb8
RDD[23] at RDD at PythonRDD.scala:48
[(('yanf', [0, 0, 0]), 0), (('wang', [20, 60, 50]), 1), (('li', [100, 54, 0]), 2), (('li', [100, 54, 0]), 3), (('yang', [85, 90, 30]), 4), (('zhang', [90, 90, 100]), 5), (('zhang', [90, 90, 100]), 6)]
[('zhang', [90, 90, 100]), ('zhang', [90, 90, 100]), ('yang', [85, 90, 30])]
None
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Python Spark Jupyter 实例