Spark pyspark package
2015-11-12 21:52
579 查看
1. contents
PySpark是Spark的PythonAPI。公共类:
SparkContext: Spark运行的主要集成类。
它负责与Spark集群的connection,并且负责数据的生成和计算,以及其中的task的调度。
RDD:弹性分布式数据集,Spark中的基础抽象,Spark支持对RDD进行两类操作:transformations和actions。根据已经存在的数据集创建新的数据集的操作称为transformation;对数据集做计算并将结果返回driver program的操作被称为action。
如:map是根据传入的函数参数对已有的RDD做处理,其运行结果得到一个新的RDD,所以是一个transformation操作,而reduce则是根据传入的参数对已有RDD进行计算,计算结果不再是个RDD,而是个具体的值,所以是action操作。
特别的是,Spark对所有的transformations操作都采用lazy evaluation的策略,即spark在调度时并不是对遇到的每个transformation都立即求值得到新的RDD,而是将针对某个RDD的一系列transformation操作记录下来,只有最终遇到action操作时,Spark才会计算先前记录的每个transformations。
这种lazy evaluation 的设计思路使得Spark得以更高效运行,因为调度器可以对从初始RDD到最终action操作路径上的transformations做合并或其他变换,且只有最终的action操作结果才会返回给driver program,节省了transformations操作的中间结果在集群worker node和driver program间的传输开销。
默认情况下,调用action操作时,初始RDD经过的每个transformation操作均会被执行一次,在多个actions会经过一系列相同的trans操作时,这种recompute显得并不高效。因此,在实际开发Spark计算任务脚本时,会被多个actions共用是transformations结果最好调用persist或cache缓存起来,这样会节省不少时间。
Broadcast:在任务中重复使用的broadcast变量。
通过Broadcast变量的作用域对应用所申请的每个节点上的executor进程都是可见的,而且广播后,变量会一直存在于每个worker节点的executor进程中,直到任务结束。这样可以避免RDD数据集在driver和worker节点的executor进程间频繁传输带来的开销。尤其是对于某些用到制度共享变量的应用(如需要加载字典并且所有计算节点均需访问该字典),广播可以高效的实现变量共享的目的。
Accumulator:一个“add-only”的共享变量,任务只能向其中增加值。
worker节点上的进程可以通过add()操作更新变量,更新后的变量会自动传播回driver program。
SparkConf:配置Spark
通过pyspark.SparkConf,可在提交的应用代码中动态创建spark app的配置并作为conf参数传给pyspark.SparkContext实例的构造函数。若未动态创建conf,则pyspark.SparkContext实例从conf/spark-defaults.conf中读取默认的全局配置。
SparkFiles:
当应用通过SparkContext.addFile()向集群提交任务用到的文件时,调用SparkFiles类的相关方法可以解析这些文件并访问文件。
StorageLevel:
StorageLevel可以指定RDD的存储级别,如只使用内存,只使用磁盘,内存为主,磁盘为辅等。
参考文章
2.SparkConf
class pyspark.SparkConf(loadDefaults=True, _jvm=None, _jconf=None)配置Spark应用,用来设置Spark参数作为键值对。
所有的设置方法支持链式操作,如:
conf.setMaster(“local”).setAppName(“My App”)
一旦SparkConf对象被送入Spark,它将被克隆,并且不能再被user改变。
3.SparkContext
class pyspark.SparkContext( master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class'pyspark.profiler.BasicProfiler'>)
SparkContext是Spark的主入口点,一个SparkContext表示与Spark集群的一次连接,可以在该集群上创建RDD和broadcast变量。
PACKAGE_EXTENSIONS=(’.zip’,’.egg’,’.jar’)
accumulator(value,accum_param=None)
根据给定的初始值创建一个Accumulator,用给定的Accumulator or Param帮助对象来定义:如何add data type values if provided.
如果不提供值,默认的AccumulatorParams是整数和浮点数。对其他的类型来说,使用自定义的AccumulatorParam.
addFile(path): 在每个节点上增加一个可以下载到Spark job的文件。传入的路径可以是一个本地文件,或者是HDFS里的文件,HTTP,HTTPS或FTP URI
为了在Spark jobs里连接文件,使用
L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} 通过文件名来找出它的下载位置。
from pyspark import SparkFiles path = os.path.join(tempdir,"test.txt") with open(path,'w') as testFile: _ = testFile.write('100') sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] sc.parallelize([1,2,3,4]).mapPartitions(func).collect() [100, 200, 300, 400]
addPyFile
applicationId
Spark应用的一个唯一的标识。他的格式依赖于调度器的实现。
print sc.applicationId
u’local-…’
binaryFiles(path,minPartitions=None)
binaryRecords(path,recordLength)
broadcast(value)
cancelAllJobs():取消已经被安排的或者正在跑的任务
cancelJobGroup(groupId):取消特定组的活动对象
clearFiles():清理addFile或addPyFile增加的job的文件列表,从而它们不会下载到任意新的节点。
de
相关文章推荐
- 公司内网与外网一起上
- OData V4 系列 Action 与 Function
- android传递数据bundle封装传递map对象
- ListView的多选模式
- HDU 1848 Fibonacci again and again(博弈_SG函数)
- 单例模式线程安全的实现方式
- 一条sql语句查出多个表的数据
- Windows平台下Ubuntu系统安装
- 各文件系统对单个文件大小的限制
- ptlib编译时,会自动扫描电脑系统,找出头文件以及库文件并进行引用,生成头文件ptbuildopts.h
- Swift——Command failed due to signal: Segmentation fault: 11
- powerdesigner16.5 列显示comment
- opencv笔记 3.访问图像元素的四种方法
- Objective-数组操作
- iOS之Drawing<2>
- sicily 1344. 数列
- OpenCV中保存视频的一般方法
- OpenCV中保存视频的一般方法
- ubuntu 解决 “E: Problem wih MergeList /var/lib/apt/lists/”错误
- Caffe编译出问题:This tool requires OpenCV; compile with USE_OPENCV.