zoukankan      html  css  js  c++  java
  • java之消息队列ActiveMQ实践

    原创论文:https://www.cnblogs.com/goujh/p/8510239.html

    消息队列的应用场景:

    消息队列应用场景
        异步处理,应用解耦,流量削锋和消息通讯四个场景
    
    异步处理:
        场景说明:用户注册后,需要发注册邮件和注册短信
    
    应用解耦:
        场景说明:用户下单后,订单系统需要通知库存系统
    
    流量削锋:
        应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用容易挂掉.
            1、用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
            2、秒杀业务根据消息队列中的请求信息,再做后续处理
    
    日志处理:
        应用场景:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题

    1、在安装ActiveMQ

    http://activemq.apache.org/activemq-5158-release.html

    2、解压启动服务

    tar -zxvf apache-activemq-5.15.8-bin.tar.gz
    进入目录,运行./bin/activemq start

    3、网页查看

    网址:http://139.199.64.189:8161/
    点击:
    Manage ActiveMQ broker
    输入默认用户和密码都为:admin

    4、创建maven工程,在pom.xml文件中添加

    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>

    /****************************************************消息队列***************************************************************/

    注意:消息队列模式,当生产者生产消息后,会将消息放入消息队列中,一旦消费者启动了,立马回读取到。

    5、创建JMSConsumer.java文件,这是一个消费者

    package com.activemq.demo.method1;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    public class JMSConsumer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://ip:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        public static void main(String[] args) {
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话,接收或者发送消息的线程
            Session session;
            //消息目的地
            Destination destination;
            //消息的消费者
            MessageConsumer messageConsumer;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL);
            try {
                //通过工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接Hello World!的消息队列
                destination = session.createQueue("Hello World");
                //创建消息的消费者
                messageConsumer = session.createConsumer(destination);
    
                while(true){
                    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                    if (textMessage != null) {
                        System.err.println("收到的消息:" + textMessage.getText());
                    } else {
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    6、创建JMSProducer.java,这是一个生产者

    package com.activemq.demo.method1;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    public class JMSProducer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址,默认端口为61616
        private static final String BROKERURL = "tcp://ip:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        //发送的消息数量
        private static final int SENDNUM = 10;
        public static void main(String[] args) {
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话,接收或者发送消息的线程
            Session session;
            //消息的目的地
            Destination destination;
            //消息生产者
            MessageProducer messageProducer;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL);
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建一个名称为Hello World!的消息队列
                destination = session.createQueue("Hello World");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                //发送消息
                sendMessage(session,messageProducer);
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally{
                if(connection != null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        /**
         * 发送消息
         * @param session
         * @param messageProducer 消息生产者
         * @throws Exception
         */
        public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
            for (int i = 0; i < JMSProducer.SENDNUM; i++) {
                //创建一条文本消息
                TextMessage message = session.createTextMessage("activemq 发送消息:" + i);
                System.err.println("发送消息:activemq 发送消息:" + i);
                //通过消息生产者发出消息
                messageProducer.send(message);
            }
        }
    }

    /**********************************************************发布订阅模式************************************************************/

    注意:发布订阅者模式,生产者创建了消息,如果消费者没有启动是获取不到消息;只有等消费者启动后,生产者再生产消息,消费者才会有获取到消息

    7、创建JMSConsumer.java文件

    package com.activemq.demo.method2;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JMSConsumer {
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKERURL = "tcp://139.199.64.189:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer consumer; //创建消费者
    
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL);
    
            try {
                connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                /**
                 * 这里的最好使用Boolean.FALSE,如果是用true则必须commit才能生效,且http://127.0.0.1:8161/admin管理页面才会更新消息队列的变化情况。
                 */
                session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    //            destination=session.createQueue("FirstQueue1"); // 创建消息队列
                destination=session.createTopic("firstTopic");
                consumer=session.createConsumer(destination);
                consumer.setMessageListener(new MyListener()); // 注册消息监听
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    View Code

    8、创建JMSProducer.java文件

    package com.activemq.demo.method2;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息生产者
     * @author Administrator
     *
     */
    public class JMSProducer {
    
        //默认连接用户名
        private static final String USERNAME = "admin";//ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = "admin";//ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKERURL = "tcp://139.199.64.189:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;
        private static final int SENDNUM=10; // 发送的消息数量
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageProducer messageProducer; // 消息生产者
    
            // 实例化连接工厂
            connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL);
    
            try {
                connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
    //            destination=session.createQueue("FirstQueue1"); // 创建消息队列
                destination=session.createTopic("firstTopic");
                messageProducer=session.createProducer(destination); // 创建消息生产者
                sendMessage(session, messageProducer); // 发送消息
                session.commit();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally{
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 发送消息
         * @param session
         * @param messageProducer
         * @throws Exception
         */
        public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
            for(int i=0;i<JMSProducer.SENDNUM;i++){
                TextMessage message=session.createTextMessage("ActiveMQ 发布的消息"+i);
                System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
                messageProducer.send(message);
            }
        }
    }
    View Code

    9、创建监听文件MyListener.java

    package com.activemq.demo.method2;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    7、运行两个实例,可在web端查看

  • 相关阅读:
    Verilog非阻塞赋值的仿真/综合问题 (Nonblocking Assignments in Verilog Synthesis)上
    异步FIFO结构及FPGA设计 跨时钟域设计
    FPGA管脚分配需要考虑的因素
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 上篇)
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 下篇)
    中国通信简史 (下)
    谈谈德国大学的电子专业
    中国通信简史 (上)
    Verilog学习笔记
    Verilog非阻塞赋值的仿真/综合问题(Nonblocking Assignments in Verilog Synthesis) 下
  • 原文地址:https://www.cnblogs.com/ywjfx/p/10439039.html
Copyright © 2011-2022 走看看