zoukankan      html  css  js  c++  java
  • python学习之rabbitmq

    0、讲述rabbit中各部分的含义及作用

    https://www.jb51.net/article/75647.htm

    1、rabbitMQ的安装

    1)在安装rabbitmq之前需要先安装erlang,下载地址如下:

    http://www.erlang.org/downloads根据系统选择,安装按提示一直下一步就OK,安装完后,再安装rabbitmq

    2、rabbitmq的下载地址:http://www.rabbitmq.com/download.html

    3、rabbitmq队列

    假设现在需要从武汉到北京去见一个网友,哈哈哈,先的打电话约下,然后确定路线和交通工具吧,这就是下边这段代码的实际模型

    import pika
    import random
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  #先打电话约下,看能否找到人
    channel = connection.channel()                                               #确定路线
    channel.queue_declare(queue='task_queue', durable=True)                      #确定交通工具,而且交通工具的名称叫‘task_queue',durable = True表示就是你到了北京以后交通工具依然存在
    
    number = random.randint(1, 1000)
    message = 'hello world:{num}'.format(num = number)
    
    channel.basic_publish(exchange='',      #交换机,此时没有交换机参与,所以参数为空,
                           routing_key='task_queue',    #交通工具的名称
                           body=message,         #要发送给的内容
                           properties=pika.BasicProperties(
                               delivery_mode=2,)   #表示不管路通不通,你携带的消息都不会因为外界情况而消失
                          )
    print(" [x] Sent %r" % (message,))
    connection.close()
    import pika
    import time
    
    hostname = 'localhost'
    parameters = pika.ConnectionParameters(hostname)
    connection = pika.BlockingConnection(parameters)
    
    channel = connection.channel()
    channel.queue_declare(queue='task_queue', durable=True)
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % (body,))
        # time.sleep(5)
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)   #回调函数中需要给发布者发送的消息
    
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(callback, queue='task_queue', no_ack=False)  #no_ack=False当消费者接到消息后,需要调用回掉函数告诉发布者,消息的接受情况
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    下边的代码是通过交换机来实现消息的发送的,具体如下:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',      #交换机的名称为logs
                             exchange_type='fanout')
    
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',     #声明交换机的名称以及交换机发送消息的模式,这的名称要和publishor的交换机的名称相同,相当于publishor和consumor同时向exchange寻找,
    #否则publishor和consumor相互找不到,会迷路!! exchange_type
    ='fanout') #这不同的版本有可能会出错,有的说的是可以写为type = 'fanout',在我的电脑上运行会出错,
    #改为exchange_type = 'fanout'仍然会出错,后来运行cmd->service.msc->找到rabbitmq关闭后再启动就OK了,
    #至于为啥,如果你找到了,跟我说一声,先谢谢啦 result
    = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除 queue_name = result.method.queue #得到队列的名字 channel.queue_bind(exchange='logs', queue=queue_name) #将交换机和队列的名字绑定,也就是说exchange = ‘logs'的交换机只能公国queue = queue_name来发送消息 print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

    4、rabbitmq有选择的接受消息,模型如下

    更过相关内容详见:http://www.cnblogs.com/alex3714/articles/5248247.html

     5、client发送给指令,server根据指令运行完毕后再将结果返回给client

    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response,   #类中定义的方法__init__()是为了建立链接,同时定义一个callback_queue,再basic_publish中传递给server端,用于存放运行的结果
                                       no_ack=True,
                                       queue=self.callback_queue)     #这里的basic_consume只是声明,如果要取消息应该取callback__queue中取
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:    #确定发的指令和收到的结果是相互对应的
                self.response = body
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id,),
                                       body=str(n))
            while self.response is None:
                self.connection.process_data_events() #去队列中取数据,不停的循环,非阻塞版的start_consuming(),去调用basic_consume()
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    def on_request(ch, method, props, body):  #method存放的是bsic_consume中读取数据routing_key,props中存放的是从basic_consume中读取的protperties的数据,body存放的是从发送过来的消息。
        n = int(body)
        print(" [.] fib(%s)" % n)
        response = fib(n)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id= props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()

  • 相关阅读:
    Vuex基本用法
    vue ui 创建项目报错: Failed to get response from https://registry.yarnpkg.com/vue-cli-version-marker
    vue+elementui {{ }} 的使用
    vue+elementui项目去除html标签
    vue+elementui 将后端传来数据编程tree结构、同一级别数据对象
    为某个table项添加slot
    vue+elementui项目,table项内容超过确定行以“...”显示剩余部分的实现
    前端对后台数据的处理
    vue+elementUI项目获取数组最后一位,防止深拷贝问题
    thinkPHP 控制器定义
  • 原文地址:https://www.cnblogs.com/zhouzhe-blog/p/9445662.html
Copyright © 2011-2022 走看看