zoukankan      html  css  js  c++  java
  • 记录一次含有特定注解的方法后面添加指定业务的过程

    1.现在有个业务需求,车管后台和saas系统的数据要添加到钉钉后台。车管后台和saas系统任何的修改,加盟商、车队、班组的修改,都要同步到钉钉里面。

         刚开始是想着,在车管或者saas里面有修改的地方,直接做业务处理。但是问题是 ,只有业务处理成功才能做后续的操作,于是想到了使用aop的后置通知处理。另外由于是只有含有特定的方法才能进入到后置通知里面,需要再加上自定义注解。然后把要修改的业务放到mq里面进行生产,在起一个服务进行mq的消费。嗯 ,大体上是这样。

       1.自定义注解

        

    package com.zhuanche.common.dingdingsync;
    
    import java.lang.annotation.*;
    
    /**
     * @Author fanht
     * @Description 含有该注解的controller方法存储到mq
     * @Date 2019/2/28 上午11:26
     * @Version 1.0
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD,ElementType.TYPE})
    @Documented
    public @interface DingdingAnno {
        String cityId() default "";  //城市id
        String supplierId() default ""; //供应商id
        String teamId() default ""; //teamId
        String method() default ""; //添加修改删除方法
        String level() default "";  //级别 0 城市 1 供应商 2 车队班组
    }

    2.定义切面

        

    package com.zhuanche.common.dingdingsync;
    
    import com.zhuanche.common.rocketmq.CommonRocketProducer;
    import com.zhuanche.common.rocketmq.DingdingSupplierAndTeamProducer;
    import com.zhuanche.controller.supplier.SupplierController;
    import com.zhuanche.dto.CarDriverTeamDTO;
    import com.zhuanche.entity.rentcar.CarBizSupplierVo;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.Signature;
    import org.aspectj.lang.annotation.*;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Method;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @Author fanht
     * @Description
     * @Date 2019/2/28 上午11:59
     * @Version 1.0
     */
    @Component
    @Aspect
    public class DingdingAspect {
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
        @Pointcut("execution(* com.zhuanche.controller.driverteam.DriverTeamController.*(..)) ||" +
                " execution(* com.zhuanche.controller.supplier.SupplierController.*(..)) ")
        public void pointCut(){
            logger.info("含有自定义注解dingdingAnno的方法...");
        }
    
        @Before("pointCut()  && @annotation(dingdingAnno) ")
        public void dingdingVerify(JoinPoint joinPoint,DingdingAnno dingdingAnno){
            logger.info(joinPoint.getSignature().getName() + ",入参:{" + Arrays.asList(joinPoint.getArgs() + "}"));
        }
    
    
    
        @AfterReturning("pointCut() && @annotation(dingdingAnno)")
        public void finish(JoinPoint jointPoint,DingdingAnno dingdingAnno){
            Signature signature = jointPoint.getSignature();
            MethodSignature methodSignature = (MethodSignature) signature;
            Method method = methodSignature.getMethod();
            if (method != null){
               dingdingAnno = method.getAnnotation(DingdingAnno.class);
               if (dingdingAnno != null && "2".equals(dingdingAnno.level())){
                   Object[] args = jointPoint.getArgs();
                   Map<String,Object> map = new HashMap<>();
                   for(Object obj : args){
                       if(obj instanceof CarDriverTeamDTO){
                           CarDriverTeamDTO teamDTO = (CarDriverTeamDTO) obj;
                           String teamId = teamDTO.getId()==null?null:teamDTO.getId().toString();
                           String openFlag = teamDTO.getOpenCloseFlag() == null ? null: teamDTO.getOpenCloseFlag().toString();
                           map.put("city",teamDTO.getCity());
                           map.put("cityName",teamDTO.getCityName());
                           map.put("supplier",teamDTO.getSupplier());
                           map.put("teamName",teamDTO.getTeamName());
                           map.put("teamId",teamDTO.getId());
                           map.put("pId",teamDTO.getpId());
                           map.put("openCloseFlag",openFlag);
                           map.put("id",teamDTO.getId());
                           String tag = "";
                           if(!"insert".equals(dingdingAnno.method())){
                               if("0".equals(openFlag)){
                                   tag = "update";
                               }else if("1".equals(openFlag)){
                                   tag = "insert";
                               }else if("2".equals(openFlag)){
                                   tag = "delete";
                               }else {
                                   tag = "update";
                               }
                           }else {
                               tag = "insert";
                           }
    
                           DingdingSupplierAndTeamProducer.publishMessage("car_driver_team",tag,teamId,map);
                       }
                       }
    
               }if (dingdingAnno != null && "1".equals(dingdingAnno.level())){
                    Object[] args = jointPoint.getArgs();
                    Map<String,Object> map = new HashMap<>();
                    for(Object obj : args){
                        if(obj instanceof CarBizSupplierVo){
                            CarBizSupplierVo supplierVo = (CarBizSupplierVo) obj;
                            map.put("cityId",supplierVo.getSupplierCity());
                            map.put("cityName",supplierVo.getSupplierCityName());
                            map.put("supplierId",supplierVo.getSupplierId());
                            map.put("supplierName",supplierVo.getSupplierFullName());
                            map.put("cooperationType",supplierVo.getCooperationType());
                            DingdingSupplierAndTeamProducer.publishMessage("car_driver_supplier",dingdingAnno.method(),supplierVo.getSupplierId().toString(),map);
                        }
    
                    }
                }
            }
            System.out.println(jointPoint.getSignature().getName());
        }
    }

    3.mq 生产者

       

    package com.zhuanche.common.rocketmq;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.client.producer.SendStatus;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.common.message.MessageQueue;
    import com.alibaba.rocketmq.remoting.common.RemotingUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    
    /**
     * @Author fanht
     * @Description
     * @Date 2019/3/14 上午11:54
     * @Version 1.0
     */
    public class DingdingSupplierAndTeamProducer {
    
        private static final Logger logger = LoggerFactory.getLogger(DingdingSupplierAndTeamProducer.class);
    
        private static DefaultMQProducer producer; //静态的生产者
    
        private String namesrvAddr;//RocketMQ nameserverAddr
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
    
        /**初始化生产者**/
        public void init() throws MQClientException {
            DingdingSupplierAndTeamProducer.startProducer( this.namesrvAddr );
        }
    
    
        private synchronized static void startProducer(String namesrvAddr) throws MQClientException {
            producer = new DefaultMQProducer("mp-manage");
            producer.setInstanceName( RemotingUtil.getLocalAddress() +"@"+System.nanoTime()  );
            producer.setNamesrvAddr(namesrvAddr);
            producer.setRetryAnotherBrokerWhenNotStoreOK(true);//消息没有存储成功是否发送到另外一个broker
            producer.setRetryTimesWhenSendFailed(4); //定义重试次数,默认是2
            producer.setSendMsgTimeout(6000); //定义超时时间,默认是3000
            producer.start();
            logger.info(">>>>>>>>>>>通用的RocketMQ生产者初始化成功!");
        }
        /**销毁生产者**/
        public void destroy() {
            DingdingSupplierAndTeamProducer.stopProducer();
        }
        private synchronized static void stopProducer() {
            producer.shutdown();
            logger.info(">>>>>>>>>>>通用的RocketMQ生产者销毁成功!");
        }
    
        /**发送普通消息**/
        public static boolean publishMessage(String topic, String tags, String keys, Object message){
            if(topic==null) {
                return false;
            }
            if(tags==null) {
                tags = "default";
            }
            if(keys==null) {
                keys = "default";
            }
            if(message==null) {
                return false;
            }
            String msg = JSON.toJSONString(message);
            try{
                logger.info("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: " + msg );
                Message rocketMsg = new Message( topic, tags, keys,  msg.getBytes("UTF-8") );
                SendResult sendResult = producer.send(rocketMsg);
    
                if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    logger.error("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send failed!" );
                    return false;
                }
                logger.info("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send successful!" );
                return true;
            } catch (Exception e) {
                logger.error("[普通MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: 发送普通消息异常!",e );
                return false;
            }
        }
    
        /**
         * 发送顺序消息
         */
        public static boolean publishMessageOrderly(String topic, String tags, String keys, Object message){
            if(topic==null) {
                return false;
            }
            if(tags==null) {
                tags = "default";
            }
            if(keys==null) {
                keys = "default";
            }
            if(message==null) {
                return false;
            }
            String msg = JSON.toJSONString(message);
            try{
                logger.info("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: " + msg );
                Message rocketMsg = new Message( topic, tags, keys,  msg.getBytes("UTF-8") );
                SendResult sendResult = producer.send(rocketMsg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> messageQueues, Message msg, Object obj) {
                        if(obj==null) {
                            return messageQueues.get(0);
                        }
                        int hashCode = Math.abs(obj.hashCode());
                        int index = hashCode % messageQueues.size();
                        return messageQueues.get(index);
                    }
                }, keys);
    
                if (sendResult == null || sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    logger.error("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send failed!" );
                    return false;
                }
                logger.info("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: Send successful!" );
                return true;
            }catch (Exception e) {
                logger.error("[顺序MQ: topic:"+topic+",tags:"+tags+",keys:"+keys+"]: 发送顺序消息异常!",e );
                return false;
            }
        }
    }

    4.rocketmq配置

      

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:aop="http://www.springframework.org/schema/aop"
           xmlns:cache="http://www.springframework.org/schema/cache"
           xmlns:util="http://www.springframework.org/schema/util"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
            http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
    ">
    <!-- 新的配置方式BEGIN ( 说明: 在JVM内只初始化一个全局的生产者,通过此生产者,可以发布任何主题的消息 )  -->
    
        <bean id="dingdingSupplierAndTeamProducer"  class="com.zhuanche.common.rocketmq.DingdingSupplierAndTeamProducer" init-method="init" destroy-method="destroy" scope="singleton">
            <property name="namesrvAddr" value="${rocketmq.dingdingNamesrvAddr}"/>
        </bean>
    <!-- 新的配置方式END -->
    </beans>

    5.在特定的方法里面添加自定义注解,成功后使用后置通知

      

        @RequestMapping("/updateSupplier")
        @ResponseBody
        @RequestFunction(menu = SUPPLIER_UPDATE)
        @DingdingAnno(level = "1",method = "update")
        public AjaxResponse updateSupplier(CarBizSupplierVo supplier){
            return supplierService.saveSupplierInfo(supplier);
        }

    6.rocketMq消费

    package com.zhuanche.message.listener;
    
    import com.alibaba.fastjson.JSONObject;
    import com.zhuanche.message.common.DingdingConstants;
    import com.zhuanche.message.dao.manage.CarDriverTeamDao;
    import com.zhuanche.message.dao.manage.RelationSqycDingdingMapper;
    import com.zhuanche.message.dingding.DingdingHelper;
    import com.zhuanche.message.model.RelationSqycDingding;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * @Author fanht
     * @Description
     * @Date 2019/3/2 下午3:24
     * @Version 1.0
     */
    public class DingdingDriverTeamListener implements MessageListenerOrderly{
    
        private Logger logger = LoggerFactory.getLogger(this.getClass());
    
    
        private static final Integer ZIYING = 1;//ziying
    
        private static final Integer JIAMWNG = 2;//jiameng
    
        @Autowired
        private CarDriverTeamDao carDriverTeamDao;
    
        @Autowired
        private RelationSqycDingdingMapper relationSqycDingdingMapper;
    
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            try {
                for(MessageExt ext : msgs){
                    logger.info("修改钉钉班组信息,msgId:" + ext.getMsgId());
                    if(StringUtils.isEmpty(ext.getMsgId())){
                        logger.info("msgId is null");
                        continue;
                    }
    
                    String str = new String(ext.getBody());
                    if(StringUtils.isNotEmpty(str)){
                        JSONObject jsonObject = JSONObject.parseObject(str);
                        String tag = ext.getTags();
                        //删除时候传的是id
                        String departId = jsonObject.get("teamId") == null ? null : jsonObject.get("teamId").toString();
                        String teamName = jsonObject.get("teamName") == null ? null : jsonObject.get("teamName").toString();
    
                        if(DingdingConstants.INSERT.equals(tag)){
                            try {
                                String pId = jsonObject.get("pId") == null ? null : jsonObject.get("pId").toString();
                                if(StringUtils.isEmpty(pId)){ //如果只是小队
                                    String supplier = jsonObject.get("supplier") == null ? null : jsonObject.get("supplier").toString();
                                    RelationSqycDingding fatherDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.SUPPLIER,supplier,null);
                                    if(fatherDingding != null && fatherDingding.getDingdingId() > 0) {
                                      int dingdingId =  DingdingHelper.createDepartment(teamName, fatherDingding.getDingdingId().toString(), null, false, false, true,
                                              this.orgType(fatherDingding.getCooperationType()));
                                      if(dingdingId > 0){
                                          RelationSqycDingding dingDui = new RelationSqycDingding();
                                          dingDui.setDingdingId(dingdingId);
                                          dingDui.setName(teamName);
                                          dingDui.setSourceId(departId);
                                          dingDui.setType(DingdingConstants.DUI);
                                          dingDui.setCooperationType(fatherDingding.getCooperationType());
                                          int code = relationSqycDingdingMapper.insertDingding(dingDui);
                                          if(code > 0){
                                              logger.info("添加小队到dingding表数据成功");
                                          }
                                      }
                                    }
                                }else {//增加班组
                                    RelationSqycDingding relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.DUI,pId,null);
                                    if(relationSqycDingding !=null && relationSqycDingding.getDingdingId() > 0){
                                        int dingdingId =  DingdingHelper.createDepartment(teamName,relationSqycDingding.getDingdingId().toString(),null,false,false,true,orgType(relationSqycDingding.getCooperationType()));
                                        if(dingdingId > 0){
                                            RelationSqycDingding dingDui = new RelationSqycDingding();
                                            dingDui.setDingdingId(dingdingId);
                                            dingDui.setName(teamName);
                                            dingDui.setSourceId(departId);
                                            dingDui.setType(DingdingConstants.TEAM);
                                            dingDui.setCooperationType(relationSqycDingding.getCooperationType());
                                            int code = relationSqycDingdingMapper.insertDingding(dingDui);
                                            if(code > 0){
                                                logger.info("添加班组到dingding数据成功");
                                            }
                                        }
                                    }
                                }
                            } catch (Exception e) {
                                logger.info("调用dingding接口异常" + e.getMessage());
                                continue;
                            }
                        }else if(DingdingConstants.UPDATE.equals(tag)){
                            //调用更新dingding的接口  部门员工只能看到自己,显示部门
                            try {
                                RelationSqycDingding relationSqycDingding =  relationSqycDingdingMapper.getDingdingId(DingdingConstants.DUI,departId,null);
                                if(relationSqycDingding == null){//修改时候传的teamId不清楚是班组还是车队的
                                    relationSqycDingding = relationSqycDingdingMapper.getDingdingId(DingdingConstants.TEAM,departId,null);
                                }
                                String openCloseFlag = jsonObject.get("openCloseFlag") == null ? null : jsonObject.get("openCloseFlag").toString();
                                if(relationSqycDingding != null && relationSqycDingding.getDingdingId() > 0 ){
                                    if(StringUtils.isNotEmpty(openCloseFlag) &&  "2".equals(openCloseFlag) ){
                                        DingdingHelper.deleteDepartment(relationSqycDingding.getDingdingId().toString(),orgType(relationSqycDingding.getCooperationType()));
                                    }else {
                                        DingdingHelper.updateDepartment(teamName,relationSqycDingding.getDingdingId().toString(),null,false,true,orgType(relationSqycDingding.getCooperationType()));
                                    }
                                }
    
                            } catch (Exception e) {
                                logger.info("调用dingding接口异常" + e.getMessage());
                                continue;
                            }
                        }else if(DingdingConstants.DELETE.equals(tag)){
                            try {
                                String teamId = jsonObject.get("teamId") == null ? null : jsonObject.get("teamId").toString();
                                RelationSqycDingding fatherDingding = relationSqycDingdingMapper.getDingdingByBanZu(teamId,null);
                                if(fatherDingding != null && fatherDingding.getDingdingId() > 0) {
                                    //获取钉钉下面的子部门列表
                                    String subList = DingdingHelper.listIds(fatherDingding.getDingdingId(),orgType(fatherDingding.getCooperationType()));
                                    if(StringUtils.isNotEmpty(subList) && subList.length()>2){
                                        String[] sub = subList.substring(1,subList.length()-1).split(",");
                                        for(String dep : sub){
                                            DingdingHelper.deleteDepartment(dep,orgType(fatherDingding.getCooperationType()));
                                            relationSqycDingdingMapper.deleteByDingDingId(dep);
                                        }
                                    }
    
    
    
                                   Integer code = DingdingHelper.deleteDepartment(fatherDingding.getDingdingId().toString(),orgType(fatherDingding.getCooperationType()));
                                    if(code == null || code != 0){
                                        logger.info("失败次数:" + ext.getReconsumeTimes());
                                        if(ext.getReconsumeTimes() == 10){
                                            return ConsumeOrderlyStatus.SUCCESS;
                                        }
                                        return null;
                                    }
                                    relationSqycDingdingMapper.deleteByTeamId(departId);
                                }
                            } catch (Exception e) {
                                logger.info("调用dingding接口异常" + e.getMessage());
                                continue;
                            }
                        }
                    }
    
                }
            } catch (NumberFormatException e) {
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    
        private int orgType(Integer cooperationType){
            Integer  orgType = 1;
            if("5".equals(cooperationType.toString()) || "18".equals(cooperationType.toString()) ||
                    "20".equals(cooperationType.toString())){
                orgType = ZIYING;
            }else {
                orgType = JIAMWNG;
            }
            return orgType;
        }
    }

    7.rocketmq配置 

     <bean id="dingdingTeamConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start"
              destroy-method="shutdown">
            <property name="consumerGroup" value="${mq.dingdingteam.consumergroup}"/>
            <property name="namesrvAddr" value="${mq.dingdingteam.nameservAdd}"/>
            <property name="instanceName" value="dingdingTeamMessage"/>
            <property name="messageModel" value="CLUSTERING"/>
            <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/>
            <property name="vipChannelEnabled" value="false"/>
            <property name="subscription">
                <map>
                    <entry key="car_driver_team" value="*"/>
                </map>
            </property>
            <property name="messageListener" ref="dingdingTeamListener"/>
        </bean>
    
        <bean id="dingdingTeamListener" class="com.zhuanche.message.listener.DingdingDriverTeamListener"/>
    
    
    
        <bean id="dingdingSupplierConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" init-method="start"
              destroy-method="shutdown">
            <property name="consumerGroup" value="${mq.dingdingSupplier.consumergroup}"/>
            <property name="namesrvAddr" value="${mq.dingdingSupplier.nameservAdd}"/>
            <property name="instanceName" value="dingdingSupplierMessage"/>
            <property name="messageModel" value="CLUSTERING"/>
            <property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET"/>
            <property name="vipChannelEnabled" value="false"/>
            <property name="subscription">
                <map>
                    <entry key="car_driver_supplier" value="*"/>
                </map>
            </property>
            <property name="messageListener" ref="dingdingSupplierListener"/>
        </bean>
    
        <bean id="dingdingSupplierListener" class="com.zhuanche.message.listener.DingdingSupplierListener"/>

    遇到的问题:mq无法消费,原因 comsumerGroup 配置过相同的。另外instanceName 每台机器上面只能有一个。

  • 相关阅读:
    如何在获取celery中的任务执行情况
    python flask接口开发和入参的三种方式args、values、json
    python3 redis数据库写入方法和json格式的坑
    python3 封装好的时间戳转换函数,可直接使用
    python3 *args 、 **args 在函数定义和调用中的应用
    python中剔除字典重复项,可以使用集合(set)。
    python3 文件读写操作中的文件指针seek()使用
    jmeter数据库连接配置
    Python自动化培训第一周学习总结
    jmeter分布式压测
  • 原文地址:https://www.cnblogs.com/thinkingandworkinghard/p/10621263.html
Copyright © 2011-2022 走看看