zoukankan      html  css  js  c++  java
  • RabbitMQ

    消息队列RabbitMQ

    ​ 消息队列是程序之间的通信方法,无需即时返回而且耗时的操作进行异步处理,从而提高系统的吞吐量,可以实现程序之间的解耦。

    1. 安装软件:注意RabbitMQ和erlang的版本需要对应,要以管理员的身份运行。

    2. 创建工程:添加amqp-client客户端依赖。

    3. 生产者发送消息到RabbitMQ队列,消费者从消息对列中获取消息,步骤如下:

      • 将创建连接工厂的方法抽取为一个工具类

        public class ConnectionUntil {
        
            public static Connection getConnection() throws Exception {
                //创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //设置主机
                factory.setHost("localhost");
                //设置端口号
                factory.setPort(5672);
                //设置虚拟主机
                factory.setVirtualHost("/simple_test");
                //设置用户名
                factory.setUsername("admin");
                //设置密码
                factory.setPassword("123456");
                
                return factory.newConnection();
        
            }
        }
        
      • simple简单模式

        • 生产者
        public class producter {
            public static final String QUEUE_NAME = "simple_queue";
        
            public static void main(String[] args) throws Exception{
                /***
                 * @Description: 创建连接工厂
                 */
                Connection connection = ConnectionUntil.getConnection();
                /***
                 * @Description: 创建频道
                 */
                Channel channel = connection.createChannel();
                /***
                 * @Description: 声明队列
                 * 队列名称
                 * 是否持久化
                 * 是否独占连接
                 * 不使用自动删除
                 * 其他参数
                 */
                channel.queueDeclare(QUEUE_NAME,true,false,false,null);
                /***
                 * @Description: 发送消息
                 * 交换机名称
                 * 路由key
                 * 消息其他属性
                 * 消息内容
                 */
                String message = "helle,rabbitma";
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                /***
                 * @Description: 关闭资源
                 */
                channel.close();
                connection.close();
        
            }
        
        }
        
        • 消费者
        public class Consoumer {
            public static void main(String[] args) throws Exception {
                //创建连接
                Connection connection = ConnectionUntil.getConnection();
                //创建通道
                Channel channel = connection.createChannel();
                //声明队列
                channel.queueDeclare(producter.QUEUE_NAME,true,false,false,null);
                //创建消费者
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(envelope.getRoutingKey()+"==========");
                        System.out.println(envelope.getDeliveryTag()+"==========");
                        System.out.println(envelope.getExchange()+"==========");
                        System.out.println(new String(body,"utf-8")+"==========");
                    }
                };
                //监听队列
                channel.basicConsume(producter.QUEUE_NAME,true,consumer);
        
            }
        }
        
      • 工作模式

        如在同一队列中有两个或两个以上消费者的时候,他们处于竞争关系。

        • 生产者
        public class Producter {
            //声明队列
            public static final String WORKE_QUEUE = "worke_queue";
        
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(WORKE_QUEUE,true,false,false,null);
        
                for (int i = 0;i<10;i++){
                    String message = "发送消息了"+i;
                    channel.basicPublish("",WORKE_QUEUE,null,message.getBytes());
                    System.out.println(message);
                }
                channel.close();
            }
        }
        
        • 消费者1
        public class Consoumer1 {
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(Producter.WORKE_QUEUE,true,false,false,null);
                //每次可以预取多少消息
                channel.basicQos(1);
                DefaultConsumer consoumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(envelope.getDeliveryTag());
                        System.out.println(new String(body,"utf-8"));
                        //false表示只有当前消息被处理
                        channel.basicAck(envelope.getDeliveryTag(),true);
                    }
                };
        
                channel.basicConsume(Producter.WORKE_QUEUE,true,consoumer);
            }
        }
        
        • 消费者2
        public class Consoumer2 {
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(Producter.WORKE_QUEUE,true,false,false,null);
                System.out.println("kaishi ");
                DefaultConsumer consoumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(envelope.getDeliveryTag());
                        System.out.println(new String(body,"utf-8"));
                    }
                };
                System.out.println("监听");
                channel.basicConsume(Producter.WORKE_QUEUE,true,consoumer);
            }
        }
        

        消息交换机Exchange常见的有3种类型,而且交换机本身不存储数据,只负责发送,发送到通道里的数据对于消费者来说依旧属于竞争关系:
        1. Fanout:广播,将消息交给所有绑定到交换机的队列。
        2. Direct:定向,将消息交给指定 routing key的队列。
        3. Topic:通配符,把消息交给符合 routing pattern的队列。
        1) * 匹配一个单词
        2) # 匹配多个单词

      • 发布与订阅模式:Fanout

        • 生产者
        public class FanoutProducter {
            //声明交换机
            public static String FANOUT_EXCHANGE = "fanout_exchange";
            //声明队列
            public static String FANOUT_QUEUE_1 = "fanout_queue_1";
            public static String FANOUT_QUEUE_2 = "fanout_queue_2";
        
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                //绑定交换机
                channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
                //绑定队列
                channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
                channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
                //队列绑定到交换机
                channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
                channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
                for (int i = 0; i<10;i++){
                    String message = "fanout"+i;
                    channel.basicPublish(FANOUT_EXCHANGE,"",null,message.getBytes());
                }
                channel.close();
            }
        
        }
        
        • 消费者1
        public class FanoutConsoumer1 {
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(FanoutProducter.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
                channel.queueDeclare(FanoutProducter.FANOUT_QUEUE_1,true,false,false,null);
                channel.queueBind(FanoutProducter.FANOUT_QUEUE_1,FanoutProducter.FANOUT_EXCHANGE,"");
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(envelope.getDeliveryTag());
                        System.out.println(envelope.getExchange());
                        System.out.println(new String(body,"utf-8"));
                    }
                };
                channel.basicConsume(FanoutProducter.FANOUT_QUEUE_1,true,consumer);
        
            }
        }
        
        • 消费者2
        public class FanoutConsoumer2 {
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(FanoutProducter.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
                channel.queueDeclare(FanoutProducter.FANOUT_QUEUE_2,true,false,false,null);
                channel.queueBind(FanoutProducter.FANOUT_QUEUE_2,FanoutProducter.FANOUT_EXCHANGE,"");
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(envelope.getDeliveryTag());
                        System.out.println(envelope.getExchange());
                        System.out.println(new String(body,"utf-8"));
                    }
                };
                channel.basicConsume(FanoutProducter.FANOUT_QUEUE_2,true,consumer);
            }
        }
        
      • 路由模式:Direct

        携带路由key,当队列的key和消费者的key一致时才可以接收到信息

        • 生产者
        public class DirectPro {
            public static String direct_exchange="direct_exchange";
            public static String direct_queue1="direct_queue_1";
            public static String direct_queue2="direct_queue_2";
        
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(direct_exchange, BuiltinExchangeType.DIRECT);
                channel.queueDeclare(direct_queue1,true,false,false,null);
                channel.queueDeclare(direct_queue2,true,false,false,null);
                channel.queueBind(direct_queue1,direct_exchange,"update");
                channel.queueBind(direct_queue2,direct_exchange,"insert");
                String message = "2222222222";
                channel.basicPublish(direct_exchange,"insert",null,message.getBytes());
                 message = "111111111111111";
                channel.basicPublish(direct_exchange,"update",null,message.getBytes());
            }
        }
        
        • 消费者1
        public class DirectCon1 {
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(DirectPro.direct_exchange, BuiltinExchangeType.DIRECT);
                channel.queueDeclare(DirectPro.direct_queue1,true,false,false,null);
                channel.queueBind(DirectPro.direct_queue1,DirectPro.direct_exchange,"insert");
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(consumerTag);
                        System.out.println(envelope.getExchange());
                        System.out.println(envelope.getRoutingKey());
                        System.out.println(envelope.getDeliveryTag());
                        System.out.println(new String(body,"utf-8"));
                    }
                };
                channel.basicConsume(DirectPro.direct_queue1,true,consumer);
            }
        }
        
        • 消费者2
        public class DirectCon2 {
            public static void main(String[] args) throws Exception {
                Connection connection = ConnectionUntil.getConnection();
                Channel channel = connection.createChannel();
                channel.exchangeDeclare(DirectPro.direct_exchange, BuiltinExchangeType.DIRECT);
                channel.queueDeclare(DirectPro.direct_queue2,true,false,false,null);
                channel.queueBind(DirectPro.direct_queue2,DirectPro.direct_exchange,"update");
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(consumerTag);
                        System.out.println(envelope.getExchange());
                        System.out.println(envelope.getRoutingKey());
                        System.out.println(envelope.getDeliveryTag());
                        System.out.println(new String(body,"utf-8"));
                    }
                };
                channel.basicConsume(DirectPro.direct_queue2,true,consumer);
            }
        }
        
      • 通配符模式:Topic

        可以使用通配符 *、#、来匹配路由key,匹配即可收到消息。

  • 相关阅读:
    准备改进回复功能
    今天的任务
    日历已加上
    web.config中globalization设置的问题
    Request获取url信息的各种方法比较
    增加了高级评论功能
    如何修改日历的CSS
    推荐有关MasterPages的三篇文章
    如何定制日历控件显示的星期文字
    FreeTextBox的问题终于解决了
  • 原文地址:https://www.cnblogs.com/mengzhao/p/14150989.html
Copyright © 2011-2022 走看看