您的位置:首页 > 数据库

sqlContext.filter()返回的RDD为空

2015-11-04 19:00 267 查看
Hive中已有表records:

hive> desc records;

OK

year string

temperature int


quality int

hive> select * from records;

OK

2013 15
18

2014 23
32

2015 19
91

把records表中temperature中!=15的筛选出来,另建立一张新表存入筛选后的数据。代码如下:

from pyspark import SparkContext
from pyspark.sql import HiveContext

def inside(row):
<span style="color:#ff0000;">        if int(row[1]) == 15:
                print "*******************************" +str( row[1])
                return False</span>

if __name__ == "__main__":
        sc = SparkContext(appName = "records")
        sqlContext = HiveContext(sc)

        table_df = sqlContext.sql("select * from records").rdd
        rltrdd = table_df.filter(lambda row : inside(row))
count = rltrdd.count()
        if count == 0:
                print "**************************nothing****************"
        else:
                print "********************************************" + str(count)
<span style="white-space:pre">	</span>tablename = "temp"
        newdf = sqlContext.createDataFrame(rltrdd)
        newdf.registerAsTable(tablename)

        sql_create = "create table  temptable like records"
        sql_insert = "insert into table temptable select * from temp"
        sqlContext.sql(sql_create)
        sqlContext.sql(sql_insert)
        sc.stop()


提示RDD为空,报错如下:

Traceback (most recent call last):

File "/home/sky/spark/bin/workspace/query.py", line 24, in <module>

newdf = sqlContext.createDataFrame(rltrdd)

File "/home/sky/spark/python/pyspark/sql/context.py", line 284, in createDataFrame

schema = self._inferSchema(rdd, samplingRatio)

File "/home/sky/spark/python/pyspark/sql/context.py", line 164, in _inferSchema

first = rdd.first()

File "/home/sky/spark/python/pyspark/rdd.py", line 1245, in first

raise ValueError("RDD is empty")

ValueError: RDD is empty

修改代码如下:

from pyspark import SparkContext
from pyspark.sql import HiveContext

def inside(row):
<span style="color:#ff0000;">        if int(row[1]) != 15:
                print "*******************************" +str( row[1])
                return True
        else:   
                return False</span>
if __name__ == "__main__":
        sc = SparkContext(appName = "records")
        sqlContext = HiveContext(sc)
      
        table_df = sqlContext.sql("select * from records").rdd
        print "*************************************" 
        print  table_df
        rltrdd = table_df.filter(lambda row : inside(row))
        print "*************************************"+str(rltrdd)
        count = rltrdd.count()
        if count == 0:
                print "**************************nothing****************"
        else:
<span style="white-space:pre">	</span>print "********************************************" + str(count)
        tablename = "temp"
        newdf = sqlContext.createDataFrame(rltrdd)
        newdf.registerAsTable(tablename)

        sql_create = "create table  temptable like records"
        sql_insert = "insert into table temptable select * from temp"
        sqlContext.sql(sql_create)
        sqlContext.sql(sql_insert)
        sc.stop()


错误原因:

filter必须返回有True的值,否则为空
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: