zoukankan      html  css  js  c++  java
  • rabbitMQ基础应用

    1、安装erlang

    [root@localhost ~]#yum -y install erlang

    2、安装rabbitMQ

    [root@localhost ~]#yum -y install rabbitmq-server

    3、添加用户

    [root@localhost ~]# rabbitmqctl add_user rabbit_user 123.com   // 添加admin的用户密码为123.com

    4、将角色添加到管理员组

    [root@localhost ~]# rabbitmqctl set_user_tags rabbit_user administrator

    5、设置用户权限

    [root@localhost ~]# rabbitmqctl set_permissions -p "/" rabbit_user  ".*" ".*" ".*"   //set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
    
    

    6、启用web插件

    [root@localhost ~]# rabbitmq-plugins enable rabbitmq_management

    7、启动rabbitMQ服务

    [root@localhost ~]# systemctl start rabbitmq-server

    8、访问http://192.168.10.10:15672/,如果一切顺利的话你会看到如下界面

    9、输入用户名密码进入rabbitMQ后台,你会看到像下面这个样子。

    到此rabbitMQ已经可以正常运行了,下面我们使用Python来操作队列。


    1、安装pika模块

    [root@localhost ~]# pip3 install pika

    2、创建生产者模型

    [root@localhost rabbitMQ]# vim producer.py 
    #!/usr/bin/env python
    import pika
    # 创建凭证,使用rabbitmq用户密码登录
    credentials = pika.PlainCredentials("rabbit_user","123.com")
    # 新建连接,这里localhost可以更换为服务器ip
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
    # 创建通道
    channel = connection.channel()
    # 在通道内声明一个队列,用于接收消息,队列名字叫“test_queue”
    channel.queue_declare(queue='test_queue')
    # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),我们暂且将(exchange=''),
    # 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据
    while True:
        content = input("生产者输入数据>>>")
        if content.upper() == "Q":
            break
        channel.basic_publish(exchange='',
                          routing_key='test_queue',
                          body=content)
        print("已经发送了消息")
    # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
    connection.close()

    3、运行生产者模型

    [root@localhost rabbitMQ]# python3 producer.py

    4、创建消费者模型

    [root@localhost rabbitMQ]# vim consumer.py 
    #!/usr/bin/env python
    import pika
    # 建立与rabbitmq的连接
    credentials = pika.PlainCredentials("rabbit_user","123.com")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
    # 创建通道
    channel = connection.channel()
    # 在通道中创建队列
    channel.queue_declare(queue="test_queue")
    
    def callbak(ch,method,properties,body):
        print("消费者接收了消息:%r"%body.decode("utf8"))
    # 有消息来时,立即执行callbak。
    channel.basic_consume("test_queue",callbak,auto_ack=True)
    # 等待接收消息
    channel.start_consuming()

    auto_ack=True:表示不确认机制也就是说每次消费者接收到数据后,不管是否处理完毕,rabbitmq-server都会把这个消息标记完成,从队列中删除,这样会有个缺陷,就是当我们从队列中获取到消息后,碰巧程序崩溃,或者什么其它原因导致程序终,此时消息已经从消息队列中删除,造成数据的不安全。

    5、运行消费者模型

    [root@localhost rabbitMQ]# python3 consumer.py 

    6、验证消息队列的轮询机制

    7、下面演示auto_ack=True的不安全机制,我们在消费者模型中主动抛出异常模拟程序非正常终止,然后查看消息队列。

    7.1修改消费者模型(主动抛出异常)如下:

    [root@localhost rabbitMQ]# vim consumer.py 
    #!/usr/bin/env python
    import pika
    # 建立与rabbitmq的连接
    credentials = pika.PlainCredentials("rabbit_user","123.com")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
    # 创建通道
    channel = connection.channel()
    # 在通道中创建队列
    channel.queue_declare(queue="test_queue")
    
    def callbak(ch,method,properties,body):
        # 主动抛出异常
        raise TypeError
        print("消费者接收了消息:%r"%body.decode("utf8"))
    # 有消息来时,立即执行callbak。
    channel.basic_consume("test_queue",callbak,auto_ack=True)
    # 等待接收消息
    channel.start_consuming()

    7.2运行生产者模型

    [root@localhost rabbitMQ]# python3 producer.py 

    7.3运行消费之模型,查看队列变化

    8、使用相对可靠的消息机制确认来保证数据安全

    修改消费者模型的配置文件如下;

    [root@localhost rabbitMQ]# vim consumer.py
    #!/usr/bin/env python
    import pika
    # 建立与rabbitmq的连接
    credentials = pika.PlainCredentials("rabbit_user","123.com")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials))
    # 创建通道
    channel = connection.channel()
    # 在通道中创建队列
    channel.queue_declare(queue="test_queue")
    
    def callbak(ch,method,properties,body):
        # 主动抛出异常
        raise TypeError
        print("消费者接收了消息:%r"%body.decode("utf8"))
        # 告诉消息队列,我已经确认收到消息了
        ch.basic_ack(delivery_tag=method.delivery_tag)
    # 有消息来时,立即执行callbak。
    channel.basic_consume("test_queue",callbak,auto_ack=False)
    # 等待接收消息
    channel.start_consuming()

    这样在消费者没有确认的情况下,消息队列中的消息是不会被删除的。如下;

    至于是选择auto_ack确认机制还是使用auto_ack不确认机制,还需要你根据实际情况来定。

    9、消息队列的持久化

    我们上面的配置如果rabbitMQ重启服务,或者系统重启都会导致队列里的消息被清除掉。下面我们修改生产者的配置文件使消息队列持久化,即使重启服务,消息队列里的消息也不会被清除。

    [root@localhost rabbitMQ]# vim producer.py 
    #!/usr/bin/env python import pika # 创建凭证,使用rabbitmq用户密码登录 credentials = pika.PlainCredentials("rabbit_user","123.com") # 新建连接,这里localhost可以更换为服务器ip connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.10.10',credentials=credentials)) # 创建通道 channel = connection.channel() # 在通道内声明一个队列,用于接收消息,队列名字叫“test_queue”,durable=True表示持久化队列 channel.queue_declare(queue='delivery_queue',durable=True) # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),我们暂且将(exchange=''), # 它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据 while True: content = input("生产者输入数据>>>") if content.upper() == "Q": break channel.basic_publish(exchange='', routing_key='delivery_queue', body=content, properties=pika.BasicProperties(delivery_mode=2)) # 2代表的是持久化队列 print("已经发送了消息") # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接 connection.close()

    常用命令

    // 新建用户
    rabbitmqctl add_user {用户名} {密码}
    ​
    // 设置权限
    rabbitmqctl set_user_tags {用户名} {权限}
    ​
    // 查看用户列表
    rabbitmqctl list_users
    ​
    // 为用户授权
    添加 Virtual Hosts :    
    rabbitmqctl add_vhost <vhost>// 删除用户
    rabbitmqctl delete_user Username
    ​
    // 修改用户的密码
    rabbitmqctl change_password Username Newpassword
        
    // 删除 Virtual Hosts :    
    rabbitmqctl delete_vhost <vhost>    
        
    // 添加 Users :    
    rabbitmqctl add_user <username> <password>    
    rabbitmqctl set_user_tags <username> <tag> ...    
    rabbitmqctl set_permissions [-p <vhost>] <user> <conf> <write> <read>    
        
    // 删除 Users :    
    delete_user <username>// 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
    rabbitmqctl  set_permissions -p vhost1 user1 '.*' '.*' '.*'// 查看权限
    rabbitmqctl list_user_permissions user1
    ​
    rabbitmqctl list_permissions -p vhost1
    ​
    // 清除权限
    rabbitmqctl clear_permissions [-p VHostPath] User
    ​
    rabbitmqctl reset           // 重启应用
    rabbitmqctl stop_app        // 关闭应用
    rabbitmqctl start_app       // 启动应用
    rabbitmqctl list_queues     // 查看队列
    ​rabbitmqctl list_exchanges  // 查看exchangelist
    rabbitmqctl list_queues     // 查看所有queue
    rabbitmqctl list_users      // 查看所有用户
    rabbitmqctl list_bindings   // 查看所有绑定exchange和queued 消息
    rabbitmqctl list_queues name messages_ready messages_unacknowledged  // 查看消息确认
    rabbitmqctl status   // 查看rabbitMQ的状态信息。
    
    
  • 相关阅读:
    02-30 线性可分支持向量机
    02-28 scikit-learn库之线朴素贝叶斯
    02-27 朴素贝叶斯
    02-26 决策树(鸢尾花分类)
    047 选项模式
    第二节:师傅延伸的一些方法(复习_总结)
    第一节:登录流程
    第一节:对应拼音编码查询(后续更新)
    前端对象
    Form表单
  • 原文地址:https://www.cnblogs.com/caesar-id/p/10864258.html
Copyright © 2011-2022 走看看