zoukankan      html  css  js  c++  java
  • java之消息队列ActiveMQ实践

    原创论文:https://www.cnblogs.com/goujh/p/8510239.html

    消息队列的应用场景:

    消息队列应用场景
        异步处理,应用解耦,流量削锋和消息通讯四个场景
    
    异步处理:
        场景说明:用户注册后,需要发注册邮件和注册短信
    
    应用解耦:
        场景说明:用户下单后,订单系统需要通知库存系统
    
    流量削锋:
        应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用容易挂掉.
            1、用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
            2、秒杀业务根据消息队列中的请求信息,再做后续处理
    
    日志处理:
        应用场景:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题

    1、在安装ActiveMQ

    http://activemq.apache.org/activemq-5158-release.html

    2、解压启动服务

    tar -zxvf apache-activemq-5.15.8-bin.tar.gz
    进入目录,运行./bin/activemq start

    3、网页查看

    网址:http://139.199.64.189:8161/
    点击:
    Manage ActiveMQ broker
    输入默认用户和密码都为:admin

    4、创建maven工程,在pom.xml文件中添加

    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>

    /****************************************************消息队列***************************************************************/

    注意:消息队列模式,当生产者生产消息后,会将消息放入消息队列中,一旦消费者启动了,立马回读取到。

    5、创建JMSConsumer.java文件,这是一个消费者

    package com.activemq.demo.method1;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    public class JMSConsumer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://ip:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        public static void main(String[] args) {
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话,接收或者发送消息的线程
            Session session;
            //消息目的地
            Destination destination;
            //消息的消费者
            MessageConsumer messageConsumer;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL);
            try {
                //通过工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接Hello World!的消息队列
                destination = session.createQueue("Hello World");
                //创建消息的消费者
                messageConsumer = session.createConsumer(destination);
    
                while(true){
                    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                    if (textMessage != null) {
                        System.err.println("收到的消息:" + textMessage.getText());
                    } else {
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    6、创建JMSProducer.java,这是一个生产者

    package com.activemq.demo.method1;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    public class JMSProducer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://ip:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        //发送的消息数量
        private static final int SENDNUM = 10;
        public static void main(String[] args) {
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话,接收或者发送消息的线程
            Session session;
            //消息的目的地
            Destination destination;
            //消息生产者
            MessageProducer messageProducer;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL);
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建一个名称为Hello World!的消息队列
                destination = session.createQueue("Hello World");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                //发送消息
                sendMessage(session,messageProducer);
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally{
                if(connection != null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        /**
         * 发送消息
         * @param session
         * @param messageProducer 消息生产者
         * @throws Exception
         */
        public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
            for (int i = 0; i < JMSProducer.SENDNUM; i++) {
                //创建一条文本消息
                TextMessage message = session.createTextMessage("activemq 发送消息:" + i);
                System.err.println("发送消息:activemq 发送消息:" + i);
                //通过消息生产者发出消息
                messageProducer.send(message);
            }
        }
    }

    /**********************************************************发布订阅模式************************************************************/

    注意:发布订阅者模式,生产者创建了消息,如果消费者没有启动是获取不到消息;只有等消费者启动后,生产者再生产消息,消费者才会有获取到消息

    7、创建JMSConsumer.java文件

    package com.activemq.demo.method2;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JMSConsumer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKERURL = "tcp://139.199.64.189:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer consumer; //创建消费者
    
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL);
    
            try {
                connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                /**
                 * 这里的最好使用Boolean.FALSE,如果是用true则必须commit才能生效,且http://127.0.0.1:8161/admin管理页面才会更新消息队列的变化情况。
                 */
                session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    //            destination=session.createQueue("FirstQueue1"); // 创建消息队列
                destination=session.createTopic("firstTopic");
                consumer=session.createConsumer(destination);
                consumer.setMessageListener(new MyListener()); // 注册消息监听
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    View Code

    8、创建JMSProducer.java文件

    package com.activemq.demo.method2;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息生产者
     * @author Administrator
     *
     */
    public class JMSProducer {
    
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKERURL = "tcp://139.199.64.189:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        private static final int SENDNUM=10; // 发送的消息数量
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageProducer messageProducer; // 消息生产者
    
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL);
    
            try {
                connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    //            destination=session.createQueue("FirstQueue1"); // 创建消息队列
                destination=session.createTopic("firstTopic");
                messageProducer=session.createProducer(destination); // 创建消息生产者
                sendMessage(session, messageProducer); // 发送消息
                session.commit();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 发送消息
         * @param session
         * @param messageProducer
         * @throws Exception
         */
        public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
            for(int i=0;i<JMSProducer.SENDNUM;i++){
                TextMessage message=session.createTextMessage("ActiveMQ 发布的消息"+i);
                System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
                messageProducer.send(message);
            }
        }
    }
    View Code

    9、创建监听文件MyListener.java

    package com.activemq.demo.method2;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    7、运行两个实例,可在web端查看

  • 相关阅读:
    深入了解SQLServer系统数据库工作原理(转)
    什么是动态语言(转)
    ASP.NET 2.0客户端回调的实现分析
    什么是“分布式应用系统”
    SQLServer数据库安全管理机制详解
    什么是 CLR(转)
    docker容器下的asp.net core项目发布运维
    VLAN技术
    用getDrawingCache方法获取ImageView中的图像需要注意的问题
    交换机的工作原理
  • 原文地址:https://www.cnblogs.com/ywjfx/p/10439039.html
Copyright © 2011-2022 走看看