zoukankan      html  css  js  c++  java
  • 强化学习落地:竞态场景下基于锁机制的闲置端口查用

    本文首发于:行者AI

    在强化学习的游戏领域落地中,我们常把逻辑复杂的真实游戏当做一个黑盒子,使用网络通信与其数据交互达到训练的目的。为了训练效率,在Python中常使用多进程扮演多个Actor的角色来与游戏黑盒子进行数据交互;因此在这样的环境下,极大可能出现多进程争抢同一通信端口的现象(竞态场景),容易导致进程报错。

    本文根据以上背景,首先给出一个常规的解决办法,接着分析常规办法的局限性,然后介绍基于锁机制的解决办法,最后给出实际python例程。本文主要分为以下三个部分:

    (1)常规解决办法

    (2)基于锁机制的闲置端口查用

    (3)python代码实现

    1. 常规解决办法

    针对上述问题,有一种最容易想到的解决方法:每次进程调用端口时都检查端口号是否已经被使用了,如果是那就跳过继续搜寻下一个端口,直到找到一个未使用的端口号将它返回给调用者。我们简单用python实现这一过程:

    import socket 
    
    def get_free_port(ip, init_port): 
        while True: 
            try:
                sock = socket.socket()
                sock.bind((ip, init_port))
                ip, port = sock.getnameinfo()
                sock.close()
                return port
            except Exception:
                init_port += 1
                continue
    
    

    对以上思路稍加分析,可以将之大致拆分为3个步骤:

    (1)使用sock.bind()函数去自动绑定端口,以此判断端口是否可用;

    (2)若可以绑定证明该端口可用,则使用sock.close()释放掉该端口;若不可绑定则证明该端口已被使用,那么端口号加1继续尝试绑定直到成功然后同样使用sock.close()释放掉该端口为止;

    (3)把释放掉的端口号返回给需要调用的程序。

    上述思路看上去能工作,但是其实远远不够。采用这个思路只能让端口在非常短的时间内不被绑定,难以满足在竞态场景中使用。在竞态环境中,随着进程数的增多,大概率会发生以下情况:


    上图所示,同一个端口号X被进程A和进程B查用。对于进程B,查用进度比进程A稍慢,在进程A将端口X释放并准备返回程序进行绑定时,进程B绑定了端口X进行检查,此时进程A将无法再绑定端口X导致报错。

    因此可以看出,竞态场景中如何返回一个安全的端口号并不简单,即使利用随机种子随机初始端口号也不能彻底避免这种情况;我们需要一种机制保证一个未绑定的端口号不能被其它进程任意绑定。

    2. 基于锁机制的闲置端口查用

    使用lock file能够保证即使端口号未绑定,在未拿到锁之前也不会被其他进程绑定。利用fasteners.process_lock.InterProcessLock对端口加锁,具体思路如下:

    (1)sock.bind()自动绑定检测端口是否可用;

    (2)若端口可用则用InterProcessLock对端口加锁;否则继续检查,直到检测到可用端口后加锁;

    (3)解除之前的端口绑定;

    (4)安全返回这个端口号;

    (5)对该端口解锁;

    流程如下:


    回顾上述过程,在端口X被绑定后的每一个步骤之间,端口都是安全的。

    3. python代码实现

    首先定义两个类class BindFreePort()class FreePort(),前者用于搜索并检测可用端口;后者输入前者的检测结果,使用第2章所述的机制将端口安全的输出给需要的程序。

    import os
    import fasteners
    import threading
    
    
    class BindFreePort(object):
        def __init__(self, start, stop):
            self.port = None
    
            import random, socket
    
            self.sock = socket.socket()
    
            while True:
                port = random.randint(start, stop)
                try:
                    self.sock.bind(('127.0.0.1', port))
                    self.port = port
    				import time
    				time.sleep(0.01)
                    break
                except Exception:
                    continue
    
        def release(self):
            assert self.port is not None
            self.sock.close()
        
    
    class FreePort(object):
        used_ports = set()
    
        def __init__(self, start=4000, stop=6000):
            self.lock = None
            self.bind = None
            self.port = None
    
            from fasteners.process_lock import InterProcessLock
            import time
            pid = os.getpid()
    
            while True:
                bind = BindFreePort(start, stop)
    
                print(f'{pid} got port : {bind.port}')
    
                if bind.port in self.used_ports:
                    print(f'{pid} will release port : {bind.port}')
                    bind.release()
                    continue
    
                '''
                Since we cannot be certain the user will bind the port 'immediately' (actually it is not possible using
                this flow. We must ensure that the port will not be reacquired even it is not bound to anything
                '''
                lock = InterProcessLock(path='/tmp/socialdna/port_{}_lock'.format(bind.port))
                success = lock.acquire(blocking=False)
    
                if success:
                    self.lock = lock
                    self.port = bind.port
                    self.used_ports.add(bind.port)
                    bind.release()
                    break
    
                bind.release()
                time.sleep(0.01)
    
        def release(self):
            assert self.lock is not None
            assert self.port is not None
            self.used_ports.remove(self.port)
            self.lock.release()
    

    然后,基于这两个类的功能,我们简单测试一下:

    def get_and_bind_freeport(*args):
        freeport = FreePort(start=4000, stop=4009)
    	import time
    	time.sleep(0.5)
        return freeport.port
    
    def test():
        from multiprocessing.pool import Pool
        jobs = 10
        p = Pool(jobs)
        ports = p.map(get_and_bind_freeport, range(jobs))
        print(f'[ports]: {ports}')
        assert len(ports) == len(set(ports))
        p.close()
    
    if __name__ == '__main__':
        test()
    

    上述代码中,我们构建了函数get_and_bind_freeport()按照第2章所述机制返回一个端口,用time.sleep(0.5)模拟进程内的时间扰动,其中端口的搜索范围是4000~4009;函数test()从进程池中启动10个进程,每个进程映射一个函数get_and_bind_freeport()4000~4009中搜索一个端口号并将之安全返回。

    如果整个过程中端口号是安全的,那么返回结果应当是len(ports) == len(set(ports))即10个端口分别被10个进程查用,不存在多个进程返回同一端口号的情况。

    4. 总结

    本文对比了在现实游戏的强化学习训练中,解决多进程竞争通信端口这一现象的两种方法;通过原理分析对比以及实际动手实验,我们得出结论:在竞态场景中查用通信端口号时,对比常规思路基于锁机制的端口查用能够更安全地适用于这个场景。同时也提醒我们,在游戏领域的强化学习落地过程中会面临诸多现实问题,只有在实际工程中不断动手实操不断总结,才能达成目标。

    5. 参考

    [1] Fasteners

    [2] Synchronization between processes


    PS:更多技术干货,快关注【公众号 | xingzhe_ai】,与行者一起讨论吧!

  • 相关阅读:
    MySQL服务器变量:MySQL系列之八
    存储引擎:MySQL系列之七
    用户与授权:MySQL系列之六
    视图、存储函数、存储过程、触发器:MySQL系列之五
    SQL语法:MySQL系列之四
    关系型数据库基础概念:MySQL系列之开篇
    基础篇:MySQL系列之三
    多实例:MySQL系列之二
    安装篇:MySQL系列之一
    DNS
  • 原文地址:https://www.cnblogs.com/xingzheai/p/14808075.html
Copyright © 2011-2022 走看看