zoukankan      html  css  js  c++  java
  • ActiveMQ点对点模式

    1.安装ActiveMQ服务器(略)

    2.启动ActiveMQ,浏览器访问8161端口,默认账号admin/admin

    3. 生产者代码

    package test001;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.*;

    /**
     * @Created by zhengqinfeng
     * @Description :
     * @Date : created in 23:44 2018/5/14
     */
    public class ProducerMQ {
        public static void main(String[] args) throws JMSException {
            //创建MQ工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            //创建连接
            Connection connection = factory.createConnection();
            //启动连接
            connection.start();
            //创建会话工厂, false:表示不以事务方式进行提交; Session.AUTO_ACKNOWLEDGE 表示自动签收
    //        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //Session.CLIENT_ACKNOWLEDGE:手动签收
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //创建队列
            Queue testMQ = session.createQueue("testMQ1");
            //创建生产者
            MessageProducer producer = session.createProducer(testMQ);
            //消息不持久化
    //        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //持久化消息
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 0; i <= 10; i++) {
                System.out.println("###########我是生产者:"+i+"###############");
                sendMsg(session, producer, "我是生产者:" + i);
            }
                System.out.println("###########发送消息完毕###############");

        }

        private static void  sendMsg(Session session,MessageProducer producer,String i) throws JMSException {
            TextMessage textMessage = session.createTextMessage("hello activeMQ " + i);
            producer.send(textMessage);
        }
    }

    4.消费者代码

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.*;

    /**
     * @Created by zhengqinfeng
     * @Description : 消费者
     * @Date : created in 12:47 2018/5/15
     */
    public class ConsumerMQ {
        public static void main(String[] args) throws JMSException {
            System.out.println("消费者1");
            //创建MQ工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            //创建连接
            Connection connection = factory.createConnection();
            //启动连接
            connection.start();
            //创建会话工厂  AUTO_ACKNOWLEDGE:自动签收
    //        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 手动签收
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //创建队列
            Queue testMQ = session.createQueue("testMQ1");
            //创建生产者
            MessageConsumer consumer = session.createConsumer(testMQ);

            //第一种消费消息的方式
    //        while (true) {
    //            //获取消息
    //            TextMessage message = (TextMessage) consumer.receive();
    //            if (message != null) {
    //                String text = message.getText();
    //                System.out.println("消费者获取消息,text:" + text);
    //                //手动签收,如果没有手动签收,消息还是会存在于队列中的(当然这是在Session.CLIENT_ACKNOWLEDGE模式下)
    //                message.acknowledge();
    //            } else {
    //                break;
    //            }
    //        }
    //        System.out.println("消费者获取消息完毕>>>>>>>>>>>>>>>>>>>..");


            //第二种消费消息的方式
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage m = (TextMessage) message;
                    try {
                        System.out.println("消费者获取消息:"+m.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

      启动消费者代码,然后修改System.out.println("消费者1")为System.out.println("消费者2"),再次启动。模拟测试两个消费者

     

    启动生产者

    再查看消费者1:

    查看消费者2:

     结论:消费1和消费者均摊消费testMQ1这个队列中的消息。

    日拱一卒无有尽,功不唐捐终入海
  • 相关阅读:
    C——联合体(共同体)总结
    JMX操作ActiveMQ(1)
    使用xml和java代码混合控制UI界面
    Hive Metastore ObjectStore PersistenceManager自动关闭bug解析
    (算法课大报告)大数据的查找与排序
    编程珠玑---读书笔记---使用后缀数组查找最长重复子串
    VMware vSphere服务器虚拟化实验十一高可用性之三Fault Tolerance
    签名应用例子
    fopen()惹的祸
    Bigcommerce: 给已完成购买的客户发送一封产品评论邮件,让客户直接进行产品评论
  • 原文地址:https://www.cnblogs.com/z-qinfeng/p/9704092.html
Copyright © 2011-2022 走看看