zoukankan      html  css  js  c++  java
  • 并发请求的重复插入问题

      最近被一个并发问题折腾的很惨,特意拿出来分享。把我不开心的事,发出来给大家开心开心。

      业务背景:邀请活动,一个用户可以邀请多个用户,比如我可以邀请你,也可以邀请他。但一个用户只能被另一个用户邀请,不允许重复邀请。比如你邀请了我,他就不能再邀请我了。

      问题背景:根据业务背景设计了一张被邀请人的表来存储被邀请人记录。重复邀请的判断是拿活动ID和被邀请人查表,存在说明被邀请人重复了。但如果是并发重复请求,会突破这一层校验,因为那时数据未入库,根本就查不到。所以在表加了唯一索引:邀请人号码、被邀请人号码和活动ID,这样一来,同一活动、相同的邀请人和被邀请人将无法同时入库,确保了被邀请人在并发重复请求时只有一条记录插入。

      问题:需求变更,现在允许重复邀请了,比如你邀请了我,他也能再次邀请我。很明显唯一索引必须要修改,否则需求无法实现。为了继续使用唯一索引来限制并发重复请求,我们可以给它加一个邀请时间字段,这样同一个时间点的并发重复请求会被限制。那么现在问题来了,虽然限制住了同一秒(邀请时间字段精确到秒)的并发重复请求,但并不能限制住不同秒级的并发。比如两条并发,第一条是2018-9-10 17:24:00入库的,第二条是2018-9-10 17:24:01入库的。假如是100条并发,那么跨秒的可能性更大。

      解决方案:

      1、前端限制:点击按钮触发事件后把按钮属性设置为disable,限制重复点击。或者点击按钮后播放一个3秒倒计时,这3秒内用户也无法重复请求。遗憾的是这个业务场景是二维码扫码触发的,所以拿两个手机对着同一个二维码扫就可能并发了。

      2、后端限制:插入前先查,查不到插,代码加锁。这样能限制住单点的并发,但生产环境部署了好几台机子做负载均衡,也就是并发请求可能同时到达两台不同的机子。这种分布式的情况下,得加分布式锁才行。遗憾的是这个项目并未使用redis。

      消息队列,把并发请求放进队列里,然后一个一个处理,如果是重复请求就过滤掉。基本原理还是把并发变成同步。遗憾的是该项目未使用kafka或其他mq。

      3、数据库限制:先考虑了事务,该项目数据库是Oracle,采用了myBatis作为ORM框架,采用默认的事务隔离级别READ COMMITTED,又试了串行化的SERIALIZABLE,结果都不行。目前仍不清楚是否为myBatis造成的,它的事务是由spring的切面切进来的。先通过注解@Service注册到spring的容器,再由切面expression匹配,不知道是否在insertInviteeRecord(插入)调用了getInviteeCountForOneCampaign(查询)造成的。贴上代码:

    import java.sql.SQLException;
    import java.util.List;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.wlf.dao.InviteeMapper;
    import com.wlf.domain.vcode.Invitee;
    import com.wlf.domain.vcode.InviterBase;
    import com.wlf.service.inviteVcode.InviteeService;
    
    @Service("inviteeService")
    public class InviteeServiceImpl implements InviteeService
    {
        @Autowired
        private InviteeMapper inviteeMapper;
        
        @Override
        public Integer getInviteeCountForOneCampaign(String campaignId, String inviteeIdentityId)
        {
            return inviteeMapper.getInviteeCountForOneCampaign(campaignId, inviteeIdentityId);
        }
        
        
        @Override
        public void insertInviteeRecord(Invitee invitee)
        {
            if (inviteeMapper.getInviteeCountForOneCampaign(invitee.getActivityId(), invitee.getInviteeMsisdn()) > 0)
            {
                throw new RuntimeException("并发了并发了");
            }
            else
            {
                inviteeMapper.insertInviteeRecord(invitee);
            }
        }
        
    }
        <!-- 拦截器方式配置事物 -->
        <tx:advice id="transactionAdvice" transaction-manager="transactionManager">
            <tx:attributes>
                <tx:method name="insertInviteeRecord" propagation="REQUIRED" isolation="SERIALIZABLE"/>
            </tx:attributes>
        </tx:advice>
        <aop:config>
            <aop:pointcut id="transactionPointcut" expression="execution(* com.wlf.service..*Impl.*(..))" />
            <aop:advisor pointcut-ref="transactionPointcut" advice-ref="transactionAdvice" />
        </aop:config>

      又考虑了悲观锁和乐观锁。遗憾的是这里是插入的并发,而不是修改。并发请求还未到来前,表里并无数据,所以无法使用for update来锁住记录,也无法加版本或者时间戳字段来标志记录。

      存储过程和触发器太麻烦,pass了。最后采用了merge into:

        <!-- 插入一条被邀请记录 -->
        <insert id="insertInviteeRecord" parameterType="com.wlf.domain.vcode.Invitee">
            merge into t_invitee_record t1 using
            (select #{inviteeMsisdn,jdbcType=VARCHAR} inviteeMsisdn,#{inviterMsisdn,jdbcType=VARCHAR} inviterMsisdn,#{activityId,jdbcType=VARCHAR} activityId from dual) t2
            on (t1.inviteeMsisdn = t2.inviteeMsisdn and t1.inviterMsisdn = t2.inviterMsisdn and t1.activityId = t2.activityId)
            when not matched then 
            INSERT (inviteeMsisdn,inviterMsisdn,activityId,acceptInviteTime)
            VALUES(
            #{inviteeMsisdn,jdbcType=VARCHAR},
            #{inviterMsisdn,jdbcType=VARCHAR},
            #{activityId,jdbcType=VARCHAR},
            #{acceptInviteTime,jdbcType=TIMESTAMP}
            )
        </insert>

      先select一把,把select到的数据放在dual里,再跟要插入的数据匹配。如果能匹配上,说明表里已经有其他并发请求捷足先登了,匹配不上说明我先来,直接插入。这种语句应该算会话级别的防并发控制,可以过滤掉大部分并发请求,但不能识别出并发时间很短的请求,这种并发就需要唯一索引发挥威力了。

      最后看下测试结果:

    import java.nio.charset.Charset;
    import org.asynchttpclient.AsyncHttpClient;
    import org.asynchttpclient.AsyncHttpClientConfig;
    import org.asynchttpclient.BoundRequestBuilder;
    import org.asynchttpclient.DefaultAsyncHttpClient;
    import org.asynchttpclient.DefaultAsyncHttpClientConfig;
    import org.asynchttpclient.Response;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.http.HttpStatus;
    
    public class HttpTools
    {
        
        /**
         * http的header中的content-type属性的名字
         */
        private static final String CONTENT_TYPE_NAME = "content-type";
        
        /**
         * http的header中的content-type属性的内容
         */
        private static final String CONTENT_TYPE_VALUE_XML_UTF_8 = "application/json; charset=UTF-8";
        
        /**
         * http的header中的content-type属性的字符编码
         */
        private static final String UTF_8 = "UTF-8";
        
        /**
         * HTTP 成功响应结果码
         */
        private static final int HTTP_STATUS_OK = 200;
        
        /**
         * HttpUtil类的实例
         */
        private static HttpTools instance = new HttpTools();
        
        /**
         * 日志对象
         */
        private static final Logger LOGGER = LoggerFactory.getLogger(HttpTools.class);
        
        /**
         * server 其他错误错误码
         */
        private final static int SERVER_OTHER_ERROR_CODE = 20000;
        
        /**
         * HttpUtil类构造函数
         */
        public HttpTools()
        {
            
        }
        
        public static HttpTools getInstance()
        {
            return instance;
        }
        
        private static AsyncHttpClient asynHttpClient = getAsyncHttpClient();
        
        /**
         * 获取请求类的客户端
         */
        public static AsyncHttpClient getAsyncHttpClient()
        {
            AsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder().setFollowRedirect(false)
                .setConnectTimeout(PropertiesConfig.getInt("asynHttp.connectTimeout", 500))
                .setRequestTimeout(PropertiesConfig.getInt("asynHttp.requestTimeout", 10000))
                .setReadTimeout(PropertiesConfig.getInt("asynHttp.readTimeout", 10000))
                .build();
            AsyncHttpClient client = new DefaultAsyncHttpClient(config);
            return client;
        }
        
        /**
         * @param url
         * @param xml
         */
        public static String sendRequestByAsync(String url, String xml)
        {
            if (LOGGER.isDebugEnabled())
            {
                LOGGER.debug("Enter sendRequestByAsync()! url=" + url + "and xml=" + xml);
            }
            // 默认响应结果码
            int resultCode = HTTP_STATUS_OK;
            Response response = null;
            String responseXml = null;
            
            BoundRequestBuilder builder = asynHttpClient.preparePost(url);
            try
            {
                // 把参数放入请求头header中
                builder.setHeader(CONTENT_TYPE_NAME, CONTENT_TYPE_VALUE_XML_UTF_8);
                
                // 请求消息体
                builder.setBody(xml);
                
                // 发送http请求
                response = asynHttpClient.executeRequest(builder.build()).get();
                
                if (null == response)
                {
                    LOGGER.error("The response code is error! response is null and url=" + url + "and xml=" + xml);
                    return null;
                }
                
                resultCode = response.getStatusCode();
                
                if (HTTP_STATUS_OK != resultCode)
                {
                    if (SERVER_OTHER_ERROR_CODE == resultCode)
                    {
                        LOGGER.error("The response code is error!and url=" + url + "and xml=" + xml + "and resuleCode="
                            + resultCode);
                    }
                    else
                    {
                        if (LOGGER.isInfoEnabled())
                        {
                            LOGGER.info("The response code is error!and url=" + url + "and xml=" + xml + "and resuleCode="
                                + resultCode);
                        }
                    }
                    
                }
                responseXml = response.getResponseBody(Charset.forName(UTF_8));
                
            }
            catch (Exception ex)
            {
                LOGGER.error(
                    "send http request error in BaseHttpTools.sendHttpRequestByAsync(String url, String xml)!errorMessage="
                        + ex.getMessage() + "||url=" + url + "||xml=" + xml,
                    ex);
            }
            return responseXml;
        }
        public static void main(String[] args)
        {
            HttpTools ht = new HttpTools();
            try
            {
                int nThreads = 100;
                String url = "http://127.0.0.1:8088/wlf/invite";
                String xml = createXml();
                ht.httpPost(url, xml, nThreads);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
        
        /**
         * 构造请求xml报文
         *
         * @author wulinfeng
         * @return
         */
        private static String createXml()
        {
            StringBuilder strBuf = new StringBuilder();
            strBuf.append("<Request>");
            strBuf.append("<activityId>").append("4001").append("</activityId>");
            strBuf.append("<inviteeId>").append("13824384878").append("</inviteeId>");
            strBuf.append("<inviterId>").append("40000580417").append("</inviterId>");        
            strBuf.append("<acceptTime>").append("20180904094912").append("</acceptTime>");
            strBuf.append("</Request>");
            return strBuf.toString();
        }
        
        /**
         * 开始新增线程调用http
         *
         * @author wulinfeng
         * @param url
         * @param xml
         * @param nThreads 启用多少个线程
         */
        private void httpPost(String url, String xml, int nThreads)
        {
            HttpPostClient hp = new HttpPostClient(url, xml);
            
            for (int i = 0; i < nThreads; i++)
            {
                new Thread(hp).start();
            }
        }
        
        /**
         * 异步调用post请求
         *
         * @author wulinfeng
         * @version C10 2018年9月4日
         * @since SDP V300R003C10
         */
        class HttpPostClient implements Runnable
        {
            private String url;
            
            private String xml;
            
            public HttpPostClient(String url, String xml)
            {
                this.url = url;
                this.xml = xml;
            }
            
            @Override
            public void run()
            {
                String result = sendRequestByAsync(url, xml);
                System.out.println(result);
            }
        }
    }

      控制台输出:

    ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'log4j2.debug' to show Log4j2 internal initialization logging.
    <?xml version="1.0" encoding="UTF-8" ?>
    <Response>
      <resultCode>20000</resultCode>
      <resultMsg>其他错误</resultMsg>
    </Response>
    <?xml version="1.0" encoding="UTF-8" ?>
    <Response>
      <resultCode>200</resultCode>
      <resultMsg>成功</resultMsg>
    </Response>
    <?xml version="1.0" encoding="UTF-8" ?>
    <Response>
      <resultCode>200</resultCode>
      <resultMsg>成功</resultMsg>
    </Response>

      数据库查了下,只有一条入库了。第一个请求报错是因为唯一索引导致的,其他99个查到库里已经有数据直接返回成功了,我这里就没全部贴出来了。

  • 相关阅读:
    poj 1562 Oil Deposits
    poj 1650 Integer Approximation
    snmp4j 编程
    ubuntu 13.04 163源(亲测可用)
    c语言中static 用法总结(转)
    Spring入门
    Hibernate入门
    Struts2入门教程
    素数距离问题
    ASCII码排序
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/9621693.html
Copyright © 2011-2022 走看看