添加jar包
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.0.1</version> </dependency>
简单操作
public class Myredis { public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.set("v5","k5"); //事务 Transaction multi = jedis.multi(); multi.set("k1","k2"); multi.exec(); } }
首先不使用watch
public class Myredis { public static Boolean transfer(int transfernum){ Jedis jedis = new Jedis("127.0.0.1", 6379); Transaction multi = jedis.multi(); multi.decrBy("num",transfernum); multi.exec(); return true; } public static void main(String[] args) { Myredis.transfer(20); } }
测试
1、首先debug代码,停留在了exec()之前,还没有提交事务
2、查询到redis中的num为80
3、手动修改num为200
4、放行代码,执行java全部的代码
5、发现num是按照200来有重新计算的
6、总结,当执行exec()时,消息队列中的代码才真正被执行,注意redis不保证原子性,进入队列的代码分开执行,不同队列的代码执行报错对其他队列中的代码不影响。
所以在执行exec()过程中,如果出现num被修改了,就会发生数据不对的问题,所以我们需要使用watch
乐观锁
使用watch来监听key
public class Myredis { private static int x = 10; private static Jedis jedis = new Jedis("127.0.0.1", 6379); public static Boolean transfer(Long transfernum){ jedis.watch("num"); Transaction multi = jedis.multi(); multi.decrBy("num",transfernum); List<Object> exec = multi.exec(); //如果返回值是null,表示事务被终止 System.out.println(exec); jedis.close(); return true; } public static void main(String[] args) { Myredis.transfer(20L); } }
测试结果,如果外部对num进行修改,watch就会监听到,直接终止事务提交.
可以采用循环来重复获取新的值(可以设置可以循环次数,如果在循环次数内没有成功,就退出)
public class Myredis { private static int x = 10; private static Jedis jedis = new Jedis("127.0.0.1", 6379); public static Boolean transfer(Long transfernum){ jedis.watch("num"); List<Object> exec = null; while (exec==null||exec.size()==0){ Transaction multi = jedis.multi(); multi.decrBy("num",transfernum); exec = multi.exec(); } return true; } public static void main(String[] args) { Myredis.transfer(20L); } }
读写分离
public class Myredis { public static void main(String[] args) { Jedis jedis_Master = new Jedis("127.0.0.1", 6379); Jedis jedis_Slave = new Jedis("127.0.0.1", 6379); jedis_Slave.slaveof("127.0.0.1",6379); //主机用来写 jedis_Master.set("k1","v1"); //从机用来读 String k1 = jedis_Slave.get("k1"); } }
JedisPool
JedisPool的配置参数大部分是由JedisPoolConfig的对应项来赋值的。
maxActive:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了maxActiye个jedis实例,则此时pool的状态为exhausted。
maxIdle:控制一个pool最少有多少个状态为idle(空闲)的jedis实例;whenExhaustedAction:表示当pool中的jedis实例都被allocated完时,pool要采取的操作;默认有三种。
WHEN_EXHAUSTED_FAIL-->表示无jedis实例时,直接抛出NoSuchElementException;WHEN_EXHAUSTED_BLOCK-->则表示阻塞住,或者达到maxWait时抛出JedisConnectionException;WHEN_EXHAUSTED_GROW-->则表示新建一个jedis实例,也就说设置的maxActive无用;
maxwait:表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛jedisConnectionException;
testOnBorrow:获得一个jedis实例的时候是否检查连接可用性(ping());如果为true,则得到的jedis实例均是可用的;
testonReturn:return 一个jedis实例给pool时,是否检查连接可用性(ping());
简单版
public class Myredis { public static void main(String[] args) { JedisPool jedisPoll = MyJedisPoll.getJedisPoll(); Jedis jedis = jedisPoll.getResource(); MyJedisPoll.release(jedis); } } class MyJedisPoll{ private static volatile JedisPool jedisPool=null; //获取连接池 public static JedisPool getJedisPoll(){ if (jedisPool==null){ synchronized (MyJedisPoll.class){ if (jedisPool==null){ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(1000); jedisPoolConfig.setMaxIdle(4); jedisPoolConfig.setMaxWaitMillis(100*1000); jedisPoolConfig.setTestOnBorrow(true); jedisPool = new JedisPool(jedisPoolConfig, "127.0.0.1", 6379); } } return jedisPool; }else { return jedisPool; } } //释放连接 public static void release(Jedis jedis){ //如果连接池已经关闭了,则返回-1,最大活跃数不会超过MAX_ACTIVE,最大空闲数不会超过MAX_OLDE System.out.println(jedisPool.getNumWaiters()+"链接归还前活跃数:"+jedisPool.getNumActive()+"空闲连接数:"+jedisPool.getNumIdle()); jedis.close(); System.out.println(jedisPool.getNumWaiters()+"链接归还后活跃数:"+jedisPool.getNumActive()+"空闲连接数:"+jedisPool.getNumIdle()); } }
复杂版本
下面的版本我觉得有一点需要改进getJedis的时候锁加的位置不对!只需要锁住实例化initpoll,注意双重判断(性能问题)
public class RedisConnectPollUtil{ private static final Log LOG = LogFactory.getLog(RedisConnectPollUtil.class); //redis获取链接的并发锁 private static ReentrantLock redisPollLock= new ReentrantLock(); //连接redis实例的ip private static final String REDIS_ADDRESS = "localhost"; //连接redis实例的端口 private static final int PORT = 6379; //多线程环境中,连接实例的最大数,如果设为-1则无上线,建议设置,否则有可能导致资源耗尽 private static final int MAX_ACTIVE = 8; //在多线程环境中,连接池中最大空闲连接数,单线程环境没有实际意义 private static final int MAX_OLDE = 4; //在多线程环境中,连接池中最小空闲连接数 private static final int MIN_OLDE = 1; //多长时间将空闲线程进行回收,单位毫秒 private static final int METM = 2000; //对象空闲多久后逐出, 当空闲时间>该值 且 空闲连接>最大空闲数 时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略) private static final int SMETM = 2000; //逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1,只有运行了此线程,MIN_OLDE METM/SMETM才会起作用 private static final int TBERM = 1000; //当连接池中连接不够用时,等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException; private static final int MAX_WAIT = 1000; //超时时间,单位毫秒 private static final int TIME_OUT = 5000; //在借用一个jedis连接实例时,是否提前进行有效性确认操作;如果为true,则得到的jedis实例均是可用的; private static final boolean TEST_ON_BORROW = false; //连接池实例 private static JedisPool jedisPool = null; //初始化连接池,有好多重载的构造函数,根据自己业务实际需要来实例化JedisPoll private static void initPoll() { try { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(MAX_ACTIVE); config.setMaxIdle(MAX_OLDE); config.setMaxWaitMillis(MAX_WAIT); config.setTestOnBorrow(TEST_ON_BORROW); config.setMinIdle(MIN_OLDE); // config.setMinEvictableIdleTimeMillis(METM); config.setSoftMinEvictableIdleTimeMillis(SMETM); config.setTimeBetweenEvictionRunsMillis(TBERM); jedisPool = new JedisPool(config, REDIS_ADDRESS, PORT, TIME_OUT); } catch (Exception e) { LOG.error("initial JedisPoll fail:",e); } } //获取jedis连接实例 public static Jedis getJedis() { redisPollLock.lock(); if(jedisPool == null) { initPoll(); } Jedis jedis = null; try { if(jedisPool != null) { jedis = jedisPool.getResource(); } } catch (Exception e) { LOG.error("get jedis fail:",e); }finally { redisPollLock.unlock(); } return jedis; } //归还jedis实例,2.9版本后jedisPool.returnResource(jedis);过期,被close替代,源码如下 /* @Override public void close() { if (dataSource != null) { if (client.isBroken()) { this.dataSource.returnBrokenResource(this); } else { this.dataSource.returnResource(this); } } else { client.close(); } } */ //如果每次获取了jedis连接后不进行归还,redis不会自动回收,那么获取的最多连接数量为MAX_ACTIVE //超出数量则会抛出异常redis.clients.jedis.exceptions.JedisException: Could not get a resource from the pool public static void returnSource(Jedis jedis) { if(jedis != null) { //如果连接池已经关闭了,则返回-1,最大活跃数不会超过MAX_ACTIVE,最大空闲数不会超过MAX_OLDE System.out.println(jedisPool.getNumWaiters()+"链接归还前活跃数:"+jedisPool.getNumActive()+"空闲连接数:"+jedisPool.getNumIdle()); jedis.close(); System.out.println(jedisPool.getNumWaiters()+"链接归还后活跃数:"+jedisPool.getNumActive()+"空闲连接数:"+jedisPool.getNumIdle()); } } private static int k = 0; public static void main(String[] args) { for(int i=0;i<20;i++) { new Thread(new Runnable() { @Override public void run() { Jedis jedis = getJedis(); System.out.println("第"+(k++)+"次"+jedis.lpop("pageList")); returnSource(jedis); //判断此连接是否还有效,有效返回true,否则返回false //连接归还后,将不可用,会抛出redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. if(!jedis.isConnected()) { jedis.lpop("pageList"); } // jedisPool.close(); jedisPoll关闭后将导致池不可用 // System.out.println("jedispoll是否关闭了?"+jedisPool.isClosed()); } }).start(); } try { //主线程等待一定时间,否则会发生线程执行时效错乱问题 Thread.currentThread().sleep(15000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(jedisPool.getNumWaiters()+"最终链接归还后活跃数:"+jedisPool.getNumActive()+"空闲连接数:"+jedisPool.getNumIdle()); destroy(); } //在运用正常运行时,通常是不会手动调用jedisPool.close();池内将保持最大空闲数的连接,如果设置了逐出策略 //那么池内就会保留最小空闲连接,如果应用突然关闭,我们需要在bean销毁时将连接池销毁. public static void destroy(){ if(jedisPool != null) { try { jedisPool.destroy(); } catch (Exception e) { LOG.error("jedisPool destroy fail ",e); } } } }
最后任务
之后有时间需要看源码了解watch是如何实现监控的,和exec如果实现执行队列中的代码的......