zoukankan      html  css  js  c++  java
  • ActiveMq简单整合

    ActiveMq

    1. 原生JMS API操作ActiveMQ

    PTP 模式(生产者)
    /**
     * 演示点对点模式 -- 消息生产者
     */
    public class PTP_Producer {
    
        public static void main(String[] args) throws JMSException {
    
            //1.创建连接工厂
            ConnectionFactory factory
                     = new ActiveMQConnectionFactory("tcp://192.168.1.144:61616");
    
            //2.创建连接
            Connection connection = factory.createConnection();
    
            //3.打开连接
            connection.start();
    
            //4.创建session
            /**
             * 参数一:是否开启事务操作
             * 参数二:消息确认机制
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
            Queue queue = session.createQueue("queue01");
    
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(queue);
    
            //7.创建消息
            //createTextMessage: 文本类型
            TextMessage textMessage = session.createTextMessage("test message");
    
            //8.发送消息
            producer.send(textMessage);
    
            System.out.println("消息发送完成");
    
            //9.释放资源
            session.close();
            connection.close();
        }
    
    }
    
    PTP模式(消费者)
    /**
     * 演示点对点模式- 消息消费者(第二种方案) -- 更加推荐
     */
    public class PTP_Consumer2 {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory factory
                     = new ActiveMQConnectionFactory("tcp://192.168.1.144:61616");
    
            //2.创建连接
            Connection connection = factory.createConnection();
    
            //3.打开连接
            connection.start();
    
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //5.指定目标地址
            Queue queue = session.createQueue("queue01");
    
            //6.创建消息的消费者
            MessageConsumer consumer = session.createConsumer(queue);
    
            //7.设置消息监听器来接收消息
            consumer.setMessageListener( message ->{
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage)message;
    
                    try {
                        System.out.println("接收的消息(2):"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
    //        //7.设置消息监听器来接收消息
    //        consumer.setMessageListener(new MessageListener() {
    //            //处理消息
    //            @Override
    //            public void onMessage(Message message) {
    //                if(message instanceof TextMessage){
    //                    TextMessage textMessage = (TextMessage)message;
    //
    //                    try {
    //                        System.out.println("接收的消息(2):"+textMessage.getText());
    //                    } catch (JMSException e) {
    //                        e.printStackTrace();
    //                    }
    //                }
    //            }
    //        });
    
            //注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
        }
    
    }
    
    
    Pub/Sub模式(生成者)
    /**
     * 演示发布订阅模式 -- 消息生产者
     */
    public class PS_Producer {
    
        public static void main(String[] args) throws JMSException {
    
            //1.创建连接工厂
            ConnectionFactory factory
                     = new ActiveMQConnectionFactory("tcp://192.168.1.144:61616");
    
            //2.创建连接
            Connection connection = factory.createConnection();
    
            //3.打开连接
            connection.start();
    
            //4.创建session
            /**
             * 参数一:是否开启事务操作
             * 参数二:消息确认机制
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //5.创建目标地址(Queue:点对点消息,Topic:发布订阅消息)
            Topic topic = session.createTopic("topic01");
    
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(topic);
    
            //7.创建消息
            //createTextMessage: 文本类型
            TextMessage textMessage = session.createTextMessage("test message--topic");
    
            //8.发送消息
            producer.send(textMessage);
    
            System.out.println("消息发送完成");
    
            //9.释放资源
            session.close();
            connection.close();
        }
    
    }
    
    
    Pub/Sub模式(消费者)
    /**
     * 演示发布订阅模式- 消息消费者
     */
    public class PS_Consumer {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory factory
                     = new ActiveMQConnectionFactory("tcp://192.168.1.144:61616");
    
            //2.创建连接
            Connection connection = factory.createConnection();
    
            //3.打开连接
            connection.start();
    
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //5.指定目标地址
            Topic topic = session.createTopic("topic01");
    
            //6.创建消息的消费者
            MessageConsumer consumer = session.createConsumer(topic);
    
            //7.设置消息监听器来接收消息
            consumer.setMessageListener(message ->{
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
    
                        try {
                            System.out.println("接收的消息--topic:"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
            });
    
            //注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
        }
    
    }
    

    Spring整合Activemq

    pom.xml

    /**
     * 演示发布订阅模式- 消息消费者
     */
    public class PS_Consumer {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory factory
                     = new ActiveMQConnectionFactory("tcp://192.168.1.144:61616");
    
            //2.创建连接
            Connection connection = factory.createConnection();
    
            //3.打开连接
            connection.start();
    
            //4.创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //5.指定目标地址
            Topic topic = session.createTopic("topic01");
    
            //6.创建消息的消费者
            MessageConsumer consumer = session.createConsumer(topic);
    
            //7.设置消息监听器来接收消息
            consumer.setMessageListener(message ->{
                    if(message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
    
                        try {
                            System.out.println("接收的消息--topic:"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
            });
    
            //注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收
        }
    
    }
    
    生产者
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!--1.创建连接工厂对象-->
        <amp:connectionFactory
            id="connetionFactory"
            brokerURL="tcp://192.168.1.144:61616"
            userName="admin"
            password="admin"
        />
    
        <!--2.创建缓存连接工厂-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <!--注入连接工厂-->
            <property name="targetConnectionFactory" ref="connetionFactory"/>
            <!--缓存消息数据-->
            <property name="sessionCacheSize" value="5"/>
        </bean>
    
        <!--3.创建用于点对点发送的JmsTemplate-->
        <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--注入缓存连接工厂-->
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <!--指定是否为发布订阅模式-->
            <property name="pubSubDomain" value="false"/>
        </bean>
    
        <!--4.创建用于发布订阅发送的JmsTemplate-->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!--注入缓存连接工厂-->
            <property name="connectionFactory" ref="cachingConnectionFactory"/>
            <!--指定是否为发布订阅模式-->
            <property name="pubSubDomain" value="true"/>
        </bean>
    </beans>
    

    生产者:

    import com.dalianpai.activemq.bo.People;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.command.ActiveMQDestination;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.spring.ActiveMQConnectionFactory;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import org.springframework.util.FileCopyUtils;
    import org.springframework.util.FileSystemUtils;
    
    import javax.jms.*;
    import java.io.File;
    
    /**
     * 演示Spring与ActiveMQ整合
     */
    @RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
    @ContextConfiguration("classpath:applicationContext-producer.xml") // 加载spring配置文件
    public class SpringProducer {
    
        //点对点模式
        @Autowired
        @Qualifier("jmsQueueTemplate")
        private JmsTemplate jmsQueueTemplate;
    
        //发布订阅模式
        @Autowired
        @Qualifier("jmsTopicTemplate")
        private JmsTemplate jmsTopicTemplate;
    
    
        /**
         * 点对点发送
         */
        @Test
        public void ptpSender(){
    
    
    
            /**
             * 参数一:指定队列的名称
             * 参数二:MessageCreator接口,我们需要提供该接口的匿名内部实现
             */
            jmsQueueTemplate.send("spring_queue", new MessageCreator() {
                //我们只需要返回发送的消息内容即可
                @Override
                public Message createMessage(Session session) throws JMSException {
                    People p = new People();
                    p.setAge(20);
                    p.setName("wgr");
                    ObjectMessage message = session.createObjectMessage(p);
                    //创建文本消息
                 //   TextMessage textMessage = session.createTextMessage("spring test message");
                    return message;
                }
            });
            System.out.println("消息发送已完成");
        }
    
        /**
         * 发布订阅发送
         */
        @Test
        public void psSender(){
            jmsTopicTemplate.send("spring_topic", new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    //创建文本消息
                    TextMessage textMessage = session.createTextMessage("spring test message--topic");
                    return textMessage;
                }
            });
            System.out.println("消息发送已完成");
        }
    }
    
    
    消费者
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amp="http://activemq.apache.org/schema/core"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:jms="http://www.springframework.org/schema/jms"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
    
    
        <!--1.连接工厂-->
        <amp:connectionFactory
            id="connectionFactory"
            brokerURL="tcp://192.168.1.144:61616"
            userName="admin"
            password="admin"
        />
    
    
        <!--2.缓存连接工厂-->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="connectionFactory"/>
            <property name="sessionCacheSize" value="100"/>
        </bean>
    
    
        <!--3.配置消息监听组件扫描-->
        <context:component-scan base-package="com.dalianpai.activemq.listener"/>
    
       <!--4.配置监听器(点对点)-->
        <!--
             destination-type: 目标的类型(queue:点对点,topic:发布订阅)
        -->
        <jms:listener-container connection-factory="cachingConnectionFactory" destination-type="queue">
            <jms:listener destination="spring_queue" ref="queueListener"/>
        </jms:listener-container>
    
    
        <!--5.配置监听器(发布订阅)-->
        <jms:listener-container connection-factory="cachingConnectionFactory" destination-type="topic">
            <jms:listener destination="spring_topic" ref="topicListener"/>
        </jms:listener-container>
    
    
    
    
    </beans>
    
    /**
     * @author WGR
     * @create 2020/10/13 -- 11:27
     */
    public class SpringConsuer {
        public static void main(String[] args) throws IOException {
            //1.加载spring配置
            ClassPathXmlApplicationContext
                    cxt = new ClassPathXmlApplicationContext("classpath:applicationContext-consumer.xml");
            //2.启动
            cxt.start();
    
            //3.阻塞方法,让程序一直处于等待状态
            System.in.read();
    
        }
    }
    
    /**
     * 点对点
     */
    @Component // 放入SpringIIOC容器,名称queueListener
    public class QueueListener implements MessageListener{
    
        //用于接收消息
        @Override
        public void onMessage(Message message) {
    //        if(message instanceof TextMessage){
    //            TextMessage textMessage = (TextMessage) message;
    //            try {
    //                System.out.println("queue接口消息:"+textMessage.getText());
    //            } catch (JMSException e) {
    //                e.printStackTrace();
    //            }
    //
    //        }
    
            if(message instanceof ObjectMessage){
                ObjectMessage objectMessage =   (ObjectMessage)message;
    
                try {
                    People object = (People)objectMessage.getObject();
                    System.out.println(object.getAge());
                    System.out.println(object.getName());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    
    /**
     * 发布订阅
     */
    @Component // 放入SpringIIOC容器,名称queueListener
    public class TopicListener implements MessageListener{
    
        //用于接收消息
        @Override
        public void onMessage(Message message) {
            if(message instanceof TextMessage){
    
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("topic接口消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            }
    
        }
    }
    

    个人感觉实用就是动态的加队列加监听删除队列等,新增队列的话,给该队列有消息就会新建,删除队列可以参考原生JMS,也能实现,但是新增监听暂时没有什么好的实现,再研究研究。

    SpringBoot整合Activemq
    server:
      port: 9002 #端口
    spring:
      application:
        name: activemq # 服务名称
    
    # springboot与activemq整合配置
      activemq:
        broker-url: tcp://192.168.1.144:61616 # 连接地址
        user: admin # activemq用户名
        password: admin  # activemq密码
    
    # 指定发送模式 (点对点 false , 发布订阅 true)
      jms:
        pub-sub-domain: false
    
    activemq:
      name: springboot_queue
    
    /**
     * 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
     */
    @Component // 放入IOC容器
    public class MsgListener {
    
        /**
         * 用于接收消息的方法
         *  destination: 队列的名称或主题的名称
         */
        @JmsListener(destination = "${activemq.name}")
        public void receiveMessage(Message message){
            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
    }
    
    

    测试类:

    @SpringBootTest
    class ActivemqApplicationTests {
    
        //JmsMessagingTemplate: 用于工具类发送消息
        @Autowired
        private JmsMessagingTemplate jmsMessagingTemplate;
    
        @Value("${activemq.name}")
        private String name;
    
        @Test
        public void ptpSender(){
            /**
             * 参数一:队列的名称或主题名称
             * 参数二:消息内容
             */
            jmsMessagingTemplate.convertAndSend(name,"spring boot message--queue");
    
        }
    }
    
    

    image-20201014110759941

  • 相关阅读:
    彻底理解同步 异步 阻塞 非阻塞
    Vue2+Hbuilder 开发 H5+App 优雅调试
    Vue2+Hbuilderx打包移动端App的常见问题
    题解 loj 6102 斐波那契的最小公倍数
    题解 hdu 4336 Card Collector
    题解 luogu P3715 [HAOI2015]按位或
    python+appium【第二章-adb命令的使用】
    python+appium【第一章-环境搭建】
    python封装上传图片方法执行时有告警【ResourceWarning: Enable tracemalloc to get the object allocation traceback5】
    python需要上传图片或者上传文件的方法【autoit3】
  • 原文地址:https://www.cnblogs.com/dalianpai/p/13813782.html
Copyright © 2011-2022 走看看