zoukankan      html  css  js  c++  java
  • RabbitMQ使用详解

    1.RabbitMQ是什么?
    消息队列,应用程序写和读消息队列来实现通信。

    2.使用示例

    1)生产者send.py发送消息到队列

    import pika
    # RabbitMQ用户名,密码
    user_name = 'guest'
    password = 'guest'
    
    credential = pika.PlainCredentials(user_name, password)
    # 建立连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credential))
    # 消息读写管道
    channel = connection.channel()
    # 创建队列
    channel.queue_declare(queue='queue')
    # 发送消息到队列,exchange:路由器(根据类型,路由到相应的队列),routing_key:队列名称,body:消息体
    channel.basic_publish(
        exchange='',
        routing_key='queue',
        body='hello world'
    )
    print('send....')
    # 关闭连接
    connection.close()
    

    2)消费者receive.py从队列获取消息

    import pika
    
    user_name = 'guest'
    password = 'guest'
    
    credential = pika.PlainCredentials(user_name, password)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', credentials=credential))
    channel = connection.channel()
    # 队列不存在则创建,只有一个队列会创建
    channel.queue_declare(queue='queue')
    # 回调处理消息
    def callback(ch, method, properties, body):
        print('receive.py...%r'%body)
    # 从队列接收消息,callback:消息接收后回调处理函数
    channel.basic_consume(
        callback,
        queue='queue',
        no_ack=True
    )
    
    print('waiting...')
    # 开始监听消息队列
    channel.start_consuming()
    

    运行结果:

    $ python receive.py 
    receive.py: Received message 'hello world'
     [*] Waiting for messages. To exit press CTRL+C
    

    3.RabbitMQ基本概念
    1)收发消息过程

    Broker:消息队列服务器实体
    消息:就是一个简单的字符串,每个消息都有一个路由键(routing key)属性
    connection:应用程序和broker的网络连接
    channel:进行消息读写的管道
    exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
    绑定:交换机(路由表)和队列绑定起来
    队列:消息的容器,一个消息可投入一个或多个队列,消费者从队列取走消息
    2)exchange
    接收消息,根据路由键转发消息到绑定的队列
    exchange有四种类型:direct,topic,headers,fanout
    每种规则匹配队列时,CPU的开销是不同的,可以根据不同需求,选择不同类型的交换机。
    Direct交换机:完全匹配,单播
    routing key和对列名完全匹配

    Topic交换机:正则匹配,组播
    根据binding-key匹配符合的routing-key,
    匹配规则:有两种通配符"#"和"",#表示0或多个单词,表示一个单词
    如:binging-key:*.sock.#匹配routing-key: usd.stock和eur.stock.db,但是不匹配stock.ma

    Fanout交换机:消息转发所有绑定对列,最快,广播
    fanout不处理路由键,简单将对列绑定到交换机,消息将转发到所有绑定的队列
    3)注意事项
    没有队列绑定到交换机,则发送到该交换机的消息会丢失
    一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定
    不能更改交换机类型

    4.RabbmitMQ属性
    1)持久性
    如果启用,队列再Server重启前都有效
    2)自动删除
    如果启用,那么队列将在所有的消费者停止使用之后自动删除自身
    3)惰性
    如果没有声明队列,那么在执行到使用的时候回导致异常,并不会主动声明
    4)排他性
    如果启用,队列只能被声明它的消费者使用

    5.RabbitMQ持久化
    1)客户端丢失
    RabbitMQ分发完消息后,就会从内存中把消息删除掉。如果客户端连接断开了,那么客户端正在处理的消息和等待处理的消息,都将丢失。
    因此,RabbitMQ引入了消息确认机制

    no_ack=False
    

    将receive.py中channel.basic_consume消息接收事件中,no_ack属性设为False,客户端消息处理完毕后会发送确认给RabbitMQ服务器。
    需要注意的是,一定要确保在任何情况下,都会发送确认给RabbitMQ,否则将引起内存泄漏。
    2)RabbitMQ服务端丢失
    RabbitMQ重启后,消息将丢失。不过,RabbitMQ提供了持久化机制,可以将消息持久化到磁盘。

    channel.queue_declare(queue='queue', durable=True)
    

    6.常用命令
    1)启动

    rabbitmq-server &
    

    2)队列重置

    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmq stop
    

    3)关闭

    rabbitmqctl stop
    

    4)列举所有用户

    rabbitmqctl list_users
    

    5)列举所有队列

    rabbitmqctl list_queues
    

    6)添加用户

    rabbitmqctl add_user user_name user_passwd
    

    7)设置用户角色为管理员

    rabbitmqctl set_user_tags user administrator
    

    8)权限设置

    rabbitmqctl set_permissions -p / user ".*" ".*" ".*"
    

    9)查看状态

    rabbitmqctl status
    

    10)安装web管理插件

    rabbitmq-plugins enable rabbitmq_management
    

    可以通过http://localhost:15672查看服务器状态

  • 相关阅读:
    java内存回收机制
    scala学习
    [java实现]找一个数组的最大和的连续子数组(时间复杂度 O(n))
    linux 进程的创建
    linux中的进程和线程
    linux 文件系统
    gdb 调试程序
    makefile
    linux下的gcc编译器
    socket 网络编程
  • 原文地址:https://www.cnblogs.com/shijingjing07/p/7623258.html
Copyright © 2011-2022 走看看