zoukankan      html  css  js  c++  java
  • RabbitMQ知识点整理1-生产和消费消息

    演示如何使用RabbitMQ Java 客户端生产和消费消息

    目前最新的RabbitMQ Java 客户端版本为5.9.0, 相应的maven 构建文件如下:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>x.x.x</version
    </dependency>

    版本号根据项目的实际情况进行选择.

    通过 http://serverip:15672 访问 RabbitMQ 的 Web 管理界面,默认用户名密码都是 guest。

    默认情况下,访问RabbitMQ 服务的用户名和密码都是"guest " , 这个账户有限制,默认只能通过本地网络(如localhost) 访问,远程网络访问受限,所以在实现生产和消费消息之前,
    需要另外添加一个用户,并设置相应的访问权限。添加新的用户可以登录到管理界面之后进行手动添加, 没有难度

    计算机的世界是从" Hello World!" 开始的,这里我们也沿用惯例, 首先生产者发送一条消息、"Hello World ! "至RabbitMQ 中, 之后由消费者消费。下面先演示生产者客户端的代码,接着再演示消费者客户端的代码。

    生产者客户端代码:

    package demo.java.web.amqp.rabbitmq.demo1;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    /**
     * 生产者客户端代码
     * 
     * @author jiangkd
     * @date 2020/10/11
     */
    public class RabbitProducer {
    
        final private static String EXCHANGE_NAME = "exchange_demo";
        final private static String ROUTING_KEY = "routingkey_demo";
        final private static String QUEUE_NAME = "queue_demo";
        final private static String IP_ADDRESS = "172.16.176.40";
        /**
         * rabbitmq服务端默认端口号为5672
         */
        final private static int PORT = 5672;
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 和服务器建立连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(IP_ADDRESS);
            factory.setPort(PORT);
            factory.setUsername("root");
            factory.setPassword("root123");
    
            // 创建连接
            Connection connection = factory.newConnection();
    
            // 创建信道
            Channel channel = connection.createChannel();
    
            // 创建一个type为direct、持久化的、非自动删除的交换机
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 创建一个持久化的、非排它的、非自动删除的队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 将交换机与队列通过路由绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    
            // 发送一条持久化的消息
            String message = "Hello World!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    
            // 关闭资源
            channel.close();
            connection.close();
        }
    }
    View Code

    上面的生产者客户端的代码首先和RabbitMQ 服务器建立一个连接Connection, 然后在这个连接之上创建一个信道Channel。之后创建一个交换器Exchange 和一个队列Queue ,
    并通过路由键进行绑定,然后发送一条消息, 最后关闭资源。

    消费者客户端代码:

    package demo.java.web.amqp.rabbitmq.demo1;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Address;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    /**
     * 消费者客户端代码
     * 
     * @author jiangkd
     * @date 2020/10/11
     */
    public class RabbitComsumer {
    
        final private static String QUEUE_NAME = "queue_demo";
        final private static String IP_ADDRESS = "172.16.176.40";
        /**
         * rabbitmq服务端默认端口号为5672
         */
        final private static int PORT = 5672;
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root123");
    
            Address[] addresses = {new Address(IP_ADDRESS, PORT)};
    
            // 注意, 这里获取连接的方式和生产者客户端不同
            Connection connection = connectionFactory.newConnection(addresses);
    
            // 创建信道
            Channel channel = connection.createChannel();
            channel.basicQos(64); // 设置消费者客户端最多接收未被ack的消息的个数
    
            // 定义consumer
            Consumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
    
                    System.out.println("接收到的消息: " + new String(body));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, consumer);
            // 等待回调函数执行完毕之后, 关闭资源
            TimeUnit.SECONDS.sleep(5);
            channel.close();
            connection.close();
        }
    }
    View Code

    注意这里采用的是继承DefaultConsumer 的方式来实现消费....不推荐使用QueueingConsumer

  • 相关阅读:
    Docker数据卷
    Hyperloglog算法
    Greenplum6.9集群安装文档
    Java实现线程间通信方式
    计算机存储管理方式
    greenplum6.9踩坑总结
    Linux 内核参数Overcommit_memory(最近生产中Airflow和Greenplum有被这个参数坑到......)
    Airflow概念
    airflow安装文档
    基于Docker进行Zookeeper集群的安装
  • 原文地址:https://www.cnblogs.com/no-celery/p/13797631.html
Copyright © 2011-2022 走看看