zoukankan      html  css  js  c++  java
  • ActiveMQ点对点模式

    1.安装ActiveMQ服务器(略)

    2.启动ActiveMQ,浏览器访问8161端口,默认账号admin/admin

    3. 生产者代码

    package test001;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.*;

    /**
     * @Created by zhengqinfeng
     * @Description :
     * @Date : created in 23:44 2018/5/14
     */
    public class ProducerMQ {
        public static void main(String[] args) throws JMSException {
            //创建MQ工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            //创建连接
            Connection connection = factory.createConnection();
            //启动连接
            connection.start();
            //创建会话工厂, false:表示不以事务方式进行提交; Session.AUTO_ACKNOWLEDGE 表示自动签收
    //        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //Session.CLIENT_ACKNOWLEDGE:手动签收
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //创建队列
            Queue testMQ = session.createQueue("testMQ1");
            //创建生产者
            MessageProducer producer = session.createProducer(testMQ);
            //消息不持久化
    //        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //持久化消息
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 0; i <= 10; i++) {
                System.out.println("###########我是生产者:"+i+"###############");
                sendMsg(session, producer, "我是生产者:" + i);
            }
                System.out.println("###########发送消息完毕###############");

        }

        private static void  sendMsg(Session session,MessageProducer producer,String i) throws JMSException {
            TextMessage textMessage = session.createTextMessage("hello activeMQ " + i);
            producer.send(textMessage);
        }
    }

    4.消费者代码

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.*;

    /**
     * @Created by zhengqinfeng
     * @Description : 消费者
     * @Date : created in 12:47 2018/5/15
     */
    public class ConsumerMQ {
        public static void main(String[] args) throws JMSException {
            System.out.println("消费者1");
            //创建MQ工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
            //创建连接
            Connection connection = factory.createConnection();
            //启动连接
            connection.start();
            //创建会话工厂  AUTO_ACKNOWLEDGE:自动签收
    //        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 手动签收
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            //创建队列
            Queue testMQ = session.createQueue("testMQ1");
            //创建生产者
            MessageConsumer consumer = session.createConsumer(testMQ);

            //第一种消费消息的方式
    //        while (true) {
    //            //获取消息
    //            TextMessage message = (TextMessage) consumer.receive();
    //            if (message != null) {
    //                String text = message.getText();
    //                System.out.println("消费者获取消息,text:" + text);
    //                //手动签收,如果没有手动签收,消息还是会存在于队列中的(当然这是在Session.CLIENT_ACKNOWLEDGE模式下)
    //                message.acknowledge();
    //            } else {
    //                break;
    //            }
    //        }
    //        System.out.println("消费者获取消息完毕>>>>>>>>>>>>>>>>>>>..");


            //第二种消费消息的方式
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage m = (TextMessage) message;
                    try {
                        System.out.println("消费者获取消息:"+m.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

      启动消费者代码,然后修改System.out.println("消费者1")为System.out.println("消费者2"),再次启动。模拟测试两个消费者

     

    启动生产者

    再查看消费者1:

    查看消费者2:

     结论:消费1和消费者均摊消费testMQ1这个队列中的消息。

    日拱一卒无有尽,功不唐捐终入海
  • 相关阅读:
    程序返回插入数据库成功,但是数据库内却没有数据
    C++ 使用动态二维数组参数
    深入理解.Net中的内存释放,以及有关的注意事项
    用数据集时,错误:未能启用约束。一行或多行中包含违反非空、唯一或外键约束的值
    关于堆和栈
    C#加密方法总汇
    const与readonly
    struts 将上传文件保存到数据库中
    java Annotation注解的运用
    转:获取汉字的拼音(包括一级和二级)
  • 原文地址:https://www.cnblogs.com/z-qinfeng/p/9704092.html
Copyright © 2011-2022 走看看