zoukankan      html  css  js  c++  java
  • 4.activeMQ订阅者

    稍微改一下代码就好啦

    package activeMQ2;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /*
     * 消息生产者-发布者
     */
    public class JMSProducer {
        static String userName=ActiveMQConnection.DEFAULT_USER;
        static String password=ActiveMQConnection.DEFAULT_PASSWORD;
        static String brokerURL=ActiveMQConnection.DEFAULT_BROKER_URL;
        static int sendnum=10;
        public static void main(String[] args) {
        
            
            // TODO Auto-generated method stub
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(userName, password, brokerURL);
            Connection connection = null ;
            Session session;// 会话,接受或发送消息的线程
            Destination destination;
            MessageProducer messageProducer;
            try {
                connection=connectionFactory.createConnection();
                connection.start();
                /*
                 * Session.AUTO_ACKNOWLEDGE 用户成功从receive方法返回的时候,或从MessageListener.onMessage方法
                 * 成功返回时,会话会自动确认客户收到消息
                 * Session.CLIENT_ACKNOWLEDGE 客户通过消息的acknowledge方法确认,注意一个被消费的消息将自动确认所
                 * 有已被会话消费的消息。
                 * Session.SESSION_TRANSACTED 会话迟钝的确认消息的提交
                 */
                session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                destination=session.createTopic("FirstTopic1");//创建消息
                messageProducer=session.createProducer(destination);
                sendMessage(session, messageProducer);//发送消息
                session.commit();//提交
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally{
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        
        }
        /*
         * 产生消息并发消息
         */
        public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException {
            for(int i=0;i<sendnum;i++){
                TextMessage textMessage=session.createTextMessage("Message"+i);
                messageProducer.send(textMessage);
                System.out.println("开始发布消息Message"+i);
            }
            // TODO Auto-generated method stub
    
        }
    }
    package activeMQ2;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    /*
     * 消息消费-订阅者1
     */
    public class JMSConsumer {
    	static String userName=ActiveMQConnection.DEFAULT_USER;
    	static String password=ActiveMQConnection.DEFAULT_PASSWORD;
    	static String brokerURL=ActiveMQConnection.DEFAULT_BROKER_URL;
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(userName, password, brokerURL);
    		Connection connection = null ;
    		Session session;// 会话,接受或发送消息的线程
    		Destination destination;
    		MessageConsumer messageConsumer;
    		try {
    			connection=connectionFactory.createConnection();
    			connection.start();
    			session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			destination=session.createTopic("FirstTopic1");//创建连接的消息队列
    			messageConsumer=session.createConsumer(destination);
    			messageConsumer.setMessageListener(new Listener());//注册消息监听
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    //		while(true){
    //			
    //		}
    	}
    
    }
    

      

    package activeMQ2;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    /*
     * 消息监听-订阅者1
     */
    public class Listener implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println("订阅者1收到消息"+((TextMessage) message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    
    }

    再来一个

    package activeMQ2;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    /*
     * 消息消费-订阅者2
     */
    public class JMSConsumer2 {
        static String userName=ActiveMQConnection.DEFAULT_USER;
        static String password=ActiveMQConnection.DEFAULT_PASSWORD;
        static String brokerURL=ActiveMQConnection.DEFAULT_BROKER_URL;
        public static void main(String[] args) {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(userName, password, brokerURL);
            Connection connection = null ;
            Session session;// 会话,接受或发送消息的线程
            Destination destination;
            MessageConsumer messageConsumer;
            try {
                connection=connectionFactory.createConnection();
                connection.start();
                session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination=session.createTopic("FirstTopic1");//创建连接的消息队列
                messageConsumer=session.createConsumer(destination);
                messageConsumer.setMessageListener(new Listener2());//注册消息监听
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    //        while(true){
    //            
    //        }
        }
    
    }
    package activeMQ2;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    /*
     * 消息监听-订阅者2
     */
    public class Listener2 implements MessageListener{
    
    	@Override
    	public void onMessage(Message message) {
    		try {
    			System.out.println("订阅者2收到消息"+((TextMessage) message).getText());
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    	}
    
    }
    

      

    先订阅

    在发布消息

  • 相关阅读:
    实现JDK代理
    Dictionary字典类介绍
    在服务器上新建虚拟机及安装系统
    开启远程桌面设置
    Windows Server 2008 R2远程协助选项灰色
    server 2012系统更改电脑密码
    eclipse + maven + scala+spark环境搭建
    C#数据路接口中获取SQL数据的用法
    C#常用快捷键
    oracle常用的快捷键
  • 原文地址:https://www.cnblogs.com/chen20135517/p/7761633.html
Copyright © 2011-2022 走看看