zoukankan      html  css  js  c++  java
  • RabbitMQ学习总结(2)-API的使用

     

    1. 引用依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.1.2</version>
    </dependency>

    2. 连接MQ

        public static Connection getConnection() throws Exception {
            // 创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.11");         // 设置rabbitmq服务器地址
            factory.setPort(5672);
            factory.setUsername("***");
            factory.setPassword("***");
            factory.setVirtualHost("testhost"); // 在后台配置的虚拟地址,类似于库,默认是/
            return factory.newConnection();
        }

    3. 发送消息

      实现方式1:最基础的方法消息方法,只需要指定队列,不填写交换机名称会使用MQ默认的。

        public static void sendMessage(String message) throws Exception {
            // 创建连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 创建队列
            // 参数1: ConnectionUtil.QUEUE_NAME,队列名称
            // 参数2:true  队列是否需要持久化(队列持久化和消息持久化是两回事)
            // 参数3:false 是否(排他队列)单一队列,这里表示此队列是否只给这个连接使用
            // 参数4:false 是否自动删除,表示在没有人使用时是否自动删除
            // 参数5:null 是个Map类型,可以传一些队列参数
            channel.queueDeclare(ConnectionUtil.QUEUE_NAME,true,false,false,null);
            // 参数1:交换机名称,为空时,使用默认交换机AMQP default
            // 参数2:队列名称,在绑定队列之后,会把队列名称作为交换机的路由键使用
            // 参数3:。。
            // 参数4:消息内容,Byte类型
            channel.basicPublish("", ConnectionUtil.QUEUE_NAME, null, message.getBytes());
            // 关闭通道
            channel.close();
            // 关闭连接
            connection.close();
        }

      实现方式2:指定交换机类型fanout

        public static void sendMessage(String message) throws Exception {
            // 创建连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            Channel channel = connection.createChannel();
            // 删除交换机,因为之前测试可能申请过这个名字的交换机,更改类型会报错
            channel.exchangeDelete("exchangeTest");
            // 创建交换机exchange
            // 参数1:交换机名称
            // 参数2:交换机类型
            channel.exchangeDeclare("exchangeTest", BuiltinExchangeType.FANOUT);
            // 交换机与队列绑定
            // 参数1:队列名称
            // 参数2:交换机名称
            // 参数3:routingKey路由键,fanout类型交换机是不需要路由键的
            channel.queueBind("queue1", "exchangeTest", "");
            // 发送消息
            // 参数1:交换机名称,为空时,使用默认交换机AMQP default
            // 参数2:routingKey路由键,fanout类型交换机是不需要路由键的
            // 参数3:。。
            // 参数4:消息内容,Byte类型
            channel.basicPublish("exchangeTest", "", null, message.getBytes());
            // 关闭通道
            channel.close();
            // 关闭连接
            connection.close();
        }

      实现方式3:指定交换机类型direct,其他代码跟上面一样,只是BuiltinExchangeType类型不同,绑定队列时需要填写routingKey

            channel.exchangeDeclare("exchangeTest2", BuiltinExchangeType.DIRECT);
            // 交换机与队列绑定
            // 参数1:队列名称
            // 参数2:交换机名称
            // 参数3:绑定队列的routingKey路由键,direct,routingKey是必填的
            channel.queueBind("queue1", "exchangeTest2", "info.user");
            channel.queueBind("queue2", "exchangeTest2", "error.user");

      实现方式4:执行交换机类型topic,routingKey是模糊匹配的

            channel.exchangeDeclare("exchangeTest3", BuiltinExchangeType.TOPIC);
            // 交换机与队列绑定
            // 匹配debug开头,user结尾,中间随机的路由键
            channel.queueBind("queue1", "exchangeTest3", "debug.*.user");
            // 匹配所有error开头的所有路由键
            channel.queueBind("queue2", "exchangeTest3", "error.#");
            // 匹配中断是email,前后段随机的路由键
            channel.queueBind("queue3", "exchangeTest3", "*.email.*");

    2.4 接收消息

      默认接收方式

    public static void getMessage() throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 第一次连接时还要确认是否创建通道,没有的话使用channel.queueDeclare()方法创建
            // 定义消费者,传入channel
            DefaultConsumer consumer = new DefaultConsumer(channel){
                /**
                 * 服务的监听器
                 * @param consumerTag 消费者标识
                 * @param envelope    消息详细信息:包括交换机,路由键,消息标识
                 * @param properties  消息配置
                 * @param body   消息内容
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 打印消息
                    System.out.println(new String(body, "UTF-8"));
                    // 手动确认消息
                    // 参数1:消费的唯一标识,从envelope内获取
                    // 参数2:声明是否批量确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
    
            };
            // 开始消费。指定消费队列的名称,绑定消费者
            // 参数1-queue:消费通道名称
            // 参数2-autoAck:自动消息确认开关,默认是false,false状态需要手动确认消费消息
            // 参数3-callback:消费对象
            channel.basicConsume("queue1", false, consumer); // 消费者是不需要关闭连接的,因为要一直监听 
    }
  • 相关阅读:
    centos7搭建kvm
    python脚本与shell脚本的结合应用
    shell脚本常用命令组合
    服务器硬盘损坏恢复过程
    linux grup引导进入单用户下
    mysql 数据库设计查询规范
    mysql 8.0 远程连接问题
    linux新加硬盘的操作
    linux 添加硬盘到/home 目录
    linux设置最大连接数
  • 原文地址:https://www.cnblogs.com/huanshilang/p/12603782.html
Copyright © 2011-2022 走看看