zoukankan      html  css  js  c++  java
  • RabbitMq 使用

    记录rabbitmq的简单使用。其中记录了消息的确认与拒绝。

    1.产生消息:

    public void produce() {
        String queueName = "test";
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
    
        Connection con = null;
        Channel channel = null;
        try { 
            con = factory.newConnection();
            channel = con.createChannel();
    
            channel.queueDeclare(queueName, false, false, false, null);
    
            String msg = "hello rabbit2!";
            channel.basicPublish("", queueName, null, msg.getBytes());
            System.out.println("send msg : " + msg);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(channel)
        }
    }    
    

    2.消费消息:

    public void consume() {
        String queueName = "test";
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
    
        Connection con = null;
        Channel channel = null;
    
        try {
            con = factory.newConnection();
            channel = con.createChannel();
    
            channel.queueDeclare(queueName, false, false, false, null);
    
            QueueingConsumer cons = new QueueingConsumer(channel);
            //RabbitMQ客户端接受消息最大数量;如果没有设置会一次消费所有消息
            channel.basicQos(0,1,false); 
            // 向channel中写入属性需要在一下这句程序之前才能起效
            // 当要设置一次消费的最大数量和消息回置时,需要将第二个参数设置为false--代表是否自动ack
            channel.basicConsume(queueName, false, cons);
    
            while (true) {
                Delivery deliver = cons.nextDelivery();
                String msg = new String(deliver.getBody());
    //                if (msg.equals("hello")) {
    //                    //拒绝消息(如果最后一个设置为true,此消息还会存在)
    //                    channel.basicNack(deliver.getEnvelope().getDeliveryTag() , false, true);
    //                }
    //                //确认消息,已经收到  
    //                else {
    //                    channel.basicAck(deliver.getEnvelope().getDeliveryTag()  
    //                            , false);  
    //                }
                // 当autoAck为false时,就需要手动ack
                channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);  
                System.out.println("recive: " + msg);
            }
    
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ShutdownSignalException e) {
            e.printStackTrace();
        } catch (ConsumerCancelledException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            close(channel);
        }
    }
    

    3.产生于消费公共方法,关闭链接:

    private void close(Channel channel) {
        Connection connection = null;
        try {
            connection = channel.getConnection();
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    

      

  • 相关阅读:
    Lambda表达式、解决端口占用问题
    springboot初始化报错: Failed to instantiate [XXX]: Specified class is an interface
    Spring声明式事务配置
    Springboot集成jsp
    点击redisserver.exe闪退
    Spring学习笔记
    Mybatis中 <![CDATA[ ]]> 的使用
    Mybatis学习笔记
    context:annotationconfig与context:componentscan的作用
    Spring学习笔记
  • 原文地址:https://www.cnblogs.com/dengding/p/5900111.html
Copyright © 2011-2022 走看看