zoukankan      html  css  js  c++  java
  • Python网络编程学习_Day11

    一、协程

    1.理论知识

    协程,又称伪线程,是一种用户态的轻量级线程。

    协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

    优点:

    • 无需线程上下文切换的开销
    • 无需原子操作锁定及同步的开销
      •   "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
    • 方便切换控制流,简化编程模型
    • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

    缺点:

    • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
    • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

    协程满足条件:

    1. 必须在只有一个单线程里实现并发
    2. 修改共享数据不需加锁
    3. 用户程序里自己保存多个控制流的上下文栈
    4. 一个协程遇到IO操作自动切换到其它协程

    2.代码实例

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

     1 import gevent
     2 def func1():
     3     print('33[31;1m李闯在跟海涛搞...33[0m')
     4     gevent.sleep(2)
     5     print('33[31;1m李闯又回去跟继续跟海涛搞...33[0m')
     6 
     7 
     8 def func2():
     9     print('33[32;1m李闯切换到了跟海龙搞...33[0m')
    10     gevent.sleep(1)
    11     print('33[32;1m李闯搞完了海涛,回来继续跟海龙搞...33[0m')
    12 
    13 def func3():
    14     print("33333")
    15     gevent.sleep(1)
    16     print("4444")
    17 
    18 gevent.joinall([
    19     gevent.spawn(func1),
    20     gevent.spawn(func2),
    21     gevent.spawn(func3),
    22 ])
    输出结果:
    
    李闯在跟海涛搞...
    李闯切换到了跟海龙搞...
    33333
    李闯搞完了海涛,回来继续跟海龙搞...
    4444
    李闯又回去跟继续跟海涛搞...

    3.同步与异步的性能区别

    import gevent
     
    def task(pid):
        """
        Some non-deterministic task
        """
        gevent.sleep(0.5)
        print('Task %s done' % pid)
     
    def synchronous():
        for i in range(1,10):
            task(i)
     
    def asynchronous():
        threads = [gevent.spawn(task, i) for i in range(10)]
        gevent.joinall(threads)
     
    print('Synchronous:')
    synchronous()
     
    print('Asynchronous:')
    asynchronous()

    4.遇到IO阻塞自动切换任务(爬虫实例)

     

     1 import gevent
     2 from gevent import monkey
     3 monkey.patch_all()
     4 from  urllib.request import urlopen
     5 import time
     6 
     7 def pa_web_page(url):
     8     print("GET url",url)
     9     req = urlopen(url)
    10     data =req.read()
    11     print(data)
    12     print('%d bytes received from %s.' % (len(data), url))
    13 
    14 t_start = time.time()
    15 pa_web_page("http://www.autohome.com.cn/beijing/")
    16 pa_web_page("http://www.xiaohuar.com/")
    17 print("time cost:",time.time()-t_start)
    18 
    19 t2_start = time.time()
    20 gevent.joinall([
    21         #gevent.spawn(pa_web_page, 'https://www.python.org/'),
    22         gevent.spawn(pa_web_page, 'http://www.autohome.com.cn/beijing/'),
    23         gevent.spawn(pa_web_page, 'http://www.xiaohuar.com/'),
    24         #gevent.spawn(pa_web_page, 'https://github.com/'),
    25 ])
    26 print("time cost t2:",time.time()-t2_start)

    二、事件驱动与异步IO

    事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

    在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

    在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

    在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

    当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

    1. 程序中有许多任务,而且…
    2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
    3. 在等待事件到来时,某些任务会阻塞。

    当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

    网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

    1.select多并发socket例子

     1 #_*_coding:utf-8_*_
     2 __author__ = 'Alex Li'
     3 
     4 import select
     5 import socket
     6 import sys
     7 import queue
     8 
     9 
    10 server = socket.socket()
    11 server.setblocking(0)
    12 
    13 server_addr = ('localhost',10000)
    14 
    15 print('starting up on %s port %s' % server_addr)
    16 server.bind(server_addr)
    17 
    18 server.listen(5)
    19 
    20 
    21 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
    22 outputs = []
    23 
    24 message_queues = {}
    25 
    26 while True:
    27     print("waiting for next event...")
    28 
    29     readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里
    30 
    31     for s in readable: #每个s就是一个socket
    32 
    33         if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
    34             #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
    35             #新连接进来了,接受这个连接
    36             conn, client_addr = s.accept()
    37             print("new connection from",client_addr)
    38             conn.setblocking(0)
    39             inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
    40             #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
    41             #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
    42 
    43             message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
    44 
    45         else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
    46             #客户端的数据过来了,在这接收
    47             data = s.recv(1024)
    48             if data:
    49                 print("收到来自[%s]的数据:" % s.getpeername()[0], data)
    50                 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
    51                 if s not  in outputs:
    52                     outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
    53 
    54 
    55             else:#如果收不到data代表什么呢? 代表客户端断开了呀
    56                 print("客户端断开了",s)
    57 
    58                 if s in outputs:
    59                     outputs.remove(s) #清理已断开的连接
    60 
    61                 inputs.remove(s) #清理已断开的连接
    62 
    63                 del message_queues[s] ##清理已断开的连接
    64 
    65 
    66     for s in writeable:
    67         try :
    68             next_msg = message_queues[s].get_nowait()
    69 
    70         except queue.Empty:
    71             print("client [%s]" %s.getpeername()[0], "queue is empty..")
    72             outputs.remove(s)
    73 
    74         else:
    75             print("sending msg to [%s]"%s.getpeername()[0], next_msg)
    76             s.send(next_msg.upper())
    77 
    78 
    79     for s in exeptional:
    80         print("handling exception for ",s.getpeername())
    81         inputs.remove(s)
    82         if s in outputs:
    83             outputs.remove(s)
    84         s.close()
    85 
    86         del message_queues[s]
     1 import socket
     2 import sys
     3 
     4 messages = [ b'This is the message. ',
     5              b'It will be sent ',
     6              b'in parts.',
     7              ]
     8 server_address = ('localhost', 10000)
     9 
    10 # Create a TCP/IP socket
    11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    12           socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    13           ]
    14 
    15 # Connect the socket to the port where the server is listening
    16 print('connecting to %s port %s' % server_address)
    17 for s in socks:
    18     s.connect(server_address)
    19 
    20 for message in messages:
    21 
    22     # Send messages on both sockets
    23     for s in socks:
    24         print('%s: sending "%s"' % (s.getsockname(), message) )
    25         s.send(message)
    26 
    27     # Read responses on both sockets
    28     for s in socks:
    29         data = s.recv(1024)
    30         print( '%s: received "%s"' % (s.getsockname(), data) )
    31         if not data:
    32             print(sys.stderr, 'closing socket', s.getsockname() )
    33 
    34 select socket client

    2.selectors模块

     1 import selectors
     2 import socket
     3 
     4 def accept(sock, mask):
     5     conn, addr = sock.accept()  # Should be ready
     6     print('accepted', conn, 'from', addr)
     7     conn.setblocking(False)#非阻塞,或者设置为0
     8     sel.register(conn, selectors.EVENT_READ, read)
     9 def read(conn, mask):
    10     try:
    11         data = conn.recv(1000)  # Should be ready
    12         if data:
    13             print('echoing', repr(data), 'to', conn)
    14             conn.send(data)  # Hope it won't block
    15         else:
    16             print('closing', conn)
    17             sel.unregister(conn)
    18             conn.close()
    19     except ConnectionResetError as e:
    20         sel.unregister(conn)
    21 sock = socket.socket()
    22 sock.bind(('localhost', 10000))
    23 sock.listen(100)
    24 sock.setblocking(False)
    25 
    26 sel = selectors.DefaultSelector()#生成实例
    27 sel.register(sock, selectors.EVENT_READ, accept)#注册sock连接,读事件,如果有请求调用accept
    28 #select.select(inputs,outputs...)
    29 while True:
    30     events = sel.select() #如果没有事件,一直等待,返回列表
    31     for key, mask in events: #有事件,循环events列表
    32         callback = key.data #accept内存地址,发送数据后变成read内存地址
    33         print("--->",key,mask)
    34         callback(key.fileobj, mask)#fileobj是conn,
    35         #fileobj=<socket.socket fd=220, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 10000)>,

    三、RabbitMQ队列

    1.安装

    安装python rabbitMQ module 

    pip install pika
    or
    easy_install pika
    or
    源码
      
    https://pypi.python.org/pypi/pika

    2.最简单的通讯队列

    send端

     1 #!/usr/bin/env python
     2 import pika
     3  
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5                'localhost'))
     6 channel = connection.channel()
     7  
     8 #声明queue
     9 channel.queue_declare(queue='hello')
    10  
    11 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    12 channel.basic_publish(exchange='',
    13                       routing_key='hello',
    14                       body='Hello World!')
    15 print(" [x] Sent 'Hello World!'")
    16 connection.close()

    receive端

     1 #_*_coding:utf-8_*_
     2 __author__ = 'Alex Li'
     3 import pika
     4  
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6                'localhost'))
     7 channel = connection.channel()
     8  
     9  
    10 #You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    11 # We could avoid that if we were sure that the queue already exists. For example if send.py program
    12 #was run before. But we're not yet sure which program to run first. In such cases it's a good
    13 # practice to repeat declaring the queue in both programs.
    14 channel.queue_declare(queue='hello')
    15  
    16 def callback(ch, method, properties, body):
    17     print(" [x] Received %r" % body)
    18  
    19 channel.basic_consume(callback,
    20                       queue='hello',
    21                       no_ack=True)#这种情况下一旦被deliver出去,就已经被确认了,在consumer异常时会导致消息丢失。
    22 23 print(' [*] Waiting for messages. To exit press CTRL+C') 24 channel.start_consuming()

     3.Work Queues模式

    这种模式下,RabbitMQ会默认把消息依次分发给各个消费者,跟负载均衡差不多。

    消息提供着代码(send):

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 # Author:Liumj
     4 import pika
     5 import time
     6 import sys
     7 connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) #建立socket连接
     8 channel = connection.channel() #打开一个通道
     9 channel.queue_declare(queue='hello') #声明queue,名称是hello
    10 message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
    11 channel.basic_publish(exchange = '',
    12                       routing_key='hello', #queue名
    13                       body = message, #消息内容
    14                       properties=pika.BasicProperties(
    15                           delivery_mode=2
    16                       )  #basic_publist发消息
    17 )
    18 
    19 connection.close()
    View Code

    消费者代码(recv):

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 # Author:Liumj
     4 import pika,time
     5 connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1')) #建立连接
     6 channel = connection.channel() #建立通道
     7 channel.queue_declare(queue='hello') #如果确定hello存在,该条可以不写
     8 def callback(ch,method,properties,body): #channel对象,属性信息
     9     print("[x] Received %r" % body)
    10     #time.sleep(20)
    11     print("[x] Done")
    12     print("method.delivery_tag",method.delivery_tag)
    13     ch.basic_ack(delivery_tag=method.delivery_tag)
    14 channel.basic_consume(callback,  #从hello里面收消息,收到之后调用callback函数
    15                       queue='hello',
    16                       no_ack=True)
    17 print('[*] waiting for message. To exit press CTRL+C')
    18 channel.start_consuming()
    View Code

    消息会自动依次分配到各个消费者身上。

    4.消息持久化和公平分发

    为防止消息发送过程中出现异常需要将消息持久化,这样重启服务消息不会丢失。

    消息公平分发:根据每个机器配置不同,处理的任务不同,配置perfetch = 1,告诉RabbitMQ,在这个消费者当前消息没有处理完之前,不要发送新的消息。

    完整代码如下:

    生产者(send):

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 # Author:Liumj
     4 # !/usr/bin/env python
     5 import pika
     6 import sys
     7 connection = pika.BlockingConnection(pika.ConnectionParameters(
     8     host='127.0.0.1'))   #建立连接
     9 channel = connection.channel() #打开通道
    10 channel.queue_declare(queue='task_queue', durable=True) #声明queue、队列持久化
    11 message = ' '.join(sys.argv[1:]) or "Hello World!"#消息内容
    12 channel.basic_publish(exchange='',
    13                       routing_key='task_queue',
    14                       body=message,
    15                       properties=pika.BasicProperties(
    16                           delivery_mode=2,  # make message persistent
    17                       ))
    18 print(" [x] Sent %r" % message)
    19 connection.close()

    消费者(recv):

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 # Author:Liumj
     4 import pika
     5 import time
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(
     7     host='127.0.0.1'))
     8 channel = connection.channel()
     9 channel.queue_declare(queue='task_queue', durable=True)
    10 print(' [*] Waiting for messages. To exit press CTRL+C')
    11 def callback(ch, method, properties, body):
    12     print(" [x] Received %r" % body)
    13     time.sleep(body.count(b'.'))
    14     print(" [x] Done")
    15     ch.basic_ack(delivery_tag=method.delivery_tag)
    16 channel.basic_qos(prefetch_count=1) #公平分发
    17 channel.basic_consume(callback,
    18                       queue='task_queue')  #从task_queue里面接收消息后调用callback函数
    19 channel.start_consuming()

     5.PublishSubscribe(消息发布订阅)

     类似于广播,只要符合条件都可以接收消息

    fanout:所有bind到此exchange的queue都可以接收消息

    direct:通过routingKey和exchange决定哪一个唯一的queue可以接收消息,队列绑定关键字,发送者讲根据数据关键字发送到消息exchange,exchange根据关键字判定应该将数据发送制定队列。

    topic:所有符合routingKey所bind的queue可以接收消息

    publisher_fanout:

     1 import pika
     2 import sys
     3 #credentials = pika.PlainCredentials('alex', 'alex3714')
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5     host='127.0.0.1'))
     6 channel = connection.channel()
     7 channel.exchange_declare(exchange='logs', type='fanout')
     8 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
     9 channel.basic_publish(exchange='logs',
    10                       routing_key='',
    11                       body=message)
    12 print(" [x] Sent %r" % message)
    13 connection.close()
    View Code

    subscriber_fanout:

     1 import pika
     2 #credentials = pika.PlainCredentials('alex', 'alex3714')
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(
     4     host='127.0.0.1'))
     5 channel = connection.channel()
     6 channel.exchange_declare(exchange='logs',type='fanout')
     7 result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
     8 queue_name = result.method.queue
     9 channel.queue_bind(exchange='logs',queue=queue_name)
    10 print(' [*] Waiting for logs. To exit press CTRL+C')
    11 def callback(ch, method, properties, body):
    12     print(" [x] %r" % body)
    13 channel.basic_consume(callback,
    14                       queue=queue_name,
    15                       )
    16 channel.start_consuming()
    View Code

    publisher_direct:

     1 import pika
     2 import sys
     3  
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='127.0.0.1'))
     6 channel = connection.channel()
     7  
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10  
    11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    13 channel.basic_publish(exchange='direct_logs',
    14                       routing_key=severity,
    15                       body=message)
    16 print(" [x] Sent %r:%r" % (severity, message))
    17 connection.close()
    View Code

    subscriber_direct:

     1 import pika
     2 import sys
     3  
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='127.0.0.1'))
     6 channel = connection.channel()
     7  
     8 channel.exchange_declare(exchange='direct_logs',
     9                          type='direct')
    10  
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13  
    14 severities = sys.argv[1:]
    15 if not severities:
    16     sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
    17     sys.exit(1)
    18  
    19 for severity in severities:
    20     channel.queue_bind(exchange='direct_logs',
    21                        queue=queue_name,
    22                        routing_key=severity)
    23  
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25  
    26 def callback(ch, method, properties, body):
    27     print(" [x] %r:%r" % (method.routing_key, body))
    28  
    29 channel.basic_consume(callback,
    30                       queue=queue_name,
    31                       no_ack=True)
    32  
    33 channel.start_consuming()
    View Code

    publisher_topic:

     1 import pika
     2 import sys
     3  
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='127.0.0.1'))
     6 channel = connection.channel()
     7  
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10  
    11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    13 channel.basic_publish(exchange='topic_logs',
    14                       routing_key=routing_key,
    15                       body=message)
    16 print(" [x] Sent %r:%r" % (routing_key, message))
    17 connection.close()
    View Code

    subscriber_topic:

     1 import pika
     2 import sys
     3  
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5         host='127.0.0.1'))
     6 channel = connection.channel()
     7  
     8 channel.exchange_declare(exchange='topic_logs',
     9                          type='topic')
    10  
    11 result = channel.queue_declare(exclusive=True)
    12 queue_name = result.method.queue
    13  
    14 binding_keys = sys.argv[1:]
    15 if not binding_keys:
    16     sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
    17     sys.exit(1)
    18  
    19 for binding_key in binding_keys:
    20     channel.queue_bind(exchange='topic_logs',
    21                        queue=queue_name,
    22                        routing_key=binding_key)
    23  
    24 print(' [*] Waiting for logs. To exit press CTRL+C')
    25  
    26 def callback(ch, method, properties, body):
    27     print(" [x] %r:%r" % (method.routing_key, body))
    28  
    29 channel.basic_consume(callback,
    30                       queue=queue_name,
    31                       no_ack=True)
    32  
    33 channel.start_consuming()
    View Code

     6.RPC

    RabbitMQ_RPC_send:

     1 import pika
     2 import uuid
     3 class SSHRpcClient(object):
     4     def __init__(self):
     5 #        credentials = pika.PlainCredentials('alex', 'alex3714')
     6         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
     7                             host='127.0.0.1'))
     8         self.channel = self.connection.channel()
     9         result = self.channel.queue_declare(exclusive=True) # 客户端的结果必须要返回到这个queue
    10         self.callback_queue = result.method.queue
    11         self.channel.basic_consume(self.on_response,queue=self.callback_queue) #声明从这个queue里收结果
    12     def on_response(self, ch, method, props, body):
    13         if self.corr_id == props.correlation_id: #任务标识符
    14             self.response = body
    15             print(body)
    16     def call(self, n):
    17         self.response = None
    18         self.corr_id = str(uuid.uuid4()) #唯一标识符
    19         self.channel.basic_publish(exchange='',
    20                                    routing_key='rpc_queue3',
    21                                    properties=pika.BasicProperties(
    22                                        reply_to=self.callback_queue,
    23                                        correlation_id=self.corr_id,
    24                                    ),
    25                                    body=str(n))
    26         print("start waiting for cmd result ")
    27         #self.channel.start_consuming()
    28         count = 0
    29         while self.response is None: #如果命令没返回结果
    30             print("loop ",count)
    31             count +=1
    32             self.connection.process_data_events() #以不阻塞的形式去检测有没有新事件
    33             #如果没事件,那就什么也不做, 如果有事件,就触发on_response事件
    34         return self.response
    35 ssh_rpc = SSHRpcClient()
    36 print(" [x] sending cmd")
    37 response = ssh_rpc.call("ipconfig")
    38 
    39 
    40 print(" [.] Got result ")
    41 print(response.decode("gbk"))
    View Code

    RabbitMQ_RPC_recv:

     1 import pika
     2 import time
     3 import subprocess
     4 #credentials = pika.PlainCredentials('alex', 'alex3714')
     5 connection = pika.BlockingConnection(pika.ConnectionParameters(
     6     host='127.0.0.1'))
     7 channel = connection.channel()
     8 channel.queue_declare(queue='rpc_queue3')
     9 def SSHRPCServer(cmd):
    10     # if n == 0:
    11     #     return 0
    12     # elif n == 1:
    13     #     return 1
    14     # else:
    15     #     return fib(n - 1) + fib(n - 2)
    16     print("recv cmd:",cmd)
    17     cmd_obj = subprocess.Popen(cmd.decode(),shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    18     result = cmd_obj.stdout.read() or cmd_obj.stderr.read()
    19     return result
    20 def on_request(ch, method, props, body):
    21     #n = int(body)
    22     print(" [.] fib(%s)" % body)
    23     response = SSHRPCServer(body)
    24     ch.basic_publish(exchange='',
    25                      routing_key=props.reply_to,
    26                      properties=pika.BasicProperties(correlation_id= 
    27                                                          props.correlation_id),
    28                      body=response)
    29 channel.basic_consume(on_request, queue='rpc_queue3')
    30 print(" [x] Awaiting RPC requests")
    31 channel.start_consuming()
    View Code

     

  • 相关阅读:
    kafka 项目实战
    7.DHCP的相关命令
    3.centos 7执行service iptables save报错问题
    39.NFS(网络文件系统)
    37.Samba 文件共享服务1--配置共享资源
    36.Samba 文件共享服务1--安装及配置参数解释
    35.简单文件传输协议
    34.vsftpd服务程序--虚拟用户模式
    33.vsftpd服务程序--本地用户模式
    32.vsftpd服务程序--匿名开放模式
  • 原文地址:https://www.cnblogs.com/liumj0305/p/6197611.html
Copyright © 2011-2022 走看看