zoukankan      html  css  js  c++  java
  • activeMQ 学习

    1.下载activeMQ
    2.解压activeMQ
    3.启动
      对于5.10版本以及之后用:binactivemq start
      对于5.9版本以及更早的用:binactivemq
    4.消息类型
      4.1点对点
      消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费
      4.2发布/订阅
      消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。当生产者发布消息,不管是否有消费者。都不会保存消息

    5.示例代码

      5.1点对点

      1)生产者

    package org.tonny.junior.queue;
    
    import java.util.Date;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Producer
    {
        public static void main(String[] args)
        {
            String user = ActiveMQConnection.DEFAULT_USER;
            String password = ActiveMQConnection.DEFAULT_PASSWORD;
            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
            String subject = "TOOL.DEFAULT";
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    
            try
            {
                Connection conn = connectionFactory.createConnection();
                conn.start();
                Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(subject);
                MessageProducer producer = session.createProducer(destination);
                for (int i = 0; i < 1000; i++)
                {
                    MapMessage message = session.createMapMessage();
                    Date date = new Date();
                    message.setInt("count", i + 1);
                    message.setLong("time", date.getTime());
                    Thread.sleep(2000);
                    producer.send(message);
                    System.out.println("--发送消息:" + date);
                    session.commit();
                }
                session.close();
                conn.close();
            }
            catch (Exception e)
            {
                // TODO: handle exception
            }
        }
    }

      2)消费者

    package org.tonny.junior.queue;
    
    import java.util.Date;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer
    {
        public static void main(String[] args)
        {
            String user = ActiveMQConnection.DEFAULT_USER;
            String password = ActiveMQConnection.DEFAULT_PASSWORD;
            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
            String subject = "TOOL.DEFAULT";
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    
            try
            {
                Connection conn = connectionFactory.createConnection();
                conn.start();
                Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination = session.createQueue(subject);
                MessageConsumer consumer = session.createConsumer(destination);
                consumer.setMessageListener(new MessageListener()
                {
                    @Override
                    public void onMessage(Message message)
                    {
                        MapMessage msg = (MapMessage) message;
                        try
                        {
                            System.out.println("--收到消息" + msg.getInt("count"));
                            System.out.println("--收到消息" + new Date(msg.getLong("time")));
                            
                        }
                        catch (Exception e)
                        {
                            e.printStackTrace();
                        }
    
                    }
                });
                Thread.sleep(3000);
                session.commit();
                session.close();
                conn.close();
            }
            catch (Exception e)
            {
                // TODO: handle exception
            }
        }
    }

      5.2发布/订阅

      1)订阅者

    package org.tonny.junior.topic;
    
    import java.util.Date;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class SubscriberFirst
    {
    
        public static void main(String[] args)
        {
            String user = ActiveMQConnection.DEFAULT_USER;
            String password = ActiveMQConnection.DEFAULT_PASSWORD;
            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
            String subject = "TOOL.DEFAULT";
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    
            try
            {
                Connection conn = connectionFactory.createConnection();
                conn.start();
                Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Topic topic = session.createTopic(subject);
                MessageConsumer comsumer = session.createConsumer(topic);
    
                comsumer.setMessageListener(new MessageListener()
                {
    
                    @Override
                    public void onMessage(Message msg)
                    {
                        MapMessage message = (MapMessage) msg;
                        try
                        {
                            System.out.println("--收到消息" + message.getInt("count"));
                            System.out.println("--收到消息" + new Date(message.getLong("time")));
                            session.commit();
                        }
                        catch (Exception e)
                        {
                            // TODO: handle exception
                        }
    
                    }
                });
    
                //session.close();
                //conn.close();
            }
            catch (Exception e)
            {
                // TODO: handle exception
            }
        }
    
    }

      2)发布者

    package org.tonny.junior.topic;
    
    import java.util.Date;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Publisher
    {
    
        public static void main(String[] args)
        {
            String user = ActiveMQConnection.DEFAULT_USER;
            String password = ActiveMQConnection.DEFAULT_PASSWORD;
            String url = ActiveMQConnection.DEFAULT_BROKER_URL;
            String subject = "TOOL.DEFAULT";
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    
            try
            {
                Connection conn = connectionFactory.createConnection();
                conn.start();
                Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Topic topic = session.createTopic(subject);
                MessageProducer producer = session.createProducer(topic);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                for (int i = 0; i < 20; i++)
                {
                    MapMessage message = session.createMapMessage();
                    Date date = new Date();
                    message.setInt("count", i + 1);
                    message.setLong("time", date.getTime());
                    Thread.sleep(200);
                    producer.send(message);
                    System.out.println("--发送消息:" + date);
                }
                session.commit();
                session.close();
                conn.close();
            }
            catch (Exception e)
            {
                // TODO: handle exception
            }
        }
    
    }

    6.消息查看
    登录 http://localhost:8161/admin/index.jsp 用户名/密码: admin/admin

  • 相关阅读:
    回车换行解释
    二,php的错误处理
    2017年计划
    postgresql无法安装pldbgapi的问题
    在tmux中的vi 上下左右键变为了ABCD等字符
    查看某表有没有语句被锁住
    ubuntu 常见错误--Could not get lock /var/lib/dpkg/lock
    PostgreSQL杀掉死锁的链接
    实现从Oracle增量同步数据到GreenPlum
    终于将rsync-3.1.2配置成功,之外还挖掘了一些新的用法
  • 原文地址:https://www.cnblogs.com/supertonny/p/8277003.html
Copyright © 2011-2022 走看看