zoukankan      html  css  js  c++  java
  • ActiveMQ 快速入门教程系列 第一章 点对点消息实现

    ActiveMQ 开发包下载及运行环境搭建


    主页:http://activemq.apache.org/
    目前最新版本:5.11.1
    开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html

    ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码admin/admin


    ActiveMQ下载完毕后,点击以下文件(64位和32位分别点击对应文件夹下的该文件):



    ActiveMQ 服务器启动完毕后,打开http://127.0.0.1:8161/admin/ 用户名/密码admin/admin,将会见到如下界面:


    ActiveMq 点对点消息实现

    第一种方式:直接Receive 方式

    1.Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage
    方法成功返回的时候,会话自动确认客户收到的消息。
    2.Session.CLIENT_ACKNOWLEDGE。客户通过消息的acknowledge 方法确认消息。需要注意的是,在这种模
    式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一
    个消息消费者消费了10 个消息,然后确认第5 个消息,那么所有10 个消息都被确认。
    3.Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可
    能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的JMSRedelivered 字段设置
    为true。

    首先我们创建Java工程,导入activemq-all-5.11.1.jar,如下


    然后我们建立一个生产者类,负责发送消息,并创建一个名为MyQueue1的队列
    package cn.com.evan.Jms.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsProducer {
    
    	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	private static Integer SENDNUM = 10;
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		MessageProducer messageProducer;
    
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
    				BROKERURL);
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.TRUE,
    					Session.AUTO_ACKNOWLEDGE);
    			destination = session.createQueue("MyQueue1");// Create message queue
    			messageProducer = session.createProducer(destination);// Create producer
    			sendMessage(session,messageProducer);
    			session.commit();
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}finally{
    			if(connection!=null){
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    
    	}
    	
    	public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException{
    		for(int i=0;i<JmsProducer.SENDNUM;i++){
    			TextMessage message = session.createTextMessage("ActiveMQ"+i);
    			messageProducer.send(message);
    			System.out.println("发送消息:"+message.getText());
    		}
    	}
    
    }
    

    运行程序,可以看到控制台已经发送了10条信息

    我们打开activeMq控制台http://127.0.0.1:8161/admin/,查看消息发送的情况,可以看到MyQueue1里已经存放着10条消息,但还没有被消费

    接着我们创建一个消费者,负责从消息队列MyQueue1里接受消息,这里我们用一个死循环去不断监听消息
    package cn.com.evan.Jms.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsComsumer {
    	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	private static Integer SENDNUM = 10;
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		MessageConsumer messageConsumer;
    
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
    				BROKERURL);
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.FALSE,
    					Session.AUTO_ACKNOWLEDGE);
    			destination = session.createQueue("MyQueue1");// Create message
    															// queue
    			messageConsumer = session.createConsumer(destination);// Create
    																	// producer
    
    			while (true) {
    				TextMessage message = (TextMessage) messageConsumer
    						.receive(100000);
    				if (message != null) {
    					System.out.println("接收信息:" + message.getText());
    
    				} else
    					break;
    			}
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    运行程序可以看到消费者已经接受到10条消息

    我们再查看activeMq控制台,可以看到消息已经被消费了10条

    第二种方式:使用监听器去监听消息

    首先我们创建一个监听器,实现MessageListener 借口
    package cn.com.evan.Jms.activemq;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyMessageListener implements MessageListener{
    
    	@Override
    	public void onMessage(Message msg) {
    		
    		try {
    			System.out.println("接受消息:"+((TextMessage)msg).getText());
    		} catch (JMSException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		
    	}
    	
    	
    
    }
    

    我们把上面的JmsConsumer 修改下,加入我们自定义的监听器 
    package cn.com.evan.Jms.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JmsComsumer {
    	private static String USERNAME = ActiveMQConnection.DEFAULT_USER;
    	private static String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    	private static String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    	private static Integer SENDNUM = 10;
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory;
    		Connection connection = null;
    		Session session;
    		Destination destination;
    		MessageConsumer messageConsumer;
    
    		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,
    				BROKERURL);
    		try {
    			connection = connectionFactory.createConnection();
    			connection.start();
    			session = connection.createSession(Boolean.FALSE,
    					Session.AUTO_ACKNOWLEDGE);
    			destination = session.createQueue("MyQueue1");// Create message
    															// queue
    			messageConsumer = session.createConsumer(destination);// Create
    																	// producer
    
    			messageConsumer.setMessageListener(new MyMessageListener());// set listener which we created before
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    
    	}
    
    }
    
    我们先运行JmsConsumer,让它监听MyQueue1队列里的消息,再运行JmsProducer

    此时,我们在控制台可以看到消费者也接受到10条信息
    打开activeMq控制台,可以看到消费者跟上面一样同样也消费了10条消息 
    以上两种消息处理方式介绍完毕,下一章会介绍一个发布者对多个消费者的发布订阅模式实例

  • 相关阅读:
    mysql 1449 : The user specified as a definer ('root'@'%') does not exist 解决方法
    java中使用正则表达式
    Timer与ScheduledThreadPoolExecutor的比较
    Java同步块
    java中的浅拷贝与深拷贝
    java的关闭钩子(Shutdown Hook)
    JVM系列三:JVM参数设置、分析
    java虚拟机参数设置
    UTF-8编码规则(转)
    过滤3个字节以上的utf-8字符
  • 原文地址:https://www.cnblogs.com/evan-liang/p/9189637.html
Copyright © 2011-2022 走看看