不使用maven及spring,创建简单的JMS点对点示例
1.准备工作:下载activeMQ的jar包,本文使用最新稳定版5.14.2;创建java工程,导入jar包
2.在工程下创建jndi.properties配置文件
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager connectionFactoryNames=QueueCF queue.LoanRequestQ=jms.LoanRequestQ queue.LoanResponseQ=jms.LoanResponseQ
3.在active的config目录下,修改activemq.xml,在<broker>下添加,启动activemq.bat
<destinations> <queue name="LoanRequestQ" physicalName="jms.LoanRequestQ"/> <queue name="LoanResponseQ" physicalName="jms.LoanResponseQ"/> </destinations>
4.借贷方:
public class QBorrower { private QueueConnection qConnect = null; private QueueSession qSession = null; private Queue responseQ = null; private Queue requestQ = null; public QBorrower(String queuecf, String requestQueue, String responseQueue) { try { // Connect to the provider and get the JMS connection Context ctx = new InitialContext(); QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx.lookup(queuecf); qConnect = qFactory.createQueueConnection(); // Create the JMS Session qSession = qConnect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE); // Lookup the request and response queues requestQ = (Queue)ctx.lookup(requestQueue); responseQ = (Queue)ctx.lookup(responseQueue); // Now that setup is complete, start the Connection qConnect.start(); } catch (JMSException jmse) { jmse.printStackTrace( ); System.exit(1); } catch (NamingException jne) { jne.printStackTrace( ); System.exit(1); } } private void sendLoanRequest(double salary, double loanAmt) { try { // Create JMS message MapMessage类型的消息 MapMessage msg = qSession.createMapMessage(); msg.setDouble("Salary", salary); msg.setDouble("LoanAmount", loanAmt); msg.setJMSReplyTo(responseQ);//标明消费者应该应答的地址 //set the message expiration to 30 seconds msg.setJMSExpiration(new Date().getTime() + 30000); // Create the sender and send the message 创建Sender,指出将消息要发送到哪个队列 QueueSender qSender = qSession.createSender(requestQ); qSender.send(msg); // Wait to see if the loan request was accepted or declined String filter = "JMSCorrelationID = '" + msg.getJMSMessageID() + "'"; QueueReceiver qReceiver = qSession.createReceiver(responseQ, filter);//消息关联,指定要接收的消息 TextMessage tmsg = (TextMessage)qReceiver.receive(30000);//同步接收,接收到响应之前会阻塞,设置无响应的过期时间 if (tmsg == null) { System.out.println("Lender not responding"); } else { System.out.println("Loan request was " + tmsg.getText()); } } catch (JMSException jmse) { jmse.printStackTrace( ); System.exit(1); } } private void exit() { try { qConnect.close( ); } catch (JMSException jmse) { jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { QBorrower borrower = new QBorrower("QueueCF", "LoanRequestQ", "LoanResponseQ"); try { // Read all standard input and send it as a message BufferedReader stdin = new BufferedReader (new InputStreamReader(System.in)); System.out.println ("QBorrower Application Started"); System.out.println ("Press enter to quit application"); System.out.println ("Enter: Salary, Loan_Amount"); System.out.println(" e.g. 50000, 120000"); while (true) { System.out.print("> "); String loanRequest = stdin.readLine(); if (loanRequest == null || loanRequest.trim().length( ) <= 0) { borrower.exit(); } // Parse the deal description StringTokenizer st = new StringTokenizer(loanRequest, ",") ; double salary = Double.valueOf(st.nextToken().trim()).doubleValue( ); double loanAmt = Double.valueOf(st.nextToken().trim()).doubleValue( ); borrower.sendLoanRequest(salary, loanAmt); } } catch (IOException ioe) { ioe.printStackTrace( ); } } }
5.放款方:
public class QLender implements MessageListener { private QueueConnection qConnect = null; private QueueSession qSession = null; private Queue requestQ = null; public QLender(String queuecf, String requestQueue) { try { // Connect to the provider and get the JMS connection Context ctx = new InitialContext(); QueueConnectionFactory qFactory = (QueueConnectionFactory) ctx.lookup(queuecf); qConnect = qFactory.createQueueConnection(); // Create the JMS Session qSession = qConnect.createQueueSession( false, Session.AUTO_ACKNOWLEDGE); // Lookup the request queue requestQ = (Queue)ctx.lookup(requestQueue); // Now that setup is complete, start the Connection qConnect.start(); // Create the message listener QueueReceiver qReceiver = qSession.createReceiver(requestQ); qReceiver.setMessageListener(this);//监听请求队列 System.out.println("Waiting for loan requests..."); } catch (JMSException jmse) { jmse.printStackTrace( ); System.exit(1); } catch (NamingException jne) { jne.printStackTrace( ); System.exit(1); } }
//异步消息监听器,等待消息时不会被阻塞 public void onMessage(Message message) { try { boolean accepted = false; // Get the data from the message MapMessage msg = (MapMessage)message; double salary = msg.getDouble("Salary"); double loanAmt = msg.getDouble("LoanAmount"); // Determine whether to accept or decline the loan if (loanAmt < 200000) { accepted = (salary / loanAmt) > .25; } else { accepted = (salary / loanAmt) > .33; } System.out.println("" + "Percent = " + (salary / loanAmt) + ", loan is " + (accepted ? "Accepted!" : "Declined")); // Send the results back to the borrower TextMessage tmsg = qSession.createTextMessage(); tmsg.setText(accepted ? "Accepted!" : "Declined"); tmsg.setJMSCorrelationID(message.getJMSMessageID());//消息关联 // Create the sender and send the message QueueSender qSender = qSession.createSender((Queue)message.getJMSReplyTo());//从message中获得要将消息放到哪个队列 qSender.send(tmsg); System.out.println(" Waiting for loan requests..."); } catch (JMSException jmse) { jmse.printStackTrace( ); System.exit(1); } catch (Exception jmse) { jmse.printStackTrace( ); System.exit(1); } } private void exit() { try { qConnect.close( ); } catch (JMSException jmse) { jmse.printStackTrace( ); } System.exit(0); } public static void main(String argv[]) { QLender lender = new QLender("QueueCF", "LoanRequestQ"); try { // Run until enter is pressed BufferedReader stdin = new BufferedReader (new InputStreamReader(System.in)); System.out.println ("QLender application started"); System.out.println ("Press enter to quit application"); stdin.readLine(); lender.exit(); } catch (IOException ioe) { ioe.printStackTrace( ); } } }
6.打开两个console,分别运行代码,监视借贷方及放款方的输入输出即可
7.运行两个放款方,即可看到多个消息接收者的负载均衡,一个消息会传给两个放款方之一,而不是向两个程序同时传送。策略可能会使用轮询或第一个可用模式,具体需查询Jms提供者文档