使用AKKA做分布式爬虫的思路
2017-03-26 02:56
459 查看
上周公司其他小组在讨论做分布式爬虫,我也思考了一下,提了一个方案,就是使用akka分布式rpc框架来做,自己写master和worker程序,client向master提交begin任务或者其它爬虫需求,master让worker去爬网页,worker都是kafka的同一个group然后从kafka里面拉取数据(URL),然后处理爬了的网页,解析内容,把爬下来的网页通过正则表达式匹配出嵌套的网页,然后请求actor判断是否爬过(防止生成有向图,让其变成树形结构)(这里应该是个单独的actor,这样多个请求过来不会出现线程同步问题),最后把没有爬的URL扔到Kafka,直到kafka的URL被拉去完
这里有个简单的图例:
代码上面没有写爬虫的东西,也没有写checkActor,只是简单的做了下模拟,就写了个简单的分布式事例作为参考
代码结构如下:
其中POM同这篇博客一样:http://blog.csdn.net/qq_20641565/article/details/65488828
Master的代码:
Worker代码如下:
ActionUtils代码如下:
WorkBean代码如下:
caseClass代码如下:
最后先运行master,然后运行worker,我这里运行的两个worker,最后运行client 结果如下
Master:
Worker01:
Worker02:
Client:
这里有个简单的图例:
代码上面没有写爬虫的东西,也没有写checkActor,只是简单的做了下模拟,就写了个简单的分布式事例作为参考
代码结构如下:
其中POM同这篇博客一样:http://blog.csdn.net/qq_20641565/article/details/65488828
Master的代码:
package com.lijie.scala.service import scala.collection.mutable.HashMap import scala.concurrent.duration.DurationInt import com.lijie.scala.bean.WorkBean import com.lijie.scala.utils.ActorUtils import akka.actor.Actor import akka.actor.Props import akka.actor.actorRef2Scala import com.lijie.scala.bean.WorkBeanInfo import com.lijie.scala.bean.WorkBeanInfo import com.lijie.scala.caseclass.Submit import com.lijie.scala.caseclass.SubmitAble import com.lijie.scala.caseclass.Hearbeat import com.lijie.scala.caseclass.RegisterSucess import com.lijie.scala.caseclass.CheckConn import com.lijie.scala.caseclass.Register import com.lijie.scala.caseclass.SubmitCrawler import com.lijie.scala.caseclass.BeginCrawler class Master(val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor { //保存work的Actor连接 var workerConn = new HashMap[String, WorkBean] //保存客户端的连接 var clientConn = new HashMap[String, WorkBean] //超时时间 val OVERTIME = 20000 override def preStart(): Unit = { //隐式转换 import context.dispatcher //启动的时候定时检查worker是否挂了,如果挂了就从workerConn移除 context.system.scheduler.schedule(0 millis, OVERTIME millis, self, CheckConn) } def receive: Actor.Receive = { //注册 case Register(workerId, workerHost, workerPort, workerActorSystem, workerName) => { //打印worker上线消息 println(workerId + "," + workerHost + "," + workerPort + "," + workerActorSystem + "," + workerName) //获取Master的代理对象 var workerRef = context.actorSelection(s"akka.tcp://$workerActorSystem@$workerHost:$workerPort/user/$workerName") //保存连接 workerConn += (workerId -> new WorkBean(workerRef, 0)) //给worker返回应答注册成功 sender ! RegisterSucess } //接受心跳 case Hearbeat(workerId) => { if (workerConn.contains(workerId)) { //取出workerBean var workBean = workerConn.get(workerId).get //重新设置时间 workBean.time = System.currentTimeMillis() //移除之前的值 workerConn -= workerId //将新值放入conn workerConn += (workerId -> workBean) } } //定时检查 case CheckConn => { //得到超时的值 var over = workerConn.filter(System.currentTimeMillis() - _._2.time > OVERTIME) //得到超时的值 for (key <- over.keySet) { //将超时的从链接中移除 workerConn -= key } //测试输出还有多少个链接 val alive = workerConn.size println(s"还有$alive 个worker活着") } case Submit(clientId, clientHost, clientPort, clientActorSystem, clientName) => { //打印client上线消息 println(clientId + "," + clientHost + "," + clientPort + "," + clientActorSystem + "," + clientName) //获取Master的代理对象 var clientRef = context.actorSelection(s"akka.tcp://$clientActorSystem@$clientHost:$clientPort/user/$clientName") //保存连接 clientConn += (clientId -> new WorkBean(clientRef, 0)) //给client返回可以提交申请 sender ! SubmitAble } //收到爬虫任务分发给worker case SubmitCrawler(kafka, redis, other) => { //让所有worker开始爬虫任务 for (workerBean <- workerConn.values) { //向所有存活的worker发送爬虫任务 workerBean.worker ! BeginCrawler(kafka, redis, other) } } } } object Master { def main(args: Array[String]): Unit = { val argss = Array[String]("127.0.0.1", "8080", "masterSystem", "actorMaster") val host = argss(0) val port = argss(1).toInt val actorSystem = argss(2) val actorName = argss(3) //获取master的actorSystem val masterSystem = ActorUtils.getActorSystem(host, port, actorSystem) val master = masterSystem.actorOf(Props(new Master(host, port, actorSystem, actorName)), actorName) masterSystem.awaitTermination() } }
Worker代码如下:
package com.lijie.scala.service import akka.actor.Actor import akka.actor.ActorSelection import java.util.UUID import scala.concurrent.duration._ import com.lijie.scala.caseclass.SendHearbeat import com.lijie.scala.utils.ActorUtils import akka.actor.Props import com.lijie.scala.caseclass.BeginCrawler import com.lijie.scala.caseclass.Hearbeat import com.lijie.scala.caseclass.RegisterSucess import com.lijie.scala.caseclass.Register class Worker(val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor { //master的代理对象 var master: ActorSelection = _ //每个worker的id val workerId = UUID.randomUUID().toString() override def preStart(): Unit = { //获取Master的代理对象 master = context.actorSelection(s"akka.tcp://$masterActorSystem@$masterHost:$masterPort/user/$masterName") //向master注册 master ! Register(workerId, workerHost, workerPort, workerActorSystem, workerName) } def receive: Actor.Receive = { //收到注册成功的消息,定时发送心跳 case RegisterSucess => { println("收到注册成功的消息,开始发送心跳") //隐式转换 import context.dispatcher //创建定时器,并发送心跳 context.system.scheduler.schedule(0 millis, 10000 millis, self, SendHearbeat) } //发送心跳 case SendHearbeat => { println("向master发送心跳") //发送心跳 master ! Hearbeat(workerId) } //开始爬虫 case BeginCrawler(kafka, redis, other) => { println("开始执行爬虫任务...") println("kafka和redis以及其他消息内容:" + kafka + "," + redis + "," + other) println("初始化kafka连接和redis连接...") println("从队列里面取出url...") println("开始爬数据...") println("如果失败重试几次...") println("............") println("解析这个网页的内容,解析出里面的url...") //请求actionCheck println("请求actionCheck...") println("检查是否爬过...") println("把该刚爬了的url扔到redis") println("把该网页解析的没有爬过的url扔到队列...") println("继续从队列里面拿url直到队列里面url被爬完...") } } } object Worker { def main(args: Array[String]): Unit = { val argss = Array[String]("127.0.0.1", "8088", "workSystem", "actorWorker", "127.0.0.1", "8080", "masterSystem", "actorMaster") //worker val host = argss(0) val port = argss(1).toInt val actorSystem = argss(2) val actorName = argss(3) //master val hostM = argss(4) val portM = argss(5).toInt val actorSystemM = argss(6) val actorNameM = argss(7) //获取woker的actorSystem val workerSystem = ActorUtils.getActorSystem(host, port, actorSystem) val worker = workerSystem.actorOf(Props(new Worker(host, port, actorSystem, actorName, hostM, portM, actorSystemM, actorNameM)), actorName) workerSystem.awaitTermination() } }
ActionUtils代码如下:
package com.lijie.scala.utils import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.Props import akka.actor.Actor object ActorUtils { //获取actor工具类 def getActorSystem(host: String, port: Int, actorSystem: String) = { val conf = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(conf) //创建注册worker的的ActorSystem val actorSys = ActorSystem(actorSystem, config) //返回actorSystem actorSys } }
WorkBean代码如下:
package com.lijie.scala.bean import akka.actor.ActorSelection //封装worker的引用和当前时间戳 class WorkBean(var worker: ActorSelection, var time: Long) class WorkBeanInfo(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, var time: Long)
caseClass代码如下:
package com.lijie.scala.caseclass //开始提交任务 client2client case object BeginSubmit // client2client------------------------------- //client提交任务 client2master case class Submit(val clientId: String, val clientHost: String, val clientPort: Int, val clientActorSystem: String, val clientName: String) extends Serializable //提交爬虫任务 case class SubmitCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String) // client2master------------------------------- //可以提交任务 master2client case object SubmitAble // master2client------------------------------- //检查哪些worker挂了 master2master case object CheckConn //返回注册成功 master2worker case object RegisterSucess extends Serializable // master2worker------------------------------- //worker注册 worker2master case class Register(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String) extends Serializable //发送心跳 worker2master case class Hearbeat(workId: String) extends Serializable // worker2master------------------------------- //发送心跳 worker2worker case object SendHearbeat //爬虫 worker2worker case class BeginCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String) // worker2worker-------------------------------
最后先运行master,然后运行worker,我这里运行的两个worker,最后运行client 结果如下
Master:
Worker01:
Worker02:
Client:
相关文章推荐
- 使用AKKA做分布式爬虫的思路
- akka分布式爬虫框架(一)——设计思路与demo
- 使用scrapy,redis, mongodb实现的一个分布式网络爬虫
- 分布式爬虫:使用Scrapy抓取数据
- 开发一个分布式的爬虫需要用到哪些技术或是算法,请高手给个思路
- 分布式爬虫:使用Scrapy抓取数据
- 使用scrapy-redis实现分布式爬虫
- 使用Docker Swarm搭建分布式爬虫集群
- 分布式爬虫:使用Scrapy抓取数据
- 正确使用CORBA,ICE等分布式技术的一种思路
- 功能比Scrapy强,却使用最方便的Gerapy分布式爬虫管理框架
- 使用Docker Swarm搭建分布式爬虫集群
- RocketMQ使用及分布式事务解决思路
- 是否大部分分布式系统使用的技术思路都差不多?
- Java网络爬虫(十)--使用多线程提升爬虫性能的思路小结
- 基于Python使用scrapy-redis框架实现分布式爬虫 注
- 今天使用sQL2005中Try_catch,以及分布式使用远程服务的函数
- 毕业设计中怎样用python写一个搜索引擎的分布式爬虫---异样的美感
- 使用JOTM实现分布式事务管理(多数据源)