zoukankan      html  css  js  c++  java
  • RabbitMQ(二) Java使用RabbitMQ

    2-1 RabbitMQ 生产者消息发送

    创建 Maven 项目 Send

    加入依赖

     <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.1.1</version>
      </dependency>

    send代码贴图:

    SendTest.java

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class SendTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建链接工厂对象
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.171.143");//设置RabbitMQ的主机IP
            factory.setPort(5672);//设置RabbitMQ的端口号
            factory.setUsername("root");//设置访问用户名
            factory.setPassword("root");//设置访问密码
            Connection connection=null;//定义链接对象
            Channel channel=null;//定义通道对象
            connection=factory.newConnection();//实例化链接对象
            channel=connection.createChannel();//实例化通道对象
            String message ="Hello World!3";
            //发送消息到RabbitMQ
            //参数1 我们自定义的交换机名称
            //参数2 自定义的RoutingKey值
            //参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的
            //参数4 具体要发送的消息信息
            //定义消息属性对象 设置deliveryMode(1) 表示消息不需要持久化  deliveryMode(2)需要持久化
            //在性能持久化消息要低于非持久化消息,在安全上持久化消息在服务器宕机之后还可以保留因此安全性相对要高一些
            AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder().deliveryMode(2).build();
            channel.basicPublish("myExchange","myRoutingKey",properties,message.getBytes("UTF-8"));
            System.out.println("消息发送成功: "+message);
            channel.close();
            connection.close();
        }
    }

    2-2 RabbitMQ 消费者接收消息
    创建 Maven 项目 Receive
    ReceiveTest.ja

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建链接工厂对象
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.171.143");
            factory.setPort(5672);
            factory.setUsername("root");
            factory.setPassword("root");
            Connection connection=null;
            Channel channel=null;
            connection=factory.newConnection();
            channel=connection.createChannel();
            //创建消费者对象
            Consumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message=new String(body,"UTF-8");
                    System.out.println("接收到的消息:"+message);
                }
            };
            //将消费者对象设置到指定通道中,并监听某个队列,如果队列有新的消息则直接调用consumer对象中的handleDelivery方法获取消息
            //参数1 消息队列的名称
            //参数2 接收消息以后是否将消息从队列中移除 建议使用true 当消息处理完成后会自动的清空被处理过的消息,
            // 可以实现解决消息被重复消费的问题
            //参数3 设定由哪个消费监听消息
            channel.basicConsume("myQueue",true,consumer);
        }
    }

    2-3 消息持久化

    2-4 消息确认
    ReceiveTest.java

  • 相关阅读:
    Linux之文件处理命令
    Linux基础命令
    rip实验
    Linux基础之磁盘分区
    mysql安装
    centos Apache、php、mysql默认安装路径
    You probably tried to upload too large file. Please refer to documentation for ways to workaround this limit.
    Wrong permissions on configuration file, should not be world writable!
    机器会学习么 学习总结
    实验 5 Spark SQL 编程初级实践
  • 原文地址:https://www.cnblogs.com/shenlailai/p/10806610.html
Copyright © 2011-2022 走看看