zoukankan      html  css  js  c++  java
  • ActiveMQ_4SpringBoot整合

    SpringBoot实现

    引入jar包

    <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-activemq</artifactId>

    </dependency>

    配置application.properties

    spring.activemq.broker-url=tcp://192.168.114.129:61616

    spring.activemq.in-memory=true

    spring.activemq.enabled=false

    spring.jms.pub-sub-domain=true

    创建activemq配置文件类

    @EnableJms

    @Configuration

    public class ActiveMQConfig {

        @Bean

        public Queue queue(){

           return new ActiveMQQueue("queue1");

        }

       

        @Bean

        public RedeliveryPolicy redeliveryPolicy(){

           RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

           //是否在每次尝试重新发送失败后,增长这个等待时间

           redeliveryPolicy.setUseExponentialBackOff(true);

           //重发次数,默认为6次   这里设置为10次

           redeliveryPolicy.setMaximumRedeliveries(6);

           //重发时间间隔,默认为1秒

           redeliveryPolicy.setInitialRedeliveryDelay(1);

           //第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value

           redeliveryPolicy.setBackOffMultiplier(1);

           //是否避免消息碰撞

           redeliveryPolicy.setUseCollisionAvoidance(false);

           //设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效

           redeliveryPolicy.setMaximumRedeliveryDelay(1000);

           return redeliveryPolicy;

        }

       

        @Bean

        ActiveMQConnectionFactory activeMQConnectFactory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){

           ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",url);

           activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

           return activeMQConnectionFactory;

        }

       

        @Bean

        public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){

           JmsTemplate jmsTemplate = new JmsTemplate();

           jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化

           jmsTemplate.setConnectionFactory(activeMQConnectionFactory);

           jmsTemplate.setDefaultDestination(queue);//此处可不设置默认,在发送消息时也可设置队列

           jmsTemplate.setSessionAcknowledgeMode(1);//客户端签收模式

           return jmsTemplate;

        }

       

        @Bean

        public JmsTransactionManager jmsTransactionManager(ActiveMQConnectionFactory activeMQConnectionFactory){

           JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();

           jmsTransactionManager.setConnectionFactory(activeMQConnectionFactory);

           return jmsTransactionManager;

        }

       

        //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂

        @Bean(name = "jmsQueueListener")

        public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory, JmsTransactionManager jmsTransactionManager){

           DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

           factory.setConnectionFactory(activeMQConnectionFactory);

           //设置连接数

           factory.setConcurrency("1-10");

           //重连间隔时间

           //factory.setRecoveryInterval(1000L);

           //factory.setSessionAcknowledgeMode(1);

           factory.setTransactionManager(jmsTransactionManager);

          

           return factory;

          

        }

    }

    创建生产者类

    @Service("producer")

    public class Producer {

        @Autowired

        private JmsMessagingTemplate jMessagingTemplate;

       

        public void sendMessage(Destination destination, final String message){

           jMessagingTemplate.convertAndSend(destination, message);

        }

    }

    创建消费者类

    @Component

    public class Consumer {

       

        @JmsListener(destination = "mytest.queue", containerFactory="jmsQueueListener")

        public void receiveQueue(TextMessage textMessage) throws JMSException{

           System.out.println("Consumer收到的报文为:"+textMessage.getText());

        }

    }

    测试

    @Autowired

    private Producer producer;

    @Test

    public void test01(){

        Destination destination = new ActiveMQQueue("mytest.queue");

        for(int i=0; i<10; i++){

           producer.sendMessage(destination, "my name laowang");

        }

    }

  • 相关阅读:
    [ZJOI2019]语言
    浅谈javascript关键字"this"
    浅谈javascript实现图片预加载
    How browsers workBehind the scenes of modern web browsers
    javascript设计模式之单例模式
    浅谈javascript关键字"call"
    (转)DirectX 中的 Device>BeginScene(), Device>EndScene() 和 Present
    今天买了美味方面包,哭的很难吃。
    (转2)点是否在三角形内
    完成地形拼接系统
  • 原文地址:https://www.cnblogs.com/zhiboluo/p/10114763.html
Copyright © 2011-2022 走看看