zoukankan      html  css  js  c++  java
  • 高性能RabbitMQ

    1,什么是RabbitMq

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

    百度百科 ,RabbitMQ 官网  AMQP 协议

    2,几种MQ对比

    RabbitMQ 是用Erlang 语言进行开发的,一款设计之初就是抗高并发的语言

    3,RabbitMQ 安装

    1.下载并安装erlang,下载地址:http://www.erlang.org/download
    2.配置erlang环境变量信息
      新增环境变量ERLANG_HOME=erlang的安装地址
      将%ERLANG_HOME%in加入到path中
    3.下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html
    
    
    注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。

      RabbitMQ 管理平台地址 http://127.0.0.1:15672

      默认账号:guest/guest 用户可以自己创建新的账号

     

     https://blog.csdn.net/qq_35098526/article/details/80009424 安装之后启动不了,可以在sbin 里面:

     输入:rabbitmq-plugins enable rabbitmq_management  (先定位到rabbitmq安装目录)命令,出现plugins安装成功的提示。

    过程:

    Microsoft Windows [Version 10.0.17134.950]
    C:Program FilesRabbitMQ Server>
    C:Program FilesRabbitMQ Server>cd rabbitmq_server-3.7.8
    C:Program FilesRabbitMQ Server
    abbitmq_server-3.7.8>cd  sbin
    C:Program FilesRabbitMQ Server
    abbitmq_server-3.7.8sbin>rabbitmq-plugins enable rabbitmq_management
    Enabling plugins on node rabbit@DESKTOP-2MDM24J:
    rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@DESKTOP-2MDM24J...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    
    started 3 plugins.

    4,RabbitMQ 五种队列形式

        1.点对点队列,也可以叫做简单队列

          生产者投递的消息,每次只准一个消费者来消费,如果消费者集群的话,消息会被均摊。

          例如:50 个消息,2个消费者,消费者1会消费奇数,消费者2会消费偶数,两个消费者不受影响,各自消费各自的消息

          producer:

    public class Producer {
    
        private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = MQConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 50; i++) {
                String msg = "Hello, World :" + i;
                System.out.println(msg);
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            connection.close();
        }
    
    }

    consumer1:

    public class Consumer {
    
        private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("consumer1");
            Connection connection = MQConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                }
            };
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自动签收的应答模式
        }
    
    }

    consumer2:

    public class Consumer2 {
    
        private static final String QUEUE_NAME = "rabbitmq_simple_queue_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("consumer2");
            Connection connection = MQConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME, true, consumer); //true 代表采用自动签收的应答模式
        }
    
    }

      2,工作队列模式,也可以叫做公平队列模式

           点对点简单队列弊端:消费者集群的话,消息会被均摊处理,但是不同的消费者处理消息的能力是不同的,consumer1 每秒处理1个消息,consumer2 美妙处理3个消息,如果消息均摊,consumer1的效率则被浪费。

           公平消费模式:谁处理的快,并且采用手动签收,告知RabbitMQ之后,RabbitMQ 再给分发消息。这样,谁处理的快,谁就会处理的多。

           

    producer:

    public class Producer {
        // 公平队列名称
        private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = MQConnectionUtils.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 保证消费者只能取一个/每次
            channel.basicQos(1); //每次只给消费者1条消息,等消费完成,手动ack 应答之后,再给下一条
            for (int i = 0; i < 50; i++) {
                String msg = "Hello, World: " + i;
                System.out.println(msg);
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            connection.close();
    
        }
    
    }

    consumer1:

    public class Consumer {
    
        private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            
            System.out.println("consumer01");
            Connection connection = MQConnectionUtils.getConnection();
            // 创建通道
            final Channel channel = connection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(200); 
                    } catch (Exception e) {
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME, false, consumer);    //false 代表使用手动消息应答,需要使用channel.basicAck(envelope.getDeliveryTag(),false) 告知消息中间件
        }
    
    }

    consumer2:

    public class Consumer2 {
        
        private static final String QUEUE_NAME = "rabbitmq_fair_queue_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("consumer02");
            Connection connection = MQConnectionUtils.getConnection();
            // 创建通道
            final Channel channel = connection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            channel.basicQos(1);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                         Thread.sleep(1000); //让这个消费者处理消息的能力更差一点
                    } catch (Exception e) {
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    
    }

       3,发布订阅模式,采用fanout 扇形交换机,

            高级队列模式中,有交换机,生产者将消息发给交换机,在根据交换机的类型,发给定的的队列,然后发给指定的消费者消费

      producer:

    public class Producer {
    
        // 定义交换机名称
        private static final  String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
        // 定义交换机类型
        private static final  String EXCHANGE_TYPE = "fanout";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 和rabbitmq 建立连接
            Connection connection = MQConnectionUtils.getConnection();
            // 创建channel
            Channel channel = connection.createChannel();
            // 创建交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
            
            String message = "pub/sub";
            
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            
            channel.close();
            
            connection.close();
    
        }
    
    }

    邮件消费者:

    ublic class EmailConsumer {
    
        private static final String QUEUE_NAME = "rabbitmq_pubsub_email_queue_one";
        private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            
            System.out.println("邮件消费者。。。");
    
            Connection connection = MQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 将队列和交换机进行绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息 :" + msg);
                }
            };
            // 消费者监听队列消息  true 代表自动签收
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

    短信消费者:

    // 信息消费者
    public class TextConsumer {
        private static final String QUEUE_NAME = "rabbitmq_pubsub_text_queue_one";
        private static final String EXCHANGE_NAME = "rabbitmq_pubsub_exchanger_one";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            
            System.out.println("短信消费者。。。");
            Connection connection = MQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 将队列和交换机进行绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息 :" + msg);
                }
            };
            // 消费者监听队列消息  true 代表自动签收
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

    4,路由模式:采用direct 交换机

    producer:

    public class Producer {
        // 定义交换机名称
        private static final  String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
        // 定义交换机类型
        private static final  String EXCHANGE_TYPE = "direct";
        // 定义路由
        private static final String ROUTINGKEY = "info";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 和rabbitmq 建立连接
            Connection connection = MQConnectionUtils.getConnection();
            // 创建channel
            Channel channel = connection.createChannel();
            // 创建交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
            
            String message = "pub/sub";
            
            channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
            
            channel.close();
            
            connection.close();
    
        }
    
    }

    邮件消费者:

    public class EmailConsumer {
    
        private static final String QUEUE_NAME = "rabbitmq_direct_email_queue_one";
        private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
        private static final String ROUTINGKEY_INFO = "info";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            
            System.out.println("邮件消费者。。。");
    
            Connection connection = MQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 将队列和交换机进行绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息 :" + msg);
                }
            };
            // 消费者监听队列消息  true 代表自动签收
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

    短信消费者:

    public class TextConsumer {
        private static final String QUEUE_NAME = "rabbitmq_direct_text_queue_one";
        private static final String EXCHANGE_NAME = "rabbitmq_direct_exchanger_one";
        // 设定路由
        private static final String ROUTINGKEY_INFO = "info";
        private static final String ROUTINGKEY_WARN = "warn";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            System.out.println("短信消费者。。。");
            Connection connection = MQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 将队列和交换机进行绑定 绑定路由
            //info 和  warn 路由的都能接收到
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_INFO);
    
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY_WARN);
            
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息 :" + msg);
                }
            };
            // 消费者监听队列消息 true 代表自动签收
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

    5,通配符模式,采用topic 交换机  # 代表任意 * 代表一个

    producer:

    public class Producer {
    
        // 定义交换机名称
        private static final  String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
        // 定义交换机类型
        private static final  String EXCHANGE_TYPE = "topic";
        // 定义路由
        private static final String ROUTINGKEY = "routingkey.info.error.warn";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 和rabbitmq 建立连接
            Connection connection = MQConnectionUtils.getConnection();
            // 创建channel
            Channel channel = connection.createChannel();
            // 创建交换机
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
            
            String message = "pub/sub";
            
            channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
            
            channel.close();
            
            connection.close();
    
        }
    
    }

    邮件消费者:

    public class EmailConsumer {
    
        private static final String QUEUE_NAME = "rabbitmq_topic_email_queue_one";
        private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
        private static final String ROUTINGKEY = "routingkey.#";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            
            System.out.println("邮件消费者。。。");
    
            Connection connection = MQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 将队列和交换机进行绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息 :" + msg);
                }
            };
            // 消费者监听队列消息  true 代表自动签收
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

    短信消费者:

    public class TextConsumer {
        private static final String QUEUE_NAME = "rabbitmq_topic_text_queue_one";
        private static final String EXCHANGE_NAME = "rabbitmq_topic_exchanger_one";
        private static final String ROUTINGKEY = "routingkey.info.*";
        // 设定路由
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("短信消费者。。。");
            Connection connection = MQConnectionUtils.getConnection();
    
            Channel channel = connection.createChannel();
            // 定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 将队列和交换机进行绑定 绑定路由
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTINGKEY);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取生产者消息 :" + msg);
                }
            };
            // 消费者监听队列消息 true 代表自动签收
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

         

        

  • 相关阅读:
    测试签名和验证签名
    自定义mssql的CLR函数
    关于C#的Process的内存相关属性解读
    测试C#发送邮件
    关于wmv视频格式
    练习命名管道的使用
    web中局部滚动条
    C#修改文件的安全属性时报“没有可以设置的标志”
    c#的FileSystemWatcher对象监视文件的变化的事件,无休止的触发事件的解决办法
    为什么要给自己设限?
  • 原文地址:https://www.cnblogs.com/pickKnow/p/11424006.html
Copyright © 2011-2022 走看看