zoukankan      html  css  js  c++  java
  • 【RabbitMQ消息中间件】4.简单队列

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
    本文链接:https://blog.csdn.net/u013517797/article/details/79438196
    安装和启动好了RabbitMQ后,我们下面来通过编写一个Java程序来学习第一个队列知识。

    第一篇我们提到过,RabbitMQ一共有六种队列模式:

    分别是“HelloWorld”简单队列模式、“Work queues”工作队列模式、“Publish/Subscribe”发布/订阅模式、“Routing”路由模式、“Topics”通配符模式、“RPC”远程服务调用。

    那么我们首先从最基本的“HelloWorld”简单队列讲解。关于“HelloWorld”简单队列的队列模式图如下所示:

    其中“P”代表消息的生产者(Producter),由它来产生消息。而中间红色的框就是队列,即是生产者产生消息之后,将消息发送到队列。后面的“C”代表消息的消费者(consumer),即是将生产者发送至消息队列中的消息进行消费(取出)。
    这就是最简单的一个队列模式。

    我们下面使用Java连接RabbitMQ,并实现简单队列。
    首先创建一个maven工程:

     

     

     

     


    首先需要在maven工程中的pom.xml加入RabbitMQ的依赖和相关其它依赖:
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.jack.rabbitmq</groupId>
    <artifactId>RabbitMQ_Test_project</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <build/>

    <dependencies>
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.4.1</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
    </dependency>
    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.3.2</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.4.0.RELEASE</version>
    </dependency>
    </dependencies>
    </project>
    然后在src/mian/java中创建一个连接工厂ConnectionUtil,用于连接RabbitMQ:

    package cn.jack.rabbitmq.connection;
    import java.io.IOException;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    public class ConnectionUtil {
    public static Connection getConnection() throws IOException{
    //定义连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //定义连接地址
    factory.setHost("localHost");
    //定义端口
    factory.setPort(5672);
    //设置账号信息,用户名、密码、vhost
    factory.setVirtualHost("/jack");
    factory.setUsername("jack");
    factory.setPassword("jack");
    // 通过工厂获取连接
    Connection connection = factory.newConnection();
    return connection;
    }
    }
    在该类中首先定义了一个连接工厂“ConnectionFactory”,然后设置其服务地址、端口号、账号信息(用户名、密码、vhost),最后通过连接工厂获取一个RabbitMQ的连接对象。
    上面我们的账号信息使用了一开始创建的jack账号的信息。

    然后创建消息的生产者和消费者的模拟类。新建名为“Send”的类,作为生产者:

    package cn.jack.rabbitmq.simple;
    import java.io.IOException;
    import cn.jack.rabbitmq.connection.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    public class Send {

    private final static String QUEUE_NAME="test_queue";

    public static void main(String[] args) throws IOException {
    //获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    //从连接中创建通道
    Channel channel = connection.createChannel();

    //声明(创建)队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    //消息内容
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println("[product] Send '"+ message +"'");

    //关闭通道和连接
    channel.close();
    connection.close();
    }
    }
    该类首先通过连接工具类获取到了RabbitMQ的连接对象,类似JDBC连接数据库的连接对象。然后通过连接对象获取Channel通道,这个就相当于获取JDBC的Statement对象。通过连接对象创建Channel对象,相当于创建了一个与RabbitMQ的“通道”,通过“通道”可以做一系列与RabbitMQ交互的操作。
    然后使用channel对象的“queueDeclare”方法声明(创建)了一个“队列”,然后在参数中指定队列的名称(这里指定为“test_queue”)。
    然后创建一个消息,通过channel对象的“basicPublish”方法将消息发送至“test_queue”队列。
    最后关闭通道和连接。

    运行该生产者:

    然后我们可以在RabbitMQ的管理工具的Queues模块中查看队列信息:

    点击队列名称,然后选择“Get messages”选择,点击按钮,可以查看队列中信息的详细内容:

    然后创建一个名为“Recv”的类,作为消息消费者:

    package cn.jack.rabbitmq.simple;
    import cn.jack.rabbitmq.connection.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;

    public class Recv {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws Exception {
    //获取到连接以及mq通道
    Connection connection = ConnectionUtil.getConnection();
    Channel channel = connection.createChannel();

    //声明队列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    //定义队列的消费者
    QueueingConsumer consumer = new QueueingConsumer(channel);
    //监听队列
    channel.basicConsume(QUEUE_NAME, true,consumer);

    //获取消息
    while(true){
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println("[Consumer] Received '"+ message +"'");
    }
    }
    }
    与生产者一样,首先获取连接对象和创建channel通道对象。我们这个消费者消费的是生产者的队列,上面的生产者已经创建好了改队列,该理说不需要再创建队列,但是为了防止消费者消费的队列不存在这种异常的发送,还是同样声明生产者一样的队列,以防万一(如果已存在,该次创建无效,不会影响已存在的相同的队列)。如果开发者明确改队列百分之百存在,则可以在消费者代码中忽略声明队列这一步。
    然后定义队列的消费者QueueingConsumer,然后使用“basicConsume”来监听队列。后面我们写了一个死循环,用于一致监听队列并获取消息。在循环块中使用consumer对象的nextDelivery()方法来进行消息的接收和消费。

    这里要注意的是,在该模式下,消费者接收的消息一旦被消费,则队列中就不再有此消息(相当于“阅后即焚”)。在运行消费者之前,观察队列的情况:

    运行消费者之后,在控制台可以看到消费者接收到的信息:

    并且在管理工具中可以看到队列中的消息已经被消费而不存在:

    ————————————————
    版权声明:本文为CSDN博主「光仔December」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/acmman/article/details/79438196

  • 相关阅读:
    寒假学习进度九
    寒假学习进度八
    RestTemplate-记录
    Axure licensee key 8~9-转
    Mysql数据库引擎介绍--转载
    Mysql外键约束--转载
    IDEA快捷建使用
    MySQL在windows上多次安装失败
    五款优秀的端口扫描工具
    java 图片处理 base64编码和图片二进制编码相互转换-转载
  • 原文地址:https://www.cnblogs.com/laosunlaiye/p/11671389.html
Copyright © 2011-2022 走看看