zoukankan      html  css  js  c++  java
  • java操作RabbitMQ添加队列、消费队列和三个交换机

    假设已经在服务器上安装完RabbitMQ。我写的教程

    一、发送消息到队列(生产者)

    新建一个maven项目,在pom.xml文件加入以下依赖


    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
    </dependencies>
    新建一个P1类
    package com.rabbitMQ.test;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author mowen
     * @create 2019/11/20-11:23
     */
    public class P1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //消息队列名字
            String queueName="queue";
            //实例连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //设置地址
            connectionFactory.setHost("192.168.128.233");
            //设置端口
            connectionFactory.setPort(5672);
            //设置用户名
            connectionFactory.setUsername("mowen");
            //设置密码
            connectionFactory.setPassword("123456");
            //获取连接(跟jdbc很像)
            Connection connection = connectionFactory.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //声明队列。
            //参数1:队列名
            //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
            //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
            //参数4:当所有消费者客户端连接断开时是否自动删除队列
            //参数5:队列的其他参数
            channel.queueDeclare(queueName,true,false,false,null);
    
            for (int i = 0; i < 10; i++) {
                String msg="msg"+i;
                // 基本发布消息
                // 第一个参数为交换机名称、
                // 第二个参数为队列映射的路由key、
                // 第三个参数为消息的其他属性、
                // 第四个参数为发送信息的主体
                channel.basicPublish("",queueName,null,msg.getBytes());
            }
    
            channel.close();
            connection.close();
        }
    }
    

    运行后再浏览器进入RabbitMQ的控制台,切换到queue看到

    image

    二、获取队列消息(消费者)

    新建一个C1类

    package com.rabbitMQ.test;
    
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author mowen
     * @create 2019/11/20-13:12
     */
    public class C1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //消息队列名字
            String queueName="queue";
            //实例连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //设置地址
            connectionFactory.setHost("192.168.128.233");
            //设置端口
            connectionFactory.setPort(5672);
            //设置用户名
            connectionFactory.setUsername("mowen");
            //设置密码
            connectionFactory.setPassword("123456");
            //获取连接(跟jdbc很像)
            Connection connection = connectionFactory.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
    
            // 创建一个消费者
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 消费收到消息的时候调用的回调
                    System.out.println("C3接收到:" + new String(body));
                }
            };
    
    
            //把消费着绑定到指定队列
            //第一个是队列名
            //第二个是 是否自动确认
            //第三个是消费者
            channel.basicConsume(queueName,true,consumer);
    
        }
    }
    

    运行后输出为

    image消费者一般都不会关闭,会一直等待队列消息,可以手动关闭程序。

    channel.basicConsume(queueName,true,consumer);中的true为收到消息后自动确认,改为false取消自动确认。

    在handleDelivery方法最后面用

    // 手动确认
    // 确认收到消息
    channel.basicAck(envelope.getDeliveryTag(),false);

    来收到手动确认消息。消费者可以有多个并且可以同时消费一个队列;

    当有多个消费者同时消费同一个队列时,收到的消息是平均分配的(消费者没收到之前已经确认每个消费者受到的消息),

    但当其中一个消费者性能差的话,会影响其他的消费者,因为还要等它收完消息,这样会拖累其他消费者。

    可以设置channel 的basicQos方法

    //设置最多接受消息数量
    // 设置了这个参数之后要吧自动确认关掉
    channel.basicQos(1);
    

    三、扇形(fanout)交换机

    扇形交换机是基本的交换机类型,会把收到的消息以广播的形式发送到绑定的队列里,因为不需要经过条件筛选,所以它的速度最快。


    在生产者项目新建一个fanout类

    package com.rabbitMQ.routing;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author mowen
     * @date  2019/11/20-11:23
     */
    public class fanout {
        public static void main(String[] args) throws IOException, TimeoutException {
            //交换机名字
            String exchangeName="fanout";
            //交换机名字类型
            String exchangeType="fanout";
            //消息队列名字
            String queueName1="fanout.queue1";
            String queueName2="fanout.queue2";
            String queueName3="fanout.queue3";
            //实例连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //设置地址
            connectionFactory.setHost("192.168.128.233");
            //设置端口
            connectionFactory.setPort(5672);
            //设置用户名
            connectionFactory.setUsername("mowen");
            //设置密码
            connectionFactory.setPassword("123456");
            //获取连接(跟jdbc很像)
            Connection connection = connectionFactory.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //声明队列。
            //参数1:队列名
            //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
            //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
            //参数4:当所有消费者客户端连接断开时是否自动删除队列
            //参数5:队列的其他参数
            channel.queueDeclare(queueName1,true,false,false,null);
            channel.queueDeclare(queueName2,true,false,false,null);
            channel.queueDeclare(queueName3,true,false,false,null);
    
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType);
    
            //队列绑定到交换机
            channel.queueBind(queueName1,exchangeName,"");
            channel.queueBind(queueName2,exchangeName,"");
            channel.queueBind(queueName3,exchangeName,"");
    
            for (int i = 0; i < 10; i++) {
                String msg="msg"+i;
                // 基本发布消息
                // 第一个参数为交换机名称、
                // 第二个参数为队列映射的路由key、
                // 第三个参数为消息的其他属性、
                // 第四个参数为发送信息的主体
                channel.basicPublish(exchangeName,"",null,msg.getBytes());
            }
    
            channel.close();
            connection.close();
        }
    }
    

    运行后在RabbitMQ网页管理后台的queue会看到

    image

    切换到Exchanges会看到一个

    image

    就是我们声明的交换机,点击会看到我们绑定的队列

    image

    四、直连(direct)交换机

    直连交换机会带路由功能,队列通过routing_key与直连交换机绑定,发送消息需要指定routing_key,交换机收到消息时,交换机会根据routing_key发送到指定队列里,同样的routing_key可以支持多个队列。


    在生产者项目新建direct类

    package com.rabbitMQ.routing;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author mowen
     * @date  2019/11/20-11:23
     */
    public class direct {
        public static void main(String[] args) throws IOException, TimeoutException {
            String exchangeName="direct";
            String exchangeType="direct";
            //消息队列名字
            String queueName1="direct.queue1";
            String queueName2="direct.queue2";
            String queueName3="direct.queue3";
            //实例连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //设置地址
            connectionFactory.setHost("192.168.128.233");
            //设置端口
            connectionFactory.setPort(5672);
            //设置用户名
            connectionFactory.setUsername("mowen");
            //设置密码
            connectionFactory.setPassword("123456");
            //获取连接(跟jdbc很像)
            Connection connection = connectionFactory.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //声明队列。
            //参数1:队列名
            //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
            //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
            //参数4:当所有消费者客户端连接断开时是否自动删除队列
            //参数5:队列的其他参数
            channel.queueDeclare(queueName1,true,false,false,null);
            channel.queueDeclare(queueName2,true,false,false,null);
            channel.queueDeclare(queueName3,true,false,false,null);
    
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType);
    
            //队列绑定到交换机并指定rouing_key
            channel.queueBind(queueName1,exchangeName,"key1");
            channel.queueBind(queueName2,exchangeName,"key2");
            channel.queueBind(queueName3,exchangeName,"key1");
    
            for (int i = 0; i < 10; i++) {
                String msg="msg"+i;
                // 基本发布消息
                // 第一个参数为交换机名称、
                // 第二个参数为队列映射的路由key、
                // 第三个参数为消息的其他属性、
                // 第四个参数为发送信息的主体
                channel.basicPublish(exchangeName,"key1",null,msg.getBytes());
            }
    
            channel.close();
            connection.close();
        }
    }
    

    运行后到后台的queue会看到

    image

    切换到Exchanges会看到

    image

    点击进去

    image

    五、主题(topic)交换机

    主题交换机的routing_key可以有一定的规则,交换机和队列的routing_key需要采用*.#.*…..的格式

    每个部分用.分开

    *代表一个单词(不是字符)

    #代表任意数量(0或n个)单词


    在生产者项目新进topic类

    package com.rabbitMQ.routing;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author mowen
     * @date  2019/11/20-11:23
     */
    public class topic {
        public static void main(String[] args) throws IOException, TimeoutException {
            String exchangeName="topic";
            String exchangeType="topic";
            //消息队列名字
            String queueName1="topic.queue1";
            String queueName2="topic.queue2";
            String queueName3="topic.queue3";
            //实例连接工厂
            ConnectionFactory connectionFactory=new ConnectionFactory();
            //设置地址
            connectionFactory.setHost("192.168.128.233");
            //设置端口
            connectionFactory.setPort(5672);
            //设置用户名
            connectionFactory.setUsername("mowen");
            //设置密码
            connectionFactory.setPassword("123456");
            //获取连接(跟jdbc很像)
            Connection connection = connectionFactory.newConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //声明队列。
            //参数1:队列名
            //参数2:持久化 (true表示是,队列将在服务器重启时依旧存在)
            //参数3:独占队列(创建者可以使用的私有队列,断开后自动删除)
            //参数4:当所有消费者客户端连接断开时是否自动删除队列
            //参数5:队列的其他参数
            channel.queueDeclare(queueName1,true,false,false,null);
            channel.queueDeclare(queueName2,true,false,false,null);
            channel.queueDeclare(queueName3,true,false,false,null);
    
            //声明交换机
            channel.exchangeDeclare(exchangeName,exchangeType);
    
            //队列绑定到交换机并指定rouing_key
            channel.queueBind(queueName1,exchangeName,"com.aaa.*");
            channel.queueBind(queueName2,exchangeName,"com.*.topic");
            channel.queueBind(queueName3,exchangeName,"com.bbb.*");
    
            for (int i = 0; i < 10; i++) {
                String msg="msg"+i;
                // 基本发布消息
                // 第一个参数为交换机名称、
                // 第二个参数为队列映射的路由key、
                // 第三个参数为消息的其他属性、
                // 第四个参数为发送信息的主体
                channel.basicPublish(exchangeName,"com.aaa.topic",null,msg.getBytes());
            }
    
            channel.close();
            connection.close();
        }
    }
    

    运行后,到后台queue会看到

    image

    切换到Exchanges会看到

    image

    点击进入会看到

    image

  • 相关阅读:
    python得到今天前的七天每天日期
    python 实现元组中的的数据按照list排序, python查询mysql得到的数据是元组格式,按照list格式对他们排序
    NoReverseMatch at /salesman/zhuce/ Reverse for '/zhuce/' with arguments '()' and keyword arguments '{}' not found. 0 pattern(s) tried: []
    为什么springMVC和Mybatis逐渐流行起来了?
    图像的七个不变矩 可用于图像的匹配
    【可视化必备】大数据时代的可视化工具
    常用机器视觉工具----图像分析工具(blob分析)
    【转】七种常见阈值分割代码(Otsu、最大熵、迭代法、自适应阀值、手动、迭代法、基本全局阈值法)
    C#编写滤镜 图片色调取反效果(Invert)
    Bitmap四种属性
  • 原文地址:https://www.cnblogs.com/mowen120/p/11905211.html
Copyright © 2011-2022 走看看