zoukankan      html  css  js  c++  java
  • ActiveMQ初步学习

    本文主要参考张丰哲大神的简书文章,链接 https://www.jianshu.com/p/ecdc6eab554c

    JMS,即Java Message Service,通过面向消息中间件(MOM:Message Oriented Middleware)的方式很好的解决了上面的问题。大致的过程是这样的:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系;在pub/sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

     
    JMS

    需要注意的是,JMS只是定义了Java访问消息中间件的接口,其实就是在包javax.jms中,你会发现这个包下除了异常定义,其他都是interface。我们可以扫一眼,比如Message:

     
    Message接口

    JMS只给出接口,然后由具体的中间件去实现,比如ActiveMQ就是实现了JMS的一种Provider,JMS规范定义的一些术语:

    Provider/MessageProvider:生产者

    Consumer/MessageConsumer:消费者

    PTP:Point To Point,点对点通信消息模型

    Pub/Sub:Publish/Subscribe,发布订阅消息模型

    Queue:队列,目标类型之一,和PTP结合

    Topic:主题,目标类型之一,和Pub/Sub结合

    ConnectionFactory:连接工厂,JMS用它创建连接

    Connnection:JMS Client到JMS Provider的连接

    Destination:消息目的地,由Session创建

    Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的


    首先将active的压缩包上传至服务器并解压

    在保证JDK已经安装的情况下可以直接通过bin目录下的activemq文件启动activemq服务。

    使用命令:./apache-activemq-5.12.0/bin/activemq启动服务

    控制台登录的用户名密码可以在apache-activemq-5.12.0/conf/jetty-realm.properties文件中配置:

    activemq的默认端口是8161,可以在apache-activemq-5.12.0/conf/jetty.xml文件中配置

    现在,我们可以直接访问activemq的控制台,在输入用户名密码登录之后,如下:

    下面我们可以试着在JAVA中写一个P2P类型生产者用于发送消息,一个消费者用于接收消息:

            @Test
        public void testQueueProducer() throws Exception{
            //1、创建一个连接工厂对象,需要指定服务的IP及端口
            ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.1.102:61616");
            //2、使用工厂对象创建一个connection对象
            Connection connection = factory.createConnection();
            //3、调用connection对象的start方法开启连接
            connection.start();
            //4、创建一个session对象:第一个参数是是否开启事务,一般不开启,如果为true,第二个参数无意义;第二个参数是应答模式,包括自动应答和手动应答,一般为自动
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用session创建一个destination(消息的目的地)对象
            Queue queue = session.createQueue("test-queue");
            //6、使用session对象创建一个produce对象
            MessageProducer producer = session.createProducer(queue);
            //7、创建一个message对象,可以使用TestMessage
            /*TextMessage TextMessage = new ActiveMQTextMessage();
            TextMessage.setText("hello ActiveMQ");*/
            TextMessage textMessage = session.createTextMessage("hello ActiveMQ");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }        
    消息生产者发送消息

    我们这里创建连接工厂使用的是tcp协议的61616端口,其实activemq还对其他协议开放了多个端口,我们在conf/activemq.xml文件中可以查看

    @Test
        public void testQueueConsumer() throws Exception{
            //1、创建一个ConnectionFactory
            ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.1.102:61616");
            //2、创建连接对象connection
            Connection connection = factory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用connection创建Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、创建一个destination对象,queue对象
            Queue queue = session.createQueue("test-queue");
            //6、使用session创建一个consumer对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7、接受消息打印结果
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage) message;
                    String text;
                    try {
                        text=textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            //等待接受消息
            System.in.read();
            //8、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    消息接收者接收消息

    下面我们到控制台查看:

    Messages Enqueued:表示生产了多少条消息,记做P

    Messages Dequeued:表示消费了多少条消息,记做C

    Number Of Consumers:表示在该队列上还有多少消费者在等待接受消息

    Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是P-C



  • 相关阅读:
    面试题15:链表中倒数第K个结点
    面试题31:连续子数组的最大和
    数据库索引实例
    面试题27:二叉搜索树与双向链表
    面试题28:字符串的排列
    java比较器Comparable接口和Comaprator接口
    面向对象知识汇总
    虚函数与纯虚函数
    Linux IO实时监控iostat命令详解
    hive GroupBy操作(翻译自Hive wiki)
  • 原文地址:https://www.cnblogs.com/qingo00o/p/9186817.html
Copyright © 2011-2022 走看看