zoukankan      html  css  js  c++  java
  • rabbitmq 工作模式、简单模式

    1.依赖

            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
            <!-- 消息队列 -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.5.1</version>
            </dependency>

    2.消息提供者

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.junit.Test;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitmqTest {
        @Test
        public void sendMsg() throws IOException{
            //1 获取连接
            ConnectionFactory conFactory = 
                    new ConnectionFactory();
            conFactory.setVirtualHost("/jt");
            conFactory.setHost("192.168.174.141");
            conFactory.setPort(5672);
            conFactory.setUsername("jtadmin");
            conFactory.setPassword("jtadmin");
            Connection con = conFactory.newConnection();
            //2 得到channel
            Channel channle = con.createChannel();
    
            //3 创建队列
            String queue="orderQueue4";
            boolean durable=true;//队列会保存到硬盘
            boolean exclusive=false;//别的程序也能访问
            boolean autoDelete=false;//队列中的消息处理完了,不删除
            HashMap<String, Object> arguments=null;//队列的配置数据
            channle.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
    
            //4 写数据
            String msg="order4";
            String exchange="";//默认使用default exchange
            String routingKey=queue;//使用队列名当做key
            //        BasicProperties props=null;//消息是否序列到硬盘上
            BasicProperties.Builder builder = new BasicProperties().builder();
            //值为2,把消息保存到硬盘上
            for(int i=1;i<=10;i++){
                builder.deliveryMode(2);
                BasicProperties props = builder.build();
                channle.basicPublish(exchange, routingKey, props, (msg+" "+i).getBytes());
            }
    
    
            //5关闭连接
            con.close();
            //        while (true) {
            //            
            //            
            //        }
        }
    }

    3.消息使用者

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    import org.junit.Test;
    
    import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class RabbitmqConsumer {
    
        @Test
        public void getMsg() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            //获取连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setVirtualHost("/jt");
            factory.setHost("192.168.174.141");
            factory.setPort(5672);
            factory.setUsername("jtadmin");
            factory.setPassword("jtadmin");
            Connection connection = factory.newConnection();
            //获取通道
            Channel channle = connection.createChannel();
            //创建队列
            String queue="orderQueue4";
            boolean durable=true;//队列保存到硬盘
            boolean exclusive=false;//别的程序也能访问
            boolean autoDelete=false;//队列中的消息处理完了,不删除
            HashMap<String, Object> arguments=null;
            channle.queueDeclare(queue,
                    durable, exclusive, autoDelete, arguments);
    
            //读数据
            QueueingConsumer consumer=new QueueingConsumer(channle);
            //自动确认,消费者收到消息,就自动给消息队列服务器发确认信息
            //消息队列服务器就会删除信息。
            boolean autoAck=false;
            //从orderQueue1队列取消息,消息放到consumser
            channle.basicConsume(queue, autoAck, consumer);
            
            //z启动了消费者
            System.out.println("启动了消费者");
            boolean isRunning=true;
            while(isRunning){
    //            Thread.currentThread().sleep(1500);
                
                Delivery delivery = consumer.nextDelivery();
                //手动确认
                long tag=delivery.getEnvelope().getDeliveryTag();
                System.out.println(tag);
                
                byte[] body = delivery.getBody();
                String reuslt=new String(body);
                //告诉队列已取到了数据
                channle.basicAck(tag, true);
                System.out.println("result :"+reuslt);
            }
                
    
        }
    }

    4.总结

    在此模式下,消息未确认仍能接收消息,但服务器会显示消息未确认

  • 相关阅读:
    python之处理excel表格
    Django下JWT的使用
    Flask的上下文管理
    模块循环引用
    博客声明及意义
    IPTABLES--iptables
    挂载U盘
    tar压缩解压
    iconv字符编码转换
    VirtualBox设置共享文件夹
  • 原文地址:https://www.cnblogs.com/gxlaqj/p/11630575.html
Copyright © 2011-2022 走看看