zoukankan      html  css  js  c++  java
  • task一个Lua的多任务库


    title: ltask-一个Lua的多任务库
    categories:

    • Lua
      date: 2021-10-29 10:11:09
      updated: 2021-10-29 10:11:09
      tags:
    • Lua

    这是云风最新设计的一个多任务库,设计的思路见云风的博客,github 上的地址为 https://github.com/cloudwu/ltask。云风都设计大多是简单但不好理解,所以需要看一下才能把他方便的使用起来。

    实际上 ltask 提供了四个库:

    1. ltask 常规使用的API
    2. ltask.bootstrap 框架搭建,设置,启动服务但不运行
    3. ltask.exclusive
    4. ltask.root

    如果用官方的 Lua 解析器作为入口,那么入口代码只可以使用 ltask.bootstrap 这个子库。它提供的 api 可以完成一系列框架的搭建工作。可以视为把 skynet 设置和启动线程的部分,以库的形式提供了出来,方便用 lua 代码驱动。

    所有设定工作完成后,ltask.bootstrap.run 这个 api 会启动调度器并阻塞住主线程,之后的一切任务都是由设定完成的工作线程驱动。

    和 skynet 类似,一切任务( task )都被放在 service 中完成。每个 service 是一个独立的 lua 虚拟机。它们均配有一个和外界通讯的通信管道。我们用 32bit 来标识服务及所属的通信管道。和 skynet 不同,内核不负责管理服务的名字。

    0 号 id 是保留的,可以用来表示无效的服务或系统服务。1 号服务是一个特殊的 root 服务,它会有一些特权。我计划保留 2-1023 号服务,用来做特定的系统任务。例如,名字解析,timer ,socket 等等。这样,一些必要的服务就可以直接用固定的数字 id 而不需要起字符串名字。

    ltask.bootstrap 里有一系列 api 可以用来做上面的配置工作:启动服务但不运行。

    有点需要注意的是,服务的ID是由我们来指定的,而不是由 ltask 管理的。这和 skynet 不同。

    看其示例的 main.lua 脚本大概可以看到如下流程:

    • 扫描配置信息
    • 将配置信息给到 ltask.bootstrap.init(config),进行整个框架的配置工作
    • boot.init_timer() 初始化定时器
    • 创建独占的定时器服务
    • 初始化 root 服务
    • boot.run() 开始运行框架

    关于配置,大概有这么多项:

    • worker 默认256 或者是核心数减1
    • queue 默认 4096
    • queue_sending 默认 和 queue 一样 用来发送消息的队列?
    • max_service 最大服务数 默认 65536

    ltask.bootstrap.init

    这个函数,会在 LUA 虚拟机,建立一个全局的服务对象 LTASK_GLOBAL,这个全局对象中有:

    • 日志队列
    • 工作线程对象
    • max_service 个 service 结构
    • schedule max_service 个调度队列

    然后就会初始化工作线程。

    root 服务

    这个是云风内建的服务。

    local function new_service(label, id)
      local sid = boot.new_service(label, config.init_service, id)
      assert(sid == id)
      return sid
    end
    local function bootstrap()
      new_service("root", SERVICE_ROOT)
      boot.init_root(SERVICE_ROOT)
      -- send init message to root service
      local init_msg, sz = ltask.pack("init", {
        lua_path     = config.lua_path,
        lua_cpath    = config.lua_cpath,
        service_path = config.service_path,
        name         = "root",
        args         = { config },
      })
      -- self bootstrap
      boot.post_message {
        from    = SERVICE_ROOT,
        to      = SERVICE_ROOT,
        session = 0, -- 0 for root init
        type    = MESSSAGE_SYSTEM,
        message = init_msg,
        size    = sz,
      }
    end
    

    boot 服务启动后,即调用 C API 进行初始化,然后还发去一条消息,init,root 服务收到这条消息才会执行起来,具体体现在会执行:

    local function boot()
    	local request = ltask.request()
    	for i, t in ipairs(config.exclusive) do
    		local name, args
    		if type(t) == "table" then
    			name = table.remove(t, 1)
    			args = t
    		else
    			name = t
    			args = {}
    		end
    		local id = i + 1
    		register_service(id, name)
    		request:add { id, proto = "system", "init", {
    			lua_path = config.lua_path,
    			lua_cpath = config.lua_cpath,
    			service_path = config.service_path,
    			name = name,
    			args = args,
    			exclusive = true,
    		}}
    	end
    	for req, resp in request:select() do
    		if not resp then
    			print(string.format("exclusive %d init error: %s", req[1], req.error))
    			return
    		end
    	end
    	S.uniqueservice(table.unpack(config.logger))
    	S.spawn(table.unpack(config.bootstrap))
    end
    

    会向所有的读占线程发消息,并启动唯一服务 logger,然后再启动我们指定的 bootstrap ,启动器服务。

    关于服务

    实际上,任何服务,都通过了 'service/service.lua' 的包装,在配置的时候将这个作为 init_service,当 root 服务新建服务的时候,都会首先调用这个脚本, root 服务也不例外。

    local function init_service(address, name, ...)
    	root.init_service(address, name, config.init_service)
    	ltask.syscall(address, "init", {
    		lua_path = config.lua_path,
    		lua_cpath = config.lua_cpath,
    		service_path = config.service_path,
    		name = name,
    		args = {...},
    	})
    end
    
    local function new_service(name, ...)
    	local address = assert(ltask.post_message(0, 0, MESSAGE_SCHEDULE_NEW))
    	anonymous_services[address] = true
    	local ok, err = pcall(init_service, address, name, ...)
    	if not ok then
    		S.kill(address)
    		return nil, err
    	end
    	return address
    end
    

    服务本身有一些系统消息的处理,比如:init,这些在 server.lua 里面实现:

    function sys_service.init(t)
    	-- The first system message
    	assert(service == nil)
    	if t.lua_path then
    		package.path = t.lua_path
    	end
    	if t.lua_cpath then
    		package.cpath = t.lua_cpath
    	end
    	local filename = assert(package.searchpath(t.name, t.service_path))
    	local f = assert(loadfile(filename))
    	_G.require = yieldable_require
    	if t.exclusive then
    		init_exclusive()
    	end
    	local r = f(table.unpack(t.args))
    	if service == nil then
    		service = r
    	end
    	if service == nil then
    		ltask.quit()
    	end
    end
    
    while true do
    	local s = ltask.schedule_message()
    	if s == SCHEDULE_QUIT then
    		break
    	end
    	yield_service()
    end
    

    每次建立一个一个服务,具体承载的脚本都是由这个消息来驱动执行的。每个服务建立后,实际上都是处于一个死循环中读取 ltask 的消息,然后进行对应的处理。

    当收到消息的时候,就会从对应的模块中进行查找处理:

    function ltask.schedule_message()
    	local from, session, type, msg, sz = ltask.recv_message()
    	local f = SESSION[type]
    	if f then
    		-- new session for this message
    		local co = new_session(f, from, session)
    		wakeup_session(co, type, msg, sz)
    	elseif session then
    		local co = session_coroutine_suspend_lookup[session]
    		if co == nil then
    			print("Unknown response session : ", session)
    			ltask.remove(msg, sz)
    			ltask.quit()
    		else
    			session_coroutine_suspend_lookup[session] = nil
    			wakeup_session(co, type, session, msg, sz)
    		end
    	else
    		return SCHEDULE_IDLE
    	end
    	dispatch_wakeup()
    	if quit then
    		return SCHEDULE_QUIT
    	end
    	return SCHEDULE_SUCCESS
    end
    
    local function request(command, ...)
    	local s = service[command]
    	if not s then
    		error("Unknown request message : " .. command)
    		return
    	end
    	send_response(s(...))
    end
    
    SESSION[MESSAGE_REQUEST] = function (type, msg, sz)
    	request(ltask.unpack_remove(msg, sz))
    end
    

    基本上的应用就是这样了。

  • 相关阅读:
    deepsort+yolov3实现多类别多目标跟踪
    WAR2020暑期补题集
    【数据结构】浅谈主席树
    Github本地上传命令
    【蓝桥杯】2017年第八届蓝桥杯C/C++B组省赛——C题 承压计算
    【蓝桥杯】2017年第八届蓝桥杯C/C++B组省赛——B题 等差素数列
    【蓝桥杯】2019年第十届蓝桥杯C/C++ B组省赛——I题 后缀表达式
    防御Mimikatz-转载
    SQL注入之判断数据库
    XPATH注入
  • 原文地址:https://www.cnblogs.com/dieangel/p/15587400.html
Copyright © 2011-2022 走看看