package com.sankuai.qcs.regulation.nanjing.util; import com.dianping.squirrel.client.StoreKey; import com.dianping.squirrel.client.impl.redis.RedisStoreClient; import com.dianping.zebra.util.StringUtils; import com.google.common.collect.Maps; import com.sankuai.meituan.config.MtConfigClient; import com.sankuai.qcs.regulation.nanjing.Conf; import org.apache.http.HttpException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; /** * Describe: * Created by tanxiaolei * Date: 2018/4/20 11:50 */ @Component public class TokenUtils { private static final Logger LOGGER = LoggerFactory.getLogger(TokenUtils.class); @Resource private RedisStoreClient redisStoreClient; @Resource private MtConfigClient mtConfigClient; private static final String KEY_CATEGORY = "regulation_traffic"; private static final String TOKEN_KEY_PARAMS = "nanjing_token_key"; //缓存失效时间 11个小时 private static final int TOKEN_EXPIRE_SECONDS = 39600; private static final String LOCK_KEY_PARAMS = "nanjing_lock_key"; //分布式锁失效时间2秒 private static final int LOCK_EXPIRE_SECONDS = 2; private static final String NJ_TOKEN_USERID = "NJ_TOKEN_USERID"; private static final Map<String, String> headers = Maps.newHashMap(); static { headers.put("Connection", "keep-alive"); headers.put("Accept-Charset", Conf.DEFAULT_CHARSET); headers.put("Content-Type", Conf.ContentType.JSON.getMimeType()); } /** * 判断token是否在redis存在 * * @return */ public boolean tokenExists() { StoreKey key = new StoreKey(KEY_CATEGORY, TOKEN_KEY_PARAMS); return redisStoreClient.exists(key); } /** * 删除指定token * * @return */ public void delToken() { StoreKey key = new StoreKey(KEY_CATEGORY, TOKEN_KEY_PARAMS); LOGGER.info("key : {} delete {}", key, redisStoreClient.delete(key)); } /** * 获取token * * @return */ public String getToken() { StoreKey key = new StoreKey(KEY_CATEGORY, TOKEN_KEY_PARAMS); String token = redisStoreClient.get(key); LOGGER.info("get token :{} from redis", token); if (token == null) { StoreKey lock = new StoreKey(KEY_CATEGORY, LOCK_KEY_PARAMS); //分布式锁,如果没拿到锁,则直接放弃,防止南京侧服务出现问题,影响MQ消费 if (redisStoreClient.setnx(lock, "lock", LOCK_EXPIRE_SECONDS)) { //双重检验,防止重复获取token token = redisStoreClient.get(key); if (token == null) { try { String userId = mtConfigClient.getValue(NJ_TOKEN_USERID); LOGGER.info("mtConfigClient get userId : {}", userId); token = HttpClientUtils.post("http://" + Conf.GET_TOKEN_URL + userId, "320100", headers); LOGGER.info("get token : {} from http", token); if (StringUtils.isNotBlank(token)) { redisStoreClient.set(key, token, TOKEN_EXPIRE_SECONDS); } } catch (HttpException e) { LOGGER.error("get token errer", e); } } //将分布式锁直接过期 redisStoreClient.expire(lock, 0); } } return token; } }
package com.sankuai.qcs.regulation.nanjing.util; import com.sankuai.qcs.regulation.nanjing.Conf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.ClientEndpoint; import javax.websocket.CloseReason; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; /** * Describe: * Created by tanxiaolei * Date: 2018/4/18 14:26 */ @ClientEndpoint @Component public class WebSocketClientUtils { // @Autowired // private TokenUtils tokenUtils; private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketClientUtils.class); private static ApplicationContext applicationContext; public static ApplicationContext getApplicationContext() { return applicationContext; } public static void setApplicationContext(ApplicationContext applicationContext) { WebSocketClientUtils.applicationContext = applicationContext; } @OnOpen public void onOpen(Session session) { //经过试验,客户端设置 buffer size时并不生效 session.setMaxBinaryMessageBufferSize(Conf.BINARY_MESSAGE_BUFFER_SIZE); session.setMaxTextMessageBufferSize(Conf.BINARY_MESSAGE_BUFFER_SIZE); LOGGER.info("Session {}, {} Connected", session.getId(), session.getRequestParameterMap()); } @OnMessage public void onMessage(String message, Session session) { LOGGER.info("Session receive message : {}", message); //如果是403,表示token失效 if ("403".equals(message)) { delAndGetNewToken(); } } @OnClose public void onClose(Session session, CloseReason closeReason) { LOGGER.info("Session max buffer size {} {} close because of {}", session.getMaxBinaryMessageBufferSize(), session.getRequestParameterMap(), closeReason); } @OnError public void onError(Session session, Throwable throwable) { if (session != null) { LOGGER.error("Session {} error", session.getRequestParameterMap(), throwable); } else { LOGGER.error("error", throwable); } } private void delAndGetNewToken() { TokenUtils tokenUtils = (TokenUtils) applicationContext.getBean(TokenUtils.class); LOGGER.info("toeknUtils : {}", tokenUtils); tokenUtils.delToken(); LOGGER.info("again get token : {}", tokenUtils.getToken()); } }
/** * 添加 Key 对应的值为 Value,只有当 Key 不存在时才添加,如果 Key 已经存在,不改变现有的值 * {@link RedisStoreClient#add(StoreKey, Object, int)} * @param key 要添加的 Key * @param value 要添加的 Value * @param expireInSeconds 过期时间 * @return 如果 Key 不存在且添加成功,返回 true<br> * 如果 Key 已经存在,返回 false * @throws StoreException 异常都是 StoreException 的子类且是 RuntimeException,可以根据需要捕获相应异常。 * 如:如果需要捕获超时异常,可以捕获 StoreTimeoutException */ public Boolean setnx(StoreKey key, Object value, int expireInSeconds);
问题的关键是:方法:
getToken
使用了加锁方法:
if (redisStoreClient.setnx(lock, "lock", LOCK_EXPIRE_SECONDS)) {
这个方法 如果 Key 不存在且添加成功, 如果 Key 已经存在,返回 false
也就是说:只有key添加成功的话才获取token,否则丢弃,防止南京服务器出问题;