zoukankan      html  css  js  c++  java
  • twisted的defer模式和线程池

    Reference: http://www.cnblogs.com/mumuxinfei/p/4528910.html

    前言:
      最近帮朋友review其模块服务代码, 使用的是python的twisted网络框架. 鉴于之前并没有使用过, 于是决定好好研究一番.
      twisted的reactor模型很好的处理了网络IO事件, 以及定时任务触发. 但包处理后的业务逻辑操作, 需要根据具体的场景来决定.
      本文将讲述twisted如何实现half-sync/half-async的模式, 其线程池和defer模式是如何设计和使用的.

    场景构造:
      twisted服务接受业务请求, 后端需要访问mysql. 由于mysql的接口是同步的, 如果安装twisted默认的方式处理话, 其业务操作(mysql)会阻塞reactor的IO事件循环. 这大大降低了twisted的服务能力. 
      为了解决该类问题, twisted支持线程池. 把业务逻辑和IO事件分离, IO操作依旧是异步的, 而业务逻辑则采用线程池来处理.

      

    工作线程池:
      在具体讲述defer模式之前, 先谈谈reactor自带的线程池, 这也符合使用half-sync/half-async模式的直观理解.
      先来构造下一个基础样例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    #! /usr/bin/python
    #-*- coding: UTF-8 -*-
     
    from twisted.internet import reactor
    from twisted.internet import protocol
    from twisted.protocols.basic import LineReceiver
     
    import time
     
    class DemoProtocol(LineReceiver):
                    
        def lineReceived(self, line):
            # 进行数据包的处理
            reactor.callInThread(self.handle_request, line)
         
        def handle_request(self, line):
            """
                hanlde_request:
                    进行具体的业务逻辑处理
            """
            # 边使用sleep(1)来代替模拟
            time.sleep(1)
            # 借助callFromThread响应结果
            reactor.callFromThread(self.write_response, line)
         
        def write_response(self, result):
            self.transport.write("ack:" + str(result) + " ")
     
    class DemoProtocolFactory(protocol.Factory):
        def buildProtocol(self, addr):
            return DemoProtocol()
         
     
    reactor.listenTCP(9090, DemoProtocolFactory())
    reactor.run()

      DemoProtocol在收到一行消息, 需要处理一个业务需耗时一秒, 于是其调用callInThread来借助reactor的线程池来执行.
      其callInThread的函数定义如下:

    1
    2
    def callInThread(self, _callable, *args, **kwargs):
            self.getThreadPool().callInThread(_callable, *args, **kwargs)

      从中, 我们可以印证之前的观点, 借助线程池来完成耗时阻塞的业务工作.
      再来看一下callFromThread的函数定义:

    1
    2
    3
    4
    def callFromThread(self, f, *args, **kw):
            assert callable(f), "%s is not callable" % (f,)
            self.threadCallQueue.append((f, args, kw))
            self.wakeUp()

      其作用是把回调放入主线程(也是reactor主事件循环)的待执行队列中, 并及时唤醒reactor.
      我们把写入响应的操作放入主循环中, 是为了让IO集中在主循环中进行, 避免潜在的线程不安全的问题.

    defer模式:
      直接使用reactor的线程池, 非常容易实现half-sync/half-async的模式, 也让IO和业务逻辑隔离. 但reactor设计之初, 更倾向于隐藏其内部的线程池. 于是其引入了defer模式.
      让我们实现与上等同的代码片段:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    #! /usr/bin/python
    #-*- coding: UTF-8 -*-
     
    from twisted.internet import reactor
    from twisted.internet import protocol
    from twisted.protocols.basic import LineReceiver
    from twisted.internet.threads import deferToThread
     
    import time
     
    class DemoProtocol(LineReceiver):
                    
        def lineReceived(self, line):
            # 进行数据包的处理
            deferToThread(self.handle_request, line).addCallback(self.write_response)
         
        def handle_request(self, line):
            """
                hanlde_request:
                    进行具体的业务逻辑处理
            """
            # 边使用sleep(1)来代替模拟
            time.sleep(1)
            return line
         
        def write_response(self, result):
            self.transport.write("ack:" + str(result) + " ")
         
     
    class DemoProtocolFactory(protocol.Factory):
        def buildProtocol(self, addr):
            return DemoProtocol()
         
     
    reactor.listenTCP(9090, DemoProtocolFactory())
    reactor.run()

      使用defer后, 代码更加的简洁. 其defer对象, 其实借用了线程池. 
      threads.deferToThread定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def deferToThread(f, *args, **kwargs):
        from twisted.internet import reactor
        return deferToThreadPool(reactor, reactor.getThreadPool(),
                                 f, *args, **kwargs)
     
    def deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
        = defer.Deferred()
     
        def onResult(success, result):
            if success:
                reactor.callFromThread(d.callback, result)
            else:
                reactor.callFromThread(d.errback, result)
     
        threadpool.callInThreadWithCallback(onResult, f, *args, **kwargs)
     
        return d

      这边我们可以发现deferToThread, 就是间接调用了callInThread函数, 另一方面, 对其回调函数的执行结果, 进行了onCallback, 以及onErrback的调用. 这些回调函数在主线程中运行.
      defer模式简化了程序编写, 也改变了人们开发的思维模式.

    测试回顾:
      使用telnet进行测试, 结果正常.
      
      另一方面, twisted的线程池, 其默认是采用延迟初始化的方式.
      服务开启时, 只有主线程一个, 随着请求的到来, 其按需产生更多的worker thread.
      而其线程池默认为10. 我们可以借助suggestThreadPoolSize方法来修改.

  • 相关阅读:
    (五)SpringCloud学习系列-构建部门微服务消费者Module
    (四)SpringCloud学习系列-构建部门微服务提供者Module
    (三)SpringCloud学习系列-Rest微服务构建
    git提交 显示作者名不是自己
    linux一些常见命令
    支付宝退款demo
    ffmpeg截取视频
    excel导入数据到mysql
    二分法与冒泡排序
    mysql的级联删除
  • 原文地址:https://www.cnblogs.com/skying555/p/5983120.html
Copyright © 2011-2022 走看看