zoukankan      html  css  js  c++  java
  • 初识ActiveMQ

      博主之前的一个高并发需求:Java并发(三):实例引出并发应用场景中所提到的,后来经过初步测试发现多线程并不能完全满足需求,特别是性能上的需求,或者说多线程不是比较好的解决方案,真实需求是:将商品库存(第三方数据库上)"及时"通知第三方的网购平台,达到同步商品余量信息的目的,本地是存儲了相应的阈值,在第三方数据库上的库存一旦少于库存,我们就认为这件商品已经售罄,因为要防止线上线下同一时间段销售引起的库存紧张,甚至订单已经发出但库存实际不足的情况...之前多线程定时访问库存并同步数据显然非常低效,主管老哥推荐我使用消息队列来解决问题,顿时一脸懵,消息队列是啥??

    消息队列的基本概念:

      消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。 ——维基百科

    博主使用的消息队列中间件是ActiveMQ,为什么用它呢?

    1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    4. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    5. 支持通过JDBC和journal提供高速的消息持久化
    6. 从设计上保证了高性能的集群,客户端-服务器,点对点等等

    以上的总结比较官方,概括来说,ActiveMQ的优势在于它是Java语言开发,在基于Spring的项目上容易内嵌,很大程度的减少耦合,提供可靠的任务异步处理.

    ActiveMQ的通信模式:

    1.点对点(queue)

    • 一个消息只能被一个服务接收
    • 消息一旦被消费,就会消失
    • 如果没有被消费,就会一直等待,直到被消费
    • 多个服务监听同一个消费空间,先到先得

    2.发布/订阅模式(topic)

    • 一个消息可以被多个服务接收
    • 订阅一个主题的消费者,只能消费自它订阅之后发布的消息
    • 消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)

    如何实现?

    业务需求是用发布-订阅模式完成,我负责消费者部分的代码,一开始是这样实现的,五步走:

    1. 通过连接工厂获取连接对象
    2. 启动连接
    3. 创建session
    4. 创建队列或topic
    5. 注册消息监听
    public class RoundRobinConfig1 {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Resource
        private InventoryService inventoryService;
    
        @Scheduled(cron = "0 53 * * * ?")//每2分钟调度一次任务
        public void operation(){
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer consumer; //创建消费者
    
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    
            try {
                connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                /**
                 * 这里的最好使用Boolean.FALSE,如果是用true则必须commit才能生效,且http://127.0.0.1:8161/admin管理页面才会更新消息队列的变化情况。
                 */
                session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    //            destination=session.createQueue("FirstQueue1"); // 创建消息队列
                destination=session.createTopic("firstTopic");
                consumer=session.createConsumer(destination);
                consumer.setMessageListener(new MyListener()); // 注册消息监听
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    具体业务逻辑写在listener里,大家使用时别忘了引入maven依赖

    <!-- ActiveMQ -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.9.1</version>
            </dependency>

    然后就进行初步测试,喜闻乐见地遇到问题了:

    博主经过一通资料的查阅,依旧没有搞懂问题所在,最后问同事要来了生产者的代码,发现了问题可能出在这里:

    StompJmsConnectionFactory factory = new StompJmsConnectionFactory();

    生产者用了这个连接工厂获取连接,随即百度了一下Stomp,了解到这其实是一种消息格式协议,另外还有AMQP,OPENWIRE,MQTT等,几种消息协议的概述可以戳我,我便换成了StompJmsConnection对象来获取连接,结果成功获取到消息体:

    由于需要让订阅消息队列的程序一直运行,我采取官方推荐的死循环方式处理,并且使其在模块启动时运行,后来考虑了一下,万一死循环出现异常,那整个模块不就宕了吗,于是我给模块创建了一个子进程用来轮询消息队列,这样子进程就算挂了,整个模块也不受影响了:

    import com.google.gson.Gson;
    import com.ycyz.framework.task.domain.Inventorycache;
    import com.ycyz.framework.task.service.InventorycacheService;
    import org.fusesource.hawtbuf.Buffer;
    import org.fusesource.stomp.jms.StompJmsConnectionFactory;
    import org.fusesource.stomp.jms.StompJmsDestination;
    import org.fusesource.stomp.jms.message.StompJmsBytesMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.core.annotation.Order;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * @author YHW
     * @ClassName: AutoRunner
     * @Description:
     * @date 2019/1/7 8:27
     */
    
    @Order(value = 1)
    @Component
    public class AutoRunner implements ApplicationRunner {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        Map map = new HashMap(16);
        String result = null;
        String user = env("ACTIVEMQ_USER", "");
        String password = env("ACTIVEMQ_PASSWORD", "");
        String host = env("ACTIVEMQ_HOST", "域名");
        String destination = "/topic/bn.stock.prod";
        int port = Integer.parseInt(env("ACTIVEMQ_PORT", "端口号"));
        Destination dest = new StompJmsDestination(destination);
    
        @Resource
        private InventorycacheService inventorycacheService;
    
        Gson gson = new Gson();
    
        StompJmsConnectionFactory factory = new StompJmsConnectionFactory();
    
        @Override
        public void run(ApplicationArguments args) throws Exception{
            logger.info("开始运行了...");
            Thread thread = new Thread(){
                @Override
                public void run() {
                    MessageConsumer consumer = null;
                    Connection connection = null;
                    long start = 0L;
                    long count = 0L;
                    try {
                        factory.setBrokerURI("tcp://" + host + ":" + port);
                        connection = factory.createConnection(user, password);
                        connection.start();
                        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                        consumer = session.createConsumer(dest);
                        start = System.currentTimeMillis();
                        count = 1;
                        System.out.println("Waiting for messages...");
    
                        while (true) {
                            System.out.println("轮询消息队列..");
                            Message message = null;
                            try {
                                message = consumer.receive();
                                if (message instanceof StompJmsBytesMessage) {
                                    StompJmsBytesMessage sm = (StompJmsBytesMessage) message;
                                    Buffer buffer = sm.getContent();
                                    byte[] a = buffer.getData();
                                    result = new String(a);if (result.contains("SHUTDOWN")) {
                                        long diff = System.currentTimeMillis() - start;
                                        System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
                                        break;
                                    }
                                    //result是获取到的消息字符串,这里开始处理它
                                }
                            }catch(Exception e) {
                                e.printStackTrace();
                                continue;
                            }
                        }
                        connection.close();
                    }catch(JMSException e) {
                        e.printStackTrace();
                    }
                }
            };
            thread.start();
    
        }
    
        private static String env(String key, String defaultValue) {
            String rc = System.getenv(key);
            if( rc== null ){
                return defaultValue;
            }
            return rc;
        }
    
        private static void flagTrigger(Inventorycache inventorycache){
            if(new Integer(1).equals(inventorycache.getFlag())){
                inventorycache.setFlag(0);
            }else{
                inventorycache.setFlag(1);
            }
        }
    
        private static String getResult(String theWholeMessage){
            int startFlag = 0;
            int endFlag = 0;
            for (int i = 0; i < theWholeMessage.length(); i++) {
                if (theWholeMessage.charAt(i) == '{') {
                    startFlag = i;
                } else if (theWholeMessage.charAt(i) == '}') {
                    endFlag = i;
                }
            }
            return theWholeMessage.substring(startFlag, endFlag + 1);
        }
    }

    这样就初步完成,生产者只管往队列里"塞"待处理消息,消费者只管"拿"消息来处理,做到了有效的应用程序解耦.

    当然我也不确定还有没有更好的方案,博主才疏学浅,懂得太少,希望有看到的大牛能够不吝赐教,谢谢了

  • 相关阅读:
    线程
    unix架构
    Unix命令
    可重入函数reentrant function
    Eclipse 中 program arguments 与 VM arguments 的区别
    Java中Generics的使用
    Java的Reflection机制
    Java按值传递、按引用传递
    Java label
    LeetCode Merge Intervals
  • 原文地址:https://www.cnblogs.com/Joey44/p/10236997.html
Copyright © 2011-2022 走看看