zoukankan      html  css  js  c++  java
  • java练习生

    方案1:spring-cloud-starter-stream-rabbit

    一、添加依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>

    二、添加配置(application.yml)

    spring:
      application:
        name: demo1
      rabbitmq:
        host: 10.10.10.10
        port: 5672
        username: guest
        password: guest
        virtualHost: /dev-test

    三、创建操作类

    生产者:

    import lombok.RequiredArgsConstructor;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    @RequiredArgsConstructor
    public class MqSender {
        final AmqpTemplate template;
    
        /**
         * 发送消息的方法
         *
         * @param msg
         */
        public void send(String msg) {
            //向消息队列发送消息
            //参数一:交换器名称。
            //参数二:路由键
            //参数三:消息
            this.template.convertAndSend("test", "hzq", msg);
        }
    }
    View Code

    消费者:

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "test", autoDelete = "false"),
            exchange = @Exchange(value = "test", type = ExchangeTypes.DIRECT),
            key = "test"
    ))
    public class MqListener {
        @RabbitHandler
        public void process(String msg) {
            log.info("mq接收到信息:{}", msg);
        }
    
        @RabbitHandler
        public void process(byte[] content) {
            try {
                log.info("mq接收到信息:{}", new String(content,"UTF-8"));
            }
            catch (Exception ex){
                log.error("mq接收到信息异常");
            }
        }
    }
    View Code

    四、使用

    @Slf4j
    @RestController
    @RequestMapping("/test")
    public class TestController {
    
        @Resource
        MqSender mqSender;
    
        // rabbit测试
        @GetMapping("/rabbit/set")
        public String rabbitSet(){
            log.info("添加rabbit开始。");
            String msg = String.format("发送信息,当前时间戳:{%s}", System.currentTimeMillis());
            mqSender.send(msg);
            log.info("添加rabbit结束。");
            return "添加rabbit结束。";
        }
    }
    View Code

    方案2:spring-boot-starter-amqp

    一、添加依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    二、添加配置(application.yml)

    2.1 添加连接配置信息(application.yml) 允许连接多个mq服务

    rabbitmq:
      default:
        host: 10.10.10.10
        port: 5672
        username: guest
        password: guest
        vhost: /dev-test
      other:
        host: 10.10.10.11
        port: 5672
        username: guest
        password: guest
        vhost: /sit-test

    2.2 创建配置类

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.retry.backoff.ExponentialBackOffPolicy;
    import org.springframework.retry.policy.SimpleRetryPolicy;
    import org.springframework.retry.support.RetryTemplate;
    
    /**
     * rabbitMq 配置
     */
    @Configuration
    @Slf4j
    public class RabbitMqConfig {
    
        // 默认系统的MQ连接配置信息
        @Value("${rabbitmq.default.host}")
        private String host;
    
        @Value("${rabbitmq.default.port}")
        private int port;
    
        @Value("${rabbitmq.default.username}")
        private String username;
    
        @Value("${rabbitmq.default.password}")
        private String password;
    
        @Value("${rabbitmq.default.vhost}")
        private  String vhost;
    
        // other系统的MQ连接配置信息
        @Value("${rabbitmq.other.host}")
        private String otherHost;
    
        @Value("${rabbitmq.other.port}")
        private int otherPort;
    
        @Value("${rabbitmq.other.username}")
        private String otherUserName;
    
        @Value("${rabbitmq.other.password}")
        private String otherPassWord;
    
        @Value("${rabbitmq.other.vhost}")
        private String otherVhost;
    
        /**
         * 重试配置-初始间隔
         */
        @Value("${spring.rabbitmq.listener.simple.retry.initial-interval:6000}")
        private Long initialInterval;
        /**
         * 重试配置-最大尝试次数
         */
        @Value("${spring.rabbitmq.listener.simple.retry.max-attempts:3}")
        private Integer maxAttempts;
    
        // 默认系统的MQ工厂
    
        /**
         * 默认系统的MQ工厂
         * @param configurer
         * @param connectionFactory
         * @return
         */
        @Bean(name = "defaultFactory")
        public SimpleRabbitListenerContainerFactory defaultFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                   @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(factory, connectionFactory);
    //        factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
        @Bean(name="defaultRabbitTemplate")
        @Primary
        public RabbitTemplate defaultRabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(defaultConnectionFactory());
            template.setRetryTemplate(retryTemplate());
    //        template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
        /**
         * 设置成手动模式的原因, 不在使用rabbit的默认分发方式(轮询的方式), 而使用公平的方式。 使用公平的方式可以实现负载均衡。
         * 具体实现:basicQos( prefetchCount=1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
         *
         * @return ConnectionFactory
         */
        @Bean(name="defaultConnectionFactory")
        @Primary
        public ConnectionFactory defaultConnectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(vhost);
            //设置手动确认模式
            connectionFactory.setPublisherConfirms(true);
            return connectionFactory;
        }
    
        // vbs系统的MQ工厂
    
        /**
         * vbs系统的MQ工厂
         * @param configurer
         * @param connectionFactory
         * @return
         */
        @Bean(name = "otherFactory")
        public SimpleRabbitListenerContainerFactory vbsFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                               @Qualifier("otherConnectionFactory") ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(factory, connectionFactory);
            return factory;
        }
    
        @Bean(name="otherRabbitTemplate")
        public RabbitTemplate vbsrabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(otherConnectionFactory());
            template.setRetryTemplate(retryTemplate());
            return template;
        }
    
        @Bean(name="otherConnectionFactory")
        public ConnectionFactory otherConnectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(otherHost, otherPort);
            connectionFactory.setUsername(otherUserName);
            connectionFactory.setPassword(otherPassWord);
            connectionFactory.setVirtualHost(otherVhost);
            //设置手动确认模式
            connectionFactory.setPublisherConfirms(true);
            return connectionFactory;
        }
    
        /**
         * 构建rabbitMQ重试模板
         * @return RetryTemplate
         */
        private RetryTemplate retryTemplate(){
            RetryTemplate retryTemplate = new RetryTemplate();
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(initialInterval);
            retryTemplate.setBackOffPolicy(backOffPolicy);
            retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts));
            return retryTemplate;
        }
    //
    //    /**
    //     * 初始化默认系统MQ路由配置
    //     */
    //    @Bean
    //    public RabbitAdmin rabbitAdmin() {
    //        RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory());
    //        rabbitAdmin.setAutoStartup(true);
    //        //创建队列和交换机,并绑定
    //        creactQueueAndChange(rabbitAdmin);
    //        return rabbitAdmin;
    //    }
    //
    //    /**
    //     * 默认系统中所有的MQ路由配置信息
    //     */
    //    @Value("${rabbitInfo:{}}")
    //    private String rabbitInfo;
    //
    //    /**
    //     * 默认交换机名称
    //     */
    //    private static final String defualtExchange = "test";
    
    //    /**
    //     * 创建队列和交换机,并绑定
    //     * @param rabbitAdmin
    //     */
    //    private void creactQueueAndChange(RabbitAdmin rabbitAdmin){
    //        rabbitAdmin.declareExchange(new DirectExchange(defualtExchange,true,false));
    //        Map<String, Object> arguments = new HashMap<>();
    //        arguments.put("x-max-priority",10); // 设置队列优先级级别
    //        List<RabbitObject> rabbitObjectDTOList = JSONObject.parseArray(rabbitInfo,RabbitObject.class);
    //        for(RabbitObject rabbitObjectDTO:rabbitObjectDTOList){
    //            rabbitAdmin.declareQueue(new Queue(rabbitObjectDTO.queueName,true,false,false,arguments));
    //            rabbitAdmin.declareBinding(new Binding(rabbitObjectDTO.queueName, Binding.DestinationType.QUEUE, rabbitObjectDTO.exchangeName, rabbitObjectDTO.routingKey, null));
    //        }
    //    }
    }
    View Code

    三、创建操作类

    生产者:

    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.log4j.Log4j2;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Service;
    
    import java.util.UUID;
    
    /**
     * rabbit 发送实例
     */
    @Service
    @Log4j2
    public class RabbitSendServiceImpl implements RabbitSendService, RabbitTemplate.ConfirmCallback {
    
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 构造方法注入rabbitTemplate
         */
        @Autowired
        public RabbitSendServiceImpl(@Qualifier("defaultRabbitTemplate") RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
            rabbitTemplate.setConfirmCallback(this);
        }
    
        @Override
        public void sendMsg(String exchangeName, String routeName, Object message, String messageId) {
            CorrelationData correlationId = new CorrelationData(messageId);
            rabbitTemplate.convertAndSend(exchangeName, routeName, JSONObject.toJSONString(message), correlationId);
        }
    
    //    @Override
    //    public void sendMsg(MqInfoEnum mqInfoEnum, Object message, String messageId) {
    //        CorrelationData correlationId = new CorrelationData(messageId);
    //        rabbitTemplate.convertAndSend(mqInfoEnum.getExchangeName(), mqInfoEnum.getRouteName(), JSONObject.toJSONString(message),
    //                correlationId);
    //    }
    
        @Override
        public void sendMsg(String exchangeName, String routeName, Object message) {
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchangeName, routeName, JSONObject.toJSONString(message), correlationId);
        }
    
    //    @Override
    //    public void sendMsg(MqInfoEnum mqInfoEnum, Object message) {
    //        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
    //        rabbitTemplate.convertAndSend(mqInfoEnum.getExchangeName(), mqInfoEnum.getRouteName(), JSONObject.toJSONString(message),
    //                correlationId);
    //    }
    
    
    //    @Override
    //    public void sendDefaultMsg(Object message, String messageId) {
    //        CorrelationData correlationId = new CorrelationData(messageId);
    //        rabbitTemplate.convertAndSend(MqInfoEnum.VTS_DEFAULT.getExchangeName(), MqInfoEnum.VTS_DEFAULT.getRouteName(),
    //                JSONObject.toJSONString(message),
    //                correlationId);
    //    }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            if (b) {
                log.info("回调[{}]结果:消息成功消费", correlationData);
            } else {
                log.info("回调[{}]结果:消息消费失败:{}", correlationData, s);
            }
        }
    }
    View Code

    消费者:

    公共类

    import hzq.maven.demo.service.rabbit.RabbitListenService;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.stereotype.Service;
    
    import java.io.UnsupportedEncodingException;
    
    @Service
    public abstract class RabbitListenServiceImpl implements RabbitListenService {
    
        /**
         * 监听
         *
         * @param content mq消息
         */
        @RabbitHandler
        public void process(String content) {
            try {
                execute(content);
            } catch (Exception e) {
                // 保存异常消息到消息补偿表中,根据具体要求实现
                // 必须将异常抛出去,不然是不会触发rabbit unack
                throw e;
            }
        }
    
        @RabbitHandler
        public void process(byte[] content) {
            try {
                String result = new String(content,"UTF-8");
                execute(result);
            } catch (Exception e) {
                // 保存异常消息到消息补偿表中,根据具体要求实现
                // 必须将异常抛出去,不然是不会触发rabbit unack
                try {
                    throw e;
                } catch (UnsupportedEncodingException e1) {
                    e1.printStackTrace();
                }
            }
        }
    
        /**
         * 接受消息
         * @param message 消息体
         */
        public abstract void execute(String message);
    
    }
    View Code
    子类
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import hzq.maven.demo.model.dto.TestMqDataDTO;
    import hzq.maven.demo.service.business.TestService;
    import hzq.maven.demo.service.rabbit.impl.RabbitListenServiceImpl;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    
    /**
     * 测试消息队列
     */
    @Slf4j
    @Component
    @RabbitListener(queues = "test",containerFactory="defaultFactory")
    public class TestListen extends RabbitListenServiceImpl {
        @Resource
        private TestService testService;
    
        public void execute(String message) {
            TestMqDataDTO rabbitMqDataDTO;
            if(JSON.isValid(message)){
                JSONObject jsonObject = JSON.parseObject(message);
                rabbitMqDataDTO = jsonObject.toJavaObject(TestMqDataDTO.class);
            }
            else{
                rabbitMqDataDTO = new TestMqDataDTO();
                rabbitMqDataDTO.setName(message);
            }
    
            testService.mqListen(rabbitMqDataDTO);
        }
    }
    View Code

    四、使用

    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    
    @Slf4j
    @Service
    @RequiredArgsConstructor
    public class TestServiceImpl implements TestService {
        @Resource
        private RabbitSendService rabbitSendService;
    
        @Override
        public void mqSend(TestMqDataDTO testMqDataDTO){
            //推送信息到MQ
            rabbitSendService.sendMsg("test","hzq",testMqDataDTO);
            log.info("mq发送数据成功:" + testMqDataDTO.toString());
        }
    
        @Override
        public void mqListen(TestMqDataDTO testMqDataDTO){
            //从MQ接收信息
            log.info("mq消费数据成功:" + testMqDataDTO.toString());
            // do...
        }
    }
    View Code
  • 相关阅读:
    【Anagrams】 cpp
    【Count and Say】cpp
    【Roman To Integer】cpp
    【Integer To Roman】cpp
    【Valid Number】cpp
    重构之 实体与引用 逻辑实体 逻辑存在的形式 可引用逻辑实体 不可引用逻辑实体 散弹式修改
    Maven项目聚合 jar包锁定 依赖传递 私服
    Oracle学习2 视图 索引 sql编程 游标 存储过程 存储函数 触发器
    mysql案例~tcpdump的使用
    tidb架构~本地化安装
  • 原文地址:https://www.cnblogs.com/ariter/p/14886524.html
Copyright © 2011-2022 走看看