zoukankan      html  css  js  c++  java
  • 消息中间件——activeMQ

    Activemq使用教程

     

    解压activmq进入binwin64 启动activemq.bat

    启动成功

    浏览器访问http://127.0.0.1:8161

    创建maven工程

    pom.xml中添加依赖

     <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.8</version>
      </dependency>
    </dependencies>

    创建创建者和发布者

     Producer 代码

    package com.td.active;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class producer {
        public static void main(String[] args) throws JMSException {
            //1、创建工厂连接对象,需要指定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Queue queue = session.createQueue("test-queue");
            //6、使用会话对象创建生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7、使用会话对象创建一个消息对象
            TextMessage textMessage = session.createTextMessage("hello!test-queue");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

    consumer 代码

    public class consumer {
        public static void main(String[] args) throws JMSException, IOException {
            //1、创建工厂连接对象,需要制定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Queue queue = session.createQueue("test-queue");
            //6、使用会话对象创建生产者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7、向consumer对象中设置一个messageListener对象,用来接收消息
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    // TODO Auto-generated method stub
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
            //8、程序等待接收用户消息
            System.in.read();
            //9、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    开启生产者生产消息

    如果出现以下路径错误

     

    生产者启动成功生成一条消息在浏览器中可以看到

    启动消费者

    ActiveMQ整合spring及项目中运用

    activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
    </dependency>

    然后编写applicationContext-activemq.xml文件, 

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context" 
           xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop" 
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
    
        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置生产者 -->
        <!-- Spring使用JMS工具类,可以用来发送和接收消息 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory -->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="spring-queue"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic"/>
        </bean>
    </beans>

    在使用的类中注入模板来使用

    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Resource(name="itemAddTopic")
    private Destination destination;

    发送消息的示例

    发送消息列表

    public void  addUser(){
        //第一个参数目的地  可以是队列的名称spring-queue 也可以是ip
        //第二个参数是发送消息的对象
        jmsTemplate.send("spring-queue", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("要发送的消息");
            }
        });
    }

    发送主题

    try {
        Topic topic = jmsTemplate.getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createTopic("item-add-topic");
        jmsTemplate.send(topic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("要发送的消息");
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    }

    消费者项目

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="spring-queue"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic"/>
        </bean>
        <!-- 配置监听器 -->
        <bean id="myListener" class="com.td.active.MyListener"/>
        <bean id="itemAddListener" class="com.td.active.ItemAddListener"/>
        <!-- 配置系统监听器 消息列表 -->
        <!--    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="connectionFactory" ref="connectionFactory"/>
                <property name="destination" ref="queueDestination"/>
                <property name="messageListener" ref="myListener"/>
            </bean> -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="itemAddTopic"/>
            <property name="messageListener" ref="itemAddListener"/>
        </bean>
    </beans>

    通过配置监听器实现接收消息

    列表监听器

    public class MyListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
    
            try {
                TextMessage testMessage = (TextMessage) message;
                String text = testMessage.getText();
                System.out.println("接收到消息 = " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    主题监听器

    public class ItemAddListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
    
            try {
                TextMessage testMessage = (TextMessage) message;
                String text = testMessage.getText();
                System.out.println("接收到消息 = " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    js 冒泡排序 与非冒泡排序
    js Base64加密与解密
    复杂的时间调度器
    js计算阶乘与斐波切纳数列
    js按需加载兼容性写法
    关于JavaScript的宏任务与微任务
    caller与callee的区别
    如何实现一个parseInt
    nodejs 保存 payload 发送过来的文件
    angularjs:[1] ui-router 权限控制
  • 原文地址:https://www.cnblogs.com/lldsgj/p/10765129.html
Copyright © 2011-2022 走看看