不使用maven及spring,创建简单的JMS发布/订阅示例
1.准备工作:下载activeMQ的jar包,本文使用最新稳定版5.14.2;创建java工程,导入jar包
2.在工程下创建jndi.properties配置文件
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url = tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager connectionFactoryNames=TopicCF topic.topic1=jms.topic1
3.在active的config目录下,修改activemq.xml,在<broker>下添加,启动activemq.bat
<destinations> <topic name="topic1" physicalName="jms.topic1" /> </destinations>
4.创建java类
public class Chat implements javax.jms.MessageListener{ private TopicSession pubSession; private TopicPublisher publisher; private TopicConnection connection; private String username; /* Constructor used to Initialize Chat */ public Chat(String topicFactory, String topicName, String username) throws Exception { // Obtain a JNDI connection using the jndi.properties file InitialContext ctx = new InitialContext(); // Look up a JMS connection factory TopicConnectionFactory conFactory = (TopicConnectionFactory)ctx.lookup(topicFactory); // Create a JMS connection TopicConnection connection = conFactory.createTopicConnection(); // Create two JMS session objects TopicSession pubSession = connection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); TopicSession subSession = connection.createTopicSession( false, Session.AUTO_ACKNOWLEDGE); // Look up a JMS topic Topic chatTopic = (Topic)ctx.lookup(topicName); // Create a JMS publisher and subscriber TopicPublisher publisher = pubSession.createPublisher(chatTopic); TopicSubscriber subscriber = subSession.createSubscriber(chatTopic, null, false); // Set a JMS message listener subscriber.setMessageListener(this); // Intialize the Chat application variables this.connection = connection; this.pubSession = pubSession; this.publisher = publisher; this.username = username; // Start the JMS connection; allows messages to be delivered connection.start( ); } /* Receive Messages From Topic Subscriber */ public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText( ); System.out.println(text); } catch (JMSException jmse){ jmse.printStackTrace( ); } } /* Create and Send Message Using Publisher */ protected void writeMessage(String text) throws JMSException { TextMessage message = pubSession.createTextMessage( ); message.setText(username+": "+text); publisher.publish(message); } /* Close the JMS Connection */ public void close( ) throws JMSException { connection.close( ); } /* Run the Chat Client */ public static void main(String [] args){ try{
Chat chat = new Chat("TopicCF","topic1","userName1"); // Read from command line BufferedReader commandLine = new java.io.BufferedReader(new InputStreamReader(System.in)); // Loop until the word "exit" is typed while(true){ String s = commandLine.readLine( ); if (s.equalsIgnoreCase("exit")){ chat.close( ); // close down connection System.exit(0);// exit program } else chat.writeMessage(s); } } catch (Exception e){ e.printStackTrace( ); } } }
5.打开多个console,运行main函数,分别监视不同main方法的输入输出即可
6.持久订阅者和非持久订阅者:持久订阅者无论活动与否都会收到该主题的消息。这是使用保存并转发机制保证的,持久订阅者通过createDurableSubscriber方法创建,创建持久订阅者时,需要考虑消息的时效性(过期的消息没有什么用)和消息消耗的存储容量。
7.动态订阅者和受管订阅者:在配置文件或管理界面中定义的持久订阅者称为受管订阅者,在运行时动态定义的称为动态持久定义者
8.取消持久订阅者:
subscriber.close();
subSession.unsubscribe(s);