zoukankan      html  css  js  c++  java
  • ActiveMQ初步学习

    本文主要参考张丰哲大神的简书文章,链接 https://www.jianshu.com/p/ecdc6eab554c

    JMS,即Java Message Service,通过面向消息中间件(MOM:Message Oriented Middleware)的方式很好的解决了上面的问题。大致的过程是这样的:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系;在pub/sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

     
    JMS

    需要注意的是,JMS只是定义了Java访问消息中间件的接口,其实就是在包javax.jms中,你会发现这个包下除了异常定义,其他都是interface。我们可以扫一眼,比如Message:

     
    Message接口

    JMS只给出接口,然后由具体的中间件去实现,比如ActiveMQ就是实现了JMS的一种Provider,JMS规范定义的一些术语:

    Provider/MessageProvider:生产者

    Consumer/MessageConsumer:消费者

    PTP:Point To Point,点对点通信消息模型

    Pub/Sub:Publish/Subscribe,发布订阅消息模型

    Queue:队列,目标类型之一,和PTP结合

    Topic:主题,目标类型之一,和Pub/Sub结合

    ConnectionFactory:连接工厂,JMS用它创建连接

    Connnection:JMS Client到JMS Provider的连接

    Destination:消息目的地,由Session创建

    Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的


    首先将active的压缩包上传至服务器并解压

    在保证JDK已经安装的情况下可以直接通过bin目录下的activemq文件启动activemq服务。

    使用命令:./apache-activemq-5.12.0/bin/activemq启动服务

    控制台登录的用户名密码可以在apache-activemq-5.12.0/conf/jetty-realm.properties文件中配置:

    activemq的默认端口是8161,可以在apache-activemq-5.12.0/conf/jetty.xml文件中配置

    现在,我们可以直接访问activemq的控制台,在输入用户名密码登录之后,如下:

    下面我们可以试着在JAVA中写一个P2P类型生产者用于发送消息,一个消费者用于接收消息:

            @Test
        public void testQueueProducer() throws Exception{
            //1、创建一个连接工厂对象,需要指定服务的IP及端口
            ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.1.102:61616");
            //2、使用工厂对象创建一个connection对象
            Connection connection = factory.createConnection();
            //3、调用connection对象的start方法开启连接
            connection.start();
            //4、创建一个session对象:第一个参数是是否开启事务,一般不开启,如果为true,第二个参数无意义;第二个参数是应答模式,包括自动应答和手动应答,一般为自动
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用session创建一个destination(消息的目的地)对象
            Queue queue = session.createQueue("test-queue");
            //6、使用session对象创建一个produce对象
            MessageProducer producer = session.createProducer(queue);
            //7、创建一个message对象,可以使用TestMessage
            /*TextMessage TextMessage = new ActiveMQTextMessage();
            TextMessage.setText("hello ActiveMQ");*/
            TextMessage textMessage = session.createTextMessage("hello ActiveMQ");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }        
    消息生产者发送消息

    我们这里创建连接工厂使用的是tcp协议的61616端口,其实activemq还对其他协议开放了多个端口,我们在conf/activemq.xml文件中可以查看

    @Test
        public void testQueueConsumer() throws Exception{
            //1、创建一个ConnectionFactory
            ConnectionFactory factory=new ActiveMQConnectionFactory("tcp://192.168.1.102:61616");
            //2、创建连接对象connection
            Connection connection = factory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用connection创建Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、创建一个destination对象,queue对象
            Queue queue = session.createQueue("test-queue");
            //6、使用session创建一个consumer对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7、接受消息打印结果
            consumer.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage) message;
                    String text;
                    try {
                        text=textMessage.getText();
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            
            //等待接受消息
            System.in.read();
            //8、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    消息接收者接收消息

    下面我们到控制台查看:

    Messages Enqueued:表示生产了多少条消息,记做P

    Messages Dequeued:表示消费了多少条消息,记做C

    Number Of Consumers:表示在该队列上还有多少消费者在等待接受消息

    Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是P-C



  • 相关阅读:
    PAT 解题报告 1009. Product of Polynomials (25)
    PAT 解题报告 1007. Maximum Subsequence Sum (25)
    PAT 解题报告 1003. Emergency (25)
    PAT 解题报告 1004. Counting Leaves (30)
    【转】DataSource高级应用
    tomcat下jndi配置
    java中DriverManager跟DataSource获取getConnection有什么不同?
    理解JDBC和JNDI
    JDBC
    Dive into python 实例学python (2) —— 自省,apihelper
  • 原文地址:https://www.cnblogs.com/qingo00o/p/9186817.html
Copyright © 2011-2022 走看看