zoukankan      html  css  js  c++  java
  • linux:安装并使用activeMQ

    1.下载安装包:curl -O https://archive.apache.org/dist/activemq/5.14.0/apache-activemq-5.14.0-bin.tar.gz

    2.解压:tar -zxvf apache-activemq-5.14.0-bin.tar.gz

    3.重命名:mv apache-activemq-5.14.0-bin.tar.gz activemq

    4.进入bin目录:cd .../activemq/bin ,运行权限命令:chmod 755 activemq ,启动activemq: ./activemq

    5.查询端口:61616 和8161  命令:netstat -lntp (centos 7若找不到命令,运行命令安装:sudo yum install net-tools ,sudo yum provides ifconfig ,都装一下)

    java模拟场景,代码如下:

    导入包:activemq-all-5.8.0.jar

    Bean:

    public class MqBean implements Serializable{
        private Integer age;
        private String name;
        public Integer getAge() {
            return age;
        }
        public void setAge(Integer age) {
            this.age = age;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }

    队列消息的发送:

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            //第一个参数是是否是事务型消息,设置为true,第二个参数无效
            //第二个参数是
            //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的
            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的
            //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者)
            //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。
            //待测试
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue("test-queue");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
            MqBean bean = new MqBean();
            bean.setAge(13);
            for(int i=0;i<100;i++){
                bean.setName("小黄"+i);
                producer.send(session.createObjectMessage(bean));
            }
            producer.close();
            System.out.println("呵呵");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。因为无法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:

    1. 消息从生成方客户端传送到消息服务器。
    2. 消息服务器读取消息。
    3. 消息被放置到持久性存储器当中(出于可靠性的考虑)。
    4. 消息服务器确认收到消息(出于可靠性的考虑)。
    5. 消息服务器确定消息的路由。
    6. 消息服务器写出消息。
    7. 消息从消息服务器传送到使用方客户端。
    8. 使用方客户端确认收到消息(出于可靠性的考虑)。
    9. 消息服务器处理客户端确认(出于可靠性的考虑)。
    10. 消息服务器确定已经处理客户端确认。

    队列消息的接收:

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接  
        Connection connection = null;
        // Session: 一个发送或接收消息的线程  
        Session session;
        // Destination :消息的目的地;消息发送给谁.  
        Destination destination;
        // 消费者,消息接收者  
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
        try {
            // 构造从工厂得到连接对象  
            connection = connectionFactory.createConnection();
            // 启动  
            connection.start();
            // 获取操作连接  
            //这个最好还是有事务
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createQueue("test-queue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                        System.out.println(bean);
                        if (null != message) {
                            System.out.println("收到消息" + bean.getName());
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    订阅消息的发送:

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        Connection connection;
        Session session;
        Destination destination;
        MessageProducer producer;
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createTopic("test-topic");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢
            MqBean bean = new MqBean();
            bean.setAge(13);
            for(int i=0;i<100;i++){
                Thread.sleep(1000);
                bean.setName("小黄"+i);
                producer.send(session.createObjectMessage(bean));
            }
            producer.close();
            System.out.println("呵呵");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    订阅消息的接收:

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接  
        Connection connection = null;
        // Session: 一个发送或接收消息的线程  
        Session session;
        // Destination :消息的目的地;消息发送给谁.  
        Destination destination;
        // 消费者,消息接收者  
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
        try {
            // 构造从工厂得到连接对象  
            connection = connectionFactory.createConnection();
            // 启动  
            connection.start();
            // 获取操作连接  
            //这个最好还是有事务
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
            destination = session.createTopic("test-topic");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
                        System.out.println(bean);
                        if (null != message) {
                            System.out.println("收到消息" + bean.getName());
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    以上的消息发送后,如果没有接收到,可以登录自己的MQ管理页面: http://192.168.3.159:8161/admin/ ,默认帐号密码都是admin,查看队列中的消息

    Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数

    Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减

    Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

  • 相关阅读:
    Codeforces Round #652 (Div. 2) A. FashionabLee(几何)
    轻量应用服务器如何通过修改apache配置文件实现非https的访问多域名到不同子目录程序?
    在Windows环境下使用hexo搭建博客以及部署到gitee / github
    使用WordPress搭建个人手机博客(阿里云)
    访问自己服务器的ip地址
    php环境无法上传文件的解决方法
    SSRF漏洞
    CSRF全家桶(含义,防御,攻击)
    JS实现HTML实体与字符的相互转换
    CentOS系统下载及应用部署
  • 原文地址:https://www.cnblogs.com/incognitor/p/8311734.html
Copyright © 2011-2022 走看看