zoukankan      html  css  js  c++  java
  • ActiveMQ学习教程/2.简单示例

    ActiveMQ学习教程(二)——简单示例

    一。应用IDEA构建Maven项目

    File-》New-》Module...-》Maven-》勾选-》选择-》Next -》

    GroupId:com.jd.myMaven   |    ArtifactId:activeMQ    |    version:默认   -》Finish

    项目构建成功!项目结构如下所示:

    二。创建生产者类,模拟生产者发消息

    Step1:java/activemq/JMSProducer.java

    package activemq;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQMapMessage;
    
    
    import javax.jms.*;
    import java.util.Map;
    
    /**
     * 消息生产者
     */
    public class JMSProducer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码
        private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址
        private static final int SENDNUM = 2;//发送的消息数量
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = null;//连接工厂
            Connection connection = null;//连接
            Session session = null;//会话,接受或者发送消息的线程
            Destination destination = null;//消息的目的地
            MessageProducer messageProducer = null;//消息的生产者
            //实例化连接工厂(指定连接用户名|密码|连接地址)
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKERURL);
            try {
                connection = connectionFactory.createConnection();//通过连接工厂获取连接
                connection.start();//启动连接
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
                destination = session.createQueue("TestQueue");//创建消息队列
                messageProducer = session.createProducer(destination);//创建消息生产者
                sendMessage(session, messageProducer);//发送消息
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        //发送消息
        private static void sendMessage(Session session, MessageProducer messageProducer) {
            try {
                //创建消息Map<key,value>
                MapMessage message = session.createMapMessage();
                message.setString("userName", "syf");
                message.setInt("age", 30);
                message.setDouble("salary", 1000);
                message.setBoolean("isGirl", true);
                System.out.println("Sending:" + ((ActiveMQMapMessage)message).getContentMap());
                //发送消息
                messageProducer.send(message);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    Step2:启动ActiveMQ,运行生产者类,模拟生产消息

    控制台显示:

    Sending:{isGirl=true, userName=syf, salary=1000.0, age=30}

    打开浏览器,访问activeMQ监控画面 http://127.0.0.1:8161/admin

    【1】Queues

    【2】Topics

    三。模拟消费者消耗数据

    package activemq;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQMapMessage;
    
    import javax.jms.*;
    
    /**
     * 消息消费者
     */
    public class JMSConsumer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码
        private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = null;//连接工厂
            Connection connection = null;//连接
            Session session = null;//会话,接受或者发送消息的线程
            Destination destination = null;//消息的目的地
            MessageConsumer messageConsumer = null;//消息的消费者
            //实例化连接工厂(指定连接用户名|密码|连接地址)
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKERURL);
            try {
                connection = connectionFactory.createConnection();//通过连接工厂获取连接
                connection.start();//启动连接
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session
                destination = session.createQueue("TestQueue");//创建连接的消息队列(TestQueue:生产者队列名)
                messageConsumer = session.createConsumer(destination);//创建消息消费者
                while (true) {
                    MapMessage mapMessage= (MapMessage) messageConsumer.receive(10000);
                    //TextMessage textMessage= (TextMessage) messageConsumer.receive(10000);//10秒接收
                    if (mapMessage != null) {
                        System.out.println("收到的消息:" + ((ActiveMQMapMessage)mapMessage).getContentMap());
                    } else {
                        System.out.println("没有消息:");
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    运行代码:

    控制台显示:收到的消息:{userName=syf, salary=1000.0, isGirl=true, age=30}

    打开浏览器,查看activeMQ监控画面 http://127.0.0.1:8161/admin

    【1】Queues(1条消息被消费)

    【2】Topics

    示例-----企业应用最常见方式之监听器监听方式

    增加监听类JMSListener

    package activemq;
    
    import org.apache.activemq.command.ActiveMQMapMessage;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    /**
     * 消息监听
     * */
    public class JMSListener implements MessageListener{
        public void onMessage(Message message) {
            try {
                System.out.println("receive Message:"+((ActiveMQMapMessage)message).getContentMap());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    重新模拟一个消费者,应用监听器监听消息

    package activemq;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQMapMessage;
    
    import javax.jms.*;
    
    public class JMSConsumerByListener {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接密码
        private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认的连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = null;//连接工厂
            Connection connection = null;//连接
            Session session = null;//会话,接受或者发送消息的线程
            Destination destination = null;//消息的目的地
            MessageConsumer messageConsumer = null;//消息的消费者
            //实例化连接工厂(指定连接用户名|密码|连接地址)
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumerByListener.USERNAME, JMSConsumerByListener.PASSWORD, JMSConsumerByListener.BROKERURL);
            try {
                connection = connectionFactory.createConnection();//通过连接工厂获取连接
                connection.start();//启动连接
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session
                destination = session.createQueue("TestQueue2");//创建连接的消息队列(TestQueue:生产者队列名)
                messageConsumer = session.createConsumer(destination);//创建消息消费者
                messageConsumer.setMessageListener(new JMSListener());//注册消息监听
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    =》先执行生成者,生产消息!

       控制台打印:Sending:{isGirl=true, userName=kaixin, salary=1000.0, age=30}

       ActiveMQ监控画面显示:

    =》再执行消费者,消费消息!

      控制台打印:receive Message:{userName=kaixin, salary=1000.0, isGirl=true, age=30}

      ActiveMQ监控画面显示:

    OK!!!大功告成!

    参考文章:

    https://www.toutiao.com/a6345805464718409986/?tt_from=weixin&utm_campaign=client_share&app=news_article&utm_source=weixin&iid=18292470304&utm_medium=toutiao_android&wxshare_count=1

    http://blog.csdn.net/xh16319/article/details/12142249

    理论博客:http://www.cnblogs.com/Survivalist/p/8094069.html

    体胖还需勤跑步,人丑就该多读书!
  • 相关阅读:
    User Get 'Access Denied' with Excel Service WebPart
    How To Search and Restore files from Site Collection Recycle Bin
    How To Collect ULS Log from SharePoint Farm
    How To Restart timer service on all servers in farm
    How to Operate SharePoint User Alerts with PowerShell
    How to get Timer Job History
    Synchronization Service Manager
    SharePoint 2007 Full Text Searching PowerShell and CS file content with SharePoint Search
    0x80040E14 Caused by Max Url Length bug
    SharePoint 2007 User Re-created in AD with new SID issue on MySite
  • 原文地址:https://www.cnblogs.com/kaixinyufeng/p/8033936.html
Copyright © 2011-2022 走看看