zoukankan      html  css  js  c++  java
  • RabbitMQ(二) Java使用RabbitMQ

    2-1 RabbitMQ 生产者消息发送

    创建 Maven 项目 Send

    加入依赖

     <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.1.1</version>
      </dependency>

    send代码贴图:

    SendTest.java

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class SendTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建链接工厂对象
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.171.143");//设置RabbitMQ的主机IP
            factory.setPort(5672);//设置RabbitMQ的端口号
            factory.setUsername("root");//设置访问用户名
            factory.setPassword("root");//设置访问密码
            Connection connection=null;//定义链接对象
            Channel channel=null;//定义通道对象
            connection=factory.newConnection();//实例化链接对象
            channel=connection.createChannel();//实例化通道对象
            String message ="Hello World!3";
            //发送消息到RabbitMQ
            //参数1 我们自定义的交换机名称
            //参数2 自定义的RoutingKey值
            //参数3 设置消息的属性,可以通过消息属性设置消息是否是持久化的
            //参数4 具体要发送的消息信息
            //定义消息属性对象 设置deliveryMode(1) 表示消息不需要持久化  deliveryMode(2)需要持久化
            //在性能持久化消息要低于非持久化消息,在安全上持久化消息在服务器宕机之后还可以保留因此安全性相对要高一些
            AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder().deliveryMode(2).build();
            channel.basicPublish("myExchange","myRoutingKey",properties,message.getBytes("UTF-8"));
            System.out.println("消息发送成功: "+message);
            channel.close();
            connection.close();
        }
    }

    2-2 RabbitMQ 消费者接收消息
    创建 Maven 项目 Receive
    ReceiveTest.ja

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ReceiveTest {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建链接工厂对象
            ConnectionFactory factory=new ConnectionFactory();
            factory.setHost("192.168.171.143");
            factory.setPort(5672);
            factory.setUsername("root");
            factory.setPassword("root");
            Connection connection=null;
            Channel channel=null;
            connection=factory.newConnection();
            channel=connection.createChannel();
            //创建消费者对象
            Consumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message=new String(body,"UTF-8");
                    System.out.println("接收到的消息:"+message);
                }
            };
            //将消费者对象设置到指定通道中,并监听某个队列,如果队列有新的消息则直接调用consumer对象中的handleDelivery方法获取消息
            //参数1 消息队列的名称
            //参数2 接收消息以后是否将消息从队列中移除 建议使用true 当消息处理完成后会自动的清空被处理过的消息,
            // 可以实现解决消息被重复消费的问题
            //参数3 设定由哪个消费监听消息
            channel.basicConsume("myQueue",true,consumer);
        }
    }

    2-3 消息持久化

    2-4 消息确认
    ReceiveTest.java

  • 相关阅读:
    Win10升级后无法删除Windows.old文件夹
    修改Window服务器虚拟内存位置
    快速修改Windows系统密码命令
    本机无法连通虚拟机但是虚拟机之间可以连通问题记录
    Windows删除文件夹下的指定格式文件(递归删除)
    Xshell连接SqlPlus无法使用退格、删除键
    Spring SpringMVC SpringBoot SpringCloud概念、关系及区别
    关于接口设计的一些思考
    SpringCloud 在Feign上使用Hystrix(断路由)
    Docker-Compose入门
  • 原文地址:https://www.cnblogs.com/shenlailai/p/10806610.html
Copyright © 2011-2022 走看看