sqlContext.filter()返回的RDD为空
2015-11-04 19:00
267 查看
Hive中已有表records:
hive> desc records;
OK
year string
temperature int
quality int
hive> select * from records;
OK
2013 15
18
2014 23
32
2015 19
91
把records表中temperature中!=15的筛选出来,另建立一张新表存入筛选后的数据。代码如下:
提示RDD为空,报错如下:
Traceback (most recent call last):
File "/home/sky/spark/bin/workspace/query.py", line 24, in <module>
newdf = sqlContext.createDataFrame(rltrdd)
File "/home/sky/spark/python/pyspark/sql/context.py", line 284, in createDataFrame
schema = self._inferSchema(rdd, samplingRatio)
File "/home/sky/spark/python/pyspark/sql/context.py", line 164, in _inferSchema
first = rdd.first()
File "/home/sky/spark/python/pyspark/rdd.py", line 1245, in first
raise ValueError("RDD is empty")
ValueError: RDD is empty
修改代码如下:
错误原因:
filter必须返回有True的值,否则为空
hive> desc records;
OK
year string
temperature int
quality int
hive> select * from records;
OK
2013 15
18
2014 23
32
2015 19
91
把records表中temperature中!=15的筛选出来,另建立一张新表存入筛选后的数据。代码如下:
from pyspark import SparkContext from pyspark.sql import HiveContext def inside(row): <span style="color:#ff0000;"> if int(row[1]) == 15: print "*******************************" +str( row[1]) return False</span> if __name__ == "__main__": sc = SparkContext(appName = "records") sqlContext = HiveContext(sc) table_df = sqlContext.sql("select * from records").rdd rltrdd = table_df.filter(lambda row : inside(row))
count = rltrdd.count() if count == 0: print "**************************nothing****************" else: print "********************************************" + str(count)
<span style="white-space:pre"> </span>tablename = "temp" newdf = sqlContext.createDataFrame(rltrdd) newdf.registerAsTable(tablename) sql_create = "create table temptable like records" sql_insert = "insert into table temptable select * from temp" sqlContext.sql(sql_create) sqlContext.sql(sql_insert) sc.stop()
提示RDD为空,报错如下:
Traceback (most recent call last):
File "/home/sky/spark/bin/workspace/query.py", line 24, in <module>
newdf = sqlContext.createDataFrame(rltrdd)
File "/home/sky/spark/python/pyspark/sql/context.py", line 284, in createDataFrame
schema = self._inferSchema(rdd, samplingRatio)
File "/home/sky/spark/python/pyspark/sql/context.py", line 164, in _inferSchema
first = rdd.first()
File "/home/sky/spark/python/pyspark/rdd.py", line 1245, in first
raise ValueError("RDD is empty")
ValueError: RDD is empty
修改代码如下:
from pyspark import SparkContext from pyspark.sql import HiveContext def inside(row): <span style="color:#ff0000;"> if int(row[1]) != 15: print "*******************************" +str( row[1]) return True else: return False</span> if __name__ == "__main__": sc = SparkContext(appName = "records") sqlContext = HiveContext(sc) table_df = sqlContext.sql("select * from records").rdd print "*************************************" print table_df rltrdd = table_df.filter(lambda row : inside(row)) print "*************************************"+str(rltrdd) count = rltrdd.count() if count == 0: print "**************************nothing****************" else: <span style="white-space:pre"> </span>print "********************************************" + str(count) tablename = "temp" newdf = sqlContext.createDataFrame(rltrdd) newdf.registerAsTable(tablename) sql_create = "create table temptable like records" sql_insert = "insert into table temptable select * from temp" sqlContext.sql(sql_create) sqlContext.sql(sql_insert) sc.stop()
错误原因:
filter必须返回有True的值,否则为空
相关文章推荐
- Atitit.数据库分区的设计 attilax 总结
- Atitit.数据库分区的设计 attilax 总结
- Atitit.数据库分区的设计 attilax 总结
- MySql 插入(insert)性能测试
- 简单的记事本(SQLite+自定义adapter)
- 在数据表中添加一个字段的SQL语句怎么写
- mysql 学习笔记
- 一键安装lnmp-mysql(4)
- ORACLE索引介绍和使用
- Hive SQL执行流程分析
- Atitit.sql where条件表达式的原理 attilax概括
- Atitit.sql where条件表达式的原理 attilax概括
- Atitit.sql where条件表达式的原理 attilax概括
- SQL语句操作数据-------开启旅程路线喽!
- 如何诊断RAC数据库上的“IPC Send timeout”问题?
- Mysql:Forcing close of thread 756 user: 'root'
- Hive SQL的编译过程
- 数据库隔离级别
- sqlserver相关知识
- Keepalived实现Redis Failover自动故障切换