zoukankan      html  css  js  c++  java
  • SpringBoot+activeMq

    整合SpringBoot

    • Maven
    <!--消息队列连接池-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.9</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
    
    • application.yml
    spring:
      activemq:
        broker-url: tcp://192.168.100.155:61616
        #true 表示使用内置的MQ,false则连接服务器
        in-memory: false
        pool:
          #true表示使用连接池;false时,每发送一条数据创建一个连接
          enabled: true
          #连接池最大连接数
          max-connections: 10
          #空闲的连接过期时间,默认为30秒
          idle-timeout: 30000
    
    • 启动入口文件
    @SpringBootApplication
    @EnableJms //启动消息队列
    public class ActiveMQApplication {
        public static void main(String[] args) {
            SpringApplication.run(ActiveMQApplication.class, args);
        }
    }
    
    • 配置文件
    @Configuration
    public class conf {
        @Value("${spring.activemq.broker-url}")
        private String brokerUrl;
        //点对点
        @Bean
        public Queue queue() {
            ActiveMQQueue springBootQueue = new ActiveMQQueue("springBootQueue");
            return springBootQueue;
        }
    
        //发布订阅
        @Bean
        public Topic topic() {
            ActiveMQTopic springBootTopic = new ActiveMQTopic("springBootTopic");
            return springBootTopic;
        }
    
        //连接工厂
        @Bean
        public ActiveMQConnectionFactory connectionFactory() {
            return new ActiveMQConnectionFactory(brokerUrl);
        }
    
        //Queue模式
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        //Topic模式
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory factory  = new DefaultJmsListenerContainerFactory();
            //设置为发布订阅方式, 默认情况下使用的生产消费者方式
            factory .setPubSubDomain(true);
            factory .setConnectionFactory(connectionFactory);
            factory.setSessionTransacted(true);
            factory.setAutoStartup(true);
            //开启持久化订阅
            factory.setSubscriptionDurable(true);
            //重连间隔时间
            factory.setRecoveryInterval(1000L);
            factory.setClientId("springBootTopicId");
            return factory ;
        }
    }
    
    • 生产者
    @RestController()
    public class Producer {
        //Queue模式
        @Autowired
        private Queue queue;
        //Topic
        @Autowired
        private Topic topic;
        //注入springboot封装的工具类
        @Autowired
        private JmsMessagingTemplate jms;
        @GetMapping("sendMessage")
        public String sendMessage() {
            //发送消息至消息中间件代理(Broker)
            jms.convertAndSend(queue, "testQueue");
            return "success";
        }
        /**
         * 订阅模式(topic)发送消息
         *
         * @return
         */
        @GetMapping("/topicSend")
        public String topicSend() {
            jms.convertAndSend(topic, "testTopic");
            return "topic 发送成功";
        }
    }
    
    • QueueConsumer
    @Component
    public class QueueConsumer {
        @JmsListener(destination = "springBootQueue",containerFactory = "jmsListenerContainerQueue")
        public void receiveQueue(String text){
            System.out.println("ConsumerQueue接收到的消息为:"+text);
        }
    }
    
    • TopicConsumer
    @Component
    public class TopicConsumer {
        @JmsListener(destination = "springBootTopic",containerFactory = "jmsListenerContainerTopic")
        public void receiveTopic(String text){
            System.out.println("ConsumerTopic接收到的消息为:"+text);
        }
    }
    

    使用java构建一个broker

    • 单机创建
    BrokerService brokerService = new BrokerService();
            //设置属性
            brokerService.setUseJmx(true);
            //连接地址
            brokerService.addConnector("tcp://localhost:61616");
            brokerService.start();
    

    将之前单机版点对点的连接地址修改就可以看到效果了

    • BrokerFactory创建
      需要在src下创建broker.properties,内容为:
    userJmx=true
    prrsistent=false
    brokerName=test
    
    
     String uri="properties:broker.properties";
            BrokerService broker = BrokerFactory.createBroker(new URI(uri));
            broker.addConnector("tcp://localhost:61616");
            broker.start();
    
    
  • 相关阅读:
    kmp 算法
    jdk 和 cglib 的动态代理
    RestTemplate工具类
    bat脚本切换多个工程的分支
    字符串的左旋转
    输入一个正数s,打印出所有和为s的连续正数序列(至少含有两个数)。例如输入15,由于1+2+3+4+5=4+5+6=7+8=15,所以结果打印出3个连续序列1~5、4~6和7~8。
    枚举类型在JPA中的使用
    拾遗
    YAML DEMO
    kiali 1.26 anonymous策略修改为token
  • 原文地址:https://www.cnblogs.com/yangk1996/p/12663417.html
Copyright © 2011-2022 走看看