zoukankan      html  css  js  c++  java
  • 消息中间件——activeMQ

    Activemq使用教程

     

    解压activmq进入binwin64 启动activemq.bat

    启动成功

    浏览器访问http://127.0.0.1:8161

    创建maven工程

    pom.xml中添加依赖

     <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.8</version>
      </dependency>
    </dependencies>

    创建创建者和发布者

     Producer 代码

    package com.td.active;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class producer {
        public static void main(String[] args) throws JMSException {
            //1、创建工厂连接对象,需要指定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Queue queue = session.createQueue("test-queue");
            //6、使用会话对象创建生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7、使用会话对象创建一个消息对象
            TextMessage textMessage = session.createTextMessage("hello!test-queue");
            //8、发送消息
            producer.send(textMessage);
            //9、关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }

    consumer 代码

    public class consumer {
        public static void main(String[] args) throws JMSException, IOException {
            //1、创建工厂连接对象,需要制定ip和端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
            //2、使用连接工厂创建一个连接对象
            Connection connection = connectionFactory.createConnection();
            //3、开启连接
            connection.start();
            //4、使用连接对象创建会话(session)对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
            Queue queue = session.createQueue("test-queue");
            //6、使用会话对象创建生产者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7、向consumer对象中设置一个messageListener对象,用来接收消息
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    // TODO Auto-generated method stub
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            });
            //8、程序等待接收用户消息
            System.in.read();
            //9、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

    开启生产者生产消息

    如果出现以下路径错误

     

    生产者启动成功生成一条消息在浏览器中可以看到

    启动消费者

    ActiveMQ整合spring及项目中运用

    activeMQ与spring看一整合到一起使用,除了添加ActiveMQ相关的jar包外,还需要添加spring的jar包:

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
    </dependency>

    然后编写applicationContext-activemq.xml文件, 

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context" 
           xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop" 
           xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
    
        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置生产者 -->
        <!-- Spring使用JMS工具类,可以用来发送和接收消息 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这里是配置的spring用来管理connectionfactory的connectionfactory -->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="spring-queue"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic"/>
        </bean>
    </beans>

    在使用的类中注入模板来使用

    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Resource(name="itemAddTopic")
    private Destination destination;

    发送消息的示例

    发送消息列表

    public void  addUser(){
        //第一个参数目的地  可以是队列的名称spring-queue 也可以是ip
        //第二个参数是发送消息的对象
        jmsTemplate.send("spring-queue", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("要发送的消息");
            }
        });
    }

    发送主题

    try {
        Topic topic = jmsTemplate.getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE).createTopic("item-add-topic");
        jmsTemplate.send(topic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("要发送的消息");
            }
        });
    } catch (JMSException e) {
        e.printStackTrace();
    }

    消费者项目

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!-- 配置能够产生connection的connectionfactory,由JMS对应的服务厂商提供 -->
        <bean id="tagertConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <constructor-arg name="brokerURL" value="tcp://127.0.0.1:61616"/>
        </bean>
        <!-- 配置spring管理真正connectionfactory的connectionfactory,相当于spring对connectionfactory的一层封装 -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="tagertConnectionFactory"/>
        </bean>
        <!-- 配置destination -->
        <!-- 队列目的地 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="spring-queue"/>
        </bean>
        <!-- 话题目的地 -->
        <bean id="itemAddTopic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="item-add-topic"/>
        </bean>
        <!-- 配置监听器 -->
        <bean id="myListener" class="com.td.active.MyListener"/>
        <bean id="itemAddListener" class="com.td.active.ItemAddListener"/>
        <!-- 配置系统监听器 消息列表 -->
        <!--    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <property name="connectionFactory" ref="connectionFactory"/>
                <property name="destination" ref="queueDestination"/>
                <property name="messageListener" ref="myListener"/>
            </bean> -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="itemAddTopic"/>
            <property name="messageListener" ref="itemAddListener"/>
        </bean>
    </beans>

    通过配置监听器实现接收消息

    列表监听器

    public class MyListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
    
            try {
                TextMessage testMessage = (TextMessage) message;
                String text = testMessage.getText();
                System.out.println("接收到消息 = " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    主题监听器

    public class ItemAddListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
    
            try {
                TextMessage testMessage = (TextMessage) message;
                String text = testMessage.getText();
                System.out.println("接收到消息 = " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    LeetCode Missing Number (简单题)
    LeetCode Valid Anagram (简单题)
    LeetCode Single Number III (xor)
    LeetCode Best Time to Buy and Sell Stock II (简单题)
    LeetCode Move Zeroes (简单题)
    LeetCode Add Digits (规律题)
    DependencyProperty深入浅出
    SQL Server存储机制二
    WPF自定义RoutedEvent事件示例代码
    ViewModel命令ICommand对象定义
  • 原文地址:https://www.cnblogs.com/lldsgj/p/10765129.html
Copyright © 2011-2022 走看看