zoukankan      html  css  js  c++  java
  • ActiveMQ_4SpringBoot整合

    SpringBoot实现

    引入jar包

    <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-activemq</artifactId>

    </dependency>

    配置application.properties

    spring.activemq.broker-url=tcp://192.168.114.129:61616

    spring.activemq.in-memory=true

    spring.activemq.enabled=false

    spring.jms.pub-sub-domain=true

    创建activemq配置文件类

    @EnableJms

    @Configuration

    public class ActiveMQConfig {

        @Bean

        public Queue queue(){

           return new ActiveMQQueue("queue1");

        }

       

        @Bean

        public RedeliveryPolicy redeliveryPolicy(){

           RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

           //是否在每次尝试重新发送失败后,增长这个等待时间

           redeliveryPolicy.setUseExponentialBackOff(true);

           //重发次数,默认为6次   这里设置为10次

           redeliveryPolicy.setMaximumRedeliveries(6);

           //重发时间间隔,默认为1秒

           redeliveryPolicy.setInitialRedeliveryDelay(1);

           //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value

           redeliveryPolicy.setBackOffMultiplier(1);

           //是否避免消息碰撞

           redeliveryPolicy.setUseCollisionAvoidance(false);

           //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效

           redeliveryPolicy.setMaximumRedeliveryDelay(1000);

           return redeliveryPolicy;

        }

       

        @Bean

        ActiveMQConnectionFactory activeMQConnectFactory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){

           ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",url);

           activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

           return activeMQConnectionFactory;

        }

       

        @Bean

        public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){

           JmsTemplate jmsTemplate = new JmsTemplate();

           jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化

           jmsTemplate.setConnectionFactory(activeMQConnectionFactory);

           jmsTemplate.setDefaultDestination(queue);//此处可不设置默认,在发送消息时也可设置队列

           jmsTemplate.setSessionAcknowledgeMode(1);//客户端签收模式

           return jmsTemplate;

        }

       

        @Bean

        public JmsTransactionManager jmsTransactionManager(ActiveMQConnectionFactory activeMQConnectionFactory){

           JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();

           jmsTransactionManager.setConnectionFactory(activeMQConnectionFactory);

           return jmsTransactionManager;

        }

       

        //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂

        @Bean(name = "jmsQueueListener")

        public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory, JmsTransactionManager jmsTransactionManager){

           DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

           factory.setConnectionFactory(activeMQConnectionFactory);

           //设置连接数

           factory.setConcurrency("1-10");

           //重连间隔时间

           //factory.setRecoveryInterval(1000L);

           //factory.setSessionAcknowledgeMode(1);

           factory.setTransactionManager(jmsTransactionManager);

          

           return factory;

          

        }

    }

    创建生产者类

    @Service("producer")

    public class Producer {

        @Autowired

        private JmsMessagingTemplate jMessagingTemplate;

       

        public void sendMessage(Destination destination, final String message){

           jMessagingTemplate.convertAndSend(destination, message);

        }

    }

    创建消费者类

    @Component

    public class Consumer {

       

        @JmsListener(destination = "mytest.queue", containerFactory="jmsQueueListener")

        public void receiveQueue(TextMessage textMessage) throws JMSException{

           System.out.println("Consumer收到的报文为:"+textMessage.getText());

        }

    }

    测试

    @Autowired

    private Producer producer;

    @Test

    public void test01(){

        Destination destination = new ActiveMQQueue("mytest.queue");

        for(int i=0; i<10; i++){

           producer.sendMessage(destination, "my name laowang");

        }

    }

  • 相关阅读:
    PHP开发者必须养成的十大优良习惯
    Centos7下编译安装PHP
    linux 强制删除yum安装的php7.2
    php和go的web性通对比
    最好的编程语言及其它
    管理哲学新解
    继甲骨文裁员、Java服软Python后,国产原创IT技术已经成熟,让中国科技不再受制于人!
    雷军:我是个失败的创业者,因为我是劳模
    当95后进入大厂
    如何避免自high式分享
  • 原文地址:https://www.cnblogs.com/zhiboluo/p/10114763.html
Copyright © 2011-2022 走看看