zoukankan      html  css  js  c++  java
  • ActiveMq性能优化

    ActiveMq运行是比较稳定的,数据的吞吐速度也很高,如果出现入队列或者出队列慢的问题,先检查一下自己的代码,是不是本身取到数据后处理过慢。

    本文的关于性能优化,其实是列举出一些需要注意的点,请确保你的项目没有一下问题:

    1. 使用spring的JmsTemplate

     JmsTemplate的send和convertAndSend会使用持久化mode,即使你设置了NON_PERSISTENT。这会导致入队列速度变得非常慢。

     解决办法,使用下面的MyJmsTemplate代替JmsTemplate。

    public class MyJmsTemplate extends JmsTemplate {
        private Session session;
    
        public MyJmsTemplate() {
            super();
        }
    
        public MyJmsTemplate(ConnectionFactory connectionFactory) {
            super(connectionFactory);
        }
    
        public void doSend(MessageProducer producer, Message message) throws JMSException {
            if (isExplicitQosEnabled()) {
                producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive());
            } else {
                producer.send(message);
            }
        }
    
        public Session getSession() {
            return session;
        }
    
        public void setSession(Session session) {
            this.session = session;
        }
    }

    2. DeliveryMode的选择,如果你入队列的数据,不考虑MQ挂掉的情况(这概率很小),使用NON_PERSISTENT会显著提高数据写入速度。

    3. 生产者使用事物会提高入队列性能,但是消费者如果启动了事物则会显著影响数据的消费速度。相关代码如下:

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    代码中的false代表不启动事物。

    4. 消费者的消息处理即onMessage方法优化,举例如下:

    public class SmsMoPool implements MessageListener {
        private final static Logger logger = LoggerFactory.getLogger(SmsMoPool.class);
        private DefaultEventPubliser moEventPublisher;
        private final EventFactory eventFactory = new DefaultEventFactory();
        private DefaultDataGather dataGather;
        private ExecutorService pool = Executors.newFixedThreadPool(5);
    
        @Override
        public void onMessage(final Message message) {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    final ObjectMessage msg = (ObjectMessage) message;
                    Serializable obj = null;
                    try {
                        obj = msg.getObject();
                    } catch (JMSException e) {
                        logger.error("从消息队列获得上行信息异常{}", e);
                    }
                    if (obj != null) {
                        dataGather.incrementDateCount(MasEntityConstants.TRAFFIC_SMS_MO_IN);
                        AgentToServerReq req = (AgentToServerReq) obj;
                        if (logger.isInfoEnabled()) {
                            logger.info("驱动-->调度:{}", req.toXmlStr());
                        }
                        Event event = eventFactory.createMoEvent(req);
                        moEventPublisher.publishEvent(event);
                    }
                }
            });
        }
    }

    这段代码使用了线程池,另一点要注意的是msg.getObject();这个方法是一个比较耗时的方法,你的代码中不应该出现多次getObject()。

    5. 消费者使用预取,如何使用预取,下面以spring版本为例

      <bean class="org.apache.activemq.command.ActiveMQQueue">
                    <constructor-arg value="data.mo?consumer.prefetchSize=100"/>
                </bean>

    预取数量根据具体入队列数据而定,以上设置100,是针对2000/sec入队列速度设定的。
    另外如果是慢消费者,这里可设置为1。

    6. 检查你的MQ数据吞吐速度,保持生产和消费的平衡,不会出现大量积压。

    7. ActiveMQ使用TCP协议时 tcpNoDelay=默认是false ,设置为true可以提高性能。

    还是spring版本的:

     <bean id="mqPoolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
            <property name="connectionFactory">
                <bean id="mqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:useAsyncSend="true"
                      p:brokerURL="failover://(tcp://127.0.0.1:61616?tcpNoDelay=true)"/>
            </property>
        </bean>


     

  • 相关阅读:
    js中replace的正则替换
    ios沙盒路径
    Android开源框架
    小知识点
    __NSCFConstantString && __NSPlaceholderDictionary
    iq 格式分析
    C 函数
    Xcode报错
    XMPP Server
    H5网站借鉴
  • 原文地址:https://www.cnblogs.com/zhishan/p/2762672.html
Copyright © 2011-2022 走看看