您的位置:首页 > 编程语言 > Java开发

使用Java实战RDD与Dataframe动态转换

2016-05-07 22:49 465 查看
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.relaxng.datatype.Datatype;

/**
* @author 作者 E-mail:
* @version 创建时间:2016年3月16日 下午9:46:11
* 类说明
*/
public class RDD2DataFrame {
public static void main( String[] args ) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lines = sc.textFile( "D://person.txt" );
/**]
* 在RDD的基础上创建类型为Row的RDD,
*/
JavaRDD<Row> personRDD  = lines.map(new Function<String, Row>() {

private static final long serialVersionUID = 1L;

public Row call( String line )
throws Exception {
String[] split = line.split(",");
return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2]));
}
});
/**
*1、 动态的构建DataFrame的元数据,一般而言,有多少列以及酶类的具体类型可能来源于JSON文件或者数据库
*/
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField( "id", DataTypes.IntegerType, true ));
structFields.add(DataTypes.createStructField( "name", DataTypes.StringType, true ));
structFields.add(DataTypes.createStructField( "age", DataTypes.IntegerType, true ));
/**
* 2、构建StructType用于DataFrame 元数据的描述
*
*/
StructType structType = DataTypes.createStructType( structFields );
/**
* 3、基于MeataData以及RDD<Row>来构造DataFrame
*/
DataFrame personsDF = sqlContext.createDataFrame( personRDD, structType);

/**
* 4、注册成为临时表以供后续的SQL查询操作
*/
personsDF.registerTempTable("persons");
/**
* 5、进行数据的多维度分析
*/
DataFrame dataResults = sqlContext.sql("select * from persons where  age > 8");
/**
* 6对结果进行处理,包括由DataFrame转换为RDD<Row> 以及结果的持久化
*/
List<Row> collect = dataResults.javaRDD().collect();
for (Row lists : collect){
System.out.println(lists);
}
}
}


RDD与Dataframe动态转换
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: