zoukankan      html  css  js  c++  java
  • 消息系统之Apache ActiveMQ

    一、下载运行MQ服务

    1、下载ActiveMQ :http://activemq.apache.org/

    2、解压缩:

    进入bin目录 win32和win64对应不同位的操作系统,选择进入 点击activemq.bat 运行即可启动ActiveMQ服务。

    在浏览器输入ActiveMQ 服务地址:http://127.0.0.1:8161/admin/         默认用户名/密码 admin/admin

     

    二、开发

    jar:activemq-all-5.11.1.jar   在ActiveMQ安装目录下面就有  拷贝到工程即可

    1、点对点模式

    package com.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息生产者
     * @author Administrator
     */
    public class JMSProducer 
    {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args)
        {
            ConnectionFactory connfactory;//连接工厂
            Connection conn = null;//连接
            Session session;//接收或者发送消息的线程
            Destination dest;//消息的目的地
            MessageProducer producer;//消息的生产者
            
            //创建连接工厂
            connfactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL);
            try 
            {
                conn = connfactory.createConnection();//获取连接
                conn.start();//启动连接
                session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
                dest = session.createQueue("FirstQueue1");//创建消息队列
                producer = session.createProducer(dest);//创建消息生产者
                sendMessage(session, producer);//生产并发送消息
                session.commit();
            } 
            catch (Exception e) 
            {
                e.printStackTrace();
            }
            finally
            {
                if (conn != null)
                {
                    try 
                    {
                        conn.close();
                    } 
                    catch (JMSException e) 
                    {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        /**
         * 发现哦那个消息
         * @param session
         * @param messageProducer
         * @throws JMSException 
         */
        private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException
        {
            for(int i=1;i<=10;i++)
            {
                TextMessage text = session.createTextMessage("生产消息:"+i);//session用来生产消息
                messageProducer.send(text);//MessageProducer用来发送消息
            }
        }
        
    
    }
    package com.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者
     * @author Administrator
     *
     */
    public class JMSConsumer 
    {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args) 
        {
            ConnectionFactory connfactory;//连接工厂
            Connection conn = null;//连接
            Session session;//接收或者发送消息的线程
            Destination dest;//消息的目的地
            MessageConsumer messageConsumer;//消息消费者
            //创建连接工厂
            connfactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.BROKEURL);
            
            try 
            {
                conn = connfactory.createConnection();//获取连接
                conn.start();//启动连接
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
                dest = session.createQueue("FirstQueue1");//创建消息队列
                messageConsumer = session.createConsumer(dest);
                //receive模式
    //            while(true)
    //            {
    //                TextMessage text = (TextMessage)messageConsumer.receive(100000);
    //                if (text != null)
    //                {
    //                    System.out.println("receive模式接收:"+text.getText());
    //                }
    //                else
    //                {
    //                    break;
    //                }
    //            }
                //监听模式
                messageConsumer.setMessageListener(new Listener());// 注册消息监听  
            } 
            catch (Exception e) 
            {
                e.printStackTrace();
            }
            //后期不能关闭  要一直处于监听模式  需要conn一直开启
        }
    
    }
    package com.activemq;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class Listener implements MessageListener
    {
    
        @Override
        public void onMessage(Message message) 
        {
            try {
                System.out.println("监听模式接收:"+ ((TextMessage)message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    2、发布订阅模式

    package com.activemq2;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JMSProducer 
    {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args)
        {
            ConnectionFactory connfactory;//连接工厂
            Connection conn = null;//连接
            Session session;//接收或者发送消息的线程
            Destination dest;//消息的目的地
            MessageProducer producer;//消息的生产者
            
            //创建连接工厂
            connfactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL);
            try 
            {
                conn = connfactory.createConnection();//获取连接
                conn.start();//启动连接
                session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
                dest = session.createTopic("FirstTopic1");//创建主题  与队列的区别
                producer = session.createProducer(dest);//创建消息生产者
                sendMessage(session, producer);//生产并发送消息
                session.commit();
            } 
            catch (Exception e) 
            {
                e.printStackTrace();
            }
            finally
            {
                if (conn != null)
                {
                    try 
                    {
                        conn.close();
                    } 
                    catch (JMSException e) 
                    {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        /**
         * 发现哦那个消息
         * @param session
         * @param messageProducer
         * @throws JMSException 
         */
        private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException
        {
            for(int i=1;i<=10;i++)
            {
                TextMessage text = session.createTextMessage("生产消息:"+i);//session用来生产消息
                messageProducer.send(text);//MessageProducer用来发送消息
            }
        }
    
    }
    package com.activemq2;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import com.activemq.Listener;
    
    public class JMSConsumer1 
    {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
    
        public static void main(String[] args) 
        {
            ConnectionFactory connfactory;//连接工厂
            Connection conn = null;//连接
            Session session;//接收或者发送消息的线程
            Destination dest;//消息的目的地
            MessageConsumer messageConsumer;//消息消费者
            //创建连接工厂
            connfactory = new ActiveMQConnectionFactory(JMSConsumer1.USERNAME,JMSConsumer1.PASSWORD,JMSConsumer1.BROKEURL);
            
            try 
            {
                conn = connfactory.createConnection();//获取连接
                conn.start();//启动连接
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
                dest = session.createTopic("FirstTopic1");//创建消息主题
                messageConsumer = session.createConsumer(dest);
                //监听模式
                messageConsumer.setMessageListener(new Listener1());// 注册消息监听  
            } 
            catch (Exception e) 
            {
                e.printStackTrace();
            }
            //后期不能关闭  要一直处于监听模式  需要conn一直开启
        }
    }
    package com.activemq2;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class Listener1 implements MessageListener {
    
        @Override
        public void onMessage(Message message) 
        {
            try {
                System.out.println("监听模式1接收:"+ ((TextMessage)message).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }

    注意:

    1、点对点和发布订阅模式的主要区别就是

    dest = session.createQueue("FirstQueue1");//创建消息队列
    dest = session.createTopic("FirstTopic1");//创建消息主题

    2、发布订阅模式必须先订阅 再发布才能接收到。

    
    

    参考

    常见开源消息系统

  • 相关阅读:
    关于SNMP的MIB文件的语法简述
    windows 与 Centos7 共享文件方法
    虚拟机 与 host主机,无法ping通的问题
    能ping通虚拟机,但snmp报文 Destination unreachable(Host administratively prohibited
    qt 打包发布 获取dll
    qt5.7.1 (create4.2.0)+msvc2015 安装后无法编译 & 缺少h文件
    windows 为qt5.7.1 安装openssl
    windows下为qt msvc版本配置调试器
    Rust中的类型大小总结
    substrate 为什么有的调用的地方用<T::Lookup as StaticLookup>::Source 代替 T::AccountId
  • 原文地址:https://www.cnblogs.com/cac2020/p/6042380.html
Copyright © 2011-2022 走看看