zoukankan      html  css  js  c++  java
  • rabbitmq 工作模式、简单模式

    1.依赖

            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
    
            <!-- 消息队列 -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.5.1</version>
            </dependency>

    2.消息提供者

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.junit.Test;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabbitmqTest {
        @Test
        public void sendMsg() throws IOException{
            //1 获取连接
            ConnectionFactory conFactory = 
                    new ConnectionFactory();
            conFactory.setVirtualHost("/jt");
            conFactory.setHost("192.168.174.141");
            conFactory.setPort(5672);
            conFactory.setUsername("jtadmin");
            conFactory.setPassword("jtadmin");
            Connection con = conFactory.newConnection();
            //2 得到channel
            Channel channle = con.createChannel();
    
            //3 创建队列
            String queue="orderQueue4";
            boolean durable=true;//队列会保存到硬盘
            boolean exclusive=false;//别的程序也能访问
            boolean autoDelete=false;//队列中的消息处理完了,不删除
            HashMap<String, Object> arguments=null;//队列的配置数据
            channle.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
    
            //4 写数据
            String msg="order4";
            String exchange="";//默认使用default exchange
            String routingKey=queue;//使用队列名当做key
            //        BasicProperties props=null;//消息是否序列到硬盘上
            BasicProperties.Builder builder = new BasicProperties().builder();
            //值为2,把消息保存到硬盘上
            for(int i=1;i<=10;i++){
                builder.deliveryMode(2);
                BasicProperties props = builder.build();
                channle.basicPublish(exchange, routingKey, props, (msg+" "+i).getBytes());
            }
    
    
            //5关闭连接
            con.close();
            //        while (true) {
            //            
            //            
            //        }
        }
    }

    3.消息使用者

    package com.jt.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    import org.junit.Test;
    
    import com.rabbitmq.client.AMQP.Queue.DeclareOk;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class RabbitmqConsumer {
    
        @Test
        public void getMsg() throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException{
            //获取连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setVirtualHost("/jt");
            factory.setHost("192.168.174.141");
            factory.setPort(5672);
            factory.setUsername("jtadmin");
            factory.setPassword("jtadmin");
            Connection connection = factory.newConnection();
            //获取通道
            Channel channle = connection.createChannel();
            //创建队列
            String queue="orderQueue4";
            boolean durable=true;//队列保存到硬盘
            boolean exclusive=false;//别的程序也能访问
            boolean autoDelete=false;//队列中的消息处理完了,不删除
            HashMap<String, Object> arguments=null;
            channle.queueDeclare(queue,
                    durable, exclusive, autoDelete, arguments);
    
            //读数据
            QueueingConsumer consumer=new QueueingConsumer(channle);
            //自动确认,消费者收到消息,就自动给消息队列服务器发确认信息
            //消息队列服务器就会删除信息。
            boolean autoAck=false;
            //从orderQueue1队列取消息,消息放到consumser
            channle.basicConsume(queue, autoAck, consumer);
            
            //z启动了消费者
            System.out.println("启动了消费者");
            boolean isRunning=true;
            while(isRunning){
    //            Thread.currentThread().sleep(1500);
                
                Delivery delivery = consumer.nextDelivery();
                //手动确认
                long tag=delivery.getEnvelope().getDeliveryTag();
                System.out.println(tag);
                
                byte[] body = delivery.getBody();
                String reuslt=new String(body);
                //告诉队列已取到了数据
                channle.basicAck(tag, true);
                System.out.println("result :"+reuslt);
            }
                
    
        }
    }

    4.总结

    在此模式下,消息未确认仍能接收消息,但服务器会显示消息未确认

  • 相关阅读:
    加入创业公司有什么利弊
    Find Minimum in Rotated Sorted Array II
    Search in Rotated Sorted Array II
    Search in Rotated Sorted Array
    Find Minimum in Rotated Sorted Array
    Remove Duplicates from Sorted Array
    Spiral Matrix
    Spiral Matrix II
    Symmetric Tree
    Rotate Image
  • 原文地址:https://www.cnblogs.com/gxlaqj/p/11630575.html
Copyright © 2011-2022 走看看