您的位置:首页 > 其它

Spark pyspark package

2015-11-12 21:52 579 查看

1. contents

PySpark是Spark的PythonAPI。

公共类:

SparkContext: Spark运行的主要集成类。

它负责与Spark集群的connection,并且负责数据的生成和计算,以及其中的task的调度。

RDD:弹性分布式数据集,Spark中的基础抽象,Spark支持对RDD进行两类操作:transformations和actions。根据已经存在的数据集创建新的数据集的操作称为transformation;对数据集做计算并将结果返回driver program的操作被称为action。

如:map是根据传入的函数参数对已有的RDD做处理,其运行结果得到一个新的RDD,所以是一个transformation操作,而reduce则是根据传入的参数对已有RDD进行计算,计算结果不再是个RDD,而是个具体的值,所以是action操作。

特别的是,Spark对所有的transformations操作都采用lazy evaluation的策略,即spark在调度时并不是对遇到的每个transformation都立即求值得到新的RDD,而是将针对某个RDD的一系列transformation操作记录下来,只有最终遇到action操作时,Spark才会计算先前记录的每个transformations。

这种lazy evaluation 的设计思路使得Spark得以更高效运行,因为调度器可以对从初始RDD到最终action操作路径上的transformations做合并或其他变换,且只有最终的action操作结果才会返回给driver program,节省了transformations操作的中间结果在集群worker node和driver program间的传输开销。

默认情况下,调用action操作时,初始RDD经过的每个transformation操作均会被执行一次,在多个actions会经过一系列相同的trans操作时,这种recompute显得并不高效。因此,在实际开发Spark计算任务脚本时,会被多个actions共用是transformations结果最好调用persist或cache缓存起来,这样会节省不少时间。

Broadcast:在任务中重复使用的broadcast变量。

通过Broadcast变量的作用域对应用所申请的每个节点上的executor进程都是可见的,而且广播后,变量会一直存在于每个worker节点的executor进程中,直到任务结束。这样可以避免RDD数据集在driver和worker节点的executor进程间频繁传输带来的开销。尤其是对于某些用到制度共享变量的应用(如需要加载字典并且所有计算节点均需访问该字典),广播可以高效的实现变量共享的目的。

Accumulator:一个“add-only”的共享变量,任务只能向其中增加值。

worker节点上的进程可以通过add()操作更新变量,更新后的变量会自动传播回driver program。

SparkConf:配置Spark

通过pyspark.SparkConf,可在提交的应用代码中动态创建spark app的配置并作为conf参数传给pyspark.SparkContext实例的构造函数。若未动态创建conf,则pyspark.SparkContext实例从conf/spark-defaults.conf中读取默认的全局配置。

SparkFiles:

当应用通过SparkContext.addFile()向集群提交任务用到的文件时,调用SparkFiles类的相关方法可以解析这些文件并访问文件。

StorageLevel:

StorageLevel可以指定RDD的存储级别,如只使用内存,只使用磁盘,内存为主,磁盘为辅等。

参考文章

2.SparkConf

class pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)

配置Spark应用,用来设置Spark参数作为键值对。

所有的设置方法支持链式操作,如:

conf.setMaster(“local”).setAppName(“My App”)

一旦SparkConf对象被送入Spark,它将被克隆,并且不能再被user改变。

3.SparkContext

class pyspark.SparkContext(
master=None,
appName=None,
sparkHome=None,
pyFiles=None,
environment=None,
batchSize=0,
serializer=PickleSerializer(),
conf=None,
gateway=None,
jsc=None,
profiler_cls=<class'pyspark.profiler.BasicProfiler'>)


SparkContext是Spark的主入口点,一个SparkContext表示与Spark集群的一次连接,可以在该集群上创建RDD和broadcast变量。

PACKAGE_EXTENSIONS=(’.zip’,’.egg’,’.jar’)

accumulator(value,accum_param=None)

根据给定的初始值创建一个Accumulator,用给定的Accumulator or Param帮助对象来定义:如何add data type values if provided.

如果不提供值,默认的AccumulatorParams是整数和浮点数。对其他的类型来说,使用自定义的AccumulatorParam.

addFile(path): 在每个节点上增加一个可以下载到Spark job的文件。传入的路径可以是一个本地文件,或者是HDFS里的文件,HTTP,HTTPS或FTP URI

为了在Spark jobs里连接文件,使用

L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} 通过文件名来找出它的下载位置。


from pyspark import SparkFiles
path = os.path.join(tempdir,"test.txt")
with open(path,'w') as testFile:
_ = testFile.write('100')
sc.addFile(path)
def func(iterator):
with open(SparkFiles.get("test.txt")) as testFile:
fileVal = int(testFile.readline())
return [x * fileVal for x in iterator]
sc.parallelize([1,2,3,4]).mapPartitions(func).collect()
[100, 200, 300, 400]


addPyFile

applicationId

Spark应用的一个唯一的标识。他的格式依赖于调度器的实现。

print sc.applicationId

u’local-…’

binaryFiles(path,minPartitions=None)

binaryRecords(path,recordLength)

broadcast(value)

cancelAllJobs():取消已经被安排的或者正在跑的任务

cancelJobGroup(groupId):取消特定组的活动对象

clearFiles():清理addFile或addPyFile增加的job的文件列表,从而它们不会下载到任意新的节点。

de
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: