一、基本概念
-
redis单机使用不同的数据库保存多个键值对,单机redis默认有16个db,但是redis集群中节点只使用第一个数据库db[0]。 redis集群使用分片的方式保存数据库中的键值对,集群的整个数据库被分成16384个槽slot,数据库中每个键都属于这16384个槽的其中一个,集群中每个节点可以处理0个或最多16384个槽。
-
redis集群把节点分为master和slave,master负责处理客户端的命令(处理负责的槽的命令请求),slave负责复制某个master,并在被复制的master下线的时候,代替master继续处理命令请求。
在一个集群中,超过半数以上的master认为某个master下线了或者是疑似下线,那么就把这个master标记为已下线(FAIL),广播FAIL消息。这个下线master的所有slave开始故障转移。 -
故障转移
(1)基于Raft算法的领头选举方法实现选举新的主节点(只有master具有投票权,IO速度快的slave就成了新master)。
(2)新节点把已下线master的所有槽指派给自己负责。
(3)新节点广播PONG消息,宣布自己成为master并接管了槽。
(4)新节点开始接收和处理自己负责的槽有关的命令请求,故障转移完成。 -
通道pipeline
(1)通道是接收客户端的命令请求,并直接执行了,但是返回结果是在调用syncAndReturnAll()时,按照命令执行的顺序的返回结果存储的列表。调用sync()没有返回结果,也表示通道结束,需要最后close()。
(2)pipeline适用于批处理,当有大量的操作需要一次性执行的时候,可以用管道。
(3)redis集群暂时没有可以执行使用的集群通道,可以在每个master上生成一个通道,并执行这个master上的key,参考下面附件中的代码code1或者自己写程序构建一个redis集群的pipeline,参考下面附件中的代码code2. -
事务transaction
(1)Redis通过MULTI、EXEC、WATCH等命令来实现事务(transaction)功能。事务提供了
一种将多个命令请求打包,然后一次性、按顺序地执行多个命令的机制,并且在事务执行期
间,服务器不会中断事务而改去执行其他客户端的命令请求,它会将事务中的所有命令都执
行完毕,然后才去处理其他客户端的命令请求。
(2)事务中的命令要不全部执行,要不就是全部不执行。
(3)redis集群对于事务的支持只能在一个slot上完成,所以必须对每个节点上每个槽构建一个transaction,对这个槽上的key进行操作。
(4)mutli和exec必须成对出现,不然会出现:(error) ERR MULTI calls can not be nested.即不允许在存在一个mutli的情况下,再生成新的multi。代码可参考下面附件中的代码code3. -
redis集群上的正则表达模式匹配,现在只能在每个节点上单独执行提取符合模式匹配的key。
获取redis集群上所有符合给定模式pattern(正则表达式)的key
正则表达模式:
h?llo 匹配 hello, hallo 和 hxllo
h*llo 匹配 hllo 和 heeeello
h[ae]llo 匹配 hello 和 hallo, 但是不匹配 hillo
h[^e]llo 匹配 hallo, hbllo, … 但是不匹配 hello
h[a-b]llo 匹配 hallo 和 hbllo
val keys = new util.TreeSet[java.lang.String]()
// 1.获得cluster中所有节点
val clusterNodes:util.Map[String, JedisPool] = cluster.getClusterNodes
// 3. 对集群中每个节点都执行keys(pattern)
for(node <- nodes_master){
val jedisPool:JedisPool = clusterNodes.get(node)
val jedisConn:Jedis = jedisPool.getResource
try{
keys.addAll(jedisConn.keys(pattern))
}catch{
case e:Exception => { println("error getting keys: "+e+" !!!") }
}finally{
jedisConn.close()
}
}
二、性能
- Redis在最新的硬件上可以每秒执行100 000个操作,而在高端的硬件上甚至可以每秒执行将近225 000个操作。(来源《Redis实战》-6.2.2)
附件
** 代码1:在每个master上生成一个通道,并执行这个master上的key**
// 从redis上获取全部的Rec5_Pic_<pid>的key
val pattern = "User_Label_Act_-u[0123456789]*"
// println("key pattern: "+pattern)
var keys:List[AnyRef] = List()
// 1.获得cluster中所有节点
val clusterNodes:util.Map[String, JedisPool] = cluster.getClusterNodes
// 2. 对集群中每个节点都执行keys(pattern)
for(node <- nodes_master){
val time_start_key:Long = System.currentTimeMillis()
println("node: "+node)
val jedisPool:JedisPool = clusterNodes.get(node)
val jedisConn:Jedis = jedisPool.getResource
// 获得集群上每个节点上符合pattern的所有的key
keys = jedisConn.keys(pattern).toArray().toList
if(keys.isEmpty){
...
}else{
val jcp:Pipeline = jedisConn.pipelined()
for(key <- keys){
jcp操作(set,rpush等)
}
}
jcp.sync()
jcp.close()
}
jedisConn.close()
}
** 代码2:构建redis集群的pipeline**
import java.io.Closeable
import java.io.IOException
import java.lang.reflect.Field
import scala.util.control.Breaks._
import redis.clients.jedis.{Client, Jedis, JedisCluster, JedisClusterInfoCache, JedisPool, JedisSlotBasedConnectionHandler, PipelineBase}
import redis.clients.jedis.exceptions.{JedisMovedDataException, JedisRedirectionException}
import redis.clients.jedis.util.JedisClusterCRC16
import redis.clients.jedis.util.SafeEncoder
class JedisClusterPipeline extends PipelineBase with Closeable{
private var connectionHandler:JedisSlotBasedConnectionHandler = _
private val clients:collection.mutable.Queue[Client] = collection.mutable.Queue[Client]() // 根据顺序存储每个命令对应的Client
private val jedisMap:collection.mutable.Map[JedisPool, Jedis] = collection.mutable.Map[JedisPool, Jedis]() // 用于缓存连接
val FIELD_CONNECTION_HANDLER:Field = getField(Class.forName("redis.clients.jedis.BinaryJedisCluster"), "connectionHandler") // 构建属性反射对象
val FIELD_CACHE:Field = getField(Class.forName("redis.clients.jedis.JedisClusterConnectionHandler"), "cache")
println("JedisClusterPipeline clients: "+clients.length)
/* @Description: 根据jedisCluster实例生成对应的JedisClusterPipeline
* @Param: [jedisCluster]
* @return: _root_.com.boe.recommend.JedisClusterPipeline
*/
def pipelined(jedisCluster:JedisCluster):JedisClusterPipeline={
// println("pipelined(jedisCluster:JedisCluster)!!!")
val pipeline:JedisClusterPipeline = new JedisClusterPipeline()
pipeline.setJedisCluster(jedisCluster)
pipeline
}
/* @Description: 获得JedisCluster实例的connectionHandler属性值
* @Param: [jedis]
* @return: Unit
*/
def setJedisCluster(jedis:JedisCluster): Unit ={
// println("setJedisCluster(jedis:JedisCluster)!!!")
val ch:JedisSlotBasedConnectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER).asInstanceOf[JedisSlotBasedConnectionHandler]
if(ch == null){
throw new RuntimeException("error: cannot get JedisSlotBasedConnectionHandler from JedisCluster!!!")
}
connectionHandler = ch
}
/* @Description: 获得JedisSlotBasedConnectionHandler实例的cache属性值JedisClusterInfoCache,
再根据JedisClusterInfoCache获得处理特定槽slot的JedisPool,
再根据JedisPool,获得对应的Jedis
* @Param: [slot]
* @return: _root_.redis.clients.jedis.Jedis
*/
def getJedis(slot:Int):Jedis={
// println("getJedis(slot:Int)!!!")
val cache:JedisClusterInfoCache = getValue(connectionHandler, FIELD_CACHE).asInstanceOf[JedisClusterInfoCache]
val pool:JedisPool = cache.getSlotPool(slot)
// 根据pool从缓存中获取Jedis
// var jedis:Jedis = jedisMap(pool)
// if(jedis == null){
// jedis = pool.getResource
// jedisMap += (pool -> jedis)
// }
var jedis:Jedis = null
if(jedisMap.keys.exists(x=>x==pool)){
jedis = jedisMap(pool)
}else{
jedis = pool.getResource
jedisMap += (pool -> jedis)
}
jedis
}
// 必须覆盖父类PipelineBase的抽象方法
def getClient(key: String):Client={
// println("getClient(key: String)!!!")
val bKey:Array[Byte] = SafeEncoder.encode(key)
getClient(bKey)
}
// 必须覆盖父类PipelineBase的抽象方法
def getClient(key: Array[Byte]): Client={
// println("getClient(key: Array[Byte])!!!")
val jedis:Jedis = getJedis(JedisClusterCRC16.getSlot(key)) // 求出key对应的槽slot,再获取处理这个槽的Jedis
val client = jedis.getClient // 获得Jedis的client
clients += client
client
}
// 必须实现Closeable接口的方法
@throws[IOException]
override def close(): Unit={
println("close()!!!")
clean()
for(client <- clients){
client.close()
}
clients.clear()
for(jedis <- jedisMap.values){ // collection.mutable.Map[JedisPool, Jedis]
jedis.close()
}
jedisMap.clear()
}
/* @Description: 构建cls类的fieldName属性的反射对象
val cluster = RedisUtil.createJedisCluster()
val FIELD_CONNECTION_HANDLER:Field = Class.forName("redis.clients.jedis.BinaryJedisCluster").getDeclaredField("connectionHandler")
FIELD_CONNECTION_HANDLER.setAccessible(true) // 成员变量为procted,故必须进行此操作
val ch:JedisSlotBasedConnectionHandler = FIELD_CONNECTION_HANDLER.get(cluster).asInstanceOf[JedisSlotBasedConnectionHandler]
println("ch: "+ch) // ch: redis.clients.jedis.JedisSlotBasedConnectionHandler@1d082e88
* @Param: [cls, fieldName]
* @return: _root_.java.lang.reflect.Field
*/
def getField(cls:Class[_], fieldName:String): Field ={
try{
val field:Field = cls.getDeclaredField(fieldName)
field.setAccessible(true)
field
}catch {
case e1:NoSuchFieldException =>
throw new RuntimeException("error1: cannot find or access field '" + fieldName + "' from " + cls.getName, e1)
case e2:SecurityException =>
throw new RuntimeException("error2: cannot find or access field '" + fieldName + "' from " + cls.getName, e2)
case e:Exception =>
throw new RuntimeException("error: getField, ", e)
}
}
/* @Description: 获得obj对象中定义的发射field的属性值
* @Param: [obj, field]
* @return: Any
*/
def getValue(obj:Object, field:Field): Any ={
try {
field.get(obj)
}catch {
case e1:IllegalArgumentException =>
println("error1: get value fail "+e1)
throw new RuntimeException(e1)
case e2:IllegalAccessException =>
println("error2: get value fail "+e2)
throw new RuntimeException(e2)
case e:Exception =>
println("error: get value fail "+e)
throw new RuntimeException(e)
}
}
// 刷新集群信息,当集群信息发生变更时调用
def refreshCluster(): Unit ={
connectionHandler.renewSlotCache()
}
// Pipeline.java源码
// public void sync() {
// 判断消息队列是否为空,是否发出请求
// if (getPipelinedResponseLength() > 0) {
// 从InputStream中获取回复消息,逐个将消息塞回消息队列的Response中
// List<Object> unformatted = client.getMany(getPipelinedResponseLength());
// for (Object o : unformatted) {
// generateResponse(o);
// }
// }
// }
// public List<Object> syncAndReturnAll() {
// if (getPipelinedResponseLength() > 0) {
// List<Object> unformatted = client.getMany(getPipelinedResponseLength());
// List<Object> formatted = new ArrayList<Object>();
// for (Object o : unformatted) {
// try {
// formatted.add(generateResponse(o).get());
// } catch (JedisDataException e) {
// formatted.add(e);
// }
// }
// return formatted;
// } else {
// return java.util.Collections.<Object> emptyList();
// }
// }
// 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
def sync(): Unit ={
// println("sync()")
innerSync(null)
}
// 同步读取所有数据 并按命令顺序返回一个列表
def syncAndReturnAll(): collection.mutable.ListBuffer[Any] ={
println("syncAndReturnAll(): collection.mutable.ListBuffer[Any]!!!")
val responseList:collection.mutable.ListBuffer[Any] = collection.mutable.ListBuffer[Any]()
innerSync(responseList)
responseList
}
// sync()
// innerSync(formatted:collection.mutable.ListBuffer[Any])!!!
// innerSync client: redis.clients.jedis.Client@2c767a52
// getPipelinedResponseLength: 101
// client get: ()
// data: OK
// innerSync client: redis.clients.jedis.Client@708f5957
// getPipelinedResponseLength: 1
// client get: ()
// data: OK
// close()!!!
def innerSync(formatted:collection.mutable.ListBuffer[Any]): Unit ={
println("innerSync(formatted:collection.mutable.ListBuffer[Any])!!!")
val clientSet:collection.mutable.HashSet[Client] = collection.mutable.HashSet[Client]()
try{
println("clients innerSync: "+clients.size)
// println(clients)
breakable{
for(client <- clients) { // 队列
if (getPipelinedResponseLength > 0) {
// println("innerSync client: " + client)
// println("getPipelinedResponseLength: " + getPipelinedResponseLength)
// println("client get: "+client.get("testk1"))
val data: Any = generateResponse(client.getOne).get
// println("data: " + data)
// 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
// val unformatted: util.List[AnyRef] = client.getMany(getPipelinedResponseLength)
// println("unformatted: "+unformatted)
// 从inputStream中读取所有回复
// println("client.getObjectMultiBulkReply: "+client.getObjectMultiBulkReply)
// val unformatted: Array[AnyRef] = client.getObjectMultiBulkReply.toArray
if (formatted != null) {
formatted += data
// for (o <- unformatted.toArray) {
// try{
// val rep = generateResponse(o).get
// formatted += rep
// }
// catch {
// case e: JedisDataException =>
// formatted += e
// }
// }
}
// size相同说明所有的client都已经添加,就不用再调用add方法了
if (clientSet.size != jedisMap.size) {
clientSet.add(client)
}
}else{break}
}
}
}catch {
case e1:JedisRedirectionException =>{
if(e1.isInstanceOf[JedisMovedDataException]){
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
refreshCluster()
}
println("error: JedisRedirectionException "+e1)
throw e1
}
case e:Exception => println("error: generateResponse "+e)
}finally {
if(clientSet.size != jedisMap.size){
// 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
for(jedis <- jedisMap.values){
breakable{
if(clientSet.contains(jedis.getClient)) {break}
try {
// 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
jedis.getClient.getMany(getPipelinedResponseLength)
}catch {
case ex:RuntimeException => println("error: RuntimeException "+ex)
}
}
}
}
close()
}
}
//查看数据类型方法-manOf(data)
def manOf[T:Manifest](t:T):Manifest[T]=manifest[T]
}
** 代码3:在每个主节点上每个槽构建一个transaction,对这个槽上的key进行操作.**
// 从redis上获取全部符合正则匹配的key
val pattern = "User_Label_Act_-u[0123456789]*"
var keys:List[AnyRef] = List()
// 1.获得cluster中所有节点
val clusterNodes:util.Map[String, JedisPool] = cluster.getClusterNodes
// 2. 对集群中每个节点都执行keys(pattern)
for(node <- nodes_master){
val time_start_key:Long = System.currentTimeMillis()
println("node: "+node)
val jedisPool:JedisPool = clusterNodes.get(node)
val jedisConn:Jedis = jedisPool.getResource
// 获得集群上每个节点上符合pattern的所有的key
keys = jedisConn.keys(pattern).toArray().toList
if(keys.isEmpty){
...
}else{
// redis cluster对于事务的支持只能在也一个slot上完成
val slot_multi:collection.mutable.HashMap[String,collection.mutable.ListBuffer[String]] = collection.mutable.HashMap[String,collection.mutable.ListBuffer[String]]()
for(key <- keys){
val bKey:Array[Byte] = SafeEncoder.encode(key.toString)
val slot:String = JedisClusterCRC16.getSlot(bKey).toString
if(slot_multi.keys.exists(x=>x==slot)){
slot_multi(slot).append(key.toString)
}else{
slot_multi.put(slot,collection.mutable.ListBuffer(key.toString))
}
}
println("slot_multi: "+slot_multi.size+" "+slot_multi.keys)
for(slot_key <- slot_multi.keys){
println("slot_key keys.size: "+slot_key+" "+slot_multi(slot_key).length)
val tx:Transaction = jedisConn.multi()
for(key <- slot_multi(slot_key)){
tx操作(set,rpush等)
}
tx.exec()
tx.clear()
tx.close()
}
}
jedisConn.close()
}