zoukankan      html  css  js  c++  java
  • ActiveMQ 快速入门教程系列 第二章 发布-订阅者模式实现

    第二章我们会介绍怎样实现一个发布者对多个订阅者的消息传递


    Topic和queue的最大区别在于topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。


    首先我们如第一章类似,为订阅者1,2分别创建两个监听器MyMessageListener,MyMessageListener2 实现MessageListener接口

    package cn.com.evan.Jms.activemq;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyMessageListener implements MessageListener{
    
    	@Override
    	public void onMessage(Message msg) {
    		
    		try {
    			System.out.println("订阅者1接受消息:"+((TextMessage)msg).getText());
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    	}
    	
    	
    
    }
    

    package cn.com.evan.Jms.activemq;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyMessageListener2 implements MessageListener{
    
    	@Override
    	public void onMessage(Message msg) {
    		
    		try {
    			System.out.println("订阅者2接受消息:"+((TextMessage)msg).getText());
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    	}
    	
    	
    
    }
    

    然后我们分别创建订阅者1,2,并创建一个Topic “MyTopic1”用于消息的订阅,订阅者1,2分别设置对应的监听器

    package cn.com.evan.Jms.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsComsumer {
    	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	private static Integer SENDNUM = 10;
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		MessageConsumer messageConsumer;
    
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
    				BROKERURL);
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.FALSE,
    					Session.AUTO_ACKNOWLEDGE);
    			destination = session.createTopic("MyTopic1");//create topic
    			messageConsumer = session.createConsumer(destination);// Create
    																	// producer
    
    			messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    package cn.com.evan.Jms.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsComsumer2 {
    	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	private static Integer SENDNUM = 10;
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		MessageConsumer messageConsumer;
    
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
    				BROKERURL);
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.FALSE,
    					Session.AUTO_ACKNOWLEDGE);
    			destination = session.createTopic("MyTopic1");//create topic
    			messageConsumer = session.createConsumer(destination);// Create
    																	// producer
    
    			messageConsumer.setMessageListener(new MyMessageListener2());// set listener which we created before
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    }
    

    接着我们创建发布者,发布10条消息

    package cn.com.evan.Jms.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsProducer {
    
    	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	private static Integer SENDNUM = 10;
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		MessageProducer messageProducer;
    
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
    				BROKERURL);
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.TRUE,
    					Session.AUTO_ACKNOWLEDGE);
    			destination = session.createTopic("MyTopic1");// Create topic
    			messageProducer = session.createProducer(destination);// Create producer
    			sendMessage(session,messageProducer);
    			session.commit();
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}finally{
    			if(connection!=null){
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    
    	}
    	
    	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
    		for(int i=0;i<JmsProducer.SENDNUM;i++){
    			TextMessage message = session.createTextMessage("ActiveMQ"+i);
    			messageProducer.send(message);
    			System.out.println("发布者发布消息:"+message.getText());
    		}
    	}
    
    }
    
    以上发布者和2个订阅者都已经创建完毕,我们先运行订阅者1,2,再运行发布者(就像我们平时要接收到什么新闻,都得先订阅那频道,才可以接收到相关频道发布的消息),运行结果如下





    我们打开activeMq 控制台可以看到有2位消费者,消息被消费了20次(每位订阅者分别消费了10条消息)


  • 相关阅读:
    宾得镜头资料
    先感动自己才能感动别人
    关于单反相机中的APSC
    K10D和凤凰镜头
    Vista的新快捷键
    微软雅黑字体“演”字变“漠”字的bug
    Windows XP无线零配置服务
    剑走偏锋,用XP的启动管理来搞定Vista、XP双系统
    BCB中的目录选择对话框的实现
    MagicAjax 使用
  • 原文地址:https://www.cnblogs.com/evan-liang/p/9189636.html
Copyright © 2011-2022 走看看