zoukankan      html  css  js  c++  java
  • python网络编程——SocketServer/Twisted/paramiko模块

          在之前博客C/S架构的网络编程中,IO多路复用是将多个IO操作复用到1个服务端进程中进行处理,即无论有多少个客户端进行连接请求,服务端始终只有1个进程对客户端进行响应,这样的好处是节省了系统开销(select不适合单个客户端长会话操作,这样其它客户端连接请求就会一直等待,poll/epoll对select进行了改进)。下面介绍结合了IO多路复用和多进程(多线程)的SocketServer模块。

    1 SocketServer模块

        SocketServer内部使用IO多路复用以及“多线程”和“多进程” ,从而实现并发处理多个客户端请求的Socket服务端。即:每个客户端请求连接到服务器时,Socket服务端都会创建一个“线程”或者“进程”专门负责处理当前客户端的所有请求。

        SocketServer与select/poll/epoll的本质区别:客户端第1次连接时,服务端就为该客户端创建一个线程或进程,此后服务端就利用此线程或进程与客户端进行通信,后续的数据传输几乎不要server端的直接参与。如果服务端创建的是进程,那么client1和client2同时向server端传输数据时是互不影响的;如果服务端创建的是线程(python中多线程,在同一时间只有一个线程在运行,底层会自动进行上下文切换,即python中不存在真正的多线程),那么client1和client2交替上传数据。

        知识回顾: python中的多线程,有一个GIL(全局解释器锁)限制在同一时刻只有1个线程在运行,底层自动进行上下文切换,保证多个线程轮流运行(cpu切片),也就是python中不存在真正的多线程问题,伪多线程,实际多个线程不能真正实现并发处理。

       中间处理过程如图所示

            1.1 ThreadingTCPServer

                     ThreadingTCPServer实现的Soket服务器内部会为每个client创建一个 “线程”,该线程用来和客户端进行交互。

            1、ThreadingTCPServer基础

             使用ThreadingTCPServer要求:

              (1)创建一个继承自SocketServer.BaseRequestHandler的类   

              (2)类中必须定义一个名称为 handle 的方法

              (3)启动ThreadingTCPServer

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import SocketServer
     5 
     6 class MyServer(SocketServer.BaseRequestHandler):
     7 
     8     def handle(self):
     9         # print self.request,self.client_address,self.server
    10         conn = self.request
    11         conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
    12         Flag = True
    13         while Flag:
    14             data = conn.recv(1024)
    15             if data == 'exit':
    16                 Flag = False
    17             elif data == '0':
    18                 conn.sendall('通过可能会被录音.balabala一大推')
    19             else:
    20                 conn.sendall('请重新输入.')
    21 
    22 
    23 if __name__ == '__main__':
    24     server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    25     server.serve_forever()
    SocketServer实现的服务端
     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import socket
     5 
     6 ip_port = ('127.0.0.1',8009)
     7 sk = socket.socket()
     8 sk.connect(ip_port)
     9 sk.settimeout(5)
    10 
    11 while True:
    12     data = sk.recv(1024)
    13     print 'receive:',data
    14     inp = raw_input('please input:')
    15     sk.sendall(inp)
    16     if inp == 'exit':
    17         break
    18 
    19 sk.close()
    客户端

                    2、ThreadingTCPServer源码剖析

                 ThreadingTCPServer的类图关系如下:

          注:实际上在类的继承时,子类会继承父类的方法,所以我们在分析类的继承关系时,直接把父类的方法放到子类中,这样就直观些,在python中还有1点要注意的是子类到底是继承哪个父类的方法,因为python中存在多继承。

        内部调用流程为

    启动服务端程序
    1、执行TCPServer.__init__ 方法,创建服务端Socket对象并绑定IP和端口
    2、执行BaseServer.__init__ 方法,将自定义的继承自SocketServer.BaseRequestHandler的类MyRequestHandle赋值给self.RequestHandlerClass
    3、执行BaseServer.server_forever方法,While 循环一直监听是否有客户端请求到达 ...

    当客户端连接到达服务器
    4、执行ThreadingMixIn.process_request方法,创建一个“线程”用来处理请求
    5、执行ThreadingMixIn.process_request_thread方法
    6、执行BaseServer.finish_request方法,执行self.RequestHandlerClass(),即执行自定义MyRequestHandler的构造方法
      (自动调用基类BaseRequestHandler的构造方法,在该构造方法中又会调用MyRequestHandler的handle方法)

    ThreadingTCPServer相关源码

      1 class BaseServer:
      2 
      3     """Base class for server classes.
      4 
      5     Methods for the caller:
      6 
      7     - __init__(server_address, RequestHandlerClass)
      8     - serve_forever(poll_interval=0.5)
      9     - shutdown()
     10     - handle_request()  # if you do not use serve_forever()
     11     - fileno() -> int   # for select()
     12 
     13     Methods that may be overridden:
     14 
     15     - server_bind()
     16     - server_activate()
     17     - get_request() -> request, client_address
     18     - handle_timeout()
     19     - verify_request(request, client_address)
     20     - server_close()
     21     - process_request(request, client_address)
     22     - shutdown_request(request)
     23     - close_request(request)
     24     - handle_error()
     25 
     26     Methods for derived classes:
     27 
     28     - finish_request(request, client_address)
     29 
     30     Class variables that may be overridden by derived classes or
     31     instances:
     32 
     33     - timeout
     34     - address_family
     35     - socket_type
     36     - allow_reuse_address
     37 
     38     Instance variables:
     39 
     40     - RequestHandlerClass
     41     - socket
     42 
     43     """
     44 
     45     timeout = None
     46 
     47     def __init__(self, server_address, RequestHandlerClass):
     48         """Constructor.  May be extended, do not override."""
     49         self.server_address = server_address
     50         self.RequestHandlerClass = RequestHandlerClass
     51         self.__is_shut_down = threading.Event()
     52         self.__shutdown_request = False
     53 
     54     def server_activate(self):
     55         """Called by constructor to activate the server.
     56 
     57         May be overridden.
     58 
     59         """
     60         pass
     61 
     62     def serve_forever(self, poll_interval=0.5):
     63         """Handle one request at a time until shutdown.
     64 
     65         Polls for shutdown every poll_interval seconds. Ignores
     66         self.timeout. If you need to do periodic tasks, do them in
     67         another thread.
     68         """
     69         self.__is_shut_down.clear()
     70         try:
     71             while not self.__shutdown_request:
     72                 # XXX: Consider using another file descriptor or
     73                 # connecting to the socket to wake this up instead of
     74                 # polling. Polling reduces our responsiveness to a
     75                 # shutdown request and wastes cpu at all other times.
     76                 r, w, e = _eintr_retry(select.select, [self], [], [],
     77                                        poll_interval)
     78                 if self in r:
     79                     self._handle_request_noblock()
     80         finally:
     81             self.__shutdown_request = False
     82             self.__is_shut_down.set()
     83 
     84     def shutdown(self):
     85         """Stops the serve_forever loop.
     86 
     87         Blocks until the loop has finished. This must be called while
     88         serve_forever() is running in another thread, or it will
     89         deadlock.
     90         """
     91         self.__shutdown_request = True
     92         self.__is_shut_down.wait()
     93 
     94     # The distinction between handling, getting, processing and
     95     # finishing a request is fairly arbitrary.  Remember:
     96     #
     97     # - handle_request() is the top-level call.  It calls
     98     #   select, get_request(), verify_request() and process_request()
     99     # - get_request() is different for stream or datagram sockets
    100     # - process_request() is the place that may fork a new process
    101     #   or create a new thread to finish the request
    102     # - finish_request() instantiates the request handler class;
    103     #   this constructor will handle the request all by itself
    104 
    105     def handle_request(self):
    106         """Handle one request, possibly blocking.
    107 
    108         Respects self.timeout.
    109         """
    110         # Support people who used socket.settimeout() to escape
    111         # handle_request before self.timeout was available.
    112         timeout = self.socket.gettimeout()
    113         if timeout is None:
    114             timeout = self.timeout
    115         elif self.timeout is not None:
    116             timeout = min(timeout, self.timeout)
    117         fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
    118         if not fd_sets[0]:
    119             self.handle_timeout()
    120             return
    121         self._handle_request_noblock()
    122 
    123     def _handle_request_noblock(self):
    124         """Handle one request, without blocking.
    125 
    126         I assume that select.select has returned that the socket is
    127         readable before this function was called, so there should be
    128         no risk of blocking in get_request().
    129         """
    130         try:
    131             request, client_address = self.get_request()
    132         except socket.error:
    133             return
    134         if self.verify_request(request, client_address):
    135             try:
    136                 self.process_request(request, client_address)
    137             except:
    138                 self.handle_error(request, client_address)
    139                 self.shutdown_request(request)
    140 
    141     def handle_timeout(self):
    142         """Called if no new request arrives within self.timeout.
    143 
    144         Overridden by ForkingMixIn.
    145         """
    146         pass
    147 
    148     def verify_request(self, request, client_address):
    149         """Verify the request.  May be overridden.
    150 
    151         Return True if we should proceed with this request.
    152 
    153         """
    154         return True
    155 
    156     def process_request(self, request, client_address):
    157         """Call finish_request.
    158 
    159         Overridden by ForkingMixIn and ThreadingMixIn.
    160 
    161         """
    162         self.finish_request(request, client_address)
    163         self.shutdown_request(request)
    164 
    165     def server_close(self):
    166         """Called to clean-up the server.
    167 
    168         May be overridden.
    169 
    170         """
    171         pass
    172 
    173     def finish_request(self, request, client_address):
    174         """Finish one request by instantiating RequestHandlerClass."""
    175         self.RequestHandlerClass(request, client_address, self)
    176 
    177     def shutdown_request(self, request):
    178         """Called to shutdown and close an individual request."""
    179         self.close_request(request)
    180 
    181     def close_request(self, request):
    182         """Called to clean up an individual request."""
    183         pass
    184 
    185     def handle_error(self, request, client_address):
    186         """Handle an error gracefully.  May be overridden.
    187 
    188         The default is to print a traceback and continue.
    189 
    190         """
    191         print '-'*40
    192         print 'Exception happened during processing of request from',
    193         print client_address
    194         import traceback
    195         traceback.print_exc() # XXX But this goes to stderr!
    196         print '-'*40
    BaseServer
      1 class TCPServer(BaseServer):
      2 
      3     """Base class for various socket-based server classes.
      4 
      5     Defaults to synchronous IP stream (i.e., TCP).
      6 
      7     Methods for the caller:
      8 
      9     - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
     10     - serve_forever(poll_interval=0.5)
     11     - shutdown()
     12     - handle_request()  # if you don't use serve_forever()
     13     - fileno() -> int   # for select()
     14 
     15     Methods that may be overridden:
     16 
     17     - server_bind()
     18     - server_activate()
     19     - get_request() -> request, client_address
     20     - handle_timeout()
     21     - verify_request(request, client_address)
     22     - process_request(request, client_address)
     23     - shutdown_request(request)
     24     - close_request(request)
     25     - handle_error()
     26 
     27     Methods for derived classes:
     28 
     29     - finish_request(request, client_address)
     30 
     31     Class variables that may be overridden by derived classes or
     32     instances:
     33 
     34     - timeout
     35     - address_family
     36     - socket_type
     37     - request_queue_size (only for stream sockets)
     38     - allow_reuse_address
     39 
     40     Instance variables:
     41 
     42     - server_address
     43     - RequestHandlerClass
     44     - socket
     45 
     46     """
     47 
     48     address_family = socket.AF_INET
     49 
     50     socket_type = socket.SOCK_STREAM
     51 
     52     request_queue_size = 5
     53 
     54     allow_reuse_address = False
     55 
     56     def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
     57         """Constructor.  May be extended, do not override."""
     58         BaseServer.__init__(self, server_address, RequestHandlerClass)
     59         self.socket = socket.socket(self.address_family,
     60                                     self.socket_type)
     61         if bind_and_activate:
     62             try:
     63                 self.server_bind()
     64                 self.server_activate()
     65             except:
     66                 self.server_close()
     67                 raise
     68 
     69     def server_bind(self):
     70         """Called by constructor to bind the socket.
     71 
     72         May be overridden.
     73 
     74         """
     75         if self.allow_reuse_address:
     76             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
     77         self.socket.bind(self.server_address)
     78         self.server_address = self.socket.getsockname()
     79 
     80     def server_activate(self):
     81         """Called by constructor to activate the server.
     82 
     83         May be overridden.
     84 
     85         """
     86         self.socket.listen(self.request_queue_size)
     87 
     88     def server_close(self):
     89         """Called to clean-up the server.
     90 
     91         May be overridden.
     92 
     93         """
     94         self.socket.close()
     95 
     96     def fileno(self):
     97         """Return socket file number.
     98 
     99         Interface required by select().
    100 
    101         """
    102         return self.socket.fileno()
    103 
    104     def get_request(self):
    105         """Get the request and client address from the socket.
    106 
    107         May be overridden.
    108 
    109         """
    110         return self.socket.accept()
    111 
    112     def shutdown_request(self, request):
    113         """Called to shutdown and close an individual request."""
    114         try:
    115             #explicitly shutdown.  socket.close() merely releases
    116             #the socket and waits for GC to perform the actual close.
    117             request.shutdown(socket.SHUT_WR)
    118         except socket.error:
    119             pass #some platforms may raise ENOTCONN here
    120         self.close_request(request)
    121 
    122     def close_request(self, request):
    123         """Called to clean up an individual request."""
    124         request.close()
    TCPServer
     1 class ThreadingMixIn:
     2     """Mix-in class to handle each request in a new thread."""
     3 
     4     # Decides how threads will act upon termination of the
     5     # main process
     6     daemon_threads = False
     7 
     8     def process_request_thread(self, request, client_address):
     9         """Same as in BaseServer but as a thread.
    10 
    11         In addition, exception handling is done here.
    12 
    13         """
    14         try:
    15             self.finish_request(request, client_address)
    16             self.shutdown_request(request)
    17         except:
    18             self.handle_error(request, client_address)
    19             self.shutdown_request(request)
    20 
    21     def process_request(self, request, client_address):
    22         """Start a new thread to process the request."""
    23         t = threading.Thread(target = self.process_request_thread,
    24                              args = (request, client_address))
    25         t.daemon = self.daemon_threads
    26         t.start()
    27 
    28 复制代码
    ThreadingMixIn
    1 class ThreadingTCPServer(ThreadingMixIn, TCPServer): 
    2         pass
    ThreadingTCPServer

    RequestHandler相关源码

     1 class BaseRequestHandler:
     2 
     3     """Base class for request handler classes.
     4 
     5     This class is instantiated for each request to be handled.  The
     6     constructor sets the instance variables request, client_address
     7     and server, and then calls the handle() method.  To implement a
     8     specific service, all you need to do is to derive a class which
     9     defines a handle() method.
    10 
    11     The handle() method can find the request as self.request, the
    12     client address as self.client_address, and the server (in case it
    13     needs access to per-server information) as self.server.  Since a
    14     separate instance is created for each request, the handle() method
    15     can define arbitrary other instance variariables.
    16 
    17     """
    18 
    19     def __init__(self, request, client_address, server):
    20         self.request = request
    21         self.client_address = client_address
    22         self.server = server
    23         self.setup()
    24         try:
    25             self.handle()
    26         finally:
    27             self.finish()
    28 
    29     def setup(self):
    30         pass
    31 
    32     def handle(self):
    33         pass
    34 
    35     def finish(self):
    36         pass
    SocketServer.BaseRequestHandler

    源码精简

     1 import socket
     2 import threading
     3 import select
     4 
     5 def process(request, client_address):
     6     print request,client_address
     7     conn = request
     8     conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
     9     flag = True
    10     while flag:
    11         data = conn.recv(1024)
    12         if data == 'exit':
    13             flag = False
    14         elif data == '0':
    15             conn.sendall('通过可能会被录音.balabala一大推')
    16         else:
    17             conn.sendall('请重新输入.')
    18 
    19 sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    20 sk.bind(('127.0.0.1',8002))
    21 sk.listen(5)
    22 
    23 while True:
    24     r, w, e = select.select([sk,],[],[],1)
    25     print 'looping'
    26     if sk in r:
    27         print 'get request'
    28         request, client_address = sk.accept()
    29         t = threading.Thread(target=process, args=(request, client_address))
    30         t.daemon = False
    31         t.start()
    32 
    33 sk.close()
    模拟SocketServer代码

        从精简代码可以看出,SocketServer的ThreadingTCPServer之所以可以同时处理请求得益于selectThreading两个东西,其实本质上就是在服务器端为每一个客户端创建一个线程,用于后续的数据处理,当前线程用来处理对应客户端的请求,所以可以支持同时n个客户端链接(长连接)。

          1.2 ForkingTCPServer

        ForkingTCPServer与ThreadingTCPServer的使用和执行流程基本一致,只不过在内部分别为请求者建立“进程”和“线程”。

        基本使用:

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import SocketServer
     5 
     6 class MyServer(SocketServer.BaseRequestHandler):
     7 
     8     def handle(self):
     9         # print self.request,self.client_address,self.server
    10         conn = self.request
    11         conn.sendall('欢迎致电 10086,请输入1xxx,0转人工服务.')
    12         Flag = True
    13         while Flag:
    14             data = conn.recv(1024)
    15             if data == 'exit':
    16                 Flag = False
    17             elif data == '0':
    18                 conn.sendall('通过可能会被录音.balabala一大推')
    19             else:
    20                 conn.sendall('请重新输入.')
    21 
    22 
    23 if __name__ == '__main__':
    24     server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
    25     server.serve_forever()
    服务端
     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import socket
     5 
     6 ip_port = ('127.0.0.1',8009)
     7 sk = socket.socket()
     8 sk.connect(ip_port)
     9 sk.settimeout(5)
    10 
    11 while True:
    12     data = sk.recv(1024)
    13     print 'receive:',data
    14     inp = raw_input('please input:')
    15     sk.sendall(inp)
    16     if inp == 'exit':
    17         break
    18 
    19 sk.close()
    客户端

        以上ForkingTCPServer只是将 ThreadingTCPServer 实例中的代码:

    server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
    变更为:
    server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)

        SocketServer的ThreadingTCPServer之所以可以同时处理请求得益于selectos.fork两个东西,其实本质上就是在服务器端为每一个客户端创建一个进程,用于后续数据处理,当前新创建的进程用来处理对应客户端的请求,所以,可以支持同时n个客户端链接(长连接)。

    源码剖析参考 ThreadingTCPServer

    2 Twisted模块

        使用传统的BIO(Blocking IO/阻塞IO)进行网络编程时,进行网络IO读写时都会阻塞当前线程,如果实现一个TCP服务器,都需要对每个客户端连接开启一个线程,而很多线程可能都在傻傻的阻塞住等待读写数据,系统资源消耗大。

        Twisted是用Python实现的基于事件驱动的网络引擎框架,功能非常丰富,基本包括了常用的网络组件,例如:网络协议、线程、数据库管理、网络操作、电子邮件等

          编程框架,即别人预先定义好的一个框架(一个项目),如.net某个web框架有25个class,从BeginRequest依次执行类里的process方法。程序员自定义一个类添加到框架里,应用程序则从上到下运行,就会执行自定义代码。框架只知道这个类的列表,不关心具体内容,从上到下执行,类似于一个执行链,C#里叫委托链。也就是把代码类放到这个列表中,委托这个框架替你执行。

        事件驱动(not event),把自定义代码注册到框架中,框架代替你执行。或者框架提供几个接口,让你插入数据(python无接口概念,只有事件驱动)。委托不能为空,事件可以为空。通俗来讲,所谓事件驱动,就是说程序就像是一个报警器(reactor),时刻等待着外部事件(event),诸如有人入侵等,一旦有事件发生,程序就会触发一些特定的操作(callback),注入拨打报警电话等。

        简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。

        自定义事件驱动框架如下:

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 #event_drive.py
     5 
     6 event_list = []
     7 
     8 def run():
     9     for event in event_list:
    10         obj = event()
    11         obj.execute()
    12 
    13 class BaseHandler(object):
    14     """
    15     用户必须继承该类,从而规范所有类的方法(类似于接口的功能)
    16     """
    17     def execute(self):
    18         raise Exception('you must overwrite execute')
    自定义事件驱动框架代码

          程序员使用该自定义事件驱动框架

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 from source import event_drive
     5 
     6 class MyHandler(event_drive.BaseHandler):
     7 
     8     def execute(self):
     9         print 'event-drive execute MyHandler'
    10 
    11 event_drive.event_list.append(MyHandler)
    12 event_drive.run()
    程序员使用该框架代码
    执行过程:
    1 导入自定义框架event_drive
    2 自定义类MyClass,这个类必须继承event_drive中的BaseHandler类
    3 MyClass类中重载execute方法
    4 注册事件到框架的委托链,把自定义的类MyClass加入到事件列表event_list中(下面的Twisted框架是创建对象后改一个字段为类名也是同样的目的)
    5 执行run方法

    事件驱动只不过是框架规定了执行顺序,程序员在使用框架时,可以向原执行顺序中注册“事件”,从而在框架执行时可以出发已注册 的“事件”。

        基于事件驱动Twisted模块的Socket

       (1)由于twisted是第三方模块,默认没有安装,需要先安装 

    cd Twisted-15.5.0
    python setup.py build   #编译
    python setup.py install #安装
    上述安装方法适用于windows和linux的命令行安装,实际上也可以直接执行python setup.py install
    注意:twisted依赖于zope和win32api模块,需要先安装依赖。

      (2)实例

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3  
     4 from twisted.internet import protocol
     5 from twisted.internet import reactor
     6  
     7 class Echo(protocol.Protocol):   #继承protocol.py中的Protocol类
     8     def dataReceived(self, data): 
     9         self.transport.write(data)  #将收到的内容直接发送回去
    10  
    11 def main():
    12     factory = protocol.ServerFactory()  #实例化
    13     factory.protocol = Echo   #将自定义类传给对象
    14  
    15     reactor.listenTCP(8000,factory)   #将端口和实例化对象作为参数传给reactor
    16     reactor.run()
    17  
    18 if __name__ == '__main__':
    19     main()
    服务端
     1 import socket
     2 
     3 ip_port=('127.0.0.1',8000)
     4 sk=socket.socket()
     5 sk.connect(ip_port)
     6 sk.settimeout(5)
     7  
     8 while True:
     9     inp=raw_input("please input:")
    10     sk.sendall(inp)
    11     print sk.recv(1024)
    12  
    13 sk.close()
    客户端

       源码类图关系

         实例执行流程

    运行服务端程序
        创建Protocol的派生类Echo
        创建ServerFactory对象,并将Echo类封装到其protocol字段中
        执行reactor的listenTCP方法,内部使用tcp.Port创建socket server对象,并将该对象添加到了reactor的set类型的字段_read 中
        执行reactor的run方法,内部执行while循环,并通过select来监视_read中文件描述符是否有变化,循环中...

    客户端请求到达
        执行reactor的_doReadOrWrite方法,其内部通过反射调用tcp.Port类的doRead方法,内部accept客户端连接并创建Server对象实例(用于封装客户端socket信息)和创建Echo对象实例(用于处理请求),然后调用Echo对象实例的makeConnection方法,创建连接。
        执行tcp.Server类的doRead方法,读取数据,
        执行tcp.Server类的_dataReceived方法,如果读取数据内容为空(关闭链接),否则,出发Echo的dataReceived方法
        执行Echo的dataReceived方法

        从源码可以看出,上述实例本质上使用了事件驱动的方法 和 IO多路复用的机制来进行Socket的处理。

        Pycharm debug模式调试调用关系

        Twisted优点:

         1、使用基于事件驱动的编程模型,而不是多线程模型。

         2、跨平台:为主流操作系统平台暴露出的事件通知系统提供统一的接口。

         3、“内置电池”的能力:提供流行的应用层协议实现,因此Twisted马上就可为开发人员所用。

         4、符合RFC规范,已经通过健壮的测试套件证明了其一致性。

         5、能很容易的配合多个网络协议一起使用。

         6、可扩展。

           更多Twisted内容,请参考:

           http://www.cnblogs.com/c9com/archive/2013/01/05/2845552.html(Twisted reactor解剖)

           http://www.cnblogs.com/zhangjing0502/archive/2012/07/11/2586666.html(Twisted的网络通信模型

           http://www.cnblogs.com/zhangjing0502/archive/2012/07/11/2586575.html(Python中reactor,factory,protocol)

           http://www.cnblogs.com/zhangjing0502/archive/2012/05/16/2504415.html(Twisted异步编程--Deferred)

           http://www.cnblogs.com/zhangjing0502/archive/2012/05/30/2526552.html([Python-Twisted] Twisted入门之端口转发服务器

           http://www.cnblogs.com/Rex7/p/4752581.html(跟踪 twisted 里deferred 的Callback)

    3 paramiko模块

       linux运维中都需要对服务器进行配置,如果服务器数量较多,那么可以进行远程自动化批量操作。在python中的paramiko模块就是实现远程执行命令的模块。使用paramiko模块仅需要在本地安装相应的模块(pycrypto以及paramiko模块),对远程服务器没有配置要求,paramiko模块基于ssh协议,实现对远程服务器的相关操作,对于连接多台服务器,进行复杂的连接操作特别有帮助。

        3.1 paramiko安装

    1 windows下的安装paramiko
    (1)解压pycrypto-2.6.tar.gz源码到路径C:Python27Libsite-packages
    (2)在windows控制台下进入目录pycrypto-2.6,依次执行python setup.py build和python setup.py install
       window是如果没有安装编译器,那么会报错,解决办法是安装VCForPython27.msi(Microsoft Visual C++ Compiler for Python 2.7)
       编译过程中会出现“Unable to find vcvarsall.bat”的错误,解决方法参考http://blog.csdn.net/donger_soft/article/details/44838109
       测试是否安装成功:在python命令行下输入:import pycrypto,检查是否报错
    (3)解压paramiko-1.10.1.tar.gz源码到路径C:Python27Libsite-packages
    (4)在windows控制台下进入目录paramiko-1.10.1,依次执行python setup.py build和python setup.py install
       测试是否安装成功:在python命令行下输入:import paramiko,检查是否报错

    2 ubuntu下的安装paramiko
    (1)先安装python-devel(前提是要安装编译器gcc)
    (2)解压pycrypto-2.6.tar.gz源码,进入目录pycrypto-2.6,执行python setup.py build && python setup.py install
       测试是否安装成功:在python命令行下输入:import pycrypto,检查是否报错
    (3)解压paramiko-1.10.1.tar.gz源码,进入目录paramiko-1.10.1,执行python setup.py build && python setup.py install
       测试是否安装成功:在python命令行下输入:import paramiko,检查是否报错

          3.2 paramiko使用

    SSHClient方法

     1 #!/usr/bin/env python
     2 #-*- coding:utf-8 -*-
     3 
     4 import paramiko
     5 
     6 ssh = paramiko.SSHClient() #创建ssh对象
     7 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #允许连接不在know_hosts文件中的主机
     8 ssh.connect(hostname='192.168.1.100',port=22,username='root',password='111111') #hostname='主机名'或'ip地址'
     9 stdin,stdout,stderror = ssh.exec_command('df') #在远程服务器执行命令df
    10 
    11 print stdout.read() #获取命令结果
    12 print stderror.read() #如果命令执行错误,则返回标准错误输出
    13 ssh.close() #关闭连接
    实例1:在远程服务器执行命令
     1 #!/usr/bin/env python
     2 #-*- coding:utf-8 -*-
     3 
     4 import paramiko
     5 
     6 transport = paramiko.Transport(('192.168.7.100',22)) #创建transport对象
     7 transport.connect(username='root',password='nihao123!')#调用连接方法connect
     8 
     9 ssh = paramiko.SSHClient() #创建ssh对象
    10 ssh._transport = transport #把上面创建的transport对象赋值给ssh对象中的_transport字段
    11 
    12 stdin,stdout,stderr = ssh.exec_command('ifconfig') #执行命令ifconfig
    13 
    14 print stdout.read()
    15 print stderr.read()
    16 
    17 transport.close()
    实例2:在远程服务器执行命令

      在上述两个实例中,其实实例1中connect内部封装了Transport,即:
       ssh = paramiko.SSHClient()
           t = self._transport = Transport(sock, gss_kex=gss_kex, gss_deleg_creds=gss_deleg_creds)
      注意:在操作文件时只能用实例2的方法

    SFTPClient方法

     1 #!/usr/bin/env python
     2 #-*- coding:utf-8 -*-
     3 
     4 import paramiko
     5 
     6 private_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
     7 
     8 ssh = paramiko.SSHClient()  #创建ssh对象
     9 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #允许连接不在know_host文件中的的主机
    10 ssh.connect(hostname='192.168.1.100',port=22,username='root',pkey=private_key) #连接服务器
    11 stdin,stdout,stderr = ssh.exec_command('ifconfig') #执行命令
    12 print stdout.read() #获取命令执行结果
    13 ssh.close()
    14 
    15 '''
    16 如果是运维人员这里不需要看
    17 1、首先创建一个公钥和私钥
    18 ssh-keygen
    19 2、复制id_rsa.pub至要被远程执行命令的机器,并把id_rsa.pub里的内容增加至authorized_keys文件中
    20 如果authorized_keys文件不存在创建即可
    21 '''
    实例5:基于用户名密码的上传下载
     1 #!/usr/bin/env python
     2 #-*- coding:utf-8 -*-
     3 
     4 import paramiko
     5 
     6 private_key = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
     7 
     8 transport = paramiko.Transport(('192.168.1.100',22))
     9 transport.connect(username='root',pkey=private_key) #连接
    10 
    11 sftp = paramiko.SFTPClient.from_transport(transport)#创建SFTPClient对象
    12 
    13 sftp.put('test.zip','/tmp/test.zip')     #将test.zip上传到目标机器的/tmp/目录下,并命名为test.zip
    14 sftp.get('/tmp/messages.log','test.log') #下载目标服务器/tmp/messages.log 到本地,并命名为test.log
    15 
    16 transport.close()
    实例6:基于用户名密钥对的上传下载

       在远程服务器执行命令时,其实时间主要消耗在建立连接上了。

    自定义类的,在连接后进行相应的上传下载操作,这样就可以在一次连接中进行其它操作,避免频繁的创建连接,关闭连接,减少资源消耗

     1 #!/usr/bin/env python
     2 #-*- coding:utf-8 -*-
     3 
     4 import paramiko
     5 import uuid
     6 
     7 class DownUpLoad(object):
     8     def __init__(self,ip,port,user,passwd):
     9         self.hostname = ip
    10         self.port = port
    11         self.username = user
    12         self.password = passwd
    13         
    14     def create_file(self):
    15         file_name = str(uuid.uuid4())  #uuid.uuid4()会生成一个文件UUID,当作文件名
    16         with open(file_name,'wb') as f:
    17             f.write('This is test file!')
    18             return file_name
    19 
    20     def run(self):
    21         self.connect()
    22         self.upload()
    23         self.rename()
    24         self.close()
    25 
    26     def connect(self): #连接方法
    27         transport = paramiko.Transport((self.hostname, self.port)) #创建一个连接对象
    28         transport.connect(username=self.username, password=self.password)#调用transport对象中的连接方法
    29         self.__transport = transport #把transport赋值给__transport
    30 
    31     def close(self): #关闭连接
    32         self.__transport.close()
    33 
    34     def upload(self): #上传文件方法
    35         file_name = self.create_file() #创建文件
    36         sftp = paramiko.SFTPClient.from_transport(self.__transport) #创建基于transport连接的SFTPClient
    37         sftp.put(file_name,'/tmp/test.txt') #上传文件
    38 
    39     def rename(self): #执行命名方法
    40         ssh = paramiko.SSHClient() #建立ssh对象
    41         ssh._transport = self.__transport #替换ssh_transport字段为self.__transport
    42         stdin,stdout,stderr = ssh.exec_command('mv /tmp/test1 /tmp/test2') #执行命令
    43         print stdout.read() #读取执行命令
    44 
    45 if __name__ == '__main__':
    46     ha = DownUpLoad()
    47     ha.run()
    自定义包含连接和上传下载方法的类

    参考资料:

          http://www.cnblogs.com/wupeiqi/articles/5095821.html

          http://www.cnblogs.com/wupeiqi/articles/5040823.html

          http://www.cnblogs.com/luotianshuai/p/5111587.html

          http://www.cnblogs.com/luotianshuai/p/5131053.html

         

  • 相关阅读:
    Redis 2种持久化模式的缺陷
    我看过得最易懂的一段AOP的解释
    mysql-高性能索引策略
    几款效率神器助你走上人生巅峰
    shell脚本报错:"[: =: unary operator expected"
    CentOS7中使用iptables
    php foreach用法和实例
    shell 学习四十五天---xargs
    chain issues incorrect order,EXtra certs,Contains anchor
    Ubuntu 能ping通DNS 地址 无法解析域名
  • 原文地址:https://www.cnblogs.com/maociping/p/5139681.html
Copyright © 2011-2022 走看看