zoukankan      html  css  js  c++  java
  • RabbitMQ安装以及java使用(二)

    上一篇记录了rabbitmq的安装,这一篇记录一下rabbitmq的java客户端的简单使用,当然在项目中我们有更为复杂的应用场景,这里只有最简单的点对点生产者与消费者模式。

    1、建立工程

    首先建立一个简单的maven工程,我这边使用了平时使用的demo工程

    pom.xml配置,本次案例中只需要两个包即可,是用commons包的序列化,amqp则是rabbitmq的java包。


    2、新建点对点抽象类

    因为这个例子只讲述非常简单的点对点生产者与消费者关系,在某种程度上两者有很多共性,所以这里干脆抽象成一个类了。具体代码如下:

    package ucs_test.rabbitmq;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /** 
    * @author  作者 ucs_fuqing
    * @date 创建时间:2017年8月11日 下午2:21:27 
    * @version 1.0 
    * @parameter  
    * @since  
    * @return  
    */
    public abstract class PointToPoint {
        protected Channel channel;
        protected Connection connection;
        protected String pointName;
        
        /**
         * 获取一个队列的连接
         * @param pointName
         * @throws IOException
         */
        public PointToPoint(String pointName) throws IOException{
            this.pointName = pointName;
            
            //创建连接工厂
            ConnectionFactory cf = new ConnectionFactory();
            
            //设置rabbitmq服务器地址
            cf.setHost("192.168.149.133");
            
            //设置rabbitmq服务器用户名
            cf.setUsername("hxb");
            
            //设置rabbitmq服务器密码
            cf.setPassword("hxb");
            
            //获取一个新的连接
            connection = cf.newConnection();
            
            //创建一个通道
            channel = connection.createChannel();
            
            //申明一个队列,如果这个队列不存在,将会被创建
            channel.queueDeclare(pointName, false, false, false, null);
        }
        
        
        /**
         * 
        * @Title: close 
        * @Description: 其实在程序完成时一般会自动关闭连接,但是这里提供手动操作的入口, 
        * @param @throws IOException    设定文件 
        * @return void    返回类型 
        * @throws
         */
        public void close() throws IOException{
            this.channel.close();
            this.connection.close();
        }
    }

    在上面代码中,实现的是创建一个队列或者关闭它,在默认的情况下channel和connection会自动关闭,但是我觉得还是提供手动关闭的入口更好一些。


    3、生产者

    这个例子中的生产者其实非常简单,我们创建了一个连接,并且获取了通道,接下来就可以直接往我们指定的队列(queue)中发送消息了,如果这个队列不存在,则会被程序自动创建。

    package ucs_test.rabbitmq;
    
    import java.io.IOException;
    import java.io.Serializable;
    
    import org.apache.commons.lang.SerializationUtils;
    
    import com.mchange.io.SerializableUtils;
    
    /** 
    * @author  作者 ucs_fuqing
    * @date 创建时间:2017年8月11日 下午2:33:13 
    * @version 1.0 
    * @parameter  
    * @since  
    * @return  
    */
    public class Producer extends PointToPoint{
    
        public Producer(String pointName) throws IOException {
            super(pointName);
            // TODO Auto-generated constructor stub
        }
        
        /**
         * 
        * @Title: sendMessage 
        * @Description: 生产消息
        * @param @param Object
        * @param @throws IOException    设定文件 
        * @return void    返回类型 
        * @throws
         */
        public void sendMessage(Serializable Object) throws IOException{
            channel.basicPublish("", pointName, null, SerializationUtils.serialize(Object));
        }
    }

    上面代码看到,我们只是简单的向pointName的队列发送了一个对象。


    4、消费者

    我们这里的消费者也非常简单,仅仅只是拿到并打印出消息即可

    package ucs_test.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.SerializationUtils;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.ShutdownSignalException;
    
    /** 
    * @author  作者 ucs_fuqing
    * @date 创建时间:2017年8月11日 下午2:39:51 
    * @version 1.0 
    * @parameter  
    * @since  
    * @return  
    */
    public class QueueConsumer extends PointToPoint implements Runnable,Consumer{
    
        public QueueConsumer(String pointName) throws IOException {
            super(pointName);
            // TODO Auto-generated constructor stub
        }
        
        public void run(){
            try {
                channel.basicConsume(pointName,true,this);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        @Override
        public void handleConsumeOk(String consumerTag) {
            // TODO Auto-generated method stub
            System.out.println("Consumer "+consumerTag +" registered");
            
        }
    
        @Override
        public void handleCancelOk(String consumerTag) {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public void handleCancel(String consumerTag) throws IOException {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException {
            // TODO Auto-generated method stub
            Map map = (HashMap)SerializationUtils.deserialize(body);
            System.out.println("Message Number "+ map.get("tagId") + " received.");
            //channel.basicAck(env.getDeliveryTag(), false);
        }
    
        @Override
        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public void handleRecoverOk(String consumerTag) {
            // TODO Auto-generated method stub
            
        }
    
    }

    以上代码中,我们指定了消费的队列,并从中拿到消息,打印出来。


    5、测试类

    至此我们的生产者与消费者都写完了,接着写个测试类来验证一下

    package ucs_test.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    /** 
    * @author  作者 ucs_fuqing
    * @date 创建时间:2017年8月11日 下午2:44:59 
    * @version 1.0 
    * @parameter  
    * @since  
    * @return  
    */
    public class MainTest {
        public MainTest() throws IOException{
            QueueConsumer consumer = new QueueConsumer("testqueue");
            Thread cuThread = new Thread(consumer);
            
            cuThread.start();
            
            Producer producer = new Producer("testqueue");
            int i = 0;
            while (i<10000) {
                HashMap<String, Object> hm = new HashMap<>();
                hm.put("tagId", i);
                producer.sendMessage(hm);
                System.out.println("发送第"+i+"消息");
                i++;
            }
        }
        
        public static void main(String[] args) throws IOException {
            new MainTest();
        }
    }

    在这里我们的生产者生产10000条消息,消费者拿到并打印出来。看看运行结果:

    可以看到虽然有点乱序,但是10000条消息全部被消费完毕。


    6、消息应答

    在上面的例子中,我们的生产者只管发送消息,消费者只管消费消息,而RabbitMQ在上面的例子中,将消息交付给消费者之后,会从内存中移除掉这个消息。在正式的项目中,消费消息可能需要那么几秒钟,那么问题来了:如果我们拿到消息后需要进行更为复杂的业务处理,而这个业务处理失败或者中断了,那么意味着这条消息代表的工作并未完成,但是消息已经不存在了,我们会丢失掉正在处理的信息,也会丢失掉发给消费者但是并未被消费的消息。
    现在我们使用两个消费者来接受同一个队列的消息,测试类如下:

    package ucs_test.rabbitmq;
    
    import java.io.IOException;
    import java.util.HashMap;
    
    /** 
    * @author  作者 ucs_fuqing
    * @date 创建时间:2017年8月11日 下午2:44:59 
    * @version 1.0 
    * @parameter  
    * @since  
    * @return  
    */
    public class MainTest {
        public MainTest() throws IOException{
            QueueConsumer consumer = new QueueConsumer("testqueue");
            Thread cuThread = new Thread(consumer);
            QueueConsumer consumer2 = new QueueConsumer("testqueue");
            Thread cuThread2 = new Thread(consumer2);
            cuThread.start();
            cuThread2.start();
            Producer producer = new Producer("testqueue");
            int i = 0;
            while (i<10000) {
                HashMap<String, Object> hm = new HashMap<>();
                hm.put("tagId", i);
                producer.sendMessage(hm);
                //System.out.println("发送第"+i+"消息");
                i++;
            }
        }
        
        public static void main(String[] args) throws IOException {
            new MainTest();
        }
    }

    在这种情况下,MQ将会均匀的将消息发送给两个消费者消费,但是如果consumer2半路终止或者异常,那么将会导致我们的测试结果显示接受到的消息少于10000条,消失的消息被异常的消费者吃掉了,而我们没有任何办法。。。

    为了保证消息不会丢失,或者说肯定被消费,RabbitMQ支持消息应答模式,简单的只需要修改两个位置:

    消费者类QueueConsumer中

    设置basicConsume方法参数为false,打开消息应答

    消费完成之后,向mq返回应答消息。

    这样,当消费者异常时,MQ没有收到消费者消息应答,将会把消息发送给其他消费者,保证这条消息被消费掉。

    OK,简单的RabbitMQ服务器Java端例子就这样了。下一篇会在此基础上增加一些高级的应用。

  • 相关阅读:
    面向对象、构造函数的区别
    写一个function,清除字符串前后的空格。(兼容所有浏览器)
    两个DIV高度自适应方法(左右两个DIV高度一样)
    js数组去重
    input框处理删除小图标的功能
    查找显示高亮
    JSON.parse()和JSON.stringify()
    jquery封装
    怎么理解HTML语义化
    html5语义化标签
  • 原文地址:https://www.cnblogs.com/mirakel/p/7346595.html
Copyright © 2011-2022 走看看