前 言 -解决问题
一、RabbitMQ安装
1、安装erlang 环境
a.下载erlang 版本,注意这里需要和安装的rabbitMq版本相配对,rabbitMQ官方网站上可以查到:https://www.rabbitmq.com/which-erlang.html , 先下载erlang安装rpm包,地址:https://www.rabbitmq.com/releases/erlang/ ,这里下载的是:erlang-18.3-1.el6.x86_64.rpm
b.将安装包拷到虚拟机中进行安装
输入安装命令:rpm -ivh erlang-18.3-1.el6.x86_64.rpm
查看erl版本号:erl -version
2、安装socat环境
a.下载socat包,socat是一个多功能的网络工具,netcat++ ,这里下载centos 64位版本:http://repo.iotti.biz/CentOS/6/x86_64/ ,这里选的是:socat-1.7.3.2-1.el6.lux.x86_64.rpm
b.将安装包拷到虚拟机中进行安装
输入安装命令:rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
查看erl版本号:socat -Version
3、安装rabbitMQ
a.下载rabbitMQ官方安装包:https://www.rabbitmq.com/releases/
rabbitmq-server/,这里下载的是:rabbitmq-server-3.6.5-1.noarch.rpm
b.将安装包拷到虚拟机中进行安装
输入安装命令:rpm -ivh socat-1.7.3.2-1.el6.lux.x86_64.rpm
c.启用rabbitMQ浏览器管理端
d.启动rabbitMQ ,查到rabbitMQ 安装路径,输入命令启动
./rabbitmq-server start
输入ip:port 可以看到如下界面:
输入guest/guest 账号/密码 登录管理端,如下界面:
解决用户名不能登录问题:
在rabbitmq的配置文件目录( /etc/rabbitmq/)下创建一个rabbitmq.config文件。文件中添加如下配置(请不要忘记那个“.”):[{rabbit, [{loopback_users, []}]}].
4.安装rabbitMQ参考相关博客
https://www.cnblogs.com/rmxd/p/11583932.html
https://blog.csdn.net/doubleqinyan/article/details/81081673
二、RabbitMQ模式学习
RabbtMQ 官网有RabbitMQ各模式详细资料,笔记有不清楚的地方,请直接参考官网:
https://www.rabbitmq.com/tutorials/tutorial-one-java.html
1、简单队列模式
P:消息生产者,可以业务,代码等产生的消息或通知,需要处理或存储
C:消息消费者,需要处理消息或根据业务存储完成相关逻辑
RabbitMq:红色部分消息队列,生产者将消息发送给队列,生产者继续往下处理自已的逻辑,消息者获取该消息,完成自身逻辑,这些逻辑与生产者相关却不影响生产者程序逻辑
a.spring boot 集成rabbitMQ ,主要步骤如下图:
配置文件如下:
server.port=8090 rabbitmq.host=192.168.137.3 rabbitmq.port=5672 rabbitmq.username=guest rabbitmq.password=guest rabbitmq.virtualHost=/
b. 编写java生产者代码
/** * 生产者发送消息方法 * * @return */ @ApiOperation("生产者发送消息方法") @GetMapping("/SendMsg") public String SendMsg() { try { //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //定义队列 channel.queueDeclare(queueName, false, false, false, null); String msg = "this is a message"; //第一个参数为交换机,为空,说明是匿名转发 channel.basicPublish("", queueName, null, msg.getBytes("Utf-8")); //关闭通道与连接 channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return "ok"; }
c. 编写java消费者代码
/** * 消费者接收消息方法 * * @return */ @ApiOperation("消费者接收消息方法") @GetMapping("/ReciveMsg") public String ReciveMsg() { try { //创建连接 Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //声明队列 第二个参数durable为是否可持久化,若为true 则支持持久化 channel.queueDeclare(queueName, false, false, false, null); //定义消费者 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String reciveMsg = new String(body, "utf-8"); System.out.println("Simple Rec: " + reciveMsg); } }; //监听队列 channel.basicConsume(queueName, true, defaultConsumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return "ok"; }
验证代码:
调用生产者发送消息,再使用消费者读取消息 ,通过swagger 调用发送消息,并查看rabbitMQ中是否收到消息,如下图:
RabbitMQ 情况:
调用消费者处理消息:
此时再查看RabbitMQ,消息已被处理 :
d.RabbitMQ 注入类
@Configuration @ConfigurationProperties(prefix = "rabbitmq") public class RabbitMQConfig { private String host; private int port; private String username; private String password; private String virtualHost; @Bean public ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(host); //设置mq地址 connectionFactory.setPort(port); //设置端口号 connectionFactory.setUsername(username); //设置用户名 connectionFactory.setPassword(password); //设置密码 connectionFactory.setVirtualHost(virtualHost); return connectionFactory; } public String getVirtualHost() { return virtualHost; } public void setVirtualHost(String virtualHost) { this.virtualHost = virtualHost; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } }
更多参考官网:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
2、工作队列
工作队列:生产者生产消息发送给队列,队列轮询被多个消费者消费,一般情况下每个消费者处理的消息是一样的
a.简单修改简单队列代码,完成工作队列java案例,循环发送50次:
/** * 二、work_queue 工作模型 * 1、与简单模型编写一样,只是多了一个消费者 * 2、一个生产者被二个消费者消费,并且均分处理消息,是一种轮询分发方式 * 3、round-robin 方式 * 4、不足之处在于,处理能力强的消费者会有空隙时间 */ @Api(value = "轮询分发", description = "轮询分发") @RestController public class WorkQueueController { /** * 连接工厂对象 */ @Autowired ConnectionFactory connectionFactory; /** * 队列名称 */ String queueName = "queue_work_learn"; @ApiOperation("生产者发送消息方法") @GetMapping(value = "/sendWorkQueue") public String sendWorkQueue() { try { //创建连接 Connection connection = connectionFactory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(queueName, false, false, false, null); //消息体 String bodyMsg = "this is a message NO:"; for (int i = 0; i < 50; i++) { channel.basicPublish("", queueName, null, (bodyMsg + i).getBytes("utf-8")); } //关闭队列 channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return "ok"; } @ApiOperation("消费者消息方法一") @GetMapping(value = "/reciveWorkOne") public String reciveWorkOne() { try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); boolean autoAck=true; channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recive 1 :" + new String(body, "utf-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return "ok"; } @ApiOperation("消费者消息方法二") @GetMapping(value = "/reciveWorkTwo") public String reciveWorkTwo() { try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); boolean autoAck=true; channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recive 2 :" + new String(body, "utf-8")); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return "ok"; } }
b. swagger显示如下:
可以看出,消费者一处理偶数,消费者二处理基数,实现了轮询分发消息,但这样会出现一个消费者处理能力强,一个消费者处理能力弱,RabbitMQ 本身不知道谁的处理能力更强,公平分发就是能者多劳的模式,
3、公平分发工作队列
消费者需要手动通知队列,我已经处理完毕了,请再来一个,把自动应答ACK设为手动,并限定每次发送一个
a.简单修改java代码完成公平分发案例:
b.更多学习参考官网
https://www.rabbitmq.com/tutorials/tutorial-two-java.html