使用背景:
在分布式场景下,对一批数据进行处理,会先将数据存入redis缓存中,各服务器从redis中读取数据进行处理。为了避免一条数据被多个服务器同时读取,采用spop的方式获取数据。由于数据处理过程中可能因受网络或数据格式不规范等因素影响导致中断而数据丢失的问题。
使用思路:
在使用spop方式获取数据时,可通过用redis的evalsha功能实现数据的原子性操作来对数据进行获取且备份。在完成数据处理后,可使用srem删除指定数据。直到原始数据库的数据消耗完,检测备份数据库是否还有数据,如果有,则将备份数据库中的数据放回原始数据库中(直接调用rename修改数据库名)。
具体操作:
1、构建一个redis工具类
import redis.clients.jedis.Jedis; import java.util.Arrays; public class RedisUtils { static String scriptLoadId; static Jedis queueJedis; private int databaseNum; public RedisUtils(String host, String password, int databaseNum) { this.databaseNum = databaseNum; queueJedis = new Jedis("127.0.0.1", 6379); queueJedis.auth("password!"); queueJedis.select(databaseNum); } public RedisUtils spopAndSadd() { scriptLoadId = queueJedis.scriptLoad( // 把发生数据变更的命令以事务的方式做持久化和主从复制,从而允许在Lua脚本内进行随机写入 "redis.replicate_commands() " + "redis.call('select', " + databaseNum + ") " + "local ret = redis.call('spop',KEYS[1]) " + "if type(ret) == 'boolean' then " + " return ret " + "else " + " redis.call('sadd',KEYS[2],ret) " + " return ret " + "end"); return this; } public Object getDataFromRedis(String dataSource, String dataBackup) { /** * 使用redis的Evalsha 命令根据给定的 sha1 校验码,执行缓存在服务器中的脚本。 * * 执行该命令需要以下四个参数 * sha1 : 通过 SCRIPT LOAD 生成的 sha1 校验码。 * numkeys: 用于指定键名参数的个数。 * key [key ...]: 键名参数。脚本中使用方法(KEYS[1],KEYS[2]...) * arg [arg ...]: 附加参数。脚本中使用方法(ARGV[1],ARGV[2]...) * * jedis 中原始方法如下 * public Object evalsha(String sha1, int keyCount, String... params) { * 1、判断是否开启了事务或管道 * 2、将参数转化为字节数组,调用connect()构建套接字连接端口,使用RedisOutputStream写入命令给redis * 3、返回执行结果 使用RedisInputStream获取执行结果 * } * * jedis做了封装, keyCount是计算keys 数组的大小, params是由keys、args合并成新的数组 * public Object evalsha(String sha1, List<String> keys, List<String> args) {} */ return queueJedis.evalsha(scriptLoadId, Arrays.asList(dataSource, dataBackup), Arrays.asList("")); } }
2、使用案例
public class Test { public static void main(String[] args) { RedisUtils redisUtils = new RedisUtils("127.0.0.1", "password", 1); Object dataFromRedis = redisUtils.spopAndSadd().getDataFromRedis("dataSource", "dataBackup"); if (dataFromRedis instanceof String) { String text = (String) dataFromRedis; System.out.println("完成" + text + "数据消费"); } else { // 数据使用失败,将数据放回原来的redis中,等待下次使用 redisUtils.spopAndSadd().getDataFromRedis("dataBackup", "dataSource"); } } }