zoukankan      html  css  js  c++  java
  • 一个简单的jms点对点示例程序

    不使用maven及spring,创建简单的JMS点对点示例

    1.准备工作:下载activeMQ的jar包,本文使用最新稳定版5.14.2;创建java工程,导入jar包

    2.在工程下创建jndi.properties配置文件

    java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
    java.naming.provider.url = tcp://localhost:61616
    java.naming.security.principal=system
    java.naming.security.credentials=manager
    
    connectionFactoryNames=QueueCF
    queue.LoanRequestQ=jms.LoanRequestQ
    queue.LoanResponseQ=jms.LoanResponseQ

    3.在active的config目录下,修改activemq.xml,在<broker>下添加,启动activemq.bat

    <destinations>
      <queue name="LoanRequestQ" physicalName="jms.LoanRequestQ"/>
      <queue name="LoanResponseQ" physicalName="jms.LoanResponseQ"/>
    </destinations>

    4.借贷方:

    public class QBorrower {
        
        private QueueConnection qConnect = null;    
        private QueueSession qSession = null;
        private Queue responseQ = null;
        private Queue requestQ = null;
    
        public QBorrower(String queuecf, String requestQueue, 
                        String responseQueue) {        
            try {
                // Connect to the provider and get the JMS connection
                Context ctx = new InitialContext();
                QueueConnectionFactory qFactory = (QueueConnectionFactory)
                    ctx.lookup(queuecf);
                qConnect = qFactory.createQueueConnection();
                
                // Create the JMS Session
                qSession = qConnect.createQueueSession(
                    false, Session.AUTO_ACKNOWLEDGE);
    
                // Lookup the request and response queues
                requestQ = (Queue)ctx.lookup(requestQueue);
                responseQ = (Queue)ctx.lookup(responseQueue);
    
                // Now that setup is complete, start the Connection
                qConnect.start();
                
            } catch (JMSException jmse) {
                jmse.printStackTrace( ); 
                System.exit(1);
            } catch (NamingException jne) {
                jne.printStackTrace( ); 
                System.exit(1);
            }
        }
    
        private void sendLoanRequest(double salary, double loanAmt) {
            try {
                // Create JMS message MapMessage类型的消息
                MapMessage msg = qSession.createMapMessage();
                msg.setDouble("Salary", salary);
                msg.setDouble("LoanAmount", loanAmt);
                msg.setJMSReplyTo(responseQ);//标明消费者应该应答的地址
    
                //set the message expiration to 30 seconds
                msg.setJMSExpiration(new Date().getTime() + 30000);
                
                // Create the sender and send the message 创建Sender,指出将消息要发送到哪个队列
                QueueSender qSender = qSession.createSender(requestQ);
                qSender.send(msg);
                
                // Wait to see if the loan request was accepted or declined 
                String filter = 
                    "JMSCorrelationID = '" + msg.getJMSMessageID() + "'";
                QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);//消息关联,指定要接收的消息
                TextMessage tmsg = (TextMessage)qReceiver.receive(30000);//同步接收,接收到响应之前会阻塞,设置无响应的过期时间
                if (tmsg == null) {
                    System.out.println("Lender not responding");
                } else {
                    System.out.println("Loan request was " + tmsg.getText());
                }
                
            } catch (JMSException jmse) {
                jmse.printStackTrace( ); 
                System.exit(1);
            }
        }
        
        private void exit() {
            try {
                qConnect.close( );
            } catch (JMSException jmse) {
                jmse.printStackTrace( );
            }
            System.exit(0);
        }
    
        public static void main(String argv[]) {
          
            QBorrower borrower = new QBorrower("QueueCF", "LoanRequestQ", "LoanResponseQ");
          
            try {
                // Read all standard input and send it as a message
                BufferedReader stdin = new BufferedReader
                    (new InputStreamReader(System.in));
                System.out.println ("QBorrower Application Started");
                System.out.println ("Press enter to quit application");
                System.out.println ("Enter: Salary, Loan_Amount");
                System.out.println("
    e.g. 50000, 120000");
    
                while (true) {
                    System.out.print("> ");
                    
                    String loanRequest = stdin.readLine();
                    if (loanRequest == null || loanRequest.trim().length( ) <= 0) {
                        borrower.exit();
                    }
                
                    // Parse the deal description
                    StringTokenizer st = new StringTokenizer(loanRequest, ",") ;
                    double salary = 
                        Double.valueOf(st.nextToken().trim()).doubleValue( );
                    double loanAmt = 
                        Double.valueOf(st.nextToken().trim()).doubleValue( );
    
                    borrower.sendLoanRequest(salary, loanAmt);
                }
            } catch (IOException ioe) {
              ioe.printStackTrace( );
            }
        }
    }

    5.放款方:

    public class QLender implements MessageListener {
        
        private QueueConnection qConnect = null;    
        private QueueSession qSession = null;
        private Queue requestQ = null;
    
        public QLender(String queuecf, String requestQueue) {        
            try {
                // Connect to the provider and get the JMS connection
                Context ctx = new InitialContext();
                QueueConnectionFactory qFactory = (QueueConnectionFactory)
                    ctx.lookup(queuecf);
                qConnect = qFactory.createQueueConnection();
                
                // Create the JMS Session
                qSession = qConnect.createQueueSession(
                    false, Session.AUTO_ACKNOWLEDGE);
    
                // Lookup the request queue
                requestQ = (Queue)ctx.lookup(requestQueue);
    
                // Now that setup is complete, start the Connection
                qConnect.start();
    
                // Create the message listener
                QueueReceiver qReceiver = qSession.createReceiver(requestQ);
                qReceiver.setMessageListener(this);//监听请求队列
                
                System.out.println("Waiting for loan requests...");
                
            } catch (JMSException jmse) {
                jmse.printStackTrace( ); 
                System.exit(1);
            } catch (NamingException jne) {
                jne.printStackTrace( ); 
                System.exit(1);
            }
        }
    
      //异步消息监听器,等待消息时不会被阻塞
    public void onMessage(Message message) { try { boolean accepted = false; // Get the data from the message MapMessage msg = (MapMessage)message; double salary = msg.getDouble("Salary"); double loanAmt = msg.getDouble("LoanAmount"); // Determine whether to accept or decline the loan if (loanAmt < 200000) { accepted = (salary / loanAmt) > .25; } else { accepted = (salary / loanAmt) > .33; } System.out.println("" + "Percent = " + (salary / loanAmt) + ", loan is " + (accepted ? "Accepted!" : "Declined")); // Send the results back to the borrower TextMessage tmsg = qSession.createTextMessage(); tmsg.setText(accepted ? "Accepted!" : "Declined"); tmsg.setJMSCorrelationID(message.getJMSMessageID());//消息关联 // Create the sender and send the message QueueSender qSender = qSession.createSender((Queue)message.getJMSReplyTo());//从message中获得要将消息放到哪个队列 qSender.send(tmsg); System.out.println(" Waiting for loan requests..."); } catch (JMSException jmse) { jmse.printStackTrace( ); System.exit(1); } catch (Exception jmse) { jmse.printStackTrace( ); System.exit(1); } } private void exit() { try { qConnect.close( ); } catch (JMSException jmse) { jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { QLender lender = new QLender("QueueCF", "LoanRequestQ"); try { // Run until enter is pressed BufferedReader stdin = new BufferedReader (new InputStreamReader(System.in)); System.out.println ("QLender application started"); System.out.println ("Press enter to quit application"); stdin.readLine(); lender.exit(); } catch (IOException ioe) { ioe.printStackTrace( ); } } }

    6.打开两个console,分别运行代码,监视借贷方及放款方的输入输出即可

    7.运行两个放款方,即可看到多个消息接收者的负载均衡,一个消息会传给两个放款方之一,而不是向两个程序同时传送。策略可能会使用轮询或第一个可用模式,具体需查询Jms提供者文档

  • 相关阅读:
    Random 种子问题
    Matrix issue
    Two sum.
    Best Time to Buy and Sell Stock
    Maximum difference between two elements
    二分查找法的实现和应用汇总
    Why you want to be restrictive with shrink of database files [From karaszi]
    Palindrome
    NetBeans vs Eclipse 之性能参数对比 [java060515]
    国内各大互联网公司相关技术站点不完全收录[转]
  • 原文地址:https://www.cnblogs.com/qilong853/p/6554790.html
Copyright © 2011-2022 走看看