zoukankan      html  css  js  c++  java
  • JMS生产者+单线程发送-我们到底能走多远系列(29)

    我们到底能走多远系列(29)

    扯淡:

      然后我俩各自一端/望着大河弯弯/终于敢放胆/嘻皮笑脸/面对/人生的难”      --- 《山丘》

      迎着风/迎向远方的天空/路上也有艰难/也有那解脱/都走得从容”                    --- 《与你到永久

      “遇上冷风雨休太认真/自信满心里休理会讽刺与质问/笑骂由人洒脱地做人/少年人洒脱地做人/继续行洒脱地做人”      ---《沉默是金》

      

     主题:

      使用JMS将共通模块分离出去,比如发短信模块,可以在远程的机器上跑customer,然后各个应用使用发短信功能是只要向远程机器发送msg即可。

      类似于下图:

      对于图中的Producer的实现都差不多,主要是选择什么样的Jms第三方实现。对于Customer我们不必关心.

      比如下面的代码是HornetQ的Producer的样例代码:

    public class JmsProducer implements ExceptionListener,FailureListener{
    
        private final Logger logger = LoggerFactory.getLogger(JmsProducer.class);
        private String queueName;
        private String jmsHost;
        private int jmsPort;
        private ConnectionFactory cf;
        private Queue queue;
        private Connection queueConnection;
        private Session queueSession;
        private MessageProducer queueProducer;
    
        public void init() throws Exception {
            queue = HornetQJMSClient.createQueue(queueName);
            Map<String, Object> connectionParams = new HashMap<String, Object>();
            connectionParams.put(TransportConstants.PORT_PROP_NAME, jmsPort);
            connectionParams.put(TransportConstants.HOST_PROP_NAME, jmsHost);
            TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),
                                                                                       connectionParams);
            HornetQConnectionFactory hornetQConnectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
            hornetQConnectionFactory.setClientFailureCheckPeriod(60000);
            hornetQConnectionFactory.setRetryInterval(2000);          // 2 seconds for first retry
            hornetQConnectionFactory.setRetryIntervalMultiplier(1.5); // 1.5 times loner betrween retrys
            hornetQConnectionFactory.setMaxRetryInterval(20000);      // Wait max 20 secs between retrys
            hornetQConnectionFactory.setReconnectAttempts(-1);        // Retry forever
            hornetQConnectionFactory.setConnectionTTL(60000);         //The default value for connection ttl is 60000ms
            hornetQConnectionFactory.setClientFailureCheckPeriod(30000);//The default value for client failure check period is 30000ms
            cf = (ConnectionFactory)hornetQConnectionFactory;
            queueConnection = cf.createConnection();
            queueSession = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            queueProducer = queueSession.createProducer(queue);
            queueProducer.setTimeToLive(6000000);//100分钟失效
            queueProducer.setDisableMessageID(true);//关闭消息id
            queueProducer.setDisableMessageTimestamp(true);//关闭消息的时间戳
            logger.info("init JmsProducer of "+queueName);
            //queueConnection.start();
        }
    
        public void reConnect(){
            logger.info(queueName+" reConnect");
        }
    
        public void destroy() throws Exception {
            logger.info("destroy JmsProducer of "+queueName);
            if(queueSession != null){
                queueSession.close();
                queueSession = null;
            }
            if(queueConnection != null){
                queueConnection.close();
                queueConnection = null;
            }
        }
    
        public String getQueueName() {
            return queueName;
        }
    
        public void setQueueName(String queueName) {
            this.queueName = queueName;
        }
    
        public String getJmsHost() {
            return jmsHost;
        }
    
        public void setJmsHost(String jmsHost) {
            this.jmsHost = jmsHost;
        }
    
        public int getJmsPort() {
            return jmsPort;
        }
    
        public void setJmsPort(int jmsPort) {
            this.jmsPort = jmsPort;
        }
    
        public Session getQueueSession() {
            return queueSession;
        }
    
        public void sendTextMessage(final TextMessage textMessage) throws JMSException {
            try {
                queueProducer.send(textMessage);
            } catch (Exception e) {
                // TODO: handle exception
                logger.error("on sendTextMessage Exception="+e.getMessage());
            }
        }
    
        public void onException(JMSException jmsex) {
            // TODO Auto-generated method stub
            logger.warn("on JmsProducer Exception="+jmsex.getMessage());
        }
    
        public void connectionFailed(HornetQException hqex, boolean arg1) {
            // TODO Auto-generated method stub
            logger.error("on JmsProducer connectionFailed,arg1="+arg1+",Exception="+hqex.getMessage());
        }
    }

       一般性的,我们利用Spring 把这个JmsProducer 注入进自己的业务类里去使用即可:

      spring的bean配置:

        <bean id="jmsCodeProducer" class="com.sz.lvban.biz.bo.util.JmsProducer" init-method="init">
            <property name="jmsHost" value="${jms.send.code.host}" />
            <property name="jmsPort" value="${jms.send.code.port}" />
            <property name="queueName" value="${jms.send.code.queueName}" />
        </bean>
        @Autowired
        private JmsProducer                  jmsCodeProducer;

      某方法直接调用:

                    textMsg = jmsCodeProducer.getQueueSession().createTextMessage();
                    textMsg.setText(smsJson.toJSONString());
                    jmsCodeProducer.sendTextMessage(textMsg);

      这样就实现了让服务器上的customer干活的工作了。

    然后,我们发现spring默认注入jmsCodeProducer使用了单例的模式,这样一来我们就可能考虑多线程调用冲突的问题。然而我们不能避免jmsCodeProducer的单例,毕竟init-method="init" 的init方法有点消耗的。

    所以就搞了下面的方案:(上图的题部分)

    使用一个queue做中间站,只要保证单线程从queue中取数据,就能实现一条条向远程服务器发送jms消息。

    下面是一个实现的例子:

    我们先配置一个ApnsMsgSender,有他来控制queue的行为,包括插入,取出数据,以及发送jms消息。

        <bean id="ApnsMsgSender" class="com.sz.wxassistant.biz.bo.util.ApnsMsgSender" init-method="sendMsg">
            <property name="jmsApnsProducer" ref="jmsApnsProducer"></property>
        </bean>    

    注意init-method="sendMsg" 启动时,我们就需要启动一个线程来监控queue。

    结合下代码:在这里我们使用了LinkedBlockingQueue,关于ArrayBlockingQueue和LinkedBlockingQueue之间的取舍,我没有实际测试过。

    public class ApnsMsgSender {
    
        // private ArrayBlockingQueue<TextMessage> queue = new
        // ArrayBlockingQueue<TextMessage>(1024);
        private LinkedBlockingQueue<TextMessage> jmsQueue = new LinkedBlockingQueue<TextMessage>();
        private JmsProducer                      jmsApnsProducer;
        private ExecutorService                  pool;
        private Logger                           log      = LoggerFactory.getLogger("ApnsMsgSender");
    
        /**
         * 启动入口
         */
        public void sendMsg() {
            pool = Executors.newCachedThreadPool(new MyThreadFactory());
            pool.submit(new JmsSender());
        }
    
        public boolean addJms(TextMessage msg) {
            return jmsQueue.offer(msg);
        }
    
        public TextMessage getMsg() {
            TextMessage msg = null;
            try {
                // 取msg 10秒超时设置
                msg = jmsQueue.poll(10, TimeUnit.SECONDS);
            } catch (InterruptedException interuptedE) {
                log.warn("poll jms error" + interuptedE);
            } catch (Exception e) {
                log.error("poll jms get unknown error: ", e);
            }
            return msg;
        }
    
        public JmsProducer getJmsApnsProducer() {
            return jmsApnsProducer;
        }
    
        public void setJmsApnsProducer(JmsProducer jmsApnsProducer) {
            this.jmsApnsProducer = jmsApnsProducer;
        }
    
        public TextMessage genTextMessage() throws JMSException {
            return jmsApnsProducer.getQueueSession().createTextMessage();
        }
    
        private class JmsSender implements Runnable {
            public void run() {
                while (true) {
                    try {
                        // 从queue中取msg
                        TextMessage msg = getMsg();
                        if (msg != null && msg instanceof TextMessage) {
                            // 发送
                            jmsApnsProducer.sendTextMessage(msg);
                        }
                    } catch (JMSException jmsE) {
                        log.error("send jms error: " + jmsE);
                    } catch (Exception e) {
                        log.error("get unknown error: ", e);
                    }
                }
            }
        }
    
        class MyThreadFactory implements ThreadFactory {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                // 线程设置为后台进程
                thread.setDaemon(true);
                thread.setName("ApnsMsgSender");
                return thread;
            }
        }
    }

    解释:

      我们的这个线程做了什么?

        1,  getMsg()

        2,  sendTextMessage(msg)

      很明了的实现....

    注:代码中还使用了ThreadFactory 来封装了一下线程。

      外界代码调用怎么搞?

        1,addJms 就可以了 

    只负责向queue里放,这样再多的线程都没有关系了。

    TextMessage msg = apnsMsgSender.genTextMessage();
                    msg.setText("I love los angeles !");
                    apnsMsgSender.addJms(msg);


    ok。到这里就实现了单线程发送jms消息的功能。

    让我们继续前行

    ----------------------------------------------------------------------

    努力不一定成功,但不努力肯定不会成功。
    共勉。

  • 相关阅读:
    OpenCV 脸部跟踪(2)
    OpenCV 脸部跟踪(1)
    opencv2.4中SVD分解的几种调用方法
    OpenCV教程(42) xml/yaml文件的读写
    OpenCV教程(41) 人脸特征检测
    OpenCV学习(40) 人脸识别(4)
    OpenCV学习(39) OpenCV中的LBP图像
    目标检测的图像特征提取之(三)Haar特征
    目标检测的图像特征提取之(二)LBP特征
    目标检测的图像特征提取之(一)HOG特征
  • 原文地址:https://www.cnblogs.com/killbug/p/3180827.html
Copyright © 2011-2022 走看看