示例代码是基于博客 https://blog.csdn.net/qq1013598664/article/details/70183908的错误案例修改而来,如果有问题望多多指点,错误代码可以去原文查阅,本文将会指出错误之处。
不多废话,直接上代码:
package com.javartisan.concurrent; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.List; import java.util.UUID; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Transaction; /** * redis并发抢购测试 * * @author javartisan */ public class RedisTest { public static void main(String[] args) { final String watchkeys = "watchkeys"; ExecutorService executor = Executors.newFixedThreadPool(20); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); JedisPool jedisPool = new JedisPool(poolConfig, "127.0.0.1", 6379, 10000, "root"); final Jedis jedis = jedisPool.getResource(); jedis.auth("root"); jedis.set(watchkeys, "0");// 重置watchkeys为0 jedis.del("setsucc", "setfail");// 清空抢成功的,与没有成功的 jedis.close(); for (int i = 0; i < 10000; i++) {// 测试一万人同时访问 executor.execute(new MyRunnable(jedisPool)); } executor.shutdown(); } } class MyRunnable implements Runnable { String watchkeys = "watchkeys";// 监视keys JedisPool jedisPool = null; public MyRunnable(JedisPool jedisPool) { this.jedisPool = jedisPool; } @Override public void run() { Jedis jedis = jedisPool.getResource(); try { jedis.watch(watchkeys);//代码块1 watchkeys String val = jedis.get(watchkeys); int valint = Integer.valueOf(val); String userifo = UUID.randomUUID().toString(); if (valint < 10) { Transaction tx = jedis.multi();//代码块2 开启事务 tx.incr("watchkeys"); List<Object> list = tx.exec();//代码块3 提交事务,如果此时watchkeys被改动了,则返回emptyList if (list.size() > 0) { System.out.println("用户:" + userifo + "抢购成功,当前抢购成功人数:" + (valint + 1)); /* 抢购成功业务逻辑 */ jedis.sadd("setsucc", userifo); return; } } System.out.println("用户:" + userifo + "抢购失败"); jedis.sadd("setfail", userifo); return; } catch (Exception e) { e.printStackTrace(); } finally { jedis.close(); } } }
原文错误之处在于对exec方法的处理,当事务被打断或者执行失败的话返回的是一个emptyList,并不是一个null,因此需要使用list.size判断事务状态。
Jedis中的exec方法实现源码:
public List<Object> exec() { client.exec(); client.getAll(1); // Discard all but the last reply inTransaction = false; List<Object> unformatted = client.getObjectMultiBulkReply(); if (unformatted == null) { //事务失败 return Collections.emptyList(); } List<Object> formatted = new ArrayList<Object>(); for (Object o : unformatted) { try { formatted.add(generateResponse(o).get()); } catch (JedisDataException e) { formatted.add(e); } } return formatted; }
Redis抢购成功数据:
抢购失败用户个数:
备注说明:
代码块1表示监督key,以准备执行事务,如果在事务执行期间该key对应的value被修改的话事务进行回滚。
代码块2到代码块3之间是开启事务执行命令并提交事务,在这个代码期间必须使用事务对象执行命令,否者会报错。即使用tx操作。
在事务中判断元素是否存在时候,返回的是一个Response对象,例如方法get:
redis.clients.jedis.RedisPipeline#get
public Response<String> get(String key) { getClient(key).get(key); return getResponse(BuilderFactory.STRING); }
只能在事务执行完毕或者事务discard之后才可以get,不能再事务期间进行get。
秒杀系统结合限流系统一起处理效果会更好