zoukankan      html  css  js  c++  java
  • 使用AKKA做分布式爬虫的思路

    上周公司其它小组在讨论做分布式爬虫,我也思考了一下。提了一个方案,就是使用akka分布式rpc框架来做,自己写master和worker程序,client向master提交begin任务或者其它爬虫需求,master让worker去爬网页,worker都是kafka的同一个group然后从kafka里面拉取数据(URL),然后处理爬了的网页,解析内容,把爬下来的网页通过正則表達式匹配出嵌套的网页,然后请求actor推断是否爬过(防止生成有向图。让其变成树形结构)(这里应该是个单独的actor。这样多个请求过来不会出现线程同步问题),最后把没有爬的URL扔到Kafka,直到kafka的URL被拉去完

    这里有个简单的图例:

    这里写图片描写叙述

    代码上面没有写爬虫的东西,也没有写checkActor,仅仅是简单的做了下模拟,就写了个简单的分布式事例作为參考

    代码结构例如以下:

    这里写图片描写叙述

    当中POM同这篇博客一样http://blog.csdn.net/qq_20641565/article/details/65488828

    Master的代码:

    package com.lijie.scala.service
    
    import scala.collection.mutable.HashMap
    import scala.concurrent.duration.DurationInt
    
    import com.lijie.scala.bean.WorkBean
    import com.lijie.scala.utils.ActorUtils
    
    import akka.actor.Actor
    import akka.actor.Props
    import akka.actor.actorRef2Scala
    import com.lijie.scala.bean.WorkBeanInfo
    import com.lijie.scala.bean.WorkBeanInfo
    import com.lijie.scala.caseclass.Submit
    import com.lijie.scala.caseclass.SubmitAble
    import com.lijie.scala.caseclass.Hearbeat
    import com.lijie.scala.caseclass.RegisterSucess
    import com.lijie.scala.caseclass.CheckConn
    import com.lijie.scala.caseclass.Register
    import com.lijie.scala.caseclass.SubmitCrawler
    import com.lijie.scala.caseclass.BeginCrawler
    
    class Master(val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor {
    
      //保存work的Actor连接
      var workerConn = new HashMap[String, WorkBean]
    
      //保存客户端的连接
      var clientConn = new HashMap[String, WorkBean]
    
      //超时时间
      val OVERTIME = 20000
    
      override def preStart(): Unit = {
    
        //隐式转换
        import context.dispatcher
    
        //启动的时候定时检查worker是否挂了,假设挂了就从workerConn移除
        context.system.scheduler.schedule(0 millis, OVERTIME millis, self, CheckConn)
      }
    
      def receive: Actor.Receive = {
    
        //注冊
        case Register(workerId, workerHost, workerPort, workerActorSystem, workerName) => {
    
          //打印worker上线消息
          println(workerId + "," + workerHost + "," + workerPort + "," + workerActorSystem + "," + workerName)
    
          //获取Master的代理对象
          var workerRef = context.actorSelection(s"akka.tcp://$workerActorSystem@$workerHost:$workerPort/user/$workerName")
    
          //保存连接
          workerConn += (workerId -> new WorkBean(workerRef, 0))
    
          //给worker返回应答注冊成功
          sender ! RegisterSucess
    
        }
    
        //接受心跳
        case Hearbeat(workerId) => {
          if (workerConn.contains(workerId)) {
    
            //取出workerBean
            var workBean = workerConn.get(workerId).get
    
            //又一次设置时间
            workBean.time = System.currentTimeMillis()
    
            //移除之前的值
            workerConn -= workerId
    
            //将新值放入conn
            workerConn += (workerId -> workBean)
          }
        }
    
        //定时检查
        case CheckConn => {
    
          //得到超时的值
          var over = workerConn.filter(System.currentTimeMillis() - _._2.time > OVERTIME)
    
          //得到超时的值
          for (key <- over.keySet) {
    
            //将超时的从链接中移除
            workerConn -= key
          }
    
          //測试输出还有多少个链接
          val alive = workerConn.size
          println(s"还有$alive 个worker活着")
        }
    
        case Submit(clientId, clientHost, clientPort, clientActorSystem, clientName) => {
          //打印client上线消息
          println(clientId + "," + clientHost + "," + clientPort + "," + clientActorSystem + "," + clientName)
    
          //获取Master的代理对象
          var clientRef = context.actorSelection(s"akka.tcp://$clientActorSystem@$clientHost:$clientPort/user/$clientName")
    
          //保存连接
          clientConn += (clientId -> new WorkBean(clientRef, 0))
    
          //给client返回能够提交申请
          sender ! SubmitAble
        }
    
        //收到爬虫任务分发给worker
        case SubmitCrawler(kafka, redis, other) => {
    
          //让全部worker開始爬虫任务
          for (workerBean <- workerConn.values) {
    
            //向全部存活的worker发送爬虫任务
            workerBean.worker ! BeginCrawler(kafka, redis, other)
          }
        }
      }
    
    }
    
    object Master {
    
      def main(args: Array[String]): Unit = {
        val argss = Array[String]("127.0.0.1", "8080", "masterSystem", "actorMaster")
    
        val host = argss(0)
    
        val port = argss(1).toInt
    
        val actorSystem = argss(2)
    
        val actorName = argss(3)
    
        //获取master的actorSystem
        val masterSystem = ActorUtils.getActorSystem(host, port, actorSystem)
    
        val master = masterSystem.actorOf(Props(new Master(host, port, actorSystem, actorName)), actorName)
    
        masterSystem.awaitTermination()
      }
    }

    Worker代码例如以下:

    package com.lijie.scala.service
    
    import akka.actor.Actor
    import akka.actor.ActorSelection
    import java.util.UUID
    import scala.concurrent.duration._
    import com.lijie.scala.caseclass.SendHearbeat
    import com.lijie.scala.utils.ActorUtils
    import akka.actor.Props
    import com.lijie.scala.caseclass.BeginCrawler
    import com.lijie.scala.caseclass.Hearbeat
    import com.lijie.scala.caseclass.RegisterSucess
    import com.lijie.scala.caseclass.Register
    
    class Worker(val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor {
    
      //master的代理对象
      var master: ActorSelection = _
    
      //每一个worker的id
      val workerId = UUID.randomUUID().toString()
    
      override def preStart(): Unit = {
    
        //获取Master的代理对象
        master = context.actorSelection(s"akka.tcp://$masterActorSystem@$masterHost:$masterPort/user/$masterName")
    
        //向master注冊
        master ! Register(workerId, workerHost, workerPort, workerActorSystem, workerName)
      }
    
      def receive: Actor.Receive = {
    
        //收到注冊成功的消息,定时发送心跳
        case RegisterSucess => {
          println("收到注冊成功的消息,開始发送心跳")
    
          //隐式转换
          import context.dispatcher
    
          //创建定时器,并发送心跳
          context.system.scheduler.schedule(0 millis, 10000 millis, self, SendHearbeat)
    
        }
    
        //发送心跳
        case SendHearbeat => {
          println("向master发送心跳")
    
          //发送心跳
          master ! Hearbeat(workerId)
        }
    
        //開始爬虫
        case BeginCrawler(kafka, redis, other) => {
    
          println("開始执行爬虫任务...")
          println("kafka和redis以及其它消息内容:" + kafka + "," + redis + "," + other)
          println("初始化kafka连接和redis连接...")
          println("从队列里面取出url...")
          println("開始爬数据...")
          println("假设失败重试几次...")
          println("............")
          println("解析这个网页的内容,解析出里面的url...")
          //请求actionCheck
          println("请求actionCheck...")
          println("检查是否爬过...")
          println("把该刚爬了的url扔到redis")
          println("把该网页解析的没有爬过的url扔到队列...")
          println("继续从队列里面拿url直到队列里面url被爬完...")
        }
    
      }
    }
    
    object Worker {
    
      def main(args: Array[String]): Unit = {
        val argss = Array[String]("127.0.0.1", "8088", "workSystem", "actorWorker", "127.0.0.1", "8080", "masterSystem", "actorMaster")
    
        //worker
        val host = argss(0)
    
        val port = argss(1).toInt
    
        val actorSystem = argss(2)
    
        val actorName = argss(3)
    
        //master
        val hostM = argss(4)
    
        val portM = argss(5).toInt
    
        val actorSystemM = argss(6)
    
        val actorNameM = argss(7)
    
        //获取woker的actorSystem
        val workerSystem = ActorUtils.getActorSystem(host, port, actorSystem)
    
        val worker = workerSystem.actorOf(Props(new Worker(host, port, actorSystem, actorName, hostM, portM, actorSystemM, actorNameM)), actorName)
    
        workerSystem.awaitTermination()
      }
    }

    ActionUtils代码例如以下:

    package com.lijie.scala.utils
    
    import com.typesafe.config.ConfigFactory
    import akka.actor.ActorSystem
    import akka.actor.Props
    import akka.actor.Actor
    
    object ActorUtils {
    
      //获取actor工具类
      def getActorSystem(host: String, port: Int, actorSystem: String) = {
        val conf = s"""
          |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
          |akka.remote.netty.tcp.hostname = "$host"
          |akka.remote.netty.tcp.port = "$port"
          """.stripMargin
    
        val config = ConfigFactory.parseString(conf)
    
        //创建注冊worker的的ActorSystem
        val actorSys = ActorSystem(actorSystem, config)
    
        //返回actorSystem
        actorSys
      }
    }

    WorkBean代码例如以下:

    package com.lijie.scala.bean
    
    import akka.actor.ActorSelection
    
    //封装worker的引用和当前时间戳
    class WorkBean(var worker: ActorSelection, var time: Long)
    
    class WorkBeanInfo(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, var time: Long)

    caseClass代码例如以下:

    package com.lijie.scala.caseclass
    
    //開始提交任务 client2client
    case object BeginSubmit
    // client2client-------------------------------
    
    //client提交任务  client2master
    case class Submit(val clientId: String, val clientHost: String, val clientPort: Int, val clientActorSystem: String, val clientName: String) extends Serializable
    
    //提交爬虫任务
    case class SubmitCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String)
    // client2master-------------------------------
    
    //能够提交任务 master2client
    case object SubmitAble
    // master2client-------------------------------
    
    //检查哪些worker挂了 master2master
    case object CheckConn
    
    //返回注冊成功 master2worker
    case object RegisterSucess extends Serializable
    // master2worker-------------------------------
    
    //worker注冊 worker2master
    case class Register(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String) extends Serializable
    
    //发送心跳 worker2master
    case class Hearbeat(workId: String) extends Serializable
    // worker2master-------------------------------
    
    //发送心跳 worker2worker
    case object SendHearbeat
    
    //爬虫 worker2worker
    case class BeginCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String)
    // worker2worker-------------------------------
    

    最后先执行master。然后执行worker,我这里执行的两个worker。最后执行client 结果例如以下

    Master:

    这里写图片描写叙述

    Worker01:

    这里写图片描写叙述

    Worker02:

    这里写图片描写叙述

    Client:

    这里写图片描写叙述

  • 相关阅读:
    ASP.NET Core的配置信息
    ASP .NET Core 建立列表和表单View
    ASP.NET Core 如何使用Mvc相关技术建立Controller、Tag Helper (下)
    MySQL日志突然暴涨
    MySQL函数索引及优化
    MySQL统计库表大小
    MySQL8.0窗口函数实践及小结
    MySQL按指定字符合并及拆分
    分享2个近期遇到的MySQL数据库的BUG案例
    mysql大表在不停机的情况下增加字段该怎么处理
  • 原文地址:https://www.cnblogs.com/cxchanpin/p/7340865.html
Copyright © 2011-2022 走看看