zoukankan      html  css  js  c++  java
  • chapter9_4 非抢占式的多线程

      协同程序与常规的多线程不同之处:协同程序是非抢占式的。

    当一个协同程序运行时,是无法从外部停止它的。只有当协同程序显式地调用yield时,它才会停止。

    当不存在抢先时,编程会变得简单很多,无须为同步的bug抓狂。

    在程序中所有的同步都是显式的,只需要确保一个协同程序在它的临界区域之外调用yield即可。

    对于这样非抢占式的多线程来说,只要有一个线程调用了一个阻塞操作,整个程序在该操作完成前,都会停止下来。

    下面用一个有趣的方法来解决这个问题:通过HTTP下载几个远程文件。

    下面的例子测试下载lua源代码,其中会用到LuaSocket模块:

    local socket = require "socket"
    
    local host = "www.lua.org"
    local file1 = "/ftp/lua-5.3.3.tar.gz "
    
    local HTTP = "HTTP/1.0
    User-Agent: Wget/1.12 (linux-gnu)
    Accept: */*
    Host: www.lua.org
    Connection: Keep-Alive
    
    "
    
    local sock = assert(socket.connect(host1,80))
    sock:send("GET " .. file1 ..  HTTP)
    
    repeat
        local chunk,status,partial = sock:receive(4096)
        print("chuck:size:  ",string.len(chunk or partial),status or "ok")
    until status == "closed"
    
    sock:close()

    在正常情况下receive函数会返回一个字符串。若发生错误,则返回nil,并且附加错误码及出错前读取到的内容(partial).

    接下来下载几个文件,最笨的办法就是逐个下载,但是太慢。程序大部分时间花费在等待数据的接收上。

    更明确地说,是将时间花在了receive阻塞调用上。

    采用的解决办法是,当一个链接没有可用数据时,程序便可以从其他链接出读取数据。

    很明显协同程序提供了一种简便的方式来构建这种并发下载。

    为每个下载任务创建一个新的线程,只要一个线程无可用数据,它就把控制权转让给一个简单的调度程序。

    而这个调度程序则会调用其他下载线程。

    在以协同程序来重写程序,先将前面的下载代码重新写:

    function receive(connection)
        local s,status,partial = connection:receive(2^10)
        return s or partial,status
    end 
    
    function download(host,file)
        local sock = assert(socket.connect(host,80))
        local count = 0            --记录接收到的字节数
        sock:send("GET " .. file ..  HTTP)
        repeat
            local chunk,status = receive(sock)
            count = count + #chunk
        until status == "closed"
        sock:close()
        print(file,count)
    end
    download(host,file1)

    这是下载一个文件的函数封装,只需调用download就可以。单独下载一个文件需要18秒左右。

    但是在并发的情况中,receive代码不能阻塞,因此在它没有可用数据时应该挂起:

    function receive(connection)
        connection:settimeout(0)    --设置为非阻塞
        local s,status,partial = connection:receive(2^10)
        if status == "timeout" thencoroutine.yield(connection)
        end
        return s or partial,status
    end 

    settimeout的调用,使得对此链接的操作不会阻塞。

    即使在超时的情况下,连接也是会返回已经读取到的内容,即记录在partial变量中。

    以下代码用table threads为调度程序保存所有正在运行中的线程。

    get函数保证每个下载任务都在一个独立的线程中执行。

    调度程序本身主要就是一个循环,遍历所有的线程,逐个唤醒它们的执行。

    当有线程完成时,就将该线程从列表中删除。

    threads = {}             --保存活跃线程的表
    function get(host,file)
        local co = coroutine.create(function() --创建协同程序
            download(host,file)
        end)
        table.insert(threads,co)      --插入列表
    end 
    
    function dispatch()
        local i = 1
        while true do 
            if threads[i] == nil then        --没有线程了
                if threads[1] == nil then break end --表是空表吗
                i = 1                        --重新开始循环
            end
            local status,res = coroutine.resume(threads[i]) --唤醒改线程继续下载文件
            if not res then             --线程是否已经完成了任务
                table.remove(threads,i) --移除list中第i个线程
            else
                i = i + 1               --检查下一个线程
            end
        end
    end

    最后,主程序需要创建所有的线程,并调用调度程序。

    local file1 = "/ftp/lua-5.3.3.tar.gz "
    local file2 = "/ftp/lua-5.3.2.tar.gz "
    local file3 = "/ftp/lua-5.3.1.tar.gz "
    local file4 = "/ftp/lua-5.3.0.tar.gz "
    local file5 = "/ftp/lua-5.2.4.tar.gz "
    local file6 = "/ftp/lua-5.2.3.tar.gz "
    local file7 = "/ftp/lua-5.2.2.tar.gz "
    local file8 = "/ftp/lua-5.2.1.tar.gz "
    local file9 = "/ftp/lua-5.2.0.tar.gz "
    get(host,file1)
    get(host,file2)
    get(host,file3)
    get(host,file4)
    get(host,file5)
    get(host,file6)
    get(host,file7)
    get(host,file8)
    get(host,file9)
    
    dispatch() --main loop

    同时下载9个文件总共耗时36秒,比串行下载9个文件速度快很多。

    但是发现CPU占用率跑到98%。

    为了避免这样的情况,可以使用LuaSocket中的select函数(socket.select(recvt, sendt [, timeout]))。

    在等待时陷入阻塞状态,若要在当前实现中应用这个函数,只需要修该调度即可:

    function dispatch_new()
        local i = 1
        local timedout = {}                  --Recvt 集合
        while true do 
            if threads[i] == nil then        --没有线程了
                if threads[1] == nil then break end --表是空表
                i = 1                        --重新开始循环
                timedout = {}                --遍历完所有线程,开始新一轮的遍历
            end
            local status,res = coroutine.resume(threads[i]) --唤醒该线程继续下载文件
            if not res then               --若完成了res就为nil,只有status一个返回值true。否则res为yield传入的参数connection。
                table.remove(threads,i)   --移除list中第i个线程
            else
                i = i + 1                 --检查下一个线程
                timedout[#timedout +1] = res
                if #timedout == #threads then  --所有线程都阻塞了吗?
                    socket.select(timedout)    --如果线程有数据,就返回
                end
            end
        end
    end
    
    ... ...
    dispatch_new()
    --main loop

    receive会将超时的连接通过yield传给resume的res。如果所有的连接都超时了,调度程序就用select来等待这些链接的状态发生变化。

    最后运行改良版的程序后,9个文件下载总耗时24秒,cpu占用率不到5%。

  • 相关阅读:
    kotlin 通过 下标比对
    textarea元素调整
    jquery给两个标签绑定一个事件
    开发过程中遇到的错误
    response.setHeader各种用法详解
    如何在eclipse里删除一个类 然后SVN服务器也同时删了这个类
    @pathvariable 与@requestparam 写rest接口时遇到的
    $.getJSON
    easyUI学习
    jQuery validator addMethod 动态提示信息
  • 原文地址:https://www.cnblogs.com/daiker/p/5827239.html
Copyright © 2011-2022 走看看