zoukankan      html  css  js  c++  java
  • RabbitMq

    1、MQ介绍

    1.1 什么是MQ

      消息队列是一种通过典型的生产者和消费者模型,生产者向消息队列发送消息,消费者从消息队列获取消息。进行异步的发送和接受,实现系统解耦的方案。也较消息中间件。
    


    1.2 常见MQ

      常见的mq如下:
      * ActiveMq
      * Rabbitmq
      * Kafka
      * RocketMq
    

    1.3 不同MQ的特点


    2、RabbitMq介绍

    2.1 RabbitMq

      RabbitMq是基于AMQP协议,erlang语言开发,是部署最广泛的消息中间件之一。
    

    AMQP协议

    2.2 RabbitMq安装


    也可以使用yum安装


    当使用3.8时,没有config文件,我们可以新建一个用户赋予administrator权限,可以在非localhost地方访问。
    步骤4:执行下面的命令创建一个用户
    rabbitmqctl add_user 用户名 密码

    步骤5:执行下面的命令设置用户为超级管理员。
    rabbitmqctl set_user_tags 用户名 administrator

    以上操作完成重启 service rabbitmq-server restart
    用新账号登录即可.

    2.3 RabbitMq管理命令行

    2、rabbitMq消息模型


    rabbitmq中的虚拟主机相当于数据库的database,web应用的/,是一个server下提供的不同区域,相互不干扰。

    2.1 基础的发送接受编码

    • 直连型
      发送
      ` @Test
      public void SendMessage() throws IOException, TimeoutException {

        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接的主机
        connectionFactory.setHost("192.168.1.104");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置访问的虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置用户、密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("ems");
      
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定消息队列
        //参数: 1 队列名称,没有则创建 ;
        // 2 定义是否持久化,true为需要持久化;
        // 3 是否为独占队列,true代表不可以被其余连接使用,
        // 4 是否在消费完成后删除,true为是
        // 5 附加参数
        channel.queueDeclare("hello",false,false,false,null);
      
      
        //发布消息
        //1 交换机名称
        //2 队列名称
        //3 额外设置
        //4 消息具体内容
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
      
        channel.close();
        connection.close();`
      

    接受
    ` public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    //创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    //设置连接的主机
    connectionFactory.setHost("192.168.1.104");
    //设置端口号
    connectionFactory.setPort(5672);
    //设置访问的虚拟主机
    connectionFactory.setVirtualHost("/ems");
    //设置用户、密码
    connectionFactory.setUsername("ems");
    connectionFactory.setPassword("ems");

        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        //通道绑定消息队列
        //参数: 1 队列名称,没有则创建 ;
        // 2 定义是否持久化,true为需要持久化;
        // 3 是否为独占队列,true代表不可以被其余连接使用,
        // 4 是否在消费完成后删除,true为是
        // 5 附加参数
        channel.queueDeclare("hello",false,false,false,null);
    
        //消费消息
        //1 消费的队列名称
        //2 消息的自动确认机制
        //3 消费时的回调接口
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body"+new String(body));
    
            }
        });
    
        Thread.sleep(3000);
        channel.close();
        connection.close();
    }`
    


    持久化改为true后队列没有丢失,消息丢失了?

    • work-queue模型

      一个生产者,多个消费者的模式
      ` public static void main(String[] args) throws Exception {
      Connection connection = RabbitMqUtils.getConnection();
      Channel channel = connection.createChannel();

        //一次消费一条
        channel.basicQos(1);
        channel.queueDeclare("work",true,false,false,null);
      
        channel.basicConsume("work",false,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1====" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);
               /* try {
                    Thread.sleep(1000);
                }catch (Exception e){
                    e.printStackTrace();
                }*/
            }
        });
      

      }`
      消息手动确认

    • Fanout模式、广播订阅模式

      生产者
      ` public static void main(String[] args) throws IOException, TimeoutException {
      Connection connection = RabbitMqUtils.getConnection();
      Channel channel = connection.createChannel();

        //定义交换机
        channel.exchangeDeclare("logs","fanout");
      
        //发布消息
        channel.basicPublish("logs","",null,"fanout message".getBytes());
        channel.close();
        connection.close();
      

      }消费者 public static void main(String[] args) throws IOException {
      Connection connection = RabbitMqUtils.getConnection();
      Channel channel = connection.createChannel();

        //绑定交换机,不是必要的
        channel.exchangeDeclare("logs","fanout");
      
        String queue = channel.queueDeclare().getQueue();
        //绑定队列
        channel.queueBind(queue,"logs","");
      
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2====" + new String(body));
            }
        });
      

      }`

    • 路由模型
      1、直连模型

      2、Topic模型

    发送
    ` public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
    
        //申明交换机为topic类型
        channel.exchangeDeclare("topics","topic");
    
        String key = "user.haha.xjtu";
        channel.basicPublish("topics",key,null,"routingKey gag Message".getBytes());
    
        channel.close();
        connection.close();
    }
    

    接受 public static void main(String[] args) throws Exception {

        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
    
        channel.exchangeDeclare("topics","topic");
    
        String queue = channel.queueDeclare().getQueue();
    
        channel.queueBind(queue,"topics","user.*");
    
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1====" + new String(body));
            }
        });
    }`
    

    3 springboot整合rabbitmq

      ![](https://img2020.cnblogs.com/blog/1614406/202102/1614406-20210201201735719-1793305655.png)
    

    4 MQ应用场景

    1 异步

    2 解耦

    3 削峰

    5 rabbitMQ集群

    5.1 普通集群


    当集群中某一时刻master节点宕机,可以对queue中的信息进行备份。从节点不能自动升级为主节点

    5.2 镜像集群

  • 相关阅读:
    每日一篇文献:Robotic pick-and-place of novel objects in clutter with multi-affordance grasping and cross-domain image matching
    每日一篇文献:Intuitive Bare-Hand Teleoperation of a Robotic Manipulator Using Virtual Reality and Leap Motion
    每日一篇文献:Virtual Kinesthetic Teaching for Bimanual Telemanipulation
    HEBI Robotic Arm VR Teleoperation
    「iQuotient Case」AR device teleoperated robotic arm
    VR and Digital Twin Based Teleoperation of Robotic Arm
    HEBI Robotic Arm VR Teleoperation
    Human Robot Interaction
    Immersive Teleoperation Project
    机器人演示学习
  • 原文地址:https://www.cnblogs.com/baldprogrammer/p/14354207.html
Copyright © 2011-2022 走看看