使用Java实战RDD与Dataframe动态转换
2016-05-07 22:49
465 查看
import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.relaxng.datatype.Datatype; /** * @author 作者 E-mail: * @version 创建时间:2016年3月16日 下午9:46:11 * 类说明 */ public class RDD2DataFrame { public static void main( String[] args ) { SparkConf conf = new SparkConf().setMaster("local").setAppName(""); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); JavaRDD<String> lines = sc.textFile( "D://person.txt" ); /**] * 在RDD的基础上创建类型为Row的RDD, */ JavaRDD<Row> personRDD = lines.map(new Function<String, Row>() { private static final long serialVersionUID = 1L; public Row call( String line ) throws Exception { String[] split = line.split(","); return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2])); } }); /** *1、 动态的构建DataFrame的元数据,一般而言,有多少列以及酶类的具体类型可能来源于JSON文件或者数据库 */ List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField( "id", DataTypes.IntegerType, true )); structFields.add(DataTypes.createStructField( "name", DataTypes.StringType, true )); structFields.add(DataTypes.createStructField( "age", DataTypes.IntegerType, true )); /** * 2、构建StructType用于DataFrame 元数据的描述 * */ StructType structType = DataTypes.createStructType( structFields ); /** * 3、基于MeataData以及RDD<Row>来构造DataFrame */ DataFrame personsDF = sqlContext.createDataFrame( personRDD, structType); /** * 4、注册成为临时表以供后续的SQL查询操作 */ personsDF.registerTempTable("persons"); /** * 5、进行数据的多维度分析 */ DataFrame dataResults = sqlContext.sql("select * from persons where age > 8"); /** * 6对结果进行处理,包括由DataFrame转换为RDD<Row> 以及结果的持久化 */ List<Row> collect = dataResults.javaRDD().collect(); for (Row lists : collect){ System.out.println(lists); } } }
RDD与Dataframe动态转换
相关文章推荐
- 《java入门第一季》之面向对象(形式参数和返回值问题的深入研究2)
- 《java入门第一季》之面向对象(形式参数和返回值问题的深入研究2)
- 我的进步是站在巨人的肩膀,java随机数详解
- JavaWeb学习笔记——JavaBean的保存范围和删除
- Java多线程编程4--Lock的实例--实现生产者/消费者模式:一对一、多对多交替打印
- 《java入门第一季》之面向对象(形式参数和返回值问题深入研究1)
- 《java入门第一季》之面向对象(形式参数和返回值问题深入研究1)
- Java中的String详解
- 【GOF23设计模式】_建造者模式详解_类图关系JAVA232
- 20145218 《Java程序设计》第10周学习总结
- 最简单的例子告诉你什么是面向对象(java)
- ZipFile v.s. ZipInputStream in java.util.zip
- Java的代理—JDK Proxy
- java 完全解耦和弱耦合
- Java Swing 秒表
- 维护代码简洁,使用lombok消除冗长的Java代码
- Java学习4_一些基础4_输入输出_16.5.7
- 如何定义 Java 中的方法(二)
- java Io流中对象序列化和反序列问题
- Spring MVC 4 文件上传下载 Hibernate+MySQL例子 (带源码)