zoukankan      html  css  js  c++  java
  • rabbitmq

    1. rabbitmq安装

      • 使用docker搜索、拉取镜像、运行为容器

        docker search rabbitmq
        docker pull rabbitmq   若不指定版本,默认拉取最新的版本
        docker run -d --name rabbit -p 5672:5672 -p 15672:15672 --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 458123c67b79 最后为rabbit的镜像ID
        
    2. RabbitMQ概述

      • 有一些业务逻辑是不能立刻完成,会阻塞程序,类似于发送短信、邮件,这些都需要服务器给第三方发起请求,不能立刻得到结果。这就会造成用户操作过程停滞,所以解决办法就是将这些耗时操作,放到队列中去异步执行。
      • 有三个重要的组成部分,生产者、队列、消费者。生产者就是web后端程序,队列使用rabbitmq,消费者就是向第三方发起请求。
    3. RabbitMQ整体架构

      • rabbitmq-server内部包括两部分:
        • 交换机 将生产者传递过来的消息,根据自身的要求,转发给队列
        • 队列(先进先出) 队列与消费者事先有协议,消费者会从固定的队列中取得要执行的任务。
    4. 生产者的实现

      • 整个过程需要5步
      import pika
      
      # 1. 获得与rabbitmq代理连接对象
      connection_host = '192.168.1.38'
      connection_credentials = pika.PlainCredentials('root', '123')
      connection = pika.BlockingConnection(
          pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
      
      # 2. 通过连接对象,获得channel(管道)对象,用于操作rabbitmq
      channel = connection.channel()
      
      # 3. 创建名字my_queue的消息队列,如果存在不创建
      channel.queue_declare('my_queue')
      
      # 4. 发送消息到rabbitmq中名字为my_queue的队列中
      channel.basic_publish(exchange='', routing_key='my_queue', body='hello word')
      
      # 5. 关闭和rabbitmq代理的连接
      connection.close()
      
      print('消息发送完毕!')
      
    5. 消费者的实现

      • 最后一步开启监听后,队列中一旦加入任务,就会被消费者取走,并执行处理函数。
      import pika
      
      # 1. 获取与rabbitmq的连接对象
      connection_host = '192.168.1.38'
      connection_credentials = pika.PlainCredentials('root', '123')
      connection = pika.BlockingConnection(pika.ConnectionParameters(
          host=connection_host, credentials=connection_credentials))
      
      # 2. 通过连接对象获得channel对象,用于操作rabbitmq
      channel = connection.channel()
      
      # 3. 创建名字为my_queue的消息队列,如果不存在就创建
      channel.queue_declare(queue='my_queue')
      
      
      # 4. 按照rabbitmq要求定义消息处理函数
      def callback(ch, method, properties, body):
          print('接收到的消息是:', body)
          # 此刻接收到的消息是二进制格式
      
      # 5. 关联队列,并设置队列中的消息处理函数
      # channel.basic_consume(callback, queue='my_queue', no_ack=True)  以前版本的写法,后改为下面方式
      channel.basic_consume('my_queue', callback, False)
      
      # 6. 启动并开始处理消息,该程序会一直运行,进行监听
      channel.start_consuming()
      
      
    6. 任务队列

      task.py文件中内容
      
      import time
      
      def send_email():
          print('开始发送邮件')
          time.sleep(3)
          print('邮件发送完毕')
      
      
      def send_message():
          print('开始发送短信')
          time.sleep(3)
          print('短信发送完毕')
          
          
      consumer.py文件中内容
      task_list = {
          'email': task.send_email,
          'message': task.send_message
      }
      # 任务后不可以加(),否则会立即执行
      # 4. 按照rabbitmq要求定义消息处理函数
      def callback(ch, method, properties, body):
          task_name = body.decode()
          # 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
          if task_name not in task_list:
              print('error:{}任务没有注册'.format(task_name))
              return
          task_list[task_name]()
      
      producter.py文件中的内容
      # 4. 发送消息到rabbitmq中名字为my_queue的队列中
      channel.basic_publish(exchange='', routing_key='work_queue', body='message')
      只需要改变body中的内容,就可以生产出不同的任务到队列中去
      
    7. 消息确认机制

      1. 原因:会出现的意外情况:消费者取到任务以后,并未执行完成任务,就死了。

      2. rabbitmq默认会在将消息发送给消费者以后,会将任务从队列中删掉。

      3. 使用消息确认机制,若消费者意外死亡,则不能给队列反馈,队列就不会删除被该消费者取走的任务。

      4. 实现方法:

        def callback(ch, method, properties, body):
            task_name = body.decode()
            # 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
            if task_name not in task_list:
                print('error:{}任务没有注册'.format(task_name))
                return
            task_list[task_name]()
            # 消息处理完成后,确认消息
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
           
        channel.basic_consume('work_queue', callback, True)
        
    8. 循环调度机制

      1. 默认是循环调度机制,就是消费之轮流去队列中取任务
    9. 公平调度机制---在consumer文件中设置公平调度

      1. 消息确认机制打开

      2. 设置公平调度机制,消费者不确认,就不要再给该消费之任务了,因为他目前的耗时任务还在执行,可以把任务给其他已经确认了的消费者。

        在消费者中设置公平调度机制
        channel.basic_qos(prefetch_count=1)
        
    10. 队列及其中的消息持久化---在product文件中设置队列及消息持久化

      1. 重启rabbitmq服务器会使队列和任务消失

      2. 解决方法:

        • 在product文件中,生成队列的代码中加参数

          # 3. 创建名字my_queue的消息队列,如果存在不创建
          channel.queue_declare('work_queue', durable=True)
          durable=True就可以保持,队列的持久化
          
          # 4. 发送消息到rabbitmq中名字为my_queue的队列中
          channel.basic_publish(exchange='', routing_key='work_queue', body='message', properties=pika.BasicProperties(delivery_mode=2))
          # properties=pika.BasicProperties(delivery_mode=2)可以保持消息持久化
          
          
    11. 交换机的三种模式

      生产者
      import pika
      
      # 1. 获得与rabbitmq代理连接对象
      connection_host = '192.168.1.38'
      connection_credentials = pika.PlainCredentials('root', '123')
      connection = pika.BlockingConnection(
          pika.ConnectionParameters(host=connection_host, credentials=connection_credentials))
      
      # 2. 通过连接对象,获得channel(管道)对象,用于操作rabbitmq
      channel = connection.channel()
      
      # 3. 创建名字my_queue的消息队列,如果存在不创建
      channel.queue_declare('work_queue', durable=True)
      
      # 4. 发送消息到rabbitmq中名字为my_queue的队列中
      channel.basic_publish(exchange='', routing_key='work_queue', body='message', properties=pika.BasicProperties(delivery_mode=2))
      # 只要生产者,发送不同的body,就会消费者中的处理函数,就会调用不同的task
      
      # 5. 关闭和rabbitmq代理的连接
      connection.close()
      
      print('消息发送完毕!')
      
      
      
      消费者
      import pika
      import task
      
      task_list = {
          'email': task.send_email,
          'message': task.send_message
      }
      # 任务后不可以加(),否则会立即执行
      
      # 1. 获取与rabbitmq的连接对象
      connection_host = '192.168.1.38'
      connection_credentials = pika.PlainCredentials('root', '123')
      connection = pika.BlockingConnection(pika.ConnectionParameters(
          host=connection_host, credentials=connection_credentials))
      
      # 2. 通过连接对象获得channel对象,用于操作rabbitmq
      channel = connection.channel()
      
      # 3. 创建名字为my_queue的消息队列,如果不存在就创建
      channel.queue_declare(queue='work_queue')
      
      
      # 4. 按照rabbitmq要求定义消息处理函数
      def callback(ch, method, properties, body):
          task_name = body.decode()
          # 判断生产者发送的消息是否在消费者中注册过,如果没有注册过就提示错误
          if task_name not in task_list:
              print('error:{}任务没有注册'.format(task_name))
              return
          task_list[task_name]()
          # 消息处理完成后,确认消息
          ch.basic_ack(delivery_tag=method.delivery_tag)
      
      # 设置公平调度机制
      channel.basic_qos(prefetch_count=1)
      
      # 5. 关联队列,并设置队列中的消息处理函数
      # channel.basic_consume(callback, queue='my_queue', no_ack=True)  以前版本的写法,后改为下面方式
      channel.basic_consume('work_queue', callback, False)
      
      # 6. 启动并开始处理消息
      channel.start_consuming()
      
      
      任务
      import time
      
      def send_email():
          print('开始发送邮件')
          time.sleep(1)
          print('邮件发送完毕')
      
      
      def send_message():
          print('开始发送短信')
          time.sleep(1)
          print('短信发送完毕')
      
  • 相关阅读:
    通过16道练习学习Linq和Lambda
    sql server 2000,一个数据库最多能建多少张表,每张表最多能建多少个字段?
    通过EPPlus导出Excel文件
    SQLServer找出执行慢的SQL语句
    C# where用法
    双击桌面Internet Explorer图标时创建快捷方式
    xxxx.accessor: The reference to 'xxxx' was not found in the list of this projects references
    .NET设计模式(3): 抽象工厂模式
    关于Windows Installer的一些知识点
    详细介绍"使用DB Attach的方式来升级MOSS2007中SSP的user profile和mysite至SharePoint 2010"的文章
  • 原文地址:https://www.cnblogs.com/hui-code/p/12048775.html
Copyright © 2011-2022 走看看