zoukankan      html  css  js  c++  java
  • RabbitMQ的使用(五)RabbitMQ Java Client简单生产者、消费者代码示例

     

    pom文件:

    1. <dependencies>
    2. <dependency>
    3. <groupId>com.rabbitmq</groupId>
    4. <artifactId>amqp-client</artifactId>
    5. <version>5.0.0</version>
    6. </dependency>
    7.  
    8. <dependency>
    9. <groupId>org.springframework.amqp</groupId>
    10. <artifactId>spring-rabbit</artifactId>
    11. <version>2.0.2.RELEASE</version>
    12. </dependency>
    13.  
    14. <dependency>
    15. <groupId>org.springframework.boot</groupId>
    16. <artifactId>spring-boot-starter-web</artifactId>
    17. </dependency>
    18.  
    19. <dependency>
    20. <groupId>org.springframework.boot</groupId>
    21. <artifactId>spring-boot-starter-amqp</artifactId>
    22. </dependency>
    23. </dependencies>

    连接工具类:

    1. package top.wj.rabbitmq.client.utils;
    2.  
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6.  
    7. import java.util.HashMap;
    8. import java.util.Map;
    9.  
    10. public class ChannelUtils {
    11. public static Channel getChannelInstance(String connectionDescription) {
    12. try {
    13. ConnectionFactory connectionFactory = getConnectionFactory();
    14. Connection connection = connectionFactory.newConnection(connectionDescription);
    15. return connection.createChannel();
    16. } catch (Exception e) {
    17. throw new RuntimeException("获取Channel连接失败");
    18. }
    19. }
    20.  
    21. private static ConnectionFactory getConnectionFactory() {
    22. ConnectionFactory connectionFactory = new ConnectionFactory();
    23.  
    24. // 配置连接信息
    25. connectionFactory.setHost("127.0.0.1");
    26. connectionFactory.setPort(5672);
    27. connectionFactory.setVirtualHost("/");
    28. connectionFactory.setUsername("guest");
    29. connectionFactory.setPassword("guest");
    30.  
    31. // 网络异常自动连接恢复
    32. connectionFactory.setAutomaticRecoveryEnabled(true);
    33. // 每10秒尝试重试连接一次
    34. connectionFactory.setNetworkRecoveryInterval(10000);
    35.  
    36. return connectionFactory;
    37. }
    38. }

    创建生产者:

    1. package top.wj.rabbitmq.client.producer;
    2.  
    3. import com.rabbitmq.client.AMQP;
    4. import com.rabbitmq.client.BuiltinExchangeType;
    5. import com.rabbitmq.client.Channel;
    6. import top.wj.rabbitmq.client.utils.ChannelUtils;
    7.  
    8. import java.io.IOException;
    9. import java.util.HashMap;
    10. import java.util.concurrent.TimeoutException;
    11.  
    12. public class MessageProducer {
    13. public static void main(String[] args) throws IOException, TimeoutException {
    14. Channel channel = ChannelUtils.getChannelInstance("队列消息生产者");
    15.  
    16. // 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
    17. channel.exchangeDeclare("rabbitmq.wj", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
    18.  
    19. // 设置消息属性 发布消息 (交换机名, Routing key, 可靠消息相关属性 后续会介绍, 消息属性, 消息体);
    20. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
    21. channel.basicPublish("rabbitmq.wj", "add", false, basicProperties, "body中的消息内容!".getBytes());
    22. }
    23. }

    创建消费者:

    1. package top.wj.rabbitmq.client.consumer;
    2.  
    3. import com.rabbitmq.client.*;
    4. import top.wj.rabbitmq.client.utils.ChannelUtils;
    5.  
    6. import java.io.IOException;
    7. import java.util.HashMap;
    8. import java.util.concurrent.TimeoutException;
    9.  
    10. public class MessageConsumer {
    11. public static void main(String[] args) throws IOException, TimeoutException {
    12. Channel channel = ChannelUtils.getChannelInstance("队列消息消费者");
    13.  
    14. // 声明队列 (队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性);
    15. AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("rabbitmq.wj.add", true, false, false, new HashMap<>());
    16.  
    17. // 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
    18. channel.exchangeDeclare("rabbitmq.wj", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
    19.  
    20. // 将队列Binding到交换机上 (队列名, 交换机名, Routing key, 绑定属性);
    21. channel.queueBind(declareOk.getQueue(), "rabbitmq.wj", "add", new HashMap<>());
    22.  
    23. // 消费者订阅消息 监听如上声明的队列 (队列名, 是否自动应答(与消息可靠有关 后续会介绍), 消费者标签, 消费者)
    24. DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    25. @Override
    26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    27. System.out.println(consumerTag);
    28. System.out.println(envelope.toString());
    29. System.out.println(properties.toString());
    30. System.out.println("消息内容:" + new String(body));
    31. }
    32. };
    33. channel.basicConsume(declareOk.getQueue(), true, "消费者标签",defaultConsumer );
    34. }
    35. }

    控制台打印信息:

    1.  
    2. 消费者标签
    3. Envelope(deliveryTag=1, redeliver=false, exchange=rabbitmq.wj, routingKey=add)
    4. #contentHeader<basic>(content-type=UTF-8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
    5. 消息内容:body中的消息内容!

    rabbitmq管理界面显示:

  • 相关阅读:
    hdfs java.io.IOException: Mkdirs failed to create
    Linux文件权限授予
    插入排序
    Oracle中怎样设置表中的主键id递增
    单链表中是否有环之java实现
    excel 单元格的锁定 以及 JXL的实现方式
    用POI的HSSF来控制EXCEL的研究
    Oracle Job 语法和时间间隔的设定(转)
    您的JAVA代码安全吗?(转)
    Jdom 解析 XML【转】
  • 原文地址:https://www.cnblogs.com/lykbk/p/sdfsdfsdfsdf454545454545.html
Copyright © 2011-2022 走看看