zoukankan      html  css  js  c++  java
  • 《Lua程序设计》9.2 管道(pipe)与过滤器(filter) 包含使用协同函数实现“生产者——消费者”问题的实例代码

    一个关于协同程序的经典示例是“生产者-消费者”问题。这其中涉及到两个函数,一个函数不断地产生值(比如从一个文件中读取值),另一个则不断地消费这些值(比如将这些值写到另一个文件)。通常,这两个函数大致是这样的:

    function producer ()
        while true do
            local x = io.read()     -- 产生新的值
            send(x)                 -- 发送给消费者
        end
    end
    
    function consumer ()
        while true do
            local x = receive()     -- 从生产者接收值
            io.write(x, "
    ")       -- 消费新的值
        end
    end

    这里有一个问题是如何将send与receive匹配起来。这是一个典型的“谁具有主循环(who-has-the-main-loop)”的问题。由于生产者和消费者都处于活动状态,它们个字具有一个朱迅欢,并且都将对方视为一个磕掉用的服务。
    协同程序被称为是一种匹配生产者和消费者的理想工具,一对resume-yield完全一改典型的调用者与被调用者之间的关系。当一个协同程序调用yield时,他不是进入了一个新的函数,而是从一个悬而未决的resume调用中返回。同样地,对于resume的调用也不会启动一个新函数,而是从一次yield调用中返回。这项特性正可用于匹配send和receive,这两者都认为自己是主动方,对方是被动方。receive唤醒生产者的执行,促使其能产出一个新值。而send则产生一个新值返还给消费者:

    function receive ()
        local status, value = coroutine.resume(producer)
        return value
    end
    
    function send (x)
        coroutine.yield(x)
    end

    因此,生产者现在一定是一个协同程序:

    producer = coroutine.create(
        function ()
            while true do
                local x = io.read()     -- 产生新的值
                send(x)                 -- 发送给消费者
            end
        end
    end)

    在这种设计中,程序通过调用消费者来启动。当消费者需要一个新值时,它唤醒生产者。生产者返回一个新值后停止运行,并等待消费者的自此唤醒。将这种设计称为“消费者驱动(consumer-driven)”。
    还可以扩展上述功能,实现“过滤器(filter)”。过滤器是一种位于生产者和消费者之间的处理功能,可用于对数据的一些变幻。
    过滤器既是一个消费者也会死一个生产者,它唤醒一个生产者促使其产生新值,然后又将变换后的值传递给消费者。例如可以在前面代码中添加一个过滤器,在每行起始处插入一个行号。代码如下:

    function receive (prod)
        local status, value = coroutine.resume(prod)
        return value
    end
    
    function send (x)
        coroutine.yield(x)
    end
    
    function producer ()
        return coroutine.create(function ()
           while true do
            local x = io.read()     -- 产生新值
            send(x)
           end
        end)
    end
    
    function filter (prod)
        return coroutine.create(function ()
            for line = 1,math.huge do
                local x = receive(prod) -- 获取新值
                x = string.format("%5d %s", line, x)
                send(x)     -- 将新值发送给消费者
            end
        end)
    end
    
    function consumer (prod)
        while true do
            local x = receive(prod)
            io.write(x, "
    ")
        end
    end

    接下来创建运行代码就非常简单了,只需将这些函数串联起来,然后启动消费者:

    p = producer()
    f = filter(p)
    consumer(f)

    或者,更简单地写为:

    consumer(filter(producer()))

    如果接触过UNIX的pipe(管道),那么本例的内容就不会很陌生。毕竟,协同程序也是一种(非抢先的)多线程。在pipe中没想任务都在各自独立的进程中运行,而在协同程序中每项任务都在各自独立的协同程序中运行。pipe在writer(消费者)与reader(生产者)之间提供一个缓冲器,因此它们的运行速度允许存在一定差异。值得注意的是,在pipe中进程间的切换代价恨到。而在协同程序中,切换代价则小得多,因此writer和reader可以彼此协作地运行。

    ----------

    完整程序示例:

    function receive (prod)
        local status, value = coroutine.resume(prod)
        return value
    end
    
    function send (x)
        coroutine.yield(x)
    end
    
    function producer ()
        return coroutine.create(function ()
           while true do
            local x = io.read()     -- 产生新值
            send(x)
           end
        end)
    end
    
    function filter (prod)
        return coroutine.create(function ()
            for line = 1,math.huge do
                local x = receive(prod) -- 获取新值
                x = string.format("%5d %s", line, x)
                send(x)     -- 将新值发送给消费者
            end
        end)
    end
    
    function consumer (prod)
        while true do
            local x = receive(prod)
            io.write(x, "
    ")
        end
    end
    
    p = producer()
    f = filter(p)
    consumer(f)
    -- consumer(filter(producer()))
    View Code
  • 相关阅读:
    二维数组的循环遍历
    es6 学习笔记
    变量、作用域与内存的一些总结
    MapReduce历史服务器
    NameNode和SecondaryNameNode
    MapReduce入门
    ZooKeeper实现HA HDFS
    hdfs临时文件更改
    linux中没有tree命令,command not found,解决办法
    Hadoop伪集群搭建环境
  • 原文地址:https://www.cnblogs.com/moonlightpoet/p/5685837.html
Copyright © 2011-2022 走看看