zoukankan      html  css  js  c++  java
  • 将注册短信放入到消息队列里去

    将注册短信放入到消息队列里去

    参考:

    https://blog.csdn.net/weixin_42671172/article/details/106834803

    利用redis,多线程,异步消息处理

    编写redis配置类

    操作redis需要用到RedisTemplate,编写redis配置类RedisConfig.java。这里配置了多个消息监听适配器以通过不同的方法去监听、订阅不同的redis channel消息。

    /**
     * @author :RealGang
     * @description: redis自定义配置类
     * @date : 2021/11/30 11:18
     */
    @Configuration
    public class RedisConfig {
        /**
         * 返回一个RedisTemplate Bean
         * @param redisConnectionFactory    如果配置了集群版则使用集群版,否则使用单机版
         * @return
         */
        @Bean(name = "redisTemplate")
        public RedisTemplate<?, ?> getRedisTemplate(RedisConnectionFactory redisConnectionFactory){
    
            RedisTemplate<?, ?> template = new RedisTemplate<>();
            //设置key和value序列化机制
            template.setKeySerializer(new StringRedisSerializer());
            template.setValueSerializer(new Jackson2JsonRedisSerializer<Object>(Object.class));
            //设置单机或集群版连接工厂
            template.setConnectionFactory(redisConnectionFactory);
    
            return template;
        }
    
        /**
         * 系统消息适配器
         * @param receiver
         * @return
         */
        @Bean(name = "systemAdapter")
        public MessageListenerAdapter systemAdapter(Receiver receiver){
            //指定类中回调接收消息的方法
            MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "systemMessage");
            //adapter.afterPropertiesSet();
            return adapter;
        }
    
        /**
         * 短信消息适配器
         * @param receiver
         * @return
         */
        @Bean(name = "smsAdapter")
        public MessageListenerAdapter smsAdapter(Receiver receiver){
            //指定类中回调接收消息的方法
            MessageListenerAdapter adapter = new MessageListenerAdapter(receiver, "smsMessage");
            //adapter.afterPropertiesSet();
            return adapter;
        }
    
        /**
         * 构建redis消息监听器容器
         * @param connectionFactory
         * @param systemAdapter
         * @param smsAdapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                       MessageListenerAdapter systemAdapter, MessageListenerAdapter smsAdapter){
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //指定不同的方法监听不同的频道
            container.addMessageListener(systemAdapter, new PatternTopic("system"));
            container.addMessageListener(smsAdapter, new PatternTopic("sms"));
            return container;
        }
    }
    

    这里配置不同的消息适配器,通过MessageListenerAdapter配置对应的消息接收侦听器,参数里第一个是自定义的消息接收侦听器,第二个参数是该消息接收侦听器的其中一个方法,在这个方法执行接收到的消息进行处理;

    然后在redis消息监听容器里根据不同的消息适配器指定监听的频道名称(new PatternTopic("system")),然后如果要发布消息,就要通过redis把要发送的消息以及频道名称作为参数传递,然后监听器就会根据监听到的频道名称去指定对应的执行方法;

    编写消息接收侦听器

    /**
     * @author :RealGang
     * @description: 消息接收侦听器
     * @date : 2021/11/30 11:31
     */
    @Component
    public class Receiver {
        @Autowired
        private SendAndStorageProcess sendAndStorageProcess;
    
        private AtomicInteger counter = new AtomicInteger();    //消息计数器
    
        /**
         * 接收系统消息,开启异步监听
         * @param message
         */
        @Async(value = "threadTaskExecutor")
        public void systemMessage(String message){
            int counter = this.counter.incrementAndGet();
            System.out.println("接收到第" + counter + "条消息!!频道为:system,消息内容为======:");
            //将消息内容字符串转化为对象
            Message messageObject = JSONObject.parseObject(message, Message.class);
            System.out.println(messageObject.getContent());
    
            //TODO 开启多线程调用发送并处理消息
            JSONObject result = sendAndStorageProcess.sendAndStorageMsg(messageObject);
        }
    
        /**
         * 接收短信消息,开启异步监听
         * @param message
         */
        @Async(value = "threadTaskExecutor")
        public void smsMessage(String message){
            int counter = this.counter.incrementAndGet();
            System.out.println("接收到第" + counter + "条消息!!频道为:sms,消息内容为======:");
            //将消息内容字符串转化为对象
            Message messageObject = JSONObject.parseObject(message, Message.class);
            System.out.println(messageObject.getContent());
    
            //TODO 开启多线程调用发送
            JSONObject result = sendAndStorageProcess.sendAndStorageMsg(messageObject);
        }
    }
    

    这里定义了针对不同频道名称执行的不同的方法,这里的方法通过@Async(value = "threadTaskExecutor")来开启异步监听,在定义的方法里通过sendAndStorageProcess.sendAndStorageMsg(messageObject);开启多线程去发送消息。这里的方法里的字符串参数实际是Message实体类的字符串序列化的形式的字符串。后续通过JSONObject.parseObject(message, Message.class)转化为Message类。

    编写线程池配置类

    /**
     * @author :RealGang
     * @description: 线程池配置类
     * @date : 2021/11/30 14:35
     */
    @Configuration
    public class TaskExecutorConfig {
    
        /**
         * 创建一个线程池
         * @return
         */
        @Bean(name = "threadTaskExecutor")
        public ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);   //核心线程池大小
            executor.setMaxPoolSize(50);    //最大线程池大小
            executor.setQueueCapacity(1000);    //任务队列大小
            executor.setKeepAliveSeconds(300);  //线程池中空闲线程等待工作的超时时间(单位秒)
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //线程拒绝策略,此策略提供简单的反馈控制机制,能够减缓新任务的提交速度
    
            return executor;
        }
    
        /**
         * 创建一个固定大小的线程池
         * @return
         */
        @Bean(name = "fixedThreadPool")
        public ExecutorService executorService(){
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
            return fixedThreadPool;
        }
    
    }
    

    这里定义线程配置类,并制定bean的name是为了在上边的Receiver监听类里通过@Async(value = "threadTaskExecutor")去异步监听,并且在主程序类名上加上注解:@EnableAsync

    编写消息处理类

    类中发消息的方法使线程休眠了两秒,模拟发消息的相对耗时操作,用于验证多线程。

    /**
     * @author :RealGang
     * @description:TODO
     * @date : 2021/11/30 14:27
     */
    @Component
    public class SendAndStorageProcess {
        @Autowired
        private ThreadPoolTaskExecutor threadTaskExecutor;  //注入线程池
    
        /**
         * 多线程调用发送消息
         * @param message
         * @return
         */
        public JSONObject sendAndStorageMsg(Message message) {
    
            Future<JSONObject> future = threadTaskExecutor.submit(new Callable<JSONObject>() {  //采用带返回值的方式
                @Override
                public JSONObject call() throws Exception {
    
                    //1.调用相对比较耗时的发消息方法
                    String code = sendMessage(message);
                    message.setUpdateTime(new Date());
                    if ("200".equals(code)){    //发送成功
                        message.setStatusCode("4000");
                    }else{  //发送失败
                        message.setStatusCode("4001");
                    }
    
                    //2.存储消息
                    storageMessage(message);
    
                    JSONObject result = new JSONObject();
                    result.put("code", "200");
                    result.put("msg", "发送消息成功!");
                    return result;
                }
            });
    
            JSONObject jsonResult = new JSONObject();   //返回结果
            try{
                if (future.isDone()){   //线程调度结束时,才获取结果
                    jsonResult = future.get();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            return jsonResult;      //消息发送与存储结果
        }
    
        /**
         * 调用接口发送消息
         * @param message
         * @return
         */
        private String sendMessage(Message message) {
            try{
                //TODO 这里写一些发消息的业务逻辑
    
                Thread.sleep(2000);     //增加耗时操作,查看多线程效果
                System.out.println(Thread.currentThread().getName() + "线程发送消息成功,消息内容:" + message.getContent());
                return "200";   //发送消息结果状态码
            }catch (Exception e){
                System.out.println(Thread.currentThread().getName() + "线程发送消息失败,消息内容:" + message.getContent());
                e.printStackTrace();
            }
            return "500";   //发送消息结果状态码
        }
    
        /**
         * 存消息到数据库
         * @param message
         * @return
         */
        private void storageMessage(Message message) {
            try{
                //TODO 这里执行插入消息到数据操作
                System.out.println(Thread.currentThread().getName() + "线程插入消息到数据库成功,消息内容:" + message.getContent());
            }catch (Exception e){
                System.out.println(Thread.currentThread().getName() + "线程插入消息到数据库失败,消息内容:" + message.getContent());
                e.printStackTrace();
            }
        }
    }
    

    编写消息发布者service实现类

    /**
     * @author :RealGang
     * @description: 消息发布者service实现类
     * @date : 2021/11/30 14:42
     */
    @Service
    public class PublishService {
        @Autowired
        private RedisTemplate<String, Message> redisTemplate;
    
        /**
         * 发送消息到redis频道供订阅。注意:使用redis客户端redis-cli登录订阅查看的中文内容是以16进制的形式
         *      表示的,若要查看中文字符,需要在连接时强制原始输出 redis-cli -h localhost -p 6379 --raw
         *      然后再使用命令订阅频道     subscribe system
         * @param message 消息对象
         * @return
         */
        public String pubMsg(Message message) {
            //返回结果
            String result = null;
            if (null != message){
                //补全消息实体
                //如果为传id则生成并返回
                if (StringUtils.isEmpty(message.getId())){
                    message.setId(UUID.randomUUID().toString());
                }
                message.setCreateTime(new Date());
                message.setUpdateTime(new Date());
                try{
                    //往指定频道发布消息
                    redisTemplate.convertAndSend(message.getType(), message);
                    //redisTemplate.opsForList().leftPush(message.getType(), message);    //采用队列的形式发布到redis
                    System.out.println("消息发布到redis队列频道:" + message.getType() + "成功!");
                    result = "消息发布到" + message.getType() + "频道成功!";
                }catch (Exception e){
                    e.printStackTrace();
                    result = "消息发布到" + message.getType() + "频道失败!";
                }
            }
            return result;
        }
    }
    

    这里的关键方法是:redisTemplate.convertAndSend(message.getType(), message);,通过该方法发布到对应的频道,第一个参数是频道名称,第二个名称是传入的消息或者需要传入的信息,参数的类型是根据第一行的代码的redistemplate的定义指定的 :private RedisTemplate<String, Message> redisTemplate;

    编写controller类

    /**
     * 发布消息到redis指定频道
     * @param message
     * @return
     */
    @PostMapping("/pubMsg")
    public String pubMsg(){
        Message message1 = new Message();
        message1.setType("sms");
        message1.setContent("fadfgsdfg");
        message1.setTitle("标题一");
        Message message2 = new Message();
        message2.setType("sms");
        message2.setContent("as风格是");
        message2.setTitle("标题二");
        Message message3 = new Message();
        message3.setType("sms");
        message3.setContent("发苟富贵");
        message3.setTitle("标题三");
        Message message4 = new Message();
        message4.setType("system");
        message4.setContent("打防结合");
        message4.setTitle("标题四");
        Message message5 = new Message();
        message5.setType("system");
        message5.setContent("规划局发过火");
        message5.setTitle("标题五");
        Message message6 = new Message();
        message6.setType("system");
        message6.setContent("好进看发过火");
        message6.setTitle("标题六");
        publishService.pubMsg(message1);
        publishService.pubMsg(message2);
        publishService.pubMsg(message3);
        String commonResult = publishService.pubMsg(message4);
        publishService.pubMsg(message5);
        publishService.pubMsg(message6);
        System.out.println("异步放入消息队列之后先返回提示信息!");
        return commonResult;
    }
    

    运行结果

    image-20211130155917277

    集成真实业务中的短信发送代码

    原来同步的发送代码核心内容如下:

    image-20211130171316337

    将字符串类型的手机号phone设置成messagecontent,然后通过message.getContent获取手机号,把该核心代码集成到sendAndStorageProcess.sendAndStorageMsg(messageObject)方法里的sendMessage(Message message)去:

    /**
     * 调用接口发送消息
     * @param message
     * @return
     */
    private String sendMessage(Message message) {
        String messageCode = messageUtil.getRandomCode(6);
        try{
            //TODO 这里写一些发消息的业务逻辑
            if (messageUtil.sendMessage(message.getContent(), messageCode)) {
                log.info(phone + "短信发送成功" + messageCode);
                responseWrapper = ResponseWrapper.markSuccess();
                String key = "MessageVerifyCode?phone=" + message.getContent();
                redisUtil.set(key, messageCode, 10 * 60);
            } else {
                log.error(phone + "短信发送失败" + messageCode);
            }
    
            Thread.sleep(2000);     //增加耗时操作,查看多线程效果
            System.out.println(Thread.currentThread().getName() + "线程发送消息成功,消息内容:" + message.getContent());
            return "200";   //发送消息结果状态码
        }catch (Exception e){
            System.out.println(Thread.currentThread().getName() + "线程发送消息失败,消息内容:" + message.getContent());
            e.printStackTrace();
        }
        return "500";   //发送消息结果状态码
    }
    
  • 相关阅读:
    海伦公式
    简单的博弈
    Hello World 代码
    Hello world
    99999999海岛帝国后传:算法大会
    判断质数
    idea plantUML配置
    测试用例评审
    如何编写有效测试用例
    测试用例设计——场景分析法
  • 原文地址:https://www.cnblogs.com/RealGang/p/15626043.html
Copyright © 2011-2022 走看看