.Net Core 缓存方式(二)StackExchangeRedisCache实现(3)
Distributed Redis Cache 是什么
Redis是一种开源的内存中数据存储,通常用作分布式缓存
使用方式
- Startup.ConfigureServices
nuget Microsoft.Extensions.Caching.StackExchangeRedis
services.AddStackExchangeRedisCache(options =>
{
options.Configuration = "localhost";
options.InstanceName = "SampleInstance";
});
源码以及实现
依赖注入 StackExchangeRedisCacheServiceCollectionExtensions.cs
public static IServiceCollection AddStackExchangeRedisCache(this IServiceCollection services, Action<RedisCacheOptions> setupAction)
{
if (services == null)
{
throw new ArgumentNullException(nameof(services));
}
if (setupAction == null)
{
throw new ArgumentNullException(nameof(setupAction));
}
services.AddOptions();
services.Configure(setupAction);
services.Add(ServiceDescriptor.Singleton<IDistributedCache, RedisCache>());
return services;
}
实现IDistributedCache RedisCache.cs
RedisCache 为单例模式
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
namespace Microsoft.Extensions.Caching.StackExchangeRedis
{
public class RedisCache : IDistributedCache, IDisposable
{
// KEYS[1] = = key
// ARGV[1] = absolute-expiration - ticks as long (-1 for none)
// ARGV[2] = sliding-expiration - ticks as long (-1 for none)
// ARGV[3] = relative-expiration (long, in seconds, -1 for none) - Min(absolute-expiration - Now, sliding-expiration)
// ARGV[4] = data - byte[]
// this order should not change LUA script depends on it
private const string SetScript = (@"
redis.call('HMSET', KEYS[1], 'absexp', ARGV[1], 'sldexp', ARGV[2], 'data', ARGV[4])
if ARGV[3] ~= '-1' then
redis.call('EXPIRE', KEYS[1], ARGV[3])
end
return 1");
private const string AbsoluteExpirationKey = "absexp";
private const string SlidingExpirationKey = "sldexp";
private const string DataKey = "data";
private const long NotPresent = -1;
private volatile ConnectionMultiplexer _connection;
private IDatabase _cache;
private readonly RedisCacheOptions _options;
private readonly string _instance;
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
public RedisCache(IOptions<RedisCacheOptions> optionsAccessor)
{
if (optionsAccessor == null)
{
throw new ArgumentNullException(nameof(optionsAccessor));
}
_options = optionsAccessor.Value;
// This allows partitioning a single backend cache for use with multiple apps/services.
_instance = _options.InstanceName ?? string.Empty;
}
public byte[] Get(string key)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
return GetAndRefresh(key, getData: true);
}
public async Task<byte[]> GetAsync(string key, CancellationToken token = default(CancellationToken))
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
return await GetAndRefreshAsync(key, getData: true, token: token).ConfigureAwait(false);
}
public void Set(string key, byte[] value, DistributedCacheEntryOptions options)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
Connect();
var creationTime = DateTimeOffset.UtcNow;
var absoluteExpiration = GetAbsoluteExpiration(creationTime, options);
var result = _cache.ScriptEvaluate(SetScript, new RedisKey[] { _instance + key },
new RedisValue[]
{
absoluteExpiration?.Ticks ?? NotPresent,
options.SlidingExpiration?.Ticks ?? NotPresent,
GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent,
value
});
}
public async Task SetAsync(string key, byte[] value, DistributedCacheEntryOptions options, CancellationToken token = default(CancellationToken))
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
if (value == null)
{
throw new ArgumentNullException(nameof(value));
}
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}
token.ThrowIfCancellationRequested();
await ConnectAsync(token).ConfigureAwait(false);
var creationTime = DateTimeOffset.UtcNow;
var absoluteExpiration = GetAbsoluteExpiration(creationTime, options);
await _cache.ScriptEvaluateAsync(SetScript, new RedisKey[] { _instance + key },
new RedisValue[]
{
absoluteExpiration?.Ticks ?? NotPresent,
options.SlidingExpiration?.Ticks ?? NotPresent,
GetExpirationInSeconds(creationTime, absoluteExpiration, options) ?? NotPresent,
value
}).ConfigureAwait(false);
}
public void Refresh(string key)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
GetAndRefresh(key, getData: false);
}
public async Task RefreshAsync(string key, CancellationToken token = default(CancellationToken))
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
await GetAndRefreshAsync(key, getData: false, token: token).ConfigureAwait(false);
}
private void Connect()
{
if (_cache != null)
{
return;
}
_connectionLock.Wait();
try
{
if (_cache == null)
{
if (_options.ConfigurationOptions != null)
{
_connection = ConnectionMultiplexer.Connect(_options.ConfigurationOptions);
}
else
{
_connection = ConnectionMultiplexer.Connect(_options.Configuration);
}
_cache = _connection.GetDatabase();
}
}
finally
{
_connectionLock.Release();
}
}
private async Task ConnectAsync(CancellationToken token = default(CancellationToken))
{
token.ThrowIfCancellationRequested();
if (_cache != null)
{
return;
}
await _connectionLock.WaitAsync(token).ConfigureAwait(false);
try
{
if (_cache == null)
{
if (_options.ConfigurationOptions != null)
{
_connection = await ConnectionMultiplexer.ConnectAsync(_options.ConfigurationOptions).ConfigureAwait(false);
}
else
{
_connection = await ConnectionMultiplexer.ConnectAsync(_options.Configuration).ConfigureAwait(false);
}
_cache = _connection.GetDatabase();
}
}
finally
{
_connectionLock.Release();
}
}
private byte[] GetAndRefresh(string key, bool getData)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
Connect();
// This also resets the LRU status as desired.
// TODO: Can this be done in one operation on the server side? Probably, the trick would just be the DateTimeOffset math.
RedisValue[] results;
if (getData)
{
results = _cache.HashMemberGet(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey, DataKey);
}
else
{
results = _cache.HashMemberGet(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey);
}
// TODO: Error handling
if (results.Length >= 2)
{
MapMetadata(results, out DateTimeOffset? absExpr, out TimeSpan? sldExpr);
Refresh(key, absExpr, sldExpr);
}
if (results.Length >= 3 && results[2].HasValue)
{
return results[2];
}
return null;
}
private async Task<byte[]> GetAndRefreshAsync(string key, bool getData, CancellationToken token = default(CancellationToken))
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
await ConnectAsync(token).ConfigureAwait(false);
// This also resets the LRU status as desired.
// TODO: Can this be done in one operation on the server side? Probably, the trick would just be the DateTimeOffset math.
RedisValue[] results;
if (getData)
{
results = await _cache.HashMemberGetAsync(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey, DataKey).ConfigureAwait(false);
}
else
{
results = await _cache.HashMemberGetAsync(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey).ConfigureAwait(false);
}
// TODO: Error handling
if (results.Length >= 2)
{
MapMetadata(results, out DateTimeOffset? absExpr, out TimeSpan? sldExpr);
await RefreshAsync(key, absExpr, sldExpr, token).ConfigureAwait(false);
}
if (results.Length >= 3 && results[2].HasValue)
{
return results[2];
}
return null;
}
public void Remove(string key)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
Connect();
_cache.KeyDelete(_instance + key);
// TODO: Error handling
}
public async Task RemoveAsync(string key, CancellationToken token = default(CancellationToken))
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
await ConnectAsync(token).ConfigureAwait(false);
await _cache.KeyDeleteAsync(_instance + key).ConfigureAwait(false);
// TODO: Error handling
}
private void MapMetadata(RedisValue[] results, out DateTimeOffset? absoluteExpiration, out TimeSpan? slidingExpiration)
{
absoluteExpiration = null;
slidingExpiration = null;
var absoluteExpirationTicks = (long?)results[0];
if (absoluteExpirationTicks.HasValue && absoluteExpirationTicks.Value != NotPresent)
{
absoluteExpiration = new DateTimeOffset(absoluteExpirationTicks.Value, TimeSpan.Zero);
}
var slidingExpirationTicks = (long?)results[1];
if (slidingExpirationTicks.HasValue && slidingExpirationTicks.Value != NotPresent)
{
slidingExpiration = new TimeSpan(slidingExpirationTicks.Value);
}
}
private void Refresh(string key, DateTimeOffset? absExpr, TimeSpan? sldExpr)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
// Note Refresh has no effect if there is just an absolute expiration (or neither).
TimeSpan? expr = null;
if (sldExpr.HasValue)
{
if (absExpr.HasValue)
{
var relExpr = absExpr.Value - DateTimeOffset.Now;
expr = relExpr <= sldExpr.Value ? relExpr : sldExpr;
}
else
{
expr = sldExpr;
}
_cache.KeyExpire(_instance + key, expr);
// TODO: Error handling
}
}
private async Task RefreshAsync(string key, DateTimeOffset? absExpr, TimeSpan? sldExpr, CancellationToken token = default(CancellationToken))
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
token.ThrowIfCancellationRequested();
// Note Refresh has no effect if there is just an absolute expiration (or neither).
TimeSpan? expr = null;
if (sldExpr.HasValue)
{
if (absExpr.HasValue)
{
var relExpr = absExpr.Value - DateTimeOffset.Now;
expr = relExpr <= sldExpr.Value ? relExpr : sldExpr;
}
else
{
expr = sldExpr;
}
await _cache.KeyExpireAsync(_instance + key, expr).ConfigureAwait(false);
// TODO: Error handling
}
}
private static long? GetExpirationInSeconds(DateTimeOffset creationTime, DateTimeOffset? absoluteExpiration, DistributedCacheEntryOptions options)
{
if (absoluteExpiration.HasValue && options.SlidingExpiration.HasValue)
{
return (long)Math.Min(
(absoluteExpiration.Value - creationTime).TotalSeconds,
options.SlidingExpiration.Value.TotalSeconds);
}
else if (absoluteExpiration.HasValue)
{
return (long)(absoluteExpiration.Value - creationTime).TotalSeconds;
}
else if (options.SlidingExpiration.HasValue)
{
return (long)options.SlidingExpiration.Value.TotalSeconds;
}
return null;
}
private static DateTimeOffset? GetAbsoluteExpiration(DateTimeOffset creationTime, DistributedCacheEntryOptions options)
{
if (options.AbsoluteExpiration.HasValue && options.AbsoluteExpiration <= creationTime)
{
throw new ArgumentOutOfRangeException(
nameof(DistributedCacheEntryOptions.AbsoluteExpiration),
options.AbsoluteExpiration.Value,
"The absolute expiration value must be in the future.");
}
var absoluteExpiration = options.AbsoluteExpiration;
if (options.AbsoluteExpirationRelativeToNow.HasValue)
{
absoluteExpiration = creationTime + options.AbsoluteExpirationRelativeToNow;
}
return absoluteExpiration;
}
public void Dispose()
{
if (_connection != null)
{
_connection.Close();
}
}
}
}
Get =》 GetAndRefresh
private byte[] GetAndRefresh(string key, bool getData)
{
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
// 连接
Connect();
// This also resets the LRU status as desired.
// TODO: Can this be done in one operation on the server side? Probably, the trick would just be the DateTimeOffset math.
RedisValue[] results;
if (getData)
{
results = _cache.HashMemberGet(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey, DataKey);
}
else
{
results = _cache.HashMemberGet(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey);
}
// TODO: Error handling
if (results.Length >= 2)
{
MapMetadata(results, out DateTimeOffset? absExpr, out TimeSpan? sldExpr);
Refresh(key, absExpr, sldExpr);
}
if (results.Length >= 3 && results[2].HasValue)
{
return results[2];
}
return null;
}
Connect()
private void Connect()
{
if (_cache != null)
{
return;
}
_connectionLock.Wait();
try
{
if (_cache == null)
{
if (_options.ConfigurationOptions != null)
{
_connection = ConnectionMultiplexer.Connect(_options.ConfigurationOptions);
}
else
{
_connection = ConnectionMultiplexer.Connect(_options.Configuration);
}
_cache = _connection.GetDatabase();
}
}
finally
{
_connectionLock.Release();
}
}
private IDatabase _cache 来自扩展方法 RedisExtensions.cs 实现
_cache.HashMemberGet(_instance + key, AbsoluteExpirationKey, SlidingExpirationKey)
internal static class RedisExtensions
{
private const string HmGetScript = (@"return redis.call('HMGET', KEYS[1], unpack(ARGV))");
internal static RedisValue[] HashMemberGet(this IDatabase cache, string key, params string[] members)
{
var result = cache.ScriptEvaluate(
HmGetScript,
new RedisKey[] { key },
GetRedisMembers(members));
// TODO: Error checking?
return (RedisValue[])result;
}
internal static async Task<RedisValue[]> HashMemberGetAsync(
this IDatabase cache,
string key,
params string[] members)
{
var result = await cache.ScriptEvaluateAsync(
HmGetScript,
new RedisKey[] { key },
GetRedisMembers(members)).ConfigureAwait(false);
// TODO: Error checking?
return (RedisValue[])result;
}
private static RedisValue[] GetRedisMembers(params string[] members)
{
var redisMembers = new RedisValue[members.Length];
for (int i = 0; i < members.Length; i++)
{
redisMembers[i] = (RedisValue)members[i];
}
return redisMembers;
}
}
Connect() 实现StackExchange.Redis
stackexchange.github.io/StackExchange.Redis/Basics
_connection = ConnectionMultiplexer.Connect(_options.Configuration);
using StackExchange.Redis;
...
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost");
// ^^^ store and re-use this!!!
IDatabase db = redis.GetDatabase();
string value = "abcdefg";
db.StringSet("mykey", value);
...
string value = db.StringGet("mykey");
Console.WriteLine(value); // writes: "abcdefg"
private readonly RedisCacheOptions _options;
private volatile ConnectionMultiplexer _connection;
->
if (_options.ConfigurationOptions != null)
{
_connection = ConnectionMultiplexer.Connect(_options.ConfigurationOptions);
}
else
{
_connection = ConnectionMultiplexer.Connect(_options.Configuration);
}
/// <summary>
/// Configuration options for <see cref="RedisCache"/>.
/// </summary>
public class RedisCacheOptions : IOptions<RedisCacheOptions>
{
/// <summary>
/// The configuration used to connect to Redis.
/// </summary>
public string Configuration { get; set; }
/// <summary>
/// The configuration used to connect to Redis.
/// This is preferred over Configuration.
/// </summary>
public ConfigurationOptions ConfigurationOptions { get; set; }
/// <summary>
/// The Redis instance name.
/// </summary>
public string InstanceName { get; set; }
RedisCacheOptions IOptions<RedisCacheOptions>.Value
{
get { return this; }
}
}
C# volatile 关键字
volatile 关键字指示一个字段可以由多个同时执行的线程修改。 出于性能原因,编译器,运行时系统甚至硬件都可能重新排列对存储器位置的读取和写入。 声明了 volatile 的字段不进行这些优化。 添加 volatile 修饰符可确保所有线程观察易失性写入操作(由任何其他线程执行)时的观察顺序与写入操作的执行顺序一致。 不确保从所有执行线程整体来看时所有易失性写入操作均按执行顺序排序。
https://docs.microsoft.com/zh-cn/dotnet/csharp/language-reference/keywords/volatile
SemaphoreSlim 信号量
private readonly SemaphoreSlim _connectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
使用 信号量 实现多线程不会产生多个 IDatabase 对象
_connectionLock.Wait();
try
{
......
}
finally
{
_connectionLock.Release();
}
StackExchange.Redis Configuration
StackExchange.Redis有两种配置模式:
- var conn = ConnectionMultiplexer.Connect("localhost");
- ConfigurationOptions options = ConfigurationOptions.Parse(configString);