zoukankan      html  css  js  c++  java
  • Python开发【第十篇】:RabbitMQ队列

    简介

    RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。

    安装

    首先安装erlang环境。

    官网:http://www.erlang.org/

    Windows版下载地址:http://erlang.org/download/otp_win64_20.0.exe

    Linux版:yum安装

    Windows安装步骤

    第一步运行

    第二步

    第三步

    第四步

    第五步

    Erlang安装完成。

    然后安装RabbitMQ,首先下载RabbitMQ的Windows版本。

    官网:http://www.rabbitmq.com/

    Windows版下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10.exe

    打开安装程序,按照下面步骤安装。

    RabbitMQ安装完成。

    开始菜单中进入管理工具。

    运行命令

    1. rabbitmq-plugins enable rabbitmq_management

    查看RabbitMQ服务是否启动。

    至此全部安装完成。

    Linux安装步骤

    安装erlang。

    1. yum -y install erlang

    安装RabbitMQ。

    1. wget https://github.com/rabbitmq/rabbitmq-server/archive/rabbitmq_v3_6_10.tar.gz
    2. rpm -ivh rabbitmq-server-3.6.10-1.el6.noarch.rpm

    RabbitMQ安装失败,报错如下。

    1. warning: rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY
    2. error: Failed dependencies:
    3.         erlang >= R16B-03 is needed by rabbitmq-server-3.6.10-1.el6.noarch
    4.         socat is needed by rabbitmq-server-3.6.10-1.el6.noarch

    原因是yum安装的erlang版本太低,这里提供的RabbitMQ是最新版3.6.10,所需的erlang版本最低为R16B-03,否则编译时将失败,也就是上述错误。

    重新安装erlang。

    1. wget http://erlang.org/download/otp_src_20.0.tar.gz
    2. tar xvzf otp_src_20.0.tar.gz
    3. cd otp_src_20.0
    4. ./configure
    5. make && make install

    重新安装erlang完毕。

    运行erlang。

    1. erl
    2. Erlang/OTP 20 [erts-9.0] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe] [kernel-poll:false]
    3.  
    4. Eshell V9.0 (abort with ^G)

    安装socat。

    1. yum install -y socat

    再次安装RabbitMQ。

    1. rpm -ivh rabbitmq-server-3.6.10-1.el6.noarch.rpm
    2. warning: rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY
    3. error: Failed dependencies:
    4.         erlang >= R16B-03 is needed by rabbitmq-server-3.6.10-1.el6.noarch

    上述错误信息显示安装失败,因为rabbitMQ的依赖关系所导致,所以要忽略依赖,执行以下命令。

    1. rpm -ivh --nodeps rabbitmq-server-3.6.10-1.el6.noarch.rpm

    安装成功。

    启动、停止RabbitMQ。

    1. rabbitmq-server start     #启动
    2. rabbitmq-server stop     #停止
    3. rabbitmq-server restart    #重启

     

    RabbitMQ使用

    实现最简单的队列通信

    send端(producer)

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    8. channel = connection.channel()
    9.  
    10. # 声明queue
    11. channel.queue_declare(queue='hello')
    12.  
    13. channel.basic_publish(exchange='',
    14.                       routing_key='hello',
    15.                       body='hello word')
    16. print("[x] Sent 'hello word!'")
    17. connection.close()

    receive端(consumer)

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,time
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.queue_declare(queue='hello')
    11.  
    12. def callback(ch,method,properties,body):
    13.     print('-->',ch,method,properties)
    14.     print("[x] Received %s" % body)
    15.  
    16. channel.basic_consume(callback,
    17.                       queue='hello',
    18.                       no_ack=True
    19.                       )
    20.  
    21. print('[*] waiting for messages.To exit press CTRL+C')
    22. channel.start_consuming()

    no_ack分析

    no_ack属性是在调用Basic.Consume方法时可以设置的一个重要参数。no_ack的用途是确保message被consumer成功处理了。这里成功的意识是,在设置了no_ack=false的情况下,只要consumer手动应答了Basic.Ack,就算其成功处理了。

    no_ack=true(此时为自动应答)

    在这种情况下,consumer会在接收到Basic.Deliver+Content-Header+Content-Body之后,立即回复Ack,而这个Ack是TCP协议中的Ack。此Ack的回复不关心consumer是否对接收到的数据进行了处理,当然也不关心处理数据所需要的耗时。

    no_ack=False(此时为手动应答)

    在这种情况下,要求consumer在处理完接收到的Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而这个Ack是AMQP协议中的Basic.Ack。此Ack的回复与业务处理相关,所以具体的回复时间应该要取决于业务处理的耗时。

    总结

    Basic.Ack发给RabbitMQ以告知,可以将相应message从RabbitMQ的消息从缓存中移除。

    Basic.Ack未被consumer发给RabbitMQ前出现了异常,RabbitMQ发现与该consumer对应的连接被断开,将该该message以轮询方式发送给其他consumer(需要存在多个consumer订阅同一个queue)。

    在no_ack=true的情况下,RabbitMQ认为message一旦被deliver出去后就已被确认了,所以会立即将缓存中的message删除,因此在consumer异常时会导致消息丢失。

    来自consumer的Basic.Ack与发送给Producer的Basic.Ack没有直接关系。

    消息持久化

    acknowledgment消息持久化

    no-ack=False,如果consumer挂掉了,那么RabbitMQ会重新将该任务添加到队列中。

    回调函数中

    1. ch.basic_ack(delivery_tag=method.delivery_tag)

    basic_consume中

    1. no_ack=False

    receive端(consumer)

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,time
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.queue_declare(queue='hello')
    11.  
    12. # 定义回调函数
    13. def callback(ch,method,properties,body):
    14.     print('-->',ch,method,properties)
    15.     print("[x] Received %s" % body)
    16.     ch.basic_ack(delivery_tag=method.delivery_tag)
    17.  
    18. # no_ack=False表示消费完以后不主动把状态通知RabbitMQ
    19. channel.basic_consume(callback,
    20.                       queue='hello',
    21.                       no_ack=False
    22.                       )
    23.  
    24. print('[*] waiting for messages.To exit press CTRL+C')
    25. channel.start_consuming()

    durable消息持久化

    producer发送消息时挂掉了,consumer接收消息时挂掉了,以下方法会让RabbitMQ重新将该消息添加到队列中。

    回调函数中

    1. ch.basic_ack(delivery_tag=method.delivery_tag)

    basic_consume中

    1. no_ack=False

    basic_publish中添加参数

    1. properties=pika.BasicProperties(delivery_mode=2)

    channel.queue_declare中添加参数

    1. channel.queue_declare(queue='hello',durable=True)

    send端(producer)

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    8. channel = connection.channel()
    9.  
    10. # 声明queue
    11. channel.queue_declare(queue='hello',durable=True)
    12.  
    13. channel.basic_publish(exchange='',
    14.                       routing_key='hello',
    15.                       body='hello word',
    16.                       properties=pika.BasicProperties(delivery_mode=2))
    17. print("[x] Sent 'hello word!'")
    18. connection.close()

    receive端(consumer)与acknowledgment消息持久化中receive端(consumer)相同。

    消息分发

    默认消息队列里的数据是按照顺序分发到各个消费者,但是大部分情况下,消息队列后端的消费者服务器的处理能力是不相同的,这就会出现有的服务器闲置时间较长,资源浪费的情况。那么,我们就需要改变默认的消息队列获取顺序。可以在各个消费者端配置prefetch_count=1,意思就是告诉RabbitMQ在这个消费者当前消息还没有处理完的时候就不要再发新消息了。

    消费者端

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4. __author__ = 'Golden'
    5. #!/usr/bin/env python3
    6. # -*- coding:utf-8 -*-
    7.  
    8. import pika,time
    9.  
    10. connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    11. channel = connection.channel()
    12.  
    13. channel.queue_declare(queue='hello2',durable=True)
    14.  
    15. def callback(ch,method,properties,body):
    16.     print('-->',ch,method,properties)
    17.     print("[x] Received %s" % body)
    18.     time.sleep(30)
    19.     ch.basic_ack(delivery_tag=method.delivery_tag)
    20.  
    21. channel.basic_qos(prefetch_count=1)
    22. channel.basic_consume(callback,
    23.                       queue='hello2',
    24.                       no_ack=False
    25.                       )
    26.  
    27. print('[*] waiting for messages.To exit press CTRL+C')
    28. channel.start_consuming()

    生产者端不变。

    消息发布和订阅(publishsubscribe)

    发布和订阅与简单的消息队列区别在于,发布和订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。类似广播的效果,这时候就要用到exchange。Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。

    fanout:所有bind到此exchange的queue都可以接收消息。

    direct:通过routingKey和exchange决定的哪个唯一的queue可以接收消息。

    topic:所有符合routingKey(可以是一个表达式)的routingKey所bind的queue可以接收消息。

    表达式符号说明

    #:一个或多个字符

    *:任何字符

    例如:#.a会匹配a.a,aa.a,aaa.a等。

    *.a会匹配a.a,b.a,c.a等。

    注意:使用RoutingKey为#,Exchange Type为topic的时候相对于使用fanout。

    heaers:通过headers来决定把消息发给哪些queue。

    publisher

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,sys
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.exchange_declare(exchange='logs',type='fanout')
    11.  
    12. message = ''.join(sys.argv[1:]) or 'info:Hello World!'
    13. channel.basic_publish(exchange='logs',
    14.                       routing_key='',
    15.                       body=message)
    16.  
    17. print('[x] Send %r' % message)
    18. connection.close()

    subscriber

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9. channel.exchange_declare(exchange='logs',type='fanout')
    10. # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    11. result = channel.queue_declare(exclusive=True)
    12. queue_name = result.method.queue
    13. channel.queue_bind(exchange='logs',queue=queue_name)
    14. print('[*]Waiting for logs.To exit press CTRL+C')
    15. def callback(ch,method,properties,body):
    16.     print('[*] %s'%body)
    17.  
    18. channel.basic_consume(callback,
    19.                       queue=queue_name,
    20.                       no_ack=True)
    21.  
    22. channel.start_consuming()

    关键字发送(echange type=direct)

    发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判定应该将数据发送至哪个队列。

    publisher

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,sys
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.exchange_declare(exchange='direct_logs',
    11.                          type='direct')
    12.  
    13. # severity = 'error'
    14. severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    15. # message = 'Hello World!'
    16. message = ''.join(sys.argv[2:]) or 'Hello World!'
    17.  
    18. channel.basic_publish(exchange='direct_logs',
    19.                       routing_key=severity,
    20.                       body=message)
    21. print('[x] Send %r:%r' % (severity,message))
    22. connection.close()

    subscriber

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,sys
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.exchange_declare(exchange='direct_logs',
    11.                          type='direct')
    12.  
    13. result = channel.queue_declare(exclusive=True)
    14. queue_name = result.method.queue
    15.  
    16. severities = sys.argv[1:]
    17. if not severities:
    18.     sys.stderr.write('Usage: %s [info] [warning] [error] ' % sys.argv[0])
    19.     sys.exit(1)
    20.  
    21. for severity in severities:
    22.     channel.queue_bind(exchange='direct_logs',
    23.                        queue=queue_name,
    24.                        routing_key=severity)
    25.  
    26. print('[*] Waiting for logs.To exit press CTRL+C')
    27.  
    28. def callback(ch,method,properties,body):
    29.     print('[*] %r:%r' % (method.routing_key,body))
    30.  
    31. channel.basic_consume(callback,
    32.                       queue=queue_name,
    33.                       no_ack=True)
    34.  
    35. channel.start_consuming()

    启动subscriber1

    1. python3 direct_subscriber.py warning

    启动subscriber2

    1. python3 direct_subscriber.py error

    启动publisher1

    1. python3 direct_publisher.py info

    启动publisher2

    1. python3 direct_publisher.py warning

    启动publisher3

    1. python3 direct_publisher.py error

    结果

    模糊匹配(exchange type=topic)

    在topic类型下,可以让队列绑定几个模糊的关键字,发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功则将数据发送到指定队列。

    *:匹配任意一个字符

    #:匹配任意个字符

    publisher

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,sys
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.exchange_declare(exchange='topic_logs',
    11.                          type='topic')
    12.  
    13. routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    14. message = ''.join(sys.argv[2:]) or 'Hello World!'
    15. channel.basic_publish(exchange='topic_logs',
    16.                       routing_key=routing_key,
    17.                       body=message)
    18.  
    19. print('[x] Sent %r:%r' % (routing_key,message))
    20. connection.close()

    subscriber

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,sys
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.exchange_declare(exchange='topic_logs',
    11.                          type='topic')
    12.  
    13. result = channel.queue_declare(exclusive=True)
    14. queue_name = result.method.queue
    15.  
    16. binding_keys = sys.argv[1:]
    17. if not binding_keys:
    18.     sys.stderr.write('Usage: %s [binding_key]... ' % sys.argv[0])
    19.     sys.exit(1)
    20.  
    21. for binding_key in binding_keys:
    22.     channel.queue_bind(exchange='topic_logs',
    23.                        queue=queue_name,
    24.                        routing_key=binding_key)
    25.  
    26. print('[*] Waiting for logs.To exit press CTRL+C')
    27.  
    28. def callback(ch,method,properties,body):
    29.     print('[x] %r:%r' % (method.routing_key,body))
    30.  
    31. channel.basic_consume(callback,
    32.                       queue=queue_name,
    33.                       no_ack=True)
    34.  
    35. channel.start_consuming()

    测试

    远程过程调用(RPC)

    RPC(Remote Procedure Call Protocol)远程过程调用协议。在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的服务器。但是在做开发的时候往往要用到其他团队的方法,因为已经有了实现。但是这些服务部署在不同的服务器,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会很低效。PRC协议定义了规划,其它的公司都给出了不同的实现。比如微软的wcf,以及WebApi。

    在RabbitMQ中RPC的实现是很简单高效的,现在客户端、服务端都是消息发布者与消息接受者。

    首先客户端通过RPC向服务端发生请求。correlation_id:请求标识,erply_to:结果返回队列。(我这里有一些数据需要你给我处理一下,correlation_id是我请求标识,你处理完成之后把结果返回到erply_to队列)

    服务端拿到请求,开始处理并返回。correlation_id:客户端请求标识。(correlation_id这是你的请求标识,还给你。这时候客户端用自己的correlation_id与服务端返回的correlation_id进行对比,相同则接收。)

    rpc_server

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,time
    6.  
    7. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    8. channel = connection.channel()
    9.  
    10. channel.queue_declare(queue='rpc_queue')
    11. def fib(n):
    12.     if n == 0:
    13.         return 0
    14.     elif n == 1:
    15.         return 1
    16.     else:
    17.         return fib(n-1) + fib(n-2)
    18.  
    19. def on_request(ch,method,props,body):
    20.     n = int(body)
    21.     print('[.] fib(%s)' % n)
    22.     response = fib(n)
    23.     ch.basic_publish(exchange='',
    24.                      routing_key=props.reply_to,
    25.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
    26.                      body = str(response))
    27.     ch.basic_ack(delivery_tag=method.delivery_tag)
    28.  
    29. channel.basic_qos(prefetch_count=1)
    30. channel.basic_consume(on_request,queue='rpc_queue')
    31.  
    32. print('[x] Awaiting RPC requests')
    33. channel.start_consuming()

    rpc_client

    1. __author__ = 'Golden'
    2. #!/usr/bin/env python3
    3. # -*- coding:utf-8 -*-
    4.  
    5. import pika,uuid
    6.  
    7. class FibonacciRpcClient(object):
    8.     def __init__(self):
    9.         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    10.         self.channel = self.connection.channel()
    11.         result = self.channel.queue_declare(exclusive=True)
    12.         self.callback_queue = result.method.queue
    13.         self.channel.basic_consume(self.on_response,no_ack=True,
    14.                                    queue=self.callback_queue)
    15.  
    16.     def on_response(self,ch,method,props,body):
    17.         if self.corr_id == props.correlation_id:
    18.             self.response = body
    19.  
    20.     def call(self,n):
    21.         self.response = None
    22.         self.corr_id = str(uuid.uuid4())
    23.         self.channel.basic_publish(exchange='',
    24.                                    routing_key='rpc_queue',
    25.                                    properties=pika.BasicProperties(
    26.                                        reply_to=self.callback_queue,
    27.                                        correlation_id=self.corr_id,),
    28.                                    body=str(n))
    29.         while self.response is None:
    30.             self.connection.process_data_events()
    31.         return int(self.response)
    32.  
    33. fibonacci_rpc = FibonacciRpcClient()
    34.  
    35. print('[x] Requesting fib(10)')
    36. response = fibonacci_rpc.call(10)
    37. print('[.] Got %r ' % response)

     

  • 相关阅读:
    线程+IO流
    jiava trim()
    statement 的延伸 ----》PreparedStatement
    java中math的用法
    java中获取所有文件--(递归调用)
    编写一个JAVA类,用于计算两个日期之间的周数。
    java中数组排序.知识点
    javascript 常用功能总结
    jquery
    创建 HTML内容
  • 原文地址:https://www.cnblogs.com/yinshoucheng-golden/p/7384686.html
Copyright © 2011-2022 走看看