zoukankan      html  css  js  c++  java
  • scala Actor -03

      1.对于上一篇讲解的scala的一些补充

        val files = Array[String]("a.txt","b.txt","c.txt")

        for(f <- files){xxxx}

      目标一:熟悉Scala Actor并发编程

      目标二:为学习Akka做准备

        注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。

        Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,

        老版本的Actor已经废弃

       2.概念

        Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的

        并发机制,

        Scala是运用消息(message)的发送、接收来实现多线程的。

        使用Scala能够更容易地实现多线程应用的开发

       3.Actor方法执行顺序

        1.首先调用start()方法执行Actor

        2.调用start()方法后其act()方法会被执行

        3.向Actor发送消息

       4.wordCount的Actor的计算方法,虽然现在不用,但是思路还是有用的

        

    package main.cn.wj.test
    import scala.actors.{Actor, Future}
    import scala.collection.immutable.HashSet
    import scala.io.Source
    import scala.collection.mutable.ListBuffer
    /**
      * Created by WJ on 2016/12/22.
      */
    
    class Task extends Actor{
      override def act(): Unit = {
          loop{
                 react{
                   case   SubmitTask(filename) =>{
                     val result = Source.fromFile(filename).getLines().flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size)
                     sender ! ResultTask(result)
                   }
                   case StopTask =>{
                     exit()
                   }
                 }
          }
      }
    }
    
    case class SubmitTask(filename:String)
    
    case class ResultTask (result:Map[String,Int])
    
    case object StopTask
    
    object ActorWordCount {
      def main(args: Array[String]): Unit = {
        var replySet = new HashSet[Future[Any]]()
        val resultList =  new ListBuffer[ResultTask]
        val files = Array[String]("E://Test/words.log", "E://Test/words.txt")
        for (f <- files) {
             val actor = new Task
             val reply = actor.start() !! SubmitTask(f)    //<reply 等同于Future>
             replySet  += reply
        }
        while(replySet.size > 0 ){
          val toCompute = replySet.filter(_.isSet)
          for(f <- toCompute) {
             val result = f.apply().asInstanceOf[ResultTask]
            resultList += result
            replySet -= f
          }
          Thread.sleep(100)
        }
    
        // reduce功能 ,汇总
        //List
        val fr = resultList.flatMap(_.result).groupBy((_._1)).mapValues(_.foldLeft(0)(_+_._2))
        println(fr)
      }
    }
    

      5.看了上面的关于多线程相关的知识点,看看我们的线程池的代码

      

    package main.cn.wj.test
    
    import java.util.concurrent.{Executor, Executors}
    
    /**
      * Created by WJ on 2016/12/22.
      */
    object ThreadDemo {
      def main(args: Array[String]): Unit = {
           val pool = Executors.newFixedThreadPool(5);
        for (i <- 1 to 10){
          pool.execute(new Runnable {
            override def run(): Unit = {
              println(Thread.currentThread().getName)
              Thread.sleep(1000)
            }
          })
        }
      }
    }
    何当共剪西窗烛,却话巴山夜雨时
  • 相关阅读:
    List<string>里的集合和字符串互转
    黑马程序员学习9
    黑马程序员学习7
    黑马程序员学习11
    黑马程序员学习10
    黑马程序员学习8
    黑马程序员学习12
    为什么Huffman编码不会发生冲突
    mule esb 配置maven 何苦
    php实现kafka功能开发 何苦
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6213423.html
Copyright © 2011-2022 走看看