zoukankan      html  css  js  c++  java
  • 基于Docker安装RabbitMQ及基本使用

    Rabbit 默认的端口5672

    默认启动方式如下(可登录容器后使用简单的命令进行管理)

    docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
    

    rabbit cli 工具

    使用带管理面板的方式启动(推荐),默认账号密码guest / guest,访问地址http://ip:port

    docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
    

    常用命令

    # 创建一个消息队列,durable=true 
    rabbitmqadmin declare queue name=队列2 durable=true
    
    # 查看消息队列列表
    rabbitmqadmin list queues
    
    # 向“队列2”中发布一条消息
    rabbitmqadmin publish routing_key=队列2 payload="{name:mmc}"
    
    # 获取指定消息队列的队头数据,默认不消费
    rabbitmqadmin get queue=队列2
    # 获取指定消息队列的队头数据,ackmode=ack_requeue_false 消费数据
    rabbitmqadmin get queue=队列2 ackmode=ack_requeue_false
    

    Java Client 测试

    参考文档 https://www.rabbitmq.com/api-guide.html#license

    基础的功能为新增队列、删除队列、发布 Exchange、发布消息、获取指定队列的队头数据等,具体的操作代码如下所示:

    import com.rabbitmq.client.*;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitMQ {
        /**
         * 新增队列
         */
        @Test
        public void addQueue() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
            // 连接消息队列
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://guest:guest@localhost:5672");
            Connection connection = connectionFactory.newConnection();
            // 使用channel操作队列
            Channel channel = connection.createChannel();
            // 使用queueDeclare发布新的队列
            System.out.println(channel.queueDeclare("测试队列新增2", true, false, false, null));
            // 关闭资源
            channel.close();
            connection.close();
        }
    
        /**
         * 删除队列
         */
        @Test
        public void deleteQueue() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
            // 连接消息队列
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://guest:guest@localhost:5672");
            Connection connection = connectionFactory.newConnection();
            // 使用channel操作队列
            Channel channel = connection.createChannel();
            // 使用queueDelete删除队列
            System.out.println(channel.queueDelete("测试队列新增2"));
            // 关闭资源
            channel.close();
            connection.close();
        }
    
        /**
         * 发布 Exchange
         */
        @Test
        public void declareExchange() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
            // 连接消息队列
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://guest:guest@localhost:5672");
            Connection connection = connectionFactory.newConnection();
            // 使用channel操作队列
            Channel channel = connection.createChannel();
            System.out.println(channel.exchangeDeclare("新增Exchange2", "direct"));
            // 关闭资源
            channel.close();
            connection.close();
        }
    
        /**
         * 发布消息
         */
        @Test
        public void publishMessage() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
            // 连接消息队列
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://guest:guest@localhost:5672");
            Connection connection = connectionFactory.newConnection();
            // 使用channel操作队列
            Channel channel = connection.createChannel();
            byte[] messageBodyBytes = "Hello Word".getBytes();
            // MessageProperties.PERSISTENT_TEXT_PLAIN 指定消息持久化
            // channel.basicPublish("", "q1", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
            // basicPublish("ExchangeName", "RoutingKey", 消息参数配置, 消息内容 byte[])
            channel.basicPublish("", "测试队列新增2", null, messageBodyBytes);
            // 关闭资源
            channel.close();
            connection.close();
        }
    
        /**
         * 获取指定队列的队头数据
         */
        @Test
        public void receiveMessage() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
            // 连接消息队列
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://guest:guest@localhost:5672");
            Connection connection = connectionFactory.newConnection();
            // 使用channel操作队列
            Channel channel = connection.createChannel();
            boolean autoAck = false; // false 只获取队头元素,不删除;true 获取队头元素并删除
            GetResponse response = channel.basicGet("测试队列新增2", autoAck); // 第一个参数为队列名称
            if (response != null) {
                byte[] body = response.getBody();
                System.out.println(new String(body));
            }
            // 关闭资源
            channel.close();
            connection.close();
        }
    }
    
    

    SpringBoot 整合 rabbitMQ

    Demo1 :使用默认交换机

    pom.xml 中导入spring-rabbit依赖

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    

    文件application.properties 配置,rabbitMQ默认配置如下,若未修改可无需添加如下配置

    spring.rabbitmq.host=localhost
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host=/
    spring.rabbitmq.port=5672
    

    发送消息,使用下面的单元测试来发送单条数据到消息队列中

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class SpringbootRabbitMQ {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void sendMessage() {
            amqpTemplate.convertAndSend("", "q1", "Hello World");
        }
    }
    

    接受消息,监听队列中的数据,当队列不为空的时候自动获取队列中的数据

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitMQListener {
        @RabbitListener(queues = "q1")
        void listen(String in) {
            System.out.println(in);
        }
    }
    

    Demo2:使用交换机并通过对象来存储数据

    自定义交换机、队列的名字 Constant.java

    public class Constant {
        public static final String TOPIC_EXCHANGE = "topic.exchange";
        public static final String TOPIC_QUEUE = "topic.queue";
    }
    

    编写RabbitConfig.java 配置文件,配置用到的交换机、队列,并将交换机与队列进行绑定

    import cn.getcharzp.common.Constant;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        @Bean()
        public TopicExchange topicExchange() {
            return new TopicExchange(Constant.TOPIC_EXCHANGE);
        }
    
        @Bean
        public Queue topicQueue() {
            return new Queue(Constant.TOPIC_QUEUE);
        }
    
        @Bean
        public Binding exchangeBindTopic(Queue topicQueue, TopicExchange topicExchange) {
            return BindingBuilder.bind(topicQueue).to(topicExchange).with(Constant.TOPIC_QUEUE);
        }
    }
    

    编辑发送、接受消息的实体类,我这里用的是User.java

    import java.io.Serializable;
    
    public class User implements Serializable {
        private String name;
        private Integer age;
        private String password;
    
        public User(String name, Integer age, String password) {
            this.name = name;
            this.age = age;
            this.password = password;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    ", password='" + password + '\'' +
                    '}';
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Integer getAge() {
            return age;
        }
    
        public void setAge(Integer age) {
            this.age = age;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    }
    

    编辑发送消息的单元测试,实际场景根据实际业务来书写,主要使用AmqpTemplate来进行消息的发送

    import cn.getcharzp.common.Constant;
    import cn.getcharzp.pojo.User;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @SpringBootTest
    @RunWith(SpringRunner.class)
    public class SpringbootRabbitMQ {
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void sendMessage() {
            try {
                amqpTemplate.convertAndSend(Constant.TOPIC_EXCHANGE, Constant.TOPIC_QUEUE, new User("GetcharZp", 20, "abcdef"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    通过@RabbitListener实现消息的接受

    import cn.getcharzp.common.Constant;
    import cn.getcharzp.pojo.User;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitMQListener {
        @RabbitListener(queues = Constant.TOPIC_QUEUE)
        void listen(User user) {
            try {
                System.out.println(user.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    资源分享: 腾讯云华为云
  • 相关阅读:
    JavaScript之数学对象Math
    JavaScript之数据类型转换
    JavaScript之操作符
    JavaScript之基本语句
    JavaScript之基本概念(二)
    JavaScript之基本概念(一)
    使用velero进行kubernetes灾备
    minikube配置CRI-O作为runtime并指定flannel插件
    使用thanos管理Prometheus持久化数据
    linux开启tcp_timestamps和tcp_tw_recycle引发的问题研究
  • 原文地址:https://www.cnblogs.com/GetcharZp/p/15687686.html
Copyright © 2011-2022 走看看