一、docker下安装
1.下载镜像
docker pull rabbitmq:management
2.运行容器
docker run -di --name=rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management
3.浏览器访问
http://192.168.25.129:15672/#/
默认用户名和密码都是guest
4.修改密码
rabbitmqctl change_password guest '123456'
二、用java代码操作rabbitmq
1.直接模式
1)继承springboot并添加相关依赖
<parent> <artifactId>tenpower-parent</artifactId> <groupId>com.tenpower</groupId> <version>1.0-SNAPSHOT</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
2)在application.yml中配置rabbitmq
spring:
rabbitmq:
host: 192.168.25.129
3)在main目录下编写springboot启动类
4)在test目录下编写发送消息测试类
@RunWith(SpringRunner.class) //替代junit原生的运行器 @SpringBootTest(classes = RabbitmqApplication.class) public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendMsgDirectly() { rabbitTemplate.convertAndSend("red","直接模式测试"); } }
5)在main目录下写消息消费类
@Component //加入spring容器 @RabbitListener(queues = "red") public class Customer1 { @RabbitHandler public void showMsg(String msg) { //方法名称随意 System.out.println("red接收到的消息:" + msg); } }
6)在http://192.168.25.129:15672/#/里创建一个队列,取名为red
7)运行测试方法发现浏览器rabbitmq页面出现消息,运行启动类后消息消失,且控制台打印出消息
2.分列模式(不常用,略)
3.主题模式
1)新建1个Exchange,name为color,类型为topic
2)新建3个queue,name分别为red、blue、green
3.点击新建的交换器color,binding新建的3个队列,并写上routing key
4.分别写3个消费者类
@Component //加入spring容器 @RabbitListener(queues = "red") public class Customer1 { @RabbitHandler public void showMsg(String msg) { //方法名称随意 System.out.println("red接收到的消息:" + msg); } }
@Component @RabbitListener(queues = "blue") public class Customer2 { @RabbitHandler public void showMsg(String msg) { System.out.println("blue接收到的消息:"+msg); } }
@Component @RabbitListener(queues = "green") public class Customer3 { @RabbitHandler public void showMsg(String msg) { System.out.println("green接收到的消息:"+msg); } }
5.以主题模式生产消息
@Test public void sendMsgTopic1() { rabbitTemplate.convertAndSend("color", "goods.aaa", "主题模式测试1"); } @Test public void sendMsgTopic2() { rabbitTemplate.convertAndSend("color", "goods.bbb.log", "主题模式测试2"); } @Test public void sendMsgTopic3() { rabbitTemplate.convertAndSend("color", "goods.log", "主题模式测试3"); }
运行测试方法和启动类,发现topic1方法输出red,topic2输出red和blue,topic3输出red、blue、green
三、利用rabbitmq执行延时任务
1.安装插件
1)下载
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
2)复制到rabbitmq插件目录
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
3)使插件生效
cd /sbin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
4)重启rabbitmq
docker restart rabbitmq
2.代码使用
1)添加配置类,绑定延时队列(可以绑定多个)
package com.lbh360.order.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { public static final String EXCHANGE_DELAY = "delay.exchange"; public static final String FINISH_COMPLETE = "finish.complete"; @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_DELAY, "x-delayed-message", true, false, args); } @Bean public Queue finishCompleteQueue() { return new Queue(FINISH_COMPLETE); } @Bean public Binding finishCompleteBinding(@Qualifier("finishCompleteQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(FINISH_COMPLETE).noargs(); } }
2)生产者
package com.lbh360.order; import com.lbh360.OrderApp; import com.lbh360.order.config.RabbitConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource; @RunWith(SpringRunner.class) @SpringBootTest(classes = OrderApp.class) public class OrderTest { @Resource private RabbitTemplate rabbitTemplate; @Test public void test() { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DELAY, RabbitConfig.FINISH_COMPLETE, "1270618586310995968", message -> { message.getMessageProperties().setDelay(3 * 1000); return message; }); } }
3)消费者
package com.lbh360.order.listener; import com.lbh360.order.config.RabbitConfig; import com.lbh360.order.model.po.Order; import com.lbh360.order.model.response.OrderResp; import com.lbh360.order.service.OrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Description rabbitmq延时任务监听类 * @Author bofeng * @Date 2019/4/8 21:50 * @Version 1.0 */ @Component public class AmqDelayListener { private static final Logger logger = LoggerFactory.getLogger(AmqDelayListener.class); @Resource private OrderService orderService; /** * 完成订单后超时结单 * * @param message */ @RabbitListener(queues = RabbitConfig.FINISH_COMPLETE) public void completeOrder(Message message) { String orderNo = new String(message.getBody()); logger.info("completeOrder, orderNo:{}", orderNo); orderService.completeOrder(orderNo); } }
4)若消费端抛异常消息无法被消费,会导致rabbitmq不停回调。application.ym可l配置最大重试次数和间隔时间
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
max-attempts: 3
initial-interval: 1000ms