一 redis key的设计
越短,而且要完整表达含义,可以缩写,但必须文档留存好说明
user:001
tm:order:001 order:1
一般以业务,功能模块或者表名开头,后跟主键(或能表示数据唯一性的值)
二 客户端连接redis
普通实现
import redis.clients.jedis.Jedis; public class jedistest { public static void main(String[] args) { try { String host = "xx.kvstore.aliyuncs.com";//控制台显示访问地址 int port = 6379; //直接new一个jedis对象 Jedis jedis = new Jedis(host, port); //鉴权信息 jedis.auth("password");//password String key = "redis"; String value = "aliyun-redis"; //select db默认为0 jedis.select(1); //set一个key jedis.set(key, value); System.out.println("Set Key " + key + " Value: " + value); //get 设置进去的key String getvalue = jedis.get(key); System.out.println("Get Key " + key + " ReturnValue: " + getvalue); jedis.quit(); jedis.close(); } catch (Exception e) { e.printStackTrace(); } } }
使用了连接池
package com.springboot.demo.base.config; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Configuration @PropertySource("classpath:redis.properties") @Slf4j public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.timeout}") private int timeout; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.max-wait}") private long maxWaitMillis; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.block-when-exhausted}") private boolean blockWhenExhausted; @Bean public JedisPool redisPoolFactory() throws Exception{ log.info("JedisPool注入成功!!"); log.info("redis地址:" + host + ":" + port); JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMaxWaitMillis(maxWaitMillis); // 连接耗尽时是否阻塞, false报异常,ture阻塞直到超时, 默认true jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted); // 是否启用pool的jmx管理功能, 默认true jedisPoolConfig.setJmxEnabled(true); JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, password); return jedisPool; } }
从连接池中获取jedis对象
/** * 通过key获取储存在redis中的value * 并释放连接 * * @param key * @param indexdb 选择redis库 0-15 * @return 成功返回value 失败返回null */ public String get(String key,int indexdb) { Jedis jedis = null; String value = null; try { jedis = jedisPool.getResource(); jedis.select(indexdb); value = jedis.get(key); log.info(value); } catch (Exception e) { log.error(e.getMessage()); } finally { //归还redis对象 returnResource(jedisPool, jedis); //或者 jedis.close() } return value; }
redis的resp协议,无论是传输还是存储,都是按这个数据格式
如 set name james ----> resp协议包 -----> redis 服务端
resp协议包的格式如下
*3 //组数 $3 //字段长度 set // 字段名 $4 //key长度 name //key的字段名 $5 //值的长度 james //值
手写redis
//基于tcp协议 socket实现 redis客户端 public static String set(Socket socket, String key, String value) throws Exception { StringBuilder str = new StringBuilder(); //1个resp协议格式的数据 str.append("*3").append(" "); //组数 str.append("$3").append(" ");//字段长度 str.append("set").append(" ");//字段 str.append("$").append(key.getBytes().length).append(" ");//key长度 str.append(key).append(" "); //key 字段名 str.append("$").append(value.getBytes().length).append(" ");//value长度 str.append(value).append(" "); //value的值 socket.getOutputStream().write(str.toString().getBytes()); byte[] response = new byte[2048]; socket.getInputStream().read(response); return new String(response); }
三 Pipeline详解
1 pipeline出现的背景:
redis客户端执行一条命令分4个过程:
发送命令--->命令排队--->命令执行--->返回结果
这个过程称为Round trip time(简称RTT, 往返时间),mget mset等批量此操作有效节约了RTT,但大部分命令(如hgetall,并没有mhgetall)不支持批量操作,需要消耗N次RTT ,这个时候需要pipeline来解决这个问题。
2 pepeline的性能
1) 未使用pipeline执行N条命令
特别频繁的redis操作,大部分时间都是花在了网络上,网络时间比redis执行时间要长很多,特别是异地机房更加明显
2)、使用了pipeline执行N条命令
3)、两者性能对比
4 )具体实现
@Test public void pipeCompare() { Jedis redis = new Jedis("192.168.1.111", 6379); redis.auth("12345678");//授权密码 对应redis.conf的requirepass密码 Map<String, String> data = new HashMap<String, String>(); redis.select(8);//使用第8个库 redis.flushDB();//清空第8个库所有数据 // hmset long start = System.currentTimeMillis(); // 直接hmset 循环10000次往redis里写数据 for (int i = 0; i < 10000; i++) { data.clear(); //清空map data.put("k_" + i, "v_" + i); redis.hmset("key_" + i, data); //循环执行10000条数据插入redis } long end = System.currentTimeMillis(); System.out.println(" 共插入:[" + redis.dbSize() + "]条 .. "); System.out.println("1,未使用PIPE批量设值耗时" + (end - start) / 1000 + "秒.."); redis.select(8); redis.flushDB(); // 使用pipeline Pipeline pipe = redis.pipelined(); start = System.currentTimeMillis(); //循环10000次,值都放到了Pileline里了 for (int i = 0; i < 10000; i++) { data.clear(); data.put("k_" + i, "v_" + i); pipe.hmset("key_" + i, data); //将值封装到PIPE对象,此时并未执行,还停留在客户端 } pipe.sync(); //将封装后的PIPE一次性发给redis end = System.currentTimeMillis(); System.out.println(" PIPE共插入:[" + redis.dbSize() + "]条 .. "); System.out.println("2,使用PIPE批量设值耗时" + (end - start) / 1000 + "秒 ..");
性能对比
使用了Pipeline性能高效了很多倍,可以优化吞吐量
四、原生批命令(mset, mget)与Pipeline对比
1 原生批命令是原子性,pipeline是非原子性
(原子性概念:一个事务是一个不可分割的最小工作单位,要么都成功要么都失败。原子操作是指你的一个业务逻辑必须是不可拆分的. 处理一件事情要么都成功,要么都失败,原子不可拆分)
2 原生批命令一命令多个key, 但pipeline支持多命令(存在事务),非原子性
3 原生批命令是服务端实现,而pipeline需要服务端与客户端共同完成
五、Pipeline正确使用方式
使用pipeline组装的命令个数不能太多,不然数据量过大,增加客户端的等待时间,还可能造成网络阻塞,可以将大量命令的拆分多个小的pipeline命令完成。
六 具体实现
/** * 删除多个字符串key 并释放连接 * * @param keys* * @return 成功返回value 失败返回null */ public boolean mdel(List<String> keys) { Jedis jedis = null; boolean flag = false; try { jedis = pool.getResource();//从连接借用Jedis对象 Pipeline pipe = jedis.pipelined();//获取jedis对象的pipeline对象 for(String key:keys){ pipe.del(key); //将多个key放入pipe删除指令中 } pipe.sync(); //执行命令,完全此时pipeline对象的远程调用 flag = true; } catch (Exception e) { pool.returnBrokenResource(jedis); e.printStackTrace(); } finally { returnResource(pool, jedis); } return flag; }
七 Redis事务
Redis事务的概念:
Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
总结说:redis事务就是一次性、顺序性、排他性的执行一个队列中的一系列命令。
Redis事务没有隔离级别的概念:
批量操作在发送 EXEC 命令前被放入队列缓存,并不会被实际执行,也就不存在事务内的查询要看到事务里的更新,事务外查询不能看到。
Redis不保证原子性:
Redis中,单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
Redis事务的三个阶段:
- 开始事务
- 命令入队
- 执行事务
Redis事务相关命令:
watch key1 key2 ... : 监视一或多个key,如果在事务执行之前,被监视的key被其他命令改动,则事务被打断 ( 类似乐观锁 )
multi : 标记一个事务块的开始( queued )
exec : 执行所有事务块的命令 ( 一旦执行exec后,之前加的监控锁都会被取消掉 )
discard : 取消事务,放弃事务块中的所有命令
unwatch : 取消watch对所有key的监控
Redis事务使用案例:
(1)正常执行
(2)放弃事务
(3)若在事务队列中存在命令性错误(类似于java编译性错误),则执行EXEC命令时,所有命令都不会执行
(4)若在事务队列中存在语法性错误(类似于java的1/0的运行时异常),则执行EXEC命令时,其他正确命令会被执行,错误命令抛出异常。
(5)使用watch
案例一:使用watch检测balance,事务期间balance数据未变动,事务执行成功
案例二:使用watch检测balance,在开启事务后(标注1处),在新窗口执行标注2中的操作,更改balance的值,模拟其他客户端在事务执行期间更改watch监控的数据,然后再执行标注1后命令,执行EXEC后,事务未成功执行。
一但执行 EXEC 开启事务的执行后,无论事务使用执行成功, WARCH 对变量的监控都将被取消。
故当事务执行失败后,需重新执行WATCH命令对变量进行监控,并开启新的事务进行操作。
总结:
watch指令类似于乐观锁,在事务提交时,如果watch监控的多个KEY中任何KEY的值已经被其他客户端更改,则使用EXEC执行事务时,事务队列将不会被执行,同时返回Nullmulti-bulk应答以通知调用者事务执行失败。