zoukankan      html  css  js  c++  java
  • Written a lua threadpool

    工作原理

    由于lua只能单线程运行,该lib要求所有lua代码在单线程,而多线程部分只能为c代码

    具体用法上要求多线程部分必须用c实现

    相关模块

    线程池

    异步函数实现框架

    Now only a sleep function is provided

    Usage:

    function test2_threadpool()
        local tp = Dll.MyTHdPool()
        local n =1
        local function f()
            n = n+1
            print('f ' .. n)
            if(n==50) then return end
            tp:sleep1(0, f)
        end
        
        f()
        tp:join()
    end

    C codes:

    #include "stdafx.h"
    #include <luabind.hpp>
    #include <vector>
    #include <queue>
    #include <boost/thread.hpp>
    
    using namespace luabind;
    
    #include "stdafx.h"
    
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/thread/thread.hpp>
    
    #include <deque>
    
    class ThreadPool
    {
        boost::asio::io_service ioService;
        boost::thread_group threadpool;
        boost::asio::io_service::work work;
    public:
        ThreadPool() :work(ioService)
        {
            /*
            * This will start the ioService processing loop. All tasks
            * assigned with ioService.post() will start executing.
            */
            //boost::asio::io_service::work work(ioService);
    
            /*
            * This will add 2 threads to the thread pool. (You could just put it in a for loop)
            */
            threadpool.create_thread(
                boost::bind(&boost::asio::io_service::run, &ioService)
                );
            threadpool.create_thread(
                boost::bind(&boost::asio::io_service::run, &ioService)
                );
    
        }
        ~ThreadPool()
        {
    
        }
    
        void post(boost::function<void()> f)
        {
            ioService.post(f);
        }
    
        void join()
        {
            threadpool.join_all();
        }
    private:
    
    };
    
    namespace bamthread
    {
        typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
    
        struct ThreadPool {
            ThreadPool(size_t threads) :service(), working(new asio_worker::element_type(service)) {
                while (threads--)
                {
                    auto worker = boost::bind(&boost::asio::io_service::run, &(this->service));
                    g.add_thread(new boost::thread(worker));
                }
            }
    
            template<class F>
            void post(F f){
                service.post(f);
            }
    
            ~ThreadPool() {
                working.reset(); //allow run() to exit
                g.join_all();
                service.stop();
            }
    
        private:
            boost::asio::io_service service; //< the io_service we are wrapping
            asio_worker working;
            boost::thread_group g; //< need to keep track of threads so we can join them
        };
    }
    
    void my_task()
    {
        Sleep(1000);
        printf("mytask");
    }
    
    void test1()
    {
        bamthread::ThreadPool tp(3);
        tp.post(boost::bind(my_task));
        //tp.join();
    }
    
    void test()
    {
        /*
        * Create an asio::io_service and a thread_group (through pool in essence)
        */
        boost::asio::io_service ioService;
        boost::thread_group threadpool;
    
    
        /*
        * This will start the ioService processing loop. All tasks
        * assigned with ioService.post() will start executing.
        */
        boost::asio::io_service::work work(ioService);
    
        /*
        * This will add 2 threads to the thread pool. (You could just put it in a for loop)
        */
        threadpool.create_thread(
            boost::bind(&boost::asio::io_service::run, &ioService)
            );
        threadpool.create_thread(
            boost::bind(&boost::asio::io_service::run, &ioService)
            );
    
        /*
        * This will assign tasks to the thread pool.
        * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions"
        */
        ioService.post(boost::bind(my_task));
    
        /*
        * This will stop the ioService processing loop. Any tasks
        * you add behind this point will not execute.
        */
        ioService.stop();
    
        /*
        * Will wait till all the threads in the thread pool are finished with
        * their assigned tasks and 'join' them. Just assume the threads inside
        * the threadpool will be destroyed by this method.
        */
        threadpool.join_all();
    }
    
    
    template <typename T>
    class queue
    {
    private:
        boost::mutex              d_mutex;
        boost::condition_variable d_condition;
        std::deque<T>           d_queue;
    public:
        void push(T const& value) {
            {
                boost::unique_lock<boost::mutex> lock(this->d_mutex);
                d_queue.push_front(value);
            }
            this->d_condition.notify_one();
        }
        T pop() {
            boost::unique_lock<boost::mutex> lock(this->d_mutex);
            this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
            T rc(std::move(this->d_queue.back()));
            this->d_queue.pop_back();
            return rc;
        }
    };
    
    class MyTHdPool
    {
        bamthread::ThreadPool tp;
    
        boost::mutex m;
        std::map<int, boost::function<void()> > f2s;    // key: taskid, value: post processing
        
        //boost::thread t_;
    
        queue<int> q_;
        int taskid_;
    
    public:
        MyTHdPool() :tp(3), taskid_(0){}
    
        ~MyTHdPool(){
            join();
        }
    
        void Call(boost::function<void()> f1, boost::function<void()> f2)
        {
            int taskid = taskid_++;
    
            printf("begin call task %d
    ", taskid);
    
            boost::function<void()> f = [=]() mutable {
                f1();
    
                q_.push(taskid);
                printf("done task %d
    ", taskid);
            };
    
            {
                boost::lock_guard<boost::mutex> lock(m);
                f2s[taskid] = (f2);
            }
    
            tp.post(f);
            printf("end post task %d
    ", taskid);
        }
    
        void join()
        {
            while (true)
            {
                boost::function<void()> f2;
                int taskid = 0;
                {
                    {
                        boost::lock_guard<boost::mutex> lock(m);
                        if (f2s.empty())
                            return;
                    }
    
                    printf("start pop a task from queue
    ");
                    int taskid = q_.pop();
                    printf("got a task %d from queue
    ", taskid);
    
                    {
                        boost::lock_guard<boost::mutex> lock(m);
                        auto it = f2s.find(taskid);
                        assert(it != f2s.end());
                        f2 = it->second;
                        f2s.erase(it);
                    }
                }
    
                printf("exec task post ftn %d
    ", taskid);
                f2();
            }
        }
    
        void sleep1(double n, object f2)
        {
            Call([n](){Sleep(n * 1000); }, [f2, this]() mutable {
                f2(); 
            });
        }
    
        void sleep2(double n)
        {
            Call([n](){Sleep(n * 1000); }, [](){});
        }
    private:
    
    };
    
    void callback(object o)
    {
        printf("before callback
    ");
        o();
        printf("after callback
    ");
    }
    
    int luaopen_Dll(lua_State* L)
    {
        
        luaL_openlibs(L);
        open(L);
    
        // define a module in _G["t"]
        module(L, "Dll")[
            
    
            class_<MyTHdPool>("MyTHdPool")
            .def(constructor<>())
            .def("sleep1", &MyTHdPool::sleep1)
            .def("sleep2", &MyTHdPool::sleep2)
            .def("join", &MyTHdPool::join),
    
            def("test1", &test1),
    
            def("callback", &callback)
        ];
    
        // push _G["t"] to stack
        lua_getglobal(L, "Dll");
    
        // set _G["t"]=nil
        lua_pushnil(L);
        lua_setglobal(L, "Dll");
    
        return 1;
    }
  • 相关阅读:
    nacos 命名空间
    Error Code: 1175. You are using safe update mode and you tried to update a table without a WHERE that uses a KEY column. To disable safe mode, toggle the option in Preferences
    gitee
    maven引入junit 4.12,项目import org.junit.Test 还是报错.
    gitflow
    202011
    idea 忽略显示不需要的文件
    服务熔断 & 降级区别
    各种微服务框架对比
    zookeeper not connected
  • 原文地址:https://www.cnblogs.com/cutepig/p/10203967.html
Copyright © 2011-2022 走看看