zoukankan      html  css  js  c++  java
  • RabbitMQ Java实例

    引入RabbitMQ的jar包

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.1.2</version>
    </dependency>

    创建消息生产者

     1 import com.rabbitmq.client.Channel;
     2 import com.rabbitmq.client.Connection;
     3 import com.rabbitmq.client.ConnectionFactory;
     4 
     5 import java.io.IOException;
     6 import java.util.concurrent.TimeoutException;
     7 
     8 public class Producer {
     9     private static final String QUEUE_NAME ="queue.test";
    10 
    11     public static void main(String[] args) throws IOException, TimeoutException {
    12         //创建连接工厂
    13         ConnectionFactory connectionFactory = new ConnectionFactory();
    14         connectionFactory.setHost("localhost");
    15         //创建一个连接
    16         Connection connection = connectionFactory.newConnection();
    17         //创建一个通道
    18         Channel channel = connection.createChannel();
    19         //声明队列
    20         //queueDeclare第一个参数表示队列名称、
    21         // 第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、
    22         // 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、
    23         // 第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
    24         channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    25         String msg = "hello rabbit";
    26         //发送消息到队列
    27         //basicPublish第一个参数为交换机名称、
    28         // 第二个参数为队列映射的路由key、
    29         // 第三个参数为消息的其他属性、
    30         // 第四个参数为发送信息的主体
    31         channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));
    32         System.out.println("Producer Send +'" + msg + "'");
    33         channel.close();
    34         connection.close();
    35     }
    36 }

    创建消费者

    package com.ysl.rabbit;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Customer {
    
        private static final String QUEUE_NAME ="queue.test";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
            //创建一个连接
            Connection connection = connectionFactory.newConnection();
            //创建一个通道
            Channel channel = connection.createChannel();
            //声明队列
            //queueDeclare第一个参数表示队列名称、
            // 第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、
            // 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、
            // 第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            System.out.println("Customer Waiting Received messages");
            Consumer consumer = new DefaultConsumer(channel){
                /**
                 * envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Customer Received '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(QUEUE_NAME,true, consumer);
        }
    }
  • 相关阅读:
    linux 虚拟机web服务接入互联网
    golang操作数据库
    开启提示:press esc in 5 seconds to skip 如何操作
    如何将qcow2转为vhd
    统信UOS如何分卷压缩
    统信UOS欧拉版本如何制作启动盘
    UOS输错密码导致长时间锁定怎么办?
    在UOS中使用WPS编辑文件,忘记保存关闭了文件,怎么找回?
    uos server版一开始没有安装桌面,后面客户需要加装DDE桌面如何实现
    uos系统升级时,我不想升级相关软件包,应该如何去做
  • 原文地址:https://www.cnblogs.com/senlinyang/p/8433196.html
Copyright © 2011-2022 走看看