zoukankan      html  css  js  c++  java
  • 二.java下使用RabbitMQ实现hello world

      上一篇文章介绍了windows环境下的安装和配置rabbitMQ,具体戳这边,一.windows环境下rabbit的的安装和配置。

      现在我们可以着手编写hello world程序了,一窥RabbitMQ的效用,从rabbitmq的官网的get start进入rabbitMQ文档学习区,即这个页面https://www.rabbitmq.com/getstarted.html。

      由于网上关于rabbitMQ的中文材料和教程不是很多,所以只好硬着头皮看官网文档了。

      可以看到官网主要从6个步骤来介绍学习轨迹,并且每个步骤均有多种编程语言的版本。由于本人采用的是java语言,所以就从一个java版本的hello world开始rabbitMQ的学习吧。

      一.Introduction(简介)

      1.可以将RabbitMQ理解为一个消息代理,它接收、存储、和分发数据信息。

      2.RabbitMQ主要由三个元素组成,producer(生产者),队列(queue),和消费者(Consumer).

      3.生产者生产消息,队列存储消息,消费者接收消息。他们之间的关系是多对多的,即多个生产者可以向一个队列中存放消息,多个消费者可以从一个队列中获取消息。

      4.值得注意的是,RabbitMQ代理器和生产者、消费者并不需要在同一个服务器上,他们可以是分布式的。

      二.hello world

      现在我们可以进入正题,用RabbitMQ来写一个hello world 的demo,以对RabbitMQ这个中间件有个直观的认识。

      在这个demo中,我们将编写两个类,一个是生产者类,一个是消费者类,生产者类负责发送一个简单的message,而消费者类负责接收这个消息并且打印出来。

      1.首先添加maven依赖包,如下。 

        <!-- rabbitMQ -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>4.0.2</version>
            </dependency>

      2.新建Send类,如下所示。

      

    package com.xdx.learn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Send {
        private final static String QUEUE_NAME="hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message="hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '"+message+"'");
            channel.close();
            connection.close();
        }
    
    }

      运行上述代码,报错如下。

      这是因为我照抄官网的代码,官网的demo是基于本地的连接,而我是远程连接,所以必须显式地指定连接端口,用户名,密码之类的信息,修改上述代码,修改后如下。

      

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Send {
        private final static String QUEUE_NAME="hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");//服务器ip
            factory.setPort(5672);//端口
            factory.setUsername("xdx");//登录名
            factory.setPassword("xxxxxx");//密码
            Connection connection=factory.newConnection();
            Channel channel=connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message="hello world";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '"+message+"'");
            channel.close();
            connection.close();
        }
    }

      然后再运行,这次可以运行成功了。

      然后我们去RabbitMQ的管理后台,就可以看到队列中有一个queue了,名字就叫做hello。如下图所示。

      

      如果我再执行以下刚才那段代码,就会发现messages的数量又多了一个,如下所示。

      

      3.接下来是Recv.java类,用于接收消息,不同意发布消息的类,接收消息的类必须一直保持运行的状态,以便监听消息的到来。

      

    package com.xdx.learn;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Envelope;
    
    public class Recv {
        private final static String QUEUE_NAME="hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //下面的配置与生产者相对应
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.1.195");//服务器ip
            factory.setPort(5672);//端口
            factory.setUsername("xdx");//登录名
            factory.setPassword("xxxx");//密码
            Connection connection=factory.newConnection();//连接
            Channel channel=connection.createChannel();//频道
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);//队列
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //defaultConsumer实现了Consumer,我们将使用它来缓存生产者发送过来储存在队列中的消息。当我们可以接收消息的时候,从中获取。
            Consumer consumer=new DefaultConsumer(channel){
                 @Override
                  public void handleDelivery(String consumerTag, Envelope envelope,
                                             AMQP.BasicProperties properties, byte[] body)
                      throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                  }
            };
            //接收到消息以后,推送给RabbitMQ,确认收到了消息。
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    
    }

    运行结果如下:

      此时我们再去RabbitMQ的控制台查看,发现hello队列中已经没有message了。

       注意到消费者的代码,有一个实现了DefaultConsumer接口的Consumer对象,去查看Consumer的源码,我们可以知道它的handleDelivery方法被一个一直存在的线程(该线程不是Connection所在的线程)调用,当有消息的时候,就会被执行。

    以上就是一个简单的生产者和消费者的例子,其实RabbitMQ在这个过程中充当了一个消息存储器的角色,它负责接收,分配消息,而发送,接收消息的工作由我们编程来实现。经过这个例子,我们对RabbitMQ有了一个直观的简单的理解。更多的细节将在下面的文章中来学习。

  • 相关阅读:
    【LeetCode】306. Additive Number
    【LeetCode】49. Group Anagrams
    【LeetCode】233. Number of Digit One
    【LeetCode】73. Set Matrix Zeroes
    【LeetCode】284. Peeking Iterator
    【LeetCode】241. Different Ways to Add Parentheses
    【LeetCode】289. Game of Life
    新版Java为什么要修改substring的实现
    计算机中整数加法满足结合律吗
    双色球中奖概率分析
  • 原文地址:https://www.cnblogs.com/roy-blog/p/8023791.html
Copyright © 2011-2022 走看看