您的位置:首页 > 编程语言 > Java开发

spark程序读写protobuf格式数据(java语言)

2017-04-20 10:27 501 查看

在spark上,用protobuf替代json格式作为数据序列化存储

谷歌的protobuf一般用来将复杂数据结构序列化为二进制数组,非常适合网络传输等领域,其效率和空间占用都优于json格式。

这一次,我在用spark做建模时,打算使用protobuf替换原json格式数据,以获得性能提升。在此记录下实现方式,以及如何避过我遇到的坑。

我的环境是spark1.5.0 + java7 + protobuf2.5。

首先,要编写.proto文件以描述数据结构。这里不详细解释,有兴趣的可参见别人写的:

http://www.cnblogs.com/dkblog/archive/2012/03/27/2419010.html

这里放一个proto文件的例子:

// protobufTest.proto
syntax = "proto2";
option java_package = "com.ismartv.recommendv2.test";
option java_outer_classname = "PersonEntity";//生成的数据访问类的类名
message Person {
required string sn = 1;// sn
required string name = 2;//必须字段,在后面的使用中必须为该段设置值
}


使用命令protoc –java_out=src pathToProto/protobufTest.proto 即可将proto文件所描述的数据类型生成为java类。

接下来只需要编写spark程序,先将Person数据类型由java对象转为protobuf二进制数组输出到hdfs,再由hdfs读取二进制数组数据转换为java对象。完成读写操作。

以protobuf为结构,写java对象到HDFS二进制文件代码:

// 先生成若干个Person对象
JavaRDD<Integer> numbersRDD = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
JavaRDD<Person> persons = numbersRDD.map(new Function<Integer, PersonEntity.Person>() {

public Person call(Integer x) throws Exception {
// TODO Auto-generated method stub
PersonEntity.Person.Builder builder = PersonEntity.Person.newBuilder();
builder.setSn(x.toString());
builder.setName(x.toString() + "name");
PersonEntity.Person person = builder.build();
return person;
}
});
// 将JavaRDD<Person> 转换为JavaPairRDD<NullWritable, BytesWritable>
// 最后保存到HDFS
persons.mapToPair(new PairFunction<PersonEntity.Person, NullWritable, BytesWritable>() {

public Tuple2<NullWritable, BytesWritable> call(Person person) throws Exception {
// 这里new BytesWritable(person.toByteArray()) 是将java对象序列化为protobuf二进制数组
return new Tuple2<NullWritable, BytesWritable>(NullWritable.get(), new BytesWritable(person.toByteArray()));
}
}).saveAsNewAPIHadoopFile("hdfs://nameservice1/test/protobufTest", NullWritable.class, BytesWritable.class, SequenceFileOutputFormat.class);


以protobuf为结构,读HDFS二进制文件到java对象代码:

// 注意要用sequenceFile函数
JavaRDD<Person> readperson = sc.sequenceFile("hdfs://nameservice1/test/protobufTest/", NullWritable.class, BytesWritable.class)
.map(new Function<Tuple2<NullWritable, BytesWritable>, PersonEntity.Person>() {

public Person call(Tuple2<NullWritable, BytesWritable> tuple) throws Exception {
// 解析byte[]为java对象,注意,一定要用copyBytes()而不是getBytes()
PersonEntity.Person p3 = PersonEntity.Person.parseFrom(tuple._2.copyBytes());
return p3;
}
});
// 看一下结果
List<PersonEntity.Person> list = readperson.collect();
for(PersonEntity.Person person : list){
System.out.println(person.toString());
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark java protobuf