如何使用Spark大规模并行构建索引
2016-02-01 13:52
323 查看
使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。
先看下,整体的拓扑图:
然后,再来看下,使用scala写的spark程序:
Java代码
package com.easy.build.index
import java.util
import org.apache.solr.client.solrj.beans.Field
import org.apache.solr.client.solrj.impl.HttpSolrClient
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.annotation.meta.field
/**
* Created by qindongliang on 2016/1/21.
*/
//注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下
case class Record(
@(Field@field)("rowkey") rowkey:String,
@(Field@field)("title") title:String,
@(Field@field)("content") content:String,
@(Field@field)("isdel") isdel:String,
@(Field@field)("t1") t1:String,
@(Field@field)("t2")t2:String,
@(Field@field)("t3")t3:String,
@(Field@field)("dtime") dtime:String
)
/***
* Spark构建索引==>Solr
*/
object SparkIndex {
//solr客户端
val client=new HttpSolrClient("http://192.168.1.188:8984/solr/monitor");
//批提交的条数
val batchCount=10000;
def main2(args: Array[String]) {
val d1=new Record("row1","title","content","1","01","57","58","3");
val d2=new Record("row2","title","content","1","01","57","58","45");
val d3=new Record("row3","title","content","1","01","57","58",null);
client.addBean(d1);
client.addBean(d2)
client.addBean(d3)
client.commit();
println("提交成功!")
}
/***
* 迭代分区数据(一个迭代器集合),然后进行处理
* @param lines 处理每个分区的数据
*/
def indexPartition(lines:scala.Iterator[String] ): Unit ={
//初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等
val datas = new util.ArrayList[Record]()
//迭代处理每条数据,符合条件会提交数据
lines.foreach(line=>indexLineToModel(line,datas))
//操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据
commitSolr(datas,true);
}
/***
* 提交索引数据到solr中
*
* @param datas 索引数据
* @param isEnd 是否为最后一次提交
*/
def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={
//仅仅最后一次提交和集合长度等于批处理的数量时才提交
if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {
client.addBeans(datas);
client.commit(); //提交数据
datas.clear();//清空集合,便于重用
}
}
/***
* 得到分区的数据具体每一行,并映射
* 到Model,进行后续索引处理
*
* @param line 每行具体数据
* @param datas 添加数据的集合,用于批量提交索引
*/
def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={
//数组数据清洗转换
val fields=line.split("\1",-1).map(field =>etl_field(field))
//将清洗完后的数组映射成Tuple类型
val tuple=buildTuble(fields)
//将Tuple转换成Bean类型
val recoder=Record.tupled(tuple)
//将实体类添加至集合,方便批处理提交
datas.add(recoder);
//提交索引到solr
commitSolr(datas,false);
}
/***
* 将数组映射成Tuple集合,方便与Bean绑定
* @param array field集合数组
* @return tuple集合
*/
def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={
array match {
case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)
}
}
/***
* 对field进行加工处理
* 空值替换为null,这样索引里面就不会索引这个字段
* ,正常值就还是原样返回
*
* @param field 用来走特定规则的数据
* @return 映射完的数据
*/
def etl_field(field:String):String={
field match {
case "" => null
case _ => field
}
}
/***
* 根据条件清空某一类索引数据
* @param query 删除的查询条件
*/
def deleteSolrByQuery(query:String): Unit ={
client.deleteByQuery(query);
client.commit()
println("删除成功!")
}
def main(args: Array[String]) {
//根据条件删除一些数据
deleteSolrByQuery("t1:03")
//远程提交时,需要提交打包后的jar
val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";
//远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统
System.setProperty("user.name", "webmaster");
//初始化SparkConf
val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");
//上传运行时依赖的jar包
val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"
conf.setJars(seq)
//初始化SparkContext上下文
val sc = new SparkContext(conf);
//此目录下所有的数据,将会被构建索引,格式一定是约定好的
val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");
//通过rdd构建索引
indexRDD(rdd);
//关闭索引资源
client.close();
//关闭SparkContext上下文
sc.stop();
}
/***
* 处理rdd数据,构建索引
* @param rdd
*/
def indexRDD(rdd:RDD[String]): Unit ={
//遍历分区,构建索引
rdd.foreachPartition(line=>indexPartition(line));
}
}
ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client ) 模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建
先看下,整体的拓扑图:
然后,再来看下,使用scala写的spark程序:
Java代码
package com.easy.build.index
import java.util
import org.apache.solr.client.solrj.beans.Field
import org.apache.solr.client.solrj.impl.HttpSolrClient
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.annotation.meta.field
/**
* Created by qindongliang on 2016/1/21.
*/
//注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下
case class Record(
@(Field@field)("rowkey") rowkey:String,
@(Field@field)("title") title:String,
@(Field@field)("content") content:String,
@(Field@field)("isdel") isdel:String,
@(Field@field)("t1") t1:String,
@(Field@field)("t2")t2:String,
@(Field@field)("t3")t3:String,
@(Field@field)("dtime") dtime:String
)
/***
* Spark构建索引==>Solr
*/
object SparkIndex {
//solr客户端
val client=new HttpSolrClient("http://192.168.1.188:8984/solr/monitor");
//批提交的条数
val batchCount=10000;
def main2(args: Array[String]) {
val d1=new Record("row1","title","content","1","01","57","58","3");
val d2=new Record("row2","title","content","1","01","57","58","45");
val d3=new Record("row3","title","content","1","01","57","58",null);
client.addBean(d1);
client.addBean(d2)
client.addBean(d3)
client.commit();
println("提交成功!")
}
/***
* 迭代分区数据(一个迭代器集合),然后进行处理
* @param lines 处理每个分区的数据
*/
def indexPartition(lines:scala.Iterator[String] ): Unit ={
//初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等
val datas = new util.ArrayList[Record]()
//迭代处理每条数据,符合条件会提交数据
lines.foreach(line=>indexLineToModel(line,datas))
//操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据
commitSolr(datas,true);
}
/***
* 提交索引数据到solr中
*
* @param datas 索引数据
* @param isEnd 是否为最后一次提交
*/
def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={
//仅仅最后一次提交和集合长度等于批处理的数量时才提交
if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {
client.addBeans(datas);
client.commit(); //提交数据
datas.clear();//清空集合,便于重用
}
}
/***
* 得到分区的数据具体每一行,并映射
* 到Model,进行后续索引处理
*
* @param line 每行具体数据
* @param datas 添加数据的集合,用于批量提交索引
*/
def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={
//数组数据清洗转换
val fields=line.split("\1",-1).map(field =>etl_field(field))
//将清洗完后的数组映射成Tuple类型
val tuple=buildTuble(fields)
//将Tuple转换成Bean类型
val recoder=Record.tupled(tuple)
//将实体类添加至集合,方便批处理提交
datas.add(recoder);
//提交索引到solr
commitSolr(datas,false);
}
/***
* 将数组映射成Tuple集合,方便与Bean绑定
* @param array field集合数组
* @return tuple集合
*/
def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={
array match {
case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)
}
}
/***
* 对field进行加工处理
* 空值替换为null,这样索引里面就不会索引这个字段
* ,正常值就还是原样返回
*
* @param field 用来走特定规则的数据
* @return 映射完的数据
*/
def etl_field(field:String):String={
field match {
case "" => null
case _ => field
}
}
/***
* 根据条件清空某一类索引数据
* @param query 删除的查询条件
*/
def deleteSolrByQuery(query:String): Unit ={
client.deleteByQuery(query);
client.commit()
println("删除成功!")
}
def main(args: Array[String]) {
//根据条件删除一些数据
deleteSolrByQuery("t1:03")
//远程提交时,需要提交打包后的jar
val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";
//远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统
System.setProperty("user.name", "webmaster");
//初始化SparkConf
val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");
//上传运行时依赖的jar包
val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"
conf.setJars(seq)
//初始化SparkContext上下文
val sc = new SparkContext(conf);
//此目录下所有的数据,将会被构建索引,格式一定是约定好的
val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");
//通过rdd构建索引
indexRDD(rdd);
//关闭索引资源
client.close();
//关闭SparkContext上下文
sc.stop();
}
/***
* 处理rdd数据,构建索引
* @param rdd
*/
def indexRDD(rdd:RDD[String]): Unit ={
//遍历分区,构建索引
rdd.foreachPartition(line=>indexPartition(line));
}
}
ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client ) 模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建
相关文章推荐
- hdoj--1281--棋盘游戏(最小点覆盖+枚举)
- 贪心 5
- html三种空格
- day03回顾
- Qt 学习之路 2(31):贪吃蛇游戏(1)
- 64位CentOS上编译 Hadoop 2.2.0
- The string "--" is not permitted within comments
- VC2010 MFC程序制作Flash动画欢迎界面
- 使用Qt开发俄罗斯方块游戏(1)
- 解压assets下的zip包并复制相应的文件
- Linux常用指令---rpm/yum命令
- cocos2dx 3.3 + QT5.3制作游戏编辑器
- Java控制台输出程序运行时间
- window下面搭建php集成环境xampp
- 阿里开源Mysql分布式中间件:Cobar
- Linux下/proc目录简介
- .Net在线付款---Paypal在线付款开发过程
- Git
- 用(*.frm *.MYD *.MYI)文件恢复MySql数据库
- iOS 定义具有位移操作的枚举的意义