Spark - 导入JSON、Text文件为Dataframe 做SQL查询
2016-04-10 16:39
555 查看
import os import sys import json import pyspark os.chdir("C:\\Temp") if 'SPARK_HOME' not in os.environ: os.environ['SPARK_HOME']="C:\\MyFolder\\spark-1.6.1-bin-hadoop2.6" SPARK_HOME=os.environ['SPARK_HOME'] sys.path.insert(0,os.path.join(SPARK_HOME, "python", "build")) sys.path.insert(0,os.path.join(SPARK_HOME, "python")) print SPARK_HOME from pyspark import SparkContext sc=SparkContext('local[4]','pyspark') from pyspark.sql.types import * from pyspark.sql import SQLContext, Row from pyspark.sql import SQLContext sqlContext = SQLContext(sc) lines=sc.textFile("C:\\Temp\\SC_Test.txt") lines.count() # 69 line1=lines.map(lambda x: x.split("|")) line1.count() line2=line1.map(lambda p: Row(trans_id=p[0],sndr_country=p[2],usd_amt=p[3],usd_margin=p[4] )) trans=sqlContext.createDataFrame(line2) trans.registerTempTable("trans") sql1=sqlContext.sql("select trans_id, sndr_country from trans where sndr_country='CA'") sql1.count() sql2=sqlContext.sql("select trans_id, usd_amt from trans where usd_amt > 100 ") sql2.count() sql2.collect() sql3=sqlContext.sql("select sndr_country, sum(usd_amt) from trans group by sndr_country ") sql3.count() sql3.collect() """ Load JSON files and Convert to dataframe ############# Impor & Convert JSON to dataframe fp = open('C:\\Temp\\ColumnConfig_6009.json','r') data = json.loads(fp.read()) data=sqlContext.createDataFrame(data) data.registerTempTable("data") fp.close() sql1=sqlContext.sql("select * from data ") sql1.count() # 2 """
sqlContext.dropTempTable("trans")
相关文章推荐
- mysql数据库批量快速插入
- Mac安装 mysql 数据库总结
- SQL SERVER存储过程,参数默认值设置
- Redis中的简单动态字符串
- oracle学习 第一章 简单的查询语句 ——03
- MYSQL 中的常见问题(包括数据筛选)
- MongoDB 决策分析
- kettle中MySQL批量加载
- kettle中MySQL批量加载
- hadoop生态系统学习之路(九)MR将结果输出到数据库(DB)
- 数据库设计 Step by Step (2)——数据库生命周期
- 初始化MySQL
- activiti5.13 框架 数据库表结构说明
- sql备份文件导出到excel
- 利用kettle对postgresql进行批量加载
- 不同的msyq版本和不同的mysql驱动会出现不兼容问题
- 分页数据有重复的问题
- 数据库设计 Step by Step (1)——扬帆启航
- Redis命令-有序集合-zlexcount
- 数据库的灵活操作