zoukankan      html  css  js  c++  java
  • JMS基础篇

      首先我们需要下载 ActiveMQ:http://activemq.apache.org/。

      启动 ActiveMQ 服务:解包下载的 ActiveMQ 》进去其bin 目录》双击 activemq.bat。

        ActiveMQ 默认使用的是端口61616,可以在cmd中查看61616端口是否被占用,以确定ActiveMQ 服务是否正常启动。查看的命令如下:

    netstat -nao | find "61616",如果服务启动则可以看到ActiveMQ所对应的的进程。

          下面将以三种形式体验ActiveMQ 发送,接收消息的功能。

          第一种是ActiveMQ 所支持的,但不是基于JNDI的,不是JMS标准所建议的。

          发送消息的类如下,基于的端口是默认的61616,连续发送了5了消息,消息队列是xiaoyunduo。

    import java.util.Date;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Sender {
    
    	public static void main(String[] srgs) throws JMSException, InterruptedException {
    		factory;
    		Connection connection = null;
    		Session session = null;
    		Destination destination = null;
    		MessageProducer producer = null;
    
    		// 创建连接工厂
    		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
    		// 创建连接
    		connection = factory.createConnection();
    		// 建立连接
    		connection.start();
    		// 建立session
    		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    		// 指定消息队列
    		destination = session.createQueue("xiaoyunduo");
    		// 创建消息发生器
    		producer = session.createProducer(destination);
    
    		for (int i = 0; i < 5; i++) {
    			MapMessage message = session.createMapMessage();
    			message.setLong("mess", new Date().getTime());
    			Thread.sleep(1000);
    			// 发送消息
    			producer.send(message);
    		}
    
    		session.commit();
    		session.close();
    		connection.close();
    
    	}
    }
    

      接收消息的类如下,基于的端口是默认的61616,消息队列是xiaoyunduo。

    import java.util.Date;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Receiver {
    
    	public static void main(String[] args) throws JMSException {
    
    		ConnectionFactory factory = null;
    		Connection connection = null;
    		Session session = null;
    		Destination destination = null;
    		MessageConsumer consumer = null;
    
    		// 创建连接工厂
    		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
    		// 创建连接
    		connection = factory.createConnection();
    		// 建立连接
    		connection.start();
    		// 建立session
    		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    		// 指定消息队列
    		destination = session.createQueue("xiaoyunduo");
    		// 产生消费者
    		consumer = session.createConsumer(destination);
    
    		for (int i = 0; i < 5; i++) {
    			//获取jms server中的消息
    			MapMessage message = (MapMessage) consumer.receive(1000);
    			session.commit();
    			System.out.println("收到消息:" + new Date(message.getLong("mess")));
    		}
    
    		session.close();
    		connection.close();
    
    	}
    }
    

      先启动发送消息的类,再启动接收消息的类,可以看到消息内容打印在消息接收端。

          第二种是基于文件形式的JNDI,使用Sun自带的RefFSContextFactory来存储JNDI信息。需要引入fscontext.jar和providerutil.jar,这是进行测试的前提。      

      ConnectionFactory改从Context中读取,其余部分保持不变。发送消息和接收消息的类都需要进行对应的修改。

            // 创建连接工厂
            //factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            Hashtable env = new Hashtable(5);
            env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");
            env.put(Context.PROVIDER_URL, "file:JNDI_REF");
                    
            try {
                Context ctx = new InitialContext(env);
                ActiveMQConnectionFactory mqFactory = new ActiveMQConnectionFactory();
                mqFactory.setBrokerURL("tcp://localhost:61616");
                mqFactory.setUserName(null);
                mqFactory.setPassword(null);
                
                //设置的参数少了某一项,只有sender发送等几秒后再去启动receive才没问题,否则接收不到消息
                //ctx.bind("mqFactory", mqFactory);//只需要绑定一次
                factory = (ConnectionFactory) ctx.lookup("mqFactory");
            } catch (NamingException e) {
                e.printStackTrace();
            }

            第三种基于配置文件的方式,需要先有下面的一个配置文件jndi.properties,其中包含了ActiveMQ 的基本配置信息。

    java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
    java.naming.provider.url=tcp://localhost:61616
    java.naming.security.principal=system
    java.naming.security.credentials=manager 
    connectionFactoryNames=con1,con2
    ##queue.MyQueue=MyQueue
    topic.MyTopic=MyTopic
    topic.topic1=jms.topic1
    

         还需要一个读取配置文件的工厂类,InitialContext利用properties信息进行初始化,将直接利用JNDI从InitialContext中读取信息。

    import java.util.Properties;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;
    public class JndiFactoryForJMS {
         protected Context context = null;  
          
        public void initalize() throws NamingException  
        {         
            Properties props = new Properties();  
            try{  
                org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory();
                props.load(this.getClass().getResourceAsStream("jndi.properties"));
                context = new InitialContext(props);  
            }catch(Exception ex){  
                ex.printStackTrace();
            }  
                  
        }  
      
        public Context getJndiContext() throws NamingException {  
            if(context == null){  
                initalize();  
            }  
            return context;  
        }     
      
    }  

            和第二种方法类似,只需要改变第一个类的部分代码即可,发送端和接收端要同时修改。

          // 创建连接工厂
          //factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp:/      /localhost:61616");
    				
         try {
    	  JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();          	          
    	    Context ctx = factoryForJMS.getJndiContext();  
    	       
    	    //获取连接工厂。  
    	    factory = (ConnectionFactory)ctx.lookup("con1");  
    	    } catch (NamingException e) {
    	    e.printStackTrace();
    	   }
    

      

  • 相关阅读:
    算法之递归(4) 应用
    算法之递归(1)
    [Async] [Series #1] 初识Async异步编程模型。
    CVE202142287/CVE202142278 复现
    易读文库下载器1.2版发布
    Sqlite.net 读取DateTime异常的解决方案
    QZFL 2.0.5 源代码
    Sqlite 管理工具 SQLiteDeveloper 及破解
    visio2010数据库正向工程生成数据库脚本
    什么是高内聚、低耦合?
  • 原文地址:https://www.cnblogs.com/lnlvinso/p/3903520.html
Copyright © 2011-2022 走看看