zoukankan      html  css  js  c++  java
  • rabbitmq消息队列

    rabbitmaq的安装使用

    • 通过阿里云yum源, 在epel园中有这个rabbitmq
      • yum install rabbitmq-server erlang -y
    • 启动rabbitmq-server
      • systemclt start rabbitmq-server
    • 开启后台管理界面
      • rabbitmq-plugins enable rabbitmq_management
    • 创建rabbitmq的账号密码
      • rabbitmqctl add_user zhangjian 123
    • 设置用户为管理员
      • sudorrabbitmqctl set_user_tags zhangjian  administrator
    • 设置用户有权限访问所有队列
      • 语法:rabbitmqctl set_permissions -p "/" zhangjian ".*" ".*" ".*"
    • 重启rabbitmq服务端, 让用户生效
      • systemctl restart rabbitmq-server
    • 访问web管理界面, 登录, 查看队列消息
      • http://localhost:15672//#queues
    • 用python操作rabbitmq, 实现生产消费这模型
      • 安装皮卡模块,m模块版本需要指定,y引文代码参数发生了变化
        • pip3 install -i https://pypi.douban.com/simple pika=0.13.1

     python操作rabbitmq队列python代码:

    1.生产者的代码
            #!/usr/bin/env python3
            import pika
            # 创建凭证,使用rabbitmq用户密码登录
            # 去邮局取邮件,必须得验证身份
            credentials = pika.PlainCredentials("selfju","cxk")
            # 新建连接,这里localhost可以更换为服务器ip
            # 找到这个邮局,等于连接上服务器
            connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
            # 创建频道
            # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个连接
            channel = connection.channel()
            # 声明一个队列,用于接收消息,队列名字叫“水许传”
            channel.queue_declare(queue='水许传')
            # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=''),它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据
            channel.basic_publish(exchange='',
                              routing_key='水许传',
                              body='武大郎出摊卖烧饼了')
            print("已经发送了消息")
            # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
            connection.close()
    
    
        2.消费者的代码
            import pika
            # 建立与rabbitmq的连接
            credentials = pika.PlainCredentials("selfju","cxk")
            connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
            channel = connection.channel()
            channel.queue_declare(queue="水许传")
    
            def callbak(ch,method,properties,body):
                print("消费者接收到了数据:%r"%body.decode("utf8"))
            # 有消息来临,立即执行callbak,没有消息则夯住,等待消息
            # 老百姓开始去邮箱取邮件啦,队列名字是水许传
            channel.basic_consume(callbak,queue="水许传",no_ack=True)
            # 开始消费,接收消息
            channel.start_consuming()
    
    
        
        3.消息确认机制的生产者代码
            #!/usr/bin/env python3
            import pika
            # 创建凭证,使用rabbitmq用户密码登录
            # 去邮局取邮件,必须得验证身份
            credentials = pika.PlainCredentials("selfju","cxk")
            # 新建连接,这里localhost可以更换为服务器ip
            # 找到这个邮局,等于连接上服务器
            connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
            # 创建频道
            # 建造一个大邮箱,隶属于这家邮局的邮箱,就是个连接
            channel = connection.channel()
            # 新建一个hello队列,用于接收消息
            # 这个邮箱可以收发各个班级的邮件,通过
    
            channel.queue_declare(queue='西游记')
            # 注意在rabbitmq中,消息想要发送给队列,必须经过交换(exchange),初学可以使用空字符串交换(exchange=''),它允许我们精确的指定发送给哪个队列(routing_key=''),参数body值发送的数据
            channel.basic_publish(exchange='',
                                  routing_key='西游记',
                                  body='大师兄,师傅被蔡许坤抓走了')
            print("已经发送了消息")
            # 程序退出前,确保刷新网络缓冲以及消息发送给rabbitmq,需要关闭本次连接
            connection.close()
    
        4.消息确认机制的消费者代码
            import pika
    
            credentials = pika.PlainCredentials("selfju","cxk")
            connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
            channel = connection.channel()
    
            # 声明一个队列(创建一个队列)
            channel.queue_declare(queue='西游记')
    
            def callback(ch, method, properties, body):
                print("消费者接受到了任务: %r" % body.decode("utf-8"))
                # int('asdfasdf')
                # 我告诉rabbitmq服务端,我已经取走了消息
                # 回复方式在这
                ch.basic_ack(delivery_tag=method.delivery_tag)
                
            # 关闭no_ack,代表给与服务端ack回复,确认给与回复
            channel.basic_consume(callback,queue='西游记',no_ack=False)
    
            channel.start_consuming()
    
        5.支持持久化的队列和消息
            1.生产者的代码
            
            import pika
            # 有密码
            credentials = pika.PlainCredentials("selfju","cxk")
            connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
            channel = connection.channel()
    
            # 声明一个队列(创建一个队列)
            # 默认此队列不支持持久化,如果服务挂掉,数据丢失
            # durable=True 开启持久化,必须新开启一个队列,原本的队列已经不支持持久化了
            '''
            实现rabbitmq持久化条件
             delivery_mode=2
            使用durable=True声明queue是持久化
             
            '''
            channel.queue_declare(queue='LOL',durable=True)
            channel.basic_publish(exchange='',
                                  routing_key='LOL', # 消息队列名称
                                  body='我用双手成就你的梦想',
                                  # 支持数据持久化
                                  properties=pika.BasicProperties(
                                      delivery_mode=2,#代表消息是持久的  2
                                  )
                                  )
            connection.close()
        
        6.持久化的消费者代码
        
    import pika
    credentials = pika.PlainCredentials("selfju","cxk")
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.16.142',credentials=credentials))
    channel = connection.channel()
    # 确保队列持久化
    channel.queue_declare(queue='LOL',durable=True)
    
    '''
    必须确保给与服务端消息回复,代表我已经消费了数据,否则数据一直持久化,不会消失
    '''
    def callback(ch, method, properties, body):
        print("消费者接受到了任务: %r" % body.decode("utf-8"))
        # 模拟代码报错
        # int('asdfasdf')    # 此处报错,没有给予回复,保证客户端挂掉,数据不丢失
       
        # 告诉服务端,我已经取走了数据,否则数据一直存在
        ch.basic_ack(delivery_tag=method.delivery_tag)
    # 关闭no_ack,代表给与回复确认
    channel.basic_consume(callback,queue='LOL',no_ack=False)
    channel.start_consuming()

            

  • 相关阅读:
    第六章 实验报告(函数与宏定义)
    第三次实验报告
    第五章 循环结构课后反思
    第二次实验报告
    第一次实验报告
    第一次作业
    第九章 结构体与共用体
    第八章 指针实验
    第7章 数组实验
    第六章 函数和宏定义实验(2)
  • 原文地址:https://www.cnblogs.com/zhangjian0092/p/11704392.html
Copyright © 2011-2022 走看看