在RabbitMQ的环境搭建成功后,创建SpringBoot项目,通过一个简单的案例来详细的说明下RabbitMQ
的生产者消费者的模式。下面结合SpringBoot项目,来具体的说明下这部分的具体应用。
一、pom引入RabbitMQ
创建项目成功后,我们需要在pom.xml的文件里面来引入rabbitmq的jar,pom.xml文件详细的内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>mq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
二、生产者思路
RabbitMQ是MQ中核心的组件技术栈,生产者消费者的模型中也是非常重要的部分,在RabbitMQ中生产者
的应用程序并不关心队列,生产者的任务只需要把MQ的消息发送到Exchange,而不关心并且也不知道Queue的
存在,至于Queue这部分是需要消费者来进行关心的。所以在操作RabbitMQ上,它的步骤具体总结如下:
- 先创建连接工厂的对象,也就是ConnectionFactory
- 然后配置连接MQ的地址,端口,账户以及密码,和虚拟主机这部分
- 连接工厂接着创建连接对象,也就是Connection
- 接着使用连接对象来创建信道,也就是Channel
- 最后通过生产者的模式把MQ的消息发送到Exchange
生产者涉及到的源代码具体如下:
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer
{
public static void main(String[] args) throws Exception {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//配置连接mq的地址信息
connectionFactory.setHost("101.43.158.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
//连接工厂创建连接
Connection connection = connectionFactory.newConnection();
//通过connection来创建Channel
Channel channel = connection.createChannel();
//通过channel来发送具体的数据信息
String msg = "Hello RabbitMQ";
for (int i = 0; i < 5; i++) {
channel.basicPublish("saas", "", null, msg.getBytes());
System.out.println("发送数据成功:"+i);
}
//发送消息成功后,关闭具体的连接
channel.close();
connection.close();
}
}
三、消费者思路
在消费者中,和生产者前面的代码基本是一致的,但是这部分需要特别强调的是在消费者的设计中,
它不需要关心Exchange,而消费者核心需要关注的是Queue这部分,也就是消费者的应用程序主要是通过
Queue里面读取到MQ的数据,这部分代码具体如下:
package com.example.rabbitmq.quickstart;
import com.rabbitmq.client.*;
public class Consumer
{
//定义exchange
private static final String EXCHANGE = "saas";
//定义队列
private static final String queueName="saas";
public static void main(String[] args) throws Exception
{
try{
//创建连接工厂
ConnectionFactory connectionFactory=new ConnectionFactory();
//配置连接mq的地址信息
connectionFactory.setHost("101.43.158.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
// connectionFactory.setVirtualHost("/");
//连接工厂创建连接
Connection connection=connectionFactory.newConnection();
//通过connection来创建Channel
Channel channel=connection.createChannel();
//设置exchange类型为fanout
channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);
/*
定义一个队列
* 一个队列来接收数据后,消费端才可以从队列里面来接收具体的数据
* param1:队列名称
* param2:是否持久化
* param3:队列是否独占此连接
* param4:队列不再使用时是否自动删除此队列
* param5:队列参数
* */
channel.queueDeclare(queueName,true,false,true,null);
channel.queueBind(queueName,EXCHANGE,"");
//创建一个消费者来消费数据
// QueueingConsumer queueingConsumer=new QueueingConsumer(channel);
//2.0以后的版本修改为DefaultConsumer
DefaultConsumer consumer=new DefaultConsumer(channel)
{
@Override
public void handleDelivery(
String consumerTag,
com.rabbitmq.client.Envelope envelope,
AMQP.BasicProperties properties,
byte [] body) throws java.io.IOException
{
String message=new String(body);
System.out.println("接收到的消息为:"+message);
};
};
// 监听队列,从队列中获取数据
System.out.println("消费者程序启动成功,准备接收生产者的数据:\n");
channel.basicConsume(queueName,consumer);
}catch (Exception e){
e.printStackTrace();
}
}
}
四、程序验证
完成代码成功后,下来执行具体的程序来进行验证。这部分需要特别的强调下,在执行消费者程序前
不需要单独的再创建exchange和queue,要不就会出现消费者的程序无法启动,这点需要特别的注意,另外
一个就是被操作的账户必须就具备管理员的权限。如上代码执行后,消费者以及生产者输出结果信息如下:
如上,就可以看到消费者接收到生产者发送的数据。程序执行后,可以在RabbitMQ的WEB控制台可以看到具体
的数据,主要是Queue的部分,具体如下:
如上,可以看到控制台中消费者的数据。感谢您的阅读,或许会持续更新RabbitMQ的技术栈的体系文章。