zoukankan      html  css  js  c++  java
  • rabbitmq 的hello world程序

    一、引入rabbitmq依赖

    首先在pom中引入rabbitmq的依赖,如下所示:

     <!--引入rabbitmq依赖-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.1.0</version>
    </dependency>
    

    二、创建消息的生产者和消费者

    创建消息的生产者,如下代码所示:

    package com;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 消息生产者
     */
    public class Send {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException,TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            //由连接工厂创建连接
            Connection conn = factory.newConnection();
            //通过连接创建通道
            Channel channel = conn.createChannel();
            //声明交换器以及exchange类型为direct
            String exchangeName = "hello-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);
            //设置路由键
            String routingKey = "jasonKey";
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //要发送到queue的消息
            String message = "jason";
            //发布消息
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
    
            System.out.println("生产者生产了消息:" + message );
    
            //关闭通道和连接
            channel.close();
            conn.close();
        }
    }
    

    创建消息的消费者,如下代码所示:

    package com;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 消息消费者
     *
     */
    public class Recive {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            //建立到代理服务器到连接
            final Connection conn = factory.newConnection();
            //获得信道
            final Channel channel = conn.createChannel();
            //声明交换器
            String exchangeName = "hello-exchange";
            channel.exchangeDeclare(exchangeName, "direct", true);
            String routingKey = "jasonKey";
            //绑定队列,通过键 hola 将队列和交换器绑定起来
            channel.queueBind(QUEUE_NAME, exchangeName, routingKey);
    
            //消费消息
            channel.basicConsume(QUEUE_NAME, false, "", new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // properties 这是消息的一些相关属性,例如内容编码、内容类型等
                    String routingKey = envelope.getRoutingKey();
                    System.out.println("消费的路由键:" + routingKey);
                    long deliveryTag = envelope.getDeliveryTag();
                    //确认消息
                    channel.basicAck(deliveryTag, false);
                    String bodyStr = new String(body, "UTF-8");
                    if("jason".equals(bodyStr)){
                        System.out.println("消费者从队列中获得了消息,并且可以根据这个消息进行一些业务操作:" + bodyStr);
                    }
    
                    try {
                        channel.close();
                        conn.close();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
    
                }
            });
    
        }
    }
    

    三、运行结果

    先执行生产者程序,后执行消费者程序,运行程序结果如下:

    四、总结

    消费者和生产者,关键在于消息的传递以及消费。
    生产者把消息发送到队列,消费者从队列中取出消息并进行消费,这其中还需要一个监听器,来实时监听这个队列,如果里面有了消息,消费者就进行消费。
    生产者和queue之间通过exchange来进行关联,消费者也是一样的。
    这就需要非常清楚 routing key、exchange、binding key、queue 的概念以及它们之间的关系。还需要清楚的知道消息的分发策略,即exchange的四种类型(direct、fanout、topic、headers)以及它们的区别,各自有什么不同。

  • 相关阅读:
    gradient函数
    matlab函数
    二进制中1的个数
    豆瓣电影数据分析
    豆瓣电影Top250数据爬取学习
    numpy_将nan替换为均值
    多次条形图
    Gym
    Educational Codeforces Round 59 (Rated for Div. 2)
    Codeforces Round #535 (Div. 3)
  • 原文地址:https://www.cnblogs.com/jasonboren/p/13300833.html
Copyright © 2011-2022 走看看