zoukankan      html  css  js  c++  java
  • SpringBoot+activeMq

    整合SpringBoot

    • Maven
    <!--消息队列连接池-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.9</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    
    • application.yml
    spring:
      activemq:
        broker-url: tcp://192.168.100.155:61616
        #true 表示使用内置的MQ,false则连接服务器
        in-memory: false
        pool:
          #true表示使用连接池;false时,每发送一条数据创建一个连接
          enabled: true
          #连接池最大连接数
          max-connections: 10
          #空闲的连接过期时间,默认为30秒
          idle-timeout: 30000
    
    • 启动入口文件
    @SpringBootApplication
    @EnableJms //启动消息队列
    public class ActiveMQApplication {
        public static void main(String[] args) {
            SpringApplication.run(ActiveMQApplication.class, args);
        }
    }
    
    • 配置文件
    @Configuration
    public class conf {
        @Value("${spring.activemq.broker-url}")
        private String brokerUrl;
        //点对点
        @Bean
        public Queue queue() {
            ActiveMQQueue springBootQueue = new ActiveMQQueue("springBootQueue");
            return springBootQueue;
        }
    
        //发布订阅
        @Bean
        public Topic topic() {
            ActiveMQTopic springBootTopic = new ActiveMQTopic("springBootTopic");
            return springBootTopic;
        }
    
        //连接工厂
        @Bean
        public ActiveMQConnectionFactory connectionFactory() {
            return new ActiveMQConnectionFactory(brokerUrl);
        }
    
        //Queue模式
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        //Topic模式
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory factory  = new DefaultJmsListenerContainerFactory();
            //设置为发布订阅方式, 默认情况下使用的生产消费者方式
            factory .setPubSubDomain(true);
            factory .setConnectionFactory(connectionFactory);
            factory.setSessionTransacted(true);
            factory.setAutoStartup(true);
            //开启持久化订阅
            factory.setSubscriptionDurable(true);
            //重连间隔时间
            factory.setRecoveryInterval(1000L);
            factory.setClientId("springBootTopicId");
            return factory ;
        }
    }
    
    • 生产者
    @RestController()
    public class Producer {
        //Queue模式
        @Autowired
        private Queue queue;
        //Topic
        @Autowired
        private Topic topic;
        //注入springboot封装的工具类
        @Autowired
        private JmsMessagingTemplate jms;
        @GetMapping("sendMessage")
        public String sendMessage() {
            //发送消息至消息中间件代理(Broker)
            jms.convertAndSend(queue, "testQueue");
            return "success";
        }
        /**
         * 订阅模式(topic)发送消息
         *
         * @return
         */
        @GetMapping("/topicSend")
        public String topicSend() {
            jms.convertAndSend(topic, "testTopic");
            return "topic 发送成功";
        }
    }
    
    • QueueConsumer
    @Component
    public class QueueConsumer {
        @JmsListener(destination = "springBootQueue",containerFactory = "jmsListenerContainerQueue")
        public void receiveQueue(String text){
            System.out.println("ConsumerQueue接收到的消息为:"+text);
        }
    }
    
    • TopicConsumer
    @Component
    public class TopicConsumer {
        @JmsListener(destination = "springBootTopic",containerFactory = "jmsListenerContainerTopic")
        public void receiveTopic(String text){
            System.out.println("ConsumerTopic接收到的消息为:"+text);
        }
    }
    

    使用java构建一个broker

    • 单机创建
    BrokerService brokerService = new BrokerService();
            //设置属性
            brokerService.setUseJmx(true);
            //连接地址
            brokerService.addConnector("tcp://localhost:61616");
            brokerService.start();
    

    将之前单机版点对点的连接地址修改就可以看到效果了

    • BrokerFactory创建
      需要在src下创建broker.properties,内容为:
    userJmx=true
    prrsistent=false
    brokerName=test
    
    
     String uri="properties:broker.properties";
            BrokerService broker = BrokerFactory.createBroker(new URI(uri));
            broker.addConnector("tcp://localhost:61616");
            broker.start();
    
    
  • 相关阅读:
    07-selenium、PhantomJS(无头浏览器)
    06爬虫-异步协程
    Numpy数值类型与数值运算-03
    05爬虫-requests模块基础(2)
    初识Matplotlib-01
    03爬虫-requests模块基础(1)
    Django安装与简单事例-02
    JavaWeb学习总结(二):Http协议
    Tomcat学习总结(一):目录简介
    Web服务器学习总结(一):web服务器简介
  • 原文地址:https://www.cnblogs.com/yangk1996/p/12663417.html
Copyright © 2011-2022 走看看