zoukankan      html  css  js  c++  java
  • RabbitMQ学习整理笔记一

    前 言 -解决问题

      一、RabbitMQ安装  

    1、安装erlang 环境 

       a.下载erlang 版本,注意这里需要和安装的rabbitMq版本相配对,rabbitMQ官方网站上可以查到:https://www.rabbitmq.com/which-erlang.html , 先下载erlang安装rpm包,地址:https://www.rabbitmq.com/releases/erlang/ ,这里下载的是:erlang-18.3-1.el6.x86_64.rpm 

       b.将安装包拷到虚拟机中进行安装 

       输入安装命令:rpm -ivh erlang-18.3-1.el6.x86_64.rpm 

       查看erl版本号:erl -version 

     2、安装socat环境

      a.下载socat包,socat是一个多功能的网络工具,netcat++ ,这里下载centos 64位版本:http://repo.iotti.biz/CentOS/6/x86_64/ ,这里选的是:socat-1.7.3.2-1.el6.lux.x86_64.rpm

      b.将安装包拷到虚拟机中进行安装

       输入安装命令:rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm

       查看erl版本号:socat -Version 

     

     3、安装rabbitMQ

      a.下载rabbitMQ官方安装包:https://www.rabbitmq.com/releases/

    rabbitmq-server/,这里下载的是:rabbitmq-server-3.6.5-1.noarch.rpm

      b.将安装包拷到虚拟机中进行安装

       输入安装命令:rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm

     

     c.启用rabbitMQ浏览器管理端

     

      d.启动rabbitMQ ,查到rabbitMQ 安装路径,输入命令启动

       ./rabbitmq-server start  

     输入ip:port 可以看到如下界面:

     输入guest/guest 账号/密码 登录管理端,如下界面:

     解决用户名不能登录问题:

    rabbitmq的配置文件目录 /etc/rabbitmq/下创建一个rabbitmq.config文件。文件中添加如下配置(请不要忘记那个“.”):[{rabbit, [{loopback_users, []}]}].

    4.安装rabbitMQ参考相关博客

       https://www.cnblogs.com/rmxd/p/11583932.html 

       https://blog.csdn.net/doubleqinyan/article/details/81081673  

    二、RabbitMQ模式学习

      RabbtMQ 官网有RabbitMQ各模式详细资料,笔记有不清楚的地方,请直接参考官网:

    https://www.rabbitmq.com/tutorials/tutorial-one-java.html 

    1、简单队列模式 

    P:消息生产者,可以业务,代码等产生的消息或通知,需要处理或存储

    C:消息消费者,需要处理消息或根据业务存储完成相关逻辑

    RabbitMq:红色部分消息队列,生产者将消息发送给队列,生产者继续往下处理自已的逻辑,消息者获取该消息,完成自身逻辑,这些逻辑与生产者相关却不影响生产者程序逻辑

       a.spring boot 集成rabbitMQ ,主要步骤如下图:

     配置文件如下: 

    server.port=8090
    rabbitmq.host=192.168.137.3
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    rabbitmq.virtualHost=/
    

    b. 编写java生产者代码 

       /**
         * 生产者发送消息方法
         *
         * @return
         */
        @ApiOperation("生产者发送消息方法")
        @GetMapping("/SendMsg")
        public String SendMsg() {
            try {
                //创建连接
                Connection connection = connectionFactory.newConnection();
                //创建通道
                Channel channel = connection.createChannel();
                //定义队列
                channel.queueDeclare(queueName, false, false, false, null);
                String msg = "this is a message";
                //第一个参数为交换机,为空,说明是匿名转发
                channel.basicPublish("", queueName, null, msg.getBytes("Utf-8"));
                //关闭通道与连接
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "ok";
    }
    

      c. 编写java消费者代码 

       /**
         * 消费者接收消息方法
         *
         * @return
         */
        @ApiOperation("消费者接收消息方法")
        @GetMapping("/ReciveMsg")
        public String ReciveMsg() {
            try {
                //创建连接
                Connection connection = connectionFactory.newConnection();
                //创建通道
                Channel channel = connection.createChannel();
                //声明队列 第二个参数durable为是否可持久化,若为true 则支持持久化
                channel.queueDeclare(queueName, false, false, false, null);
                //定义消费者
                DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String reciveMsg = new String(body, "utf-8");
                        System.out.println("Simple Rec: " + reciveMsg);
                    }
                };
                //监听队列
                channel.basicConsume(queueName, true, defaultConsumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "ok";
        }
    

     验证代码:

          调用生产者发送消息,再使用消费者读取消息 ,通过swagger 调用发送消息,并查看rabbitMQ中是否收到消息,如下图: 

     RabbitMQ 情况: 

     调用消费者处理消息: 

     此时再查看RabbitMQ,消息已被处理

     d.RabbitMQ 注入类 

    @Configuration
    @ConfigurationProperties(prefix = "rabbitmq")
    public class RabbitMQConfig {
        private String host;
        private int port;
        private String username;
        private String password;
        private String virtualHost;
        @Bean
        public ConnectionFactory getConnectionFactory() {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(host); //设置mq地址
            connectionFactory.setPort(port); //设置端口号
            connectionFactory.setUsername(username); //设置用户名
            connectionFactory.setPassword(password); //设置密码
            connectionFactory.setVirtualHost(virtualHost);
            return connectionFactory;
        }
        public String getVirtualHost() {
            return virtualHost;
        }
        public void setVirtualHost(String virtualHost) {
            this.virtualHost = virtualHost;
        }
        public String getHost() {
            return host;
        }
        public void setHost(String host) {
            this.host = host;
        }
        public int getPort() {
            return port;
        }
        public void setPort(int port) {
            this.port = port;
        }
        public String getUsername() {
            return username;
        }
        public void setUsername(String username) {
            this.username = username;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
    } 
    

        更多参考官网:https://www.rabbitmq.com/tutorials/tutorial-one-java.html  

    2、工作队列 

     工作队列:生产者生产消息发送给队列,队列轮询被多个消费者消费,一般情况下每个消费者处理的消息是一样的   

      a.简单修改简单队列代码,完成工作队列java案例,循环发送50次: 

    /**
     * 二、work_queue 工作模型
     *  1、与简单模型编写一样,只是多了一个消费者
     *  2、一个生产者被二个消费者消费,并且均分处理消息,是一种轮询分发方式
     *  3、round-robin 方式
     *  4、不足之处在于,处理能力强的消费者会有空隙时间
     */
    @Api(value = "轮询分发", description = "轮询分发")
    @RestController
    public class WorkQueueController {
        /**
         * 连接工厂对象
         */
        @Autowired
        ConnectionFactory connectionFactory;
        /**
         * 队列名称
         */
        String queueName = "queue_work_learn";
    
        @ApiOperation("生产者发送消息方法")
        @GetMapping(value = "/sendWorkQueue")
        public String sendWorkQueue() {
            try {
                //创建连接
                Connection connection = connectionFactory.newConnection();
                //创建一个通道
                Channel channel = connection.createChannel();
                //声明队列
                channel.queueDeclare(queueName, false, false, false, null);
                //消息体
                String bodyMsg = "this is a message NO:";
                for (int i = 0; i < 50; i++) {
                    channel.basicPublish("", queueName, null, (bodyMsg + i).getBytes("utf-8"));
                }
                //关闭队列
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "ok";
        }
    
        @ApiOperation("消费者消息方法一")
        @GetMapping(value = "/reciveWorkOne")
        public String reciveWorkOne() {
            try {
                Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                boolean autoAck=true;
                channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("recive 1 :" + new String(body, "utf-8"));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "ok";
        }
    
        @ApiOperation("消费者消息方法二")
        @GetMapping(value = "/reciveWorkTwo")
        public String reciveWorkTwo() {
            try {
                Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(queueName, false, false, false, null);
                boolean autoAck=true;
                channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("recive 2 :" + new String(body, "utf-8"));
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            return "ok";
        }
    }

     b. swagger显示如下: 

     

     可以看出,消费者一处理偶数,消费者二处理基数,实现了轮询分发消息,但这样会出现一个消费者处理能力强,一个消费者处理能力弱,RabbitMQ 本身不知道谁的处理能力更强,公平分发就是能者多劳的模式,

    3、公平分发工作队列 

     消费者需要手动通知队列,我已经处理完毕了,请再来一个,把自动应答ACK设为手动,并限定每次发送一个

     a.简单修改java代码完成公平分发案例: 

    b.更多学习参考官网

        https://www.rabbitmq.com/tutorials/tutorial-two-java.html 

  • 相关阅读:
    Oracle死锁只会回滚跟死锁有关的那条SQL,而不会回滚整个事务
    Mysql安装过程(linux:2.6.18-194.el5,Mysql:)
    格式化分区,报/dev/sdb1 is apparently in use by the system; will not make a filesystem here!
    安装Oracle 11gR2,报错:[INS-06101] IP address of localhost could not be determined
    mysql-5.5.25-winx64在win7 x64 免安装配置
    insert遭遇阻塞
    linux中轻松使用backspace和上下按键
    排序
    MySQL
    基于Boost的同步TCP通信
  • 原文地址:https://www.cnblogs.com/xin_ny/p/12993739.html
Copyright © 2011-2022 走看看