您的位置:首页 > 其它

SparkContext自定义扩展textFiles,支持从多个目录中输入文本文件

2015-10-20 10:07 671 查看
需求

SparkContext自定义扩展textFiles,支持从多个目录中输入文本文件

扩展

class SparkContext(pyspark.SparkContext):

def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None):
pyspark.SparkContext.__init__(self, master=master, appName=appName, sparkHome=sparkHome, pyFiles=pyFiles,
environment=environment, batchSize=batchSize, serializer=serializer, conf=conf, gateway=gateway, jsc=jsc)

def textFiles(self, dirs):
hadoopConf = {"mapreduce.input.fileinputformat.inputdir": ",".join(
dirs), "mapreduce.input.fileinputformat.input.dir.recursive": "true"}

pair = self.hadoopRDD(inputFormatClass="org.apache.hadoop.mapred.TextInputFormat",
keyClass="org.apache.hadoop.io.LongWritable", valueClass="org.apache.hadoop.io.Text", conf=hadoopConf)

text = pair.map(lambda pair: pair[1])

return text


示例

from pyspark import SparkConf
from dip.spark import SparkContext

conf = SparkConf().setAppName("spark_textFiles_test")

sc = SparkContext(conf=conf)

dirs = ["hdfs://dip.cdh5.dev:8020/user/yurun/dir1",
"hdfs://dip.cdh5.dev:8020/user/yurun/dir2"]

def printLines(lines):
if lines:
for line in lines:
print line

lines = sc.textFiles(dirs).collect()

printLines(lines)

sc.stop()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: