C#中的多級緩存架構設計與實現深度解析
引言
在現代分布式應用架構中,緩存已成為提升系統性能和用戶體驗的關鍵技術組件。隨著業務規模的不斷擴大和并發量的持續增長,單一級別的緩存往往無法滿足復雜的性能需求。多級緩存架構通過在不同層次構建緩存體系,能夠顯著提升數據訪問效率,降低數據庫負載,并提供更好的系統可擴展性。
本文將深入探討C#環境下多級緩存的架構設計與實現,重點分析內存緩存(Memory Cache)與Redis分布式緩存的協同工作機制,并詳細闡述如何通過Redis的發布-訂閱(Pub/Sub)模式實現不同節點間的緩存狀態同步。
1. 多級緩存理論基礎
1.1 緩存層次結構理論
緩存的本質是利用時間局部性(Temporal Locality)和空間局部性(Spatial Locality)原理,將頻繁訪問的數據存儲在更快的存儲介質中。在計算機系統中,從CPU緩存到內存,從內存到磁盤,都遵循著這種層次化的存儲架構。
1.1.1 緩存訪問模式
CPU Cache (L1/L2/L3) → Memory → Disk Storage
↑ ↑ ↑
快速訪問 中等速度 慢速訪問
小容量 中等容量 大容量
昂貴 適中 便宜
在應用層面,多級緩存同樣遵循類似的原理:
- L1緩存(進程內存緩存): 訪問速度最快,容量相對較小,僅在當前進程內有效
- L2緩存(分布式緩存): 訪問速度中等,容量較大,在多個節點間共享
- L3緩存(數據庫查詢緩存): 訪問速度最慢,但提供持久化存儲
1.2 緩存一致性理論
1.2.1 CAP定理在緩存系統中的應用
根據CAP定理(Consistency, Availability, Partition tolerance),在分布式緩存系統中,我們無法同時保證:
- 一致性(Consistency): 所有節點在同一時間具有相同的數據
- 可用性(Availability): 系統持續可用,即使某些節點出現故障
- 分區容錯性(Partition Tolerance): 系統能夠容忍網絡分區故障
在實際應用中,我們通常采用最終一致性(Eventually Consistency)模型,通過合理的同步策略和過期機制來平衡性能與一致性。
1.2.2 緩存穿透、擊穿、雪崩問題
緩存穿透(Cache Penetration):
- 現象:查詢不存在的數據,繞過緩存直接訪問數據庫
- 解決方案:布隆過濾器、空值緩存
緩存擊穿(Cache Breakdown):
- 現象:熱點數據過期時,大量并發請求同時訪問數據庫
- 解決方案:分布式鎖、熱點數據永不過期
緩存雪崩(Cache Avalanche):
- 現象:大量緩存同時失效,導致數據庫壓力驟增
- 解決方案:過期時間隨機化、多級緩存、熔斷機制
2. 架構設計與技術選型
2.0 系統架構流程圖
2.0.1 多級緩存整體架構
2.0.2 緩存操作流程圖
2.0.3 Redis發布-訂閱同步機制
2.0.4 緩存降級策略流程
2. 架構設計與技術選型
2.1 整體架構設計
多級緩存架構采用分層設計模式,每一層都有明確的職責和邊界:
┌─────────────────────────────────────────────────────┐
│ 應用層 │
├─────────────────────────────────────────────────────┤
│ 多級緩存管理器 │
├─────────────────┬───────────────────────────────────┤
│ L1內存緩存 │ L2 Redis緩存 │
│ (MemoryCache) │ (StackExchange.Redis) │
├─────────────────┴───────────────────────────────────┤
│ Redis Pub/Sub 同步機制 │
├─────────────────────────────────────────────────────┤
│ 數據持久層 │
└─────────────────────────────────────────────────────┘
2.2 技術選型分析
2.2.1 內存緩存選型
Microsoft.Extensions.Caching.Memory:
- 優勢:.NET官方支持,與DI容器無縫集成,支持過期策略和內存壓力驅逐
- 適用場景:單體應用、微服務單實例緩存
- 特性:線程安全、支持泛型、內置壓縮和序列化
System.Runtime.Caching.MemoryCache:
- 優勢:.NET Framework傳統方案,功能成熟
- 劣勢:不支持.NET Core,API相對古老
2.2.2 分布式緩存選型
StackExchange.Redis:
- 優勢:高性能、功能全面、支持集群、活躍的社區支持
- 特性:異步操作、連接復用、故障轉移、Lua腳本支持
- 版本選擇:推薦使用2.6+版本,支持.NET 6+的新特性
ServiceStack.Redis:
- 優勢:易用性好,文檔完善
- 劣勢:商業許可限制,性能相對較低
2.3 架構模式選擇
2.3.1 Cache-Aside Pattern(緩存旁路模式)
這是最常用的緩存模式,應用程序負責管理緩存的讀取和更新:
讀取流程:
1. 應用程序嘗試從緩存讀取數據
2. 如果緩存命中,直接返回數據
3. 如果緩存未命中,從數據庫讀取數據
4. 將數據寫入緩存,然后返回給應用程序
更新流程:
1. 更新數據庫
2. 刪除或更新緩存中的對應數據
2.3.2 Write-Through Pattern(寫透模式)
寫入流程:
1. 應用程序寫入緩存
2. 緩存服務同步寫入數據庫
3. 確認寫入完成后返回成功
2.3.3 Write-Behind Pattern(寫回模式)
寫入流程:
1. 應用程序寫入緩存
2. 立即返回成功
3. 緩存服務異步批量寫入數據庫
3. 內存緩存層實現詳解
3.1 IMemoryCache 核心接口分析
Microsoft.Extensions.Caching.Memory.IMemoryCache接口提供了緩存操作的核心方法:
public interface IMemoryCache : IDisposable
{
bool TryGetValue(object key, out object value);
ICacheEntry CreateEntry(object key);
void Remove(object key);
}
3.2 高級內存緩存封裝實現
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Text.RegularExpressions;
using System.Runtime.Serialization;
/// <summary>
/// 緩存異常基類
/// </summary>
public abstract class CacheException : Exception
{
protected CacheException(string message) : base(message) { }
protected CacheException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 緩存連接異常
/// </summary>
public class CacheConnectionException : CacheException
{
public CacheConnectionException(string message) : base(message) { }
public CacheConnectionException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 緩存序列化異常
/// </summary>
public class CacheSerializationException : CacheException
{
public CacheSerializationException(string message) : base(message) { }
public CacheSerializationException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 緩存超時異常
/// </summary>
public class CacheTimeoutException : CacheException
{
public CacheTimeoutException(string message) : base(message) { }
public CacheTimeoutException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 緩存驗證異常
/// </summary>
public class CacheValidationException : CacheException
{
public CacheValidationException(string message) : base(message) { }
public CacheValidationException(string message, Exception innerException) : base(message, innerException) { }
}
/// <summary>
/// 線程安全的緩存統計追蹤器
/// </summary>
public class CacheStatisticsTracker
{
private long _totalOperations = 0;
private long _l1Hits = 0;
private long _l2Hits = 0;
private long _totalMisses = 0;
private readonly object _lock = new object();
public void RecordOperation()
{
Interlocked.Increment(ref _totalOperations);
}
public void RecordHit(CacheLevel level)
{
switch (level)
{
case CacheLevel.L1:
Interlocked.Increment(ref _l1Hits);
break;
case CacheLevel.L2:
Interlocked.Increment(ref _l2Hits);
break;
}
}
public void RecordMiss()
{
Interlocked.Increment(ref _totalMisses);
}
public CacheStatisticsSnapshot GetSnapshot()
{
return new CacheStatisticsSnapshot
{
TotalOperations = Interlocked.Read(ref _totalOperations),
L1Hits = Interlocked.Read(ref _l1Hits),
L2Hits = Interlocked.Read(ref _l2Hits),
TotalMisses = Interlocked.Read(ref _totalMisses)
};
}
public void Reset()
{
lock (_lock)
{
Interlocked.Exchange(ref _totalOperations, 0);
Interlocked.Exchange(ref _l1Hits, 0);
Interlocked.Exchange(ref _l2Hits, 0);
Interlocked.Exchange(ref _totalMisses, 0);
}
}
}
/// <summary>
/// 緩存統計快照
/// </summary>
public class CacheStatisticsSnapshot
{
public long TotalOperations { get; init; }
public long L1Hits { get; init; }
public long L2Hits { get; init; }
public long TotalMisses { get; init; }
public long TotalHits => L1Hits + L2Hits;
public double OverallHitRatio => TotalOperations == 0 ? 0 : (double)TotalHits / TotalOperations;
public double L1HitRatio => TotalOperations == 0 ? 0 : (double)L1Hits / TotalOperations;
public double L2HitRatio => TotalOperations == 0 ? 0 : (double)L2Hits / TotalOperations;
}
/// <summary>
/// 緩存數據驗證器接口
/// </summary>
public interface ICacheDataValidator
{
bool IsValid<T>(T value);
void ValidateKey(string key);
bool IsSafeForSerialization<T>(T value);
}
/// <summary>
/// 默認緩存數據驗證器
/// </summary>
public class DefaultCacheDataValidator : ICacheDataValidator
{
private readonly ILogger<DefaultCacheDataValidator> _logger;
private readonly HashSet<Type> _forbiddenTypes;
private readonly Regex _keyValidationRegex;
public DefaultCacheDataValidator(ILogger<DefaultCacheDataValidator> logger)
{
_logger = logger;
_forbiddenTypes = new HashSet<Type>
{
typeof(System.IO.FileStream),
typeof(System.Net.Sockets.Socket),
typeof(System.Threading.Thread),
typeof(System.Threading.Tasks.Task)
};
// 限制key格式:只允許字母數字下劃線冒號和點
_keyValidationRegex = new Regex(@"^[a-zA-Z0-9_:.-]+$", RegexOptions.Compiled);
}
public bool IsValid<T>(T value)
{
if (value == null) return true;
var valueType = value.GetType();
// 檢查禁止類型
if (_forbiddenTypes.Contains(valueType))
{
_logger.LogWarning("Forbidden type in cache: {Type}", valueType.Name);
return false;
}
// 檢查循環引用(簡化版)
if (HasCircularReference(value))
{
_logger.LogWarning("Circular reference detected in cache value");
return false;
}
return true;
}
public void ValidateKey(string key)
{
if (string.IsNullOrWhiteSpace(key))
throw new CacheValidationException("Cache key cannot be null or empty");
if (key.Length > 250)
throw new CacheValidationException($"Cache key too long: {key.Length} characters");
if (!_keyValidationRegex.IsMatch(key))
throw new CacheValidationException($"Invalid characters in cache key: {key}");
}
public bool IsSafeForSerialization<T>(T value)
{
if (value == null) return true;
var valueType = value.GetType();
// 檢查是否有序列化屬性
if (valueType.IsSerializable ||
valueType.GetCustomAttributes(typeof(DataContractAttribute), false).Length > 0)
{
return true;
}
// 原始類型和字符串通常安全
return valueType.IsPrimitive || valueType == typeof(string) || valueType == typeof(DateTime);
}
private bool HasCircularReference(object obj, HashSet<object> visited = null)
{
if (obj == null) return false;
visited ??= new HashSet<object>();
if (visited.Contains(obj))
return true;
visited.Add(obj);
// 簡化的循環檢測,只檢查一層
var type = obj.GetType();
if (type.IsPrimitive || type == typeof(string))
return false;
visited.Remove(obj);
return false;
}
}
/// <summary>
/// 安全緩存管理器裝飾器
/// </summary>
public class SecureCacheManagerDecorator : IAdvancedMemoryCache
{
private readonly IAdvancedMemoryCache _innerCache;
private readonly ICacheDataValidator _validator;
private readonly ILogger<SecureCacheManagerDecorator> _logger;
public SecureCacheManagerDecorator(
IAdvancedMemoryCache innerCache,
ICacheDataValidator validator,
ILogger<SecureCacheManagerDecorator> logger)
{
_innerCache = innerCache ?? throw new ArgumentNullException(nameof(innerCache));
_validator = validator ?? throw new ArgumentNullException(nameof(validator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
{
_validator.ValidateKey(key);
return await _innerCache.GetOrSetAsync(key, async () =>
{
var value = await factory();
if (!_validator.IsValid(value))
{
throw new CacheValidationException($"Invalid cache value for key: {key}");
}
return value;
}, expiry);
}
public async Task<T> GetAsync<T>(string key)
{
_validator.ValidateKey(key);
return await _innerCache.GetAsync<T>(key);
}
public async Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
_validator.ValidateKey(key);
if (!_validator.IsValid(value))
{
throw new CacheValidationException($"Invalid cache value for key: {key}");
}
if (!_validator.IsSafeForSerialization(value))
{
_logger.LogWarning("Potentially unsafe serialization for key: {Key}, type: {Type}",
key, value?.GetType().Name);
}
await _innerCache.SetAsync(key, value, expiry);
}
public async Task RemoveAsync(string key)
{
_validator.ValidateKey(key);
await _innerCache.RemoveAsync(key);
}
public async Task RemoveByPatternAsync(string pattern)
{
if (string.IsNullOrWhiteSpace(pattern))
throw new CacheValidationException("Pattern cannot be null or empty");
await _innerCache.RemoveByPatternAsync(pattern);
}
public CacheStatistics GetStatistics() => _innerCache.GetStatistics();
public void ClearStatistics() => _innerCache.ClearStatistics();
}
/// <summary>
/// 序列化器接口
/// </summary>
public interface ICacheSerializer
{
byte[] Serialize<T>(T value);
T Deserialize<T>(byte[] data);
string SerializerName { get; }
bool SupportsType(Type type);
}
/// <summary>
/// JSON序列化器(默認)
/// </summary>
public class JsonCacheSerializer : ICacheSerializer
{
private readonly JsonSerializerOptions _options;
public string SerializerName => "JSON";
public JsonCacheSerializer()
{
_options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNameCaseInsensitive = true
};
}
public byte[] Serialize<T>(T value)
{
if (value == null) return null;
if (typeof(T) == typeof(string)) return System.Text.Encoding.UTF8.GetBytes(value.ToString());
var json = JsonSerializer.Serialize(value, _options);
return System.Text.Encoding.UTF8.GetBytes(json);
}
public T Deserialize<T>(byte[] data)
{
if (data == null || data.Length == 0) return default(T);
if (typeof(T) == typeof(string)) return (T)(object)System.Text.Encoding.UTF8.GetString(data);
var json = System.Text.Encoding.UTF8.GetString(data);
return JsonSerializer.Deserialize<T>(json, _options);
}
public bool SupportsType(Type type)
{
return true; // JSON支持所有類型
}
}
/// <summary>
/// 二進制序列化器(用于簡單類型)
/// </summary>
public class BinaryCacheSerializer : ICacheSerializer
{
public string SerializerName => "Binary";
private static readonly HashSet<Type> SupportedTypes = new()
{
typeof(int), typeof(long), typeof(double), typeof(float),
typeof(bool), typeof(byte), typeof(short),
typeof(DateTime), typeof(DateTimeOffset), typeof(TimeSpan),
typeof(Guid), typeof(decimal)
};
public byte[] Serialize<T>(T value)
{
if (value == null) return null;
var type = typeof(T);
// 專門處理常見類型,提高性能
return type switch
{
_ when type == typeof(int) => BitConverter.GetBytes((int)(object)value),
_ when type == typeof(long) => BitConverter.GetBytes((long)(object)value),
_ when type == typeof(double) => BitConverter.GetBytes((double)(object)value),
_ when type == typeof(float) => BitConverter.GetBytes((float)(object)value),
_ when type == typeof(bool) => BitConverter.GetBytes((bool)(object)value),
_ when type == typeof(DateTime) => BitConverter.GetBytes(((DateTime)(object)value).ToBinary()),
_ when type == typeof(Guid) => ((Guid)(object)value).ToByteArray(),
_ when type == typeof(string) => System.Text.Encoding.UTF8.GetBytes(value.ToString()),
_ => throw new NotSupportedException($"Type {type.Name} is not supported by BinaryCacheSerializer")
};
}
public T Deserialize<T>(byte[] data)
{
if (data == null || data.Length == 0) return default(T);
var type = typeof(T);
object result = type switch
{
_ when type == typeof(int) => BitConverter.ToInt32(data, 0),
_ when type == typeof(long) => BitConverter.ToInt64(data, 0),
_ when type == typeof(double) => BitConverter.ToDouble(data, 0),
_ when type == typeof(float) => BitConverter.ToSingle(data, 0),
_ when type == typeof(bool) => BitConverter.ToBoolean(data, 0),
_ when type == typeof(DateTime) => DateTime.FromBinary(BitConverter.ToInt64(data, 0)),
_ when type == typeof(Guid) => new Guid(data),
_ when type == typeof(string) => System.Text.Encoding.UTF8.GetString(data),
_ => throw new NotSupportedException($"Type {type.Name} is not supported by BinaryCacheSerializer")
};
return (T)result;
}
public bool SupportsType(Type type)
{
return SupportedTypes.Contains(type) || type == typeof(string);
}
}
/// <summary>
/// 智能序列化器管理器
/// </summary>
public class SmartCacheSerializer : ICacheSerializer
{
private readonly ICacheSerializer[] _serializers;
private readonly ILogger<SmartCacheSerializer> _logger;
public string SerializerName => "Smart";
public SmartCacheSerializer(ILogger<SmartCacheSerializer> logger)
{
_logger = logger;
_serializers = new ICacheSerializer[]
{
new BinaryCacheSerializer(), // 優先使用二進制序列化
new JsonCacheSerializer() // 備選JSON序列化
};
}
public byte[] Serialize<T>(T value)
{
if (value == null) return null;
var type = typeof(T);
foreach (var serializer in _serializers)
{
if (serializer.SupportsType(type))
{
try
{
var data = serializer.Serialize(value);
// 在數據開頭添加序列化器標識
var header = System.Text.Encoding.UTF8.GetBytes(serializer.SerializerName.PadRight(8));
var result = new byte[header.Length + data.Length];
Array.Copy(header, 0, result, 0, header.Length);
Array.Copy(data, 0, result, header.Length, data.Length);
return result;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Serializer {SerializerName} failed for type {TypeName}",
serializer.SerializerName, type.Name);
continue;
}
}
}
throw new CacheSerializationException($"No suitable serializer found for type: {type.Name}");
}
public T Deserialize<T>(byte[] data)
{
if (data == null || data.Length < 8) return default(T);
// 讀取序列化器標識
var headerBytes = new byte[8];
Array.Copy(data, 0, headerBytes, 0, 8);
var serializerName = System.Text.Encoding.UTF8.GetString(headerBytes).Trim();
// 獲取實際數據
var actualData = new byte[data.Length - 8];
Array.Copy(data, 8, actualData, 0, actualData.Length);
// 找到對應的序列化器
var serializer = _serializers.FirstOrDefault(s => s.SerializerName == serializerName);
if (serializer == null)
{
_logger.LogWarning("Unknown serializer: {SerializerName}, falling back to JSON", serializerName);
serializer = new JsonCacheSerializer();
}
try
{
return serializer.Deserialize<T>(actualData);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to deserialize with {SerializerName}", serializerName);
throw new CacheSerializationException($"Deserialization failed with {serializerName}", ex);
}
}
public bool SupportsType(Type type)
{
return _serializers.Any(s => s.SupportsType(type));
}
}
/// <summary>
/// 斷路器狀態
/// </summary>
public enum CircuitBreakerState
{
Closed, // 正常狀態
Open, // 斷路器打開,拒絕請求
HalfOpen // 半開狀態,允許少量請求通過
}
/// <summary>
/// 緩存斷路器配置
/// </summary>
public class CacheCircuitBreakerOptions
{
public int FailureThreshold { get; set; } = 5; // 連續失敗閾值
public TimeSpan OpenTimeout { get; set; } = TimeSpan.FromMinutes(1); // 斷路器打開時間
public int SuccessThreshold { get; set; } = 2; // 半開狀態成功閾值
public TimeSpan SamplingDuration { get; set; } = TimeSpan.FromMinutes(2); // 采樣時間窗口
}
/// <summary>
/// 緩存斷路器
/// </summary>
public class CacheCircuitBreaker
{
private readonly CacheCircuitBreakerOptions _options;
private readonly ILogger<CacheCircuitBreaker> _logger;
private readonly object _lock = new object();
private CircuitBreakerState _state = CircuitBreakerState.Closed;
private int _failureCount = 0;
private int _successCount = 0;
private DateTime _lastFailureTime = DateTime.MinValue;
private DateTime _lastStateChangeTime = DateTime.UtcNow;
public CacheCircuitBreaker(
CacheCircuitBreakerOptions options,
ILogger<CacheCircuitBreaker> logger)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public CircuitBreakerState State => _state;
/// <summary>
/// 執行帶斷路器保護的操作
/// </summary>
public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation, string operationName = null)
{
if (!CanExecute())
{
throw new CacheException($"Circuit breaker is OPEN for operation: {operationName}");
}
try
{
var result = await operation();
OnSuccess();
return result;
}
catch (Exception ex)
{
OnFailure(ex, operationName);
throw;
}
}
/// <summary>
/// 檢查是否可以執行操作
/// </summary>
private bool CanExecute()
{
lock (_lock)
{
switch (_state)
{
case CircuitBreakerState.Closed:
return true;
case CircuitBreakerState.Open:
// 檢查是否可以轉入半開狀態
if (DateTime.UtcNow - _lastStateChangeTime >= _options.OpenTimeout)
{
_state = CircuitBreakerState.HalfOpen;
_successCount = 0;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogInformation("Circuit breaker entering HALF_OPEN state");
return true;
}
return false;
case CircuitBreakerState.HalfOpen:
return true;
default:
return false;
}
}
}
/// <summary>
/// 操作成功回調
/// </summary>
private void OnSuccess()
{
lock (_lock)
{
if (_state == CircuitBreakerState.HalfOpen)
{
_successCount++;
if (_successCount >= _options.SuccessThreshold)
{
_state = CircuitBreakerState.Closed;
_failureCount = 0;
_successCount = 0;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogInformation("Circuit breaker entering CLOSED state");
}
}
else if (_state == CircuitBreakerState.Closed)
{
// 在采樣時間窗口內重置失敗計數
if (DateTime.UtcNow - _lastFailureTime > _options.SamplingDuration)
{
_failureCount = 0;
}
}
}
}
/// <summary>
/// 操作失敗回調
/// </summary>
private void OnFailure(Exception ex, string operationName)
{
lock (_lock)
{
_failureCount++;
_lastFailureTime = DateTime.UtcNow;
_logger.LogWarning(ex, "Circuit breaker recorded failure #{FailureCount} for operation: {Operation}",
_failureCount, operationName);
if (_state == CircuitBreakerState.Closed && _failureCount >= _options.FailureThreshold)
{
_state = CircuitBreakerState.Open;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogError("Circuit breaker entering OPEN state after {FailureCount} failures", _failureCount);
}
else if (_state == CircuitBreakerState.HalfOpen)
{
_state = CircuitBreakerState.Open;
_lastStateChangeTime = DateTime.UtcNow;
_logger.LogWarning("Circuit breaker returning to OPEN state from HALF_OPEN due to failure");
}
}
}
/// <summary>
/// 獲取當前狀態信息
/// </summary>
public object GetState()
{
lock (_lock)
{
return new
{
State = _state.ToString(),
FailureCount = _failureCount,
SuccessCount = _successCount,
LastFailureTime = _lastFailureTime,
LastStateChangeTime = _lastStateChangeTime,
CanExecute = CanExecute()
};
}
}
}
/// <summary>
/// 帶斷路器的Redis緩存裝飾器
/// </summary>
public class CircuitBreakerRedisCache : IRedisDistributedCache
{
private readonly IRedisDistributedCache _innerCache;
private readonly CacheCircuitBreaker _circuitBreaker;
private readonly ILogger<CircuitBreakerRedisCache> _logger;
public CircuitBreakerRedisCache(
IRedisDistributedCache innerCache,
CacheCircuitBreaker circuitBreaker,
ILogger<CircuitBreakerRedisCache> logger)
{
_innerCache = innerCache ?? throw new ArgumentNullException(nameof(innerCache));
_circuitBreaker = circuitBreaker ?? throw new ArgumentNullException(nameof(circuitBreaker));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<T> GetAsync<T>(string key)
{
try
{
return await _circuitBreaker.ExecuteAsync(() => _innerCache.GetAsync<T>(key), $"GET:{key}");
}
catch (CacheException) when (_circuitBreaker.State == CircuitBreakerState.Open)
{
_logger.LogWarning("Circuit breaker open, returning default for key: {Key}", key);
return default(T);
}
}
public async Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
try
{
await _circuitBreaker.ExecuteAsync(() => _innerCache.SetAsync(key, value, expiry), $"SET:{key}");
}
catch (CacheException) when (_circuitBreaker.State == CircuitBreakerState.Open)
{
_logger.LogWarning("Circuit breaker open, skipping cache set for key: {Key}", key);
// 不繼續拋出異常,允許應用繼續運行
}
}
// 繼續實現其他接口方法...
public Task<bool> ExistsAsync(string key) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.ExistsAsync(key), $"EXISTS:{key}");
public Task<bool> RemoveAsync(string key) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.RemoveAsync(key), $"REMOVE:{key}");
public Task<long> RemoveByPatternAsync(string pattern) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.RemoveByPatternAsync(pattern), $"REMOVE_PATTERN:{pattern}");
public Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.GetOrSetAsync(key, factory, expiry), $"GET_OR_SET:{key}");
public Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.GetMultipleAsync<T>(keys), "GET_MULTIPLE");
public Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.SetMultipleAsync(keyValuePairs, expiry), "SET_MULTIPLE");
public Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.IncrementAsync(key, value, expiry), $"INCREMENT:{key}");
public Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.IncrementAsync(key, value, expiry), $"INCREMENT_DOUBLE:{key}");
public Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.SetIfNotExistsAsync(key, value, expiry), $"SET_IF_NOT_EXISTS:{key}");
public Task<TimeSpan?> GetExpiryAsync(string key) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.GetExpiryAsync(key), $"GET_EXPIRY:{key}");
public Task<bool> ExpireAsync(string key, TimeSpan expiry) =>
_circuitBreaker.ExecuteAsync(() => _innerCache.ExpireAsync(key, expiry), $"EXPIRE:{key}");
}
/// <summary>
/// LRU緩存容器,用于防止內存泄漏
/// </summary>
public class LRUCache<TKey, TValue>
{
private readonly int _maxSize;
private readonly Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>> _cache;
private readonly LinkedList<CacheItem<TKey, TValue>> _lruList;
private readonly object _lock = new object();
public LRUCache(int maxSize)
{
if (maxSize <= 0)
throw new ArgumentException("Max size must be greater than 0", nameof(maxSize));
_maxSize = maxSize;
_cache = new Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>>(maxSize);
_lruList = new LinkedList<CacheItem<TKey, TValue>>();
}
public int Count
{
get
{
lock (_lock)
{
return _cache.Count;
}
}
}
public bool TryGet(TKey key, out TValue value)
{
lock (_lock)
{
if (_cache.TryGetValue(key, out var node))
{
// 移到鏈表頭部(最近使用)
_lruList.Remove(node);
_lruList.AddFirst(node);
value = node.Value.Value;
return true;
}
value = default(TValue);
return false;
}
}
public void Add(TKey key, TValue value)
{
lock (_lock)
{
if (_cache.TryGetValue(key, out var existingNode))
{
// 更新已存在的項
existingNode.Value.Value = value;
existingNode.Value.LastAccessed = DateTime.UtcNow;
// 移到鏈表頭部
_lruList.Remove(existingNode);
_lruList.AddFirst(existingNode);
}
else
{
// 檢查容量限制
if (_cache.Count >= _maxSize)
{
// 移除最久未使用的項
var lastNode = _lruList.Last;
if (lastNode != null)
{
_cache.Remove(lastNode.Value.Key);
_lruList.RemoveLast();
}
}
// 添加新項
var newItem = new CacheItem<TKey, TValue>
{
Key = key,
Value = value,
LastAccessed = DateTime.UtcNow
};
var newNode = _lruList.AddFirst(newItem);
_cache[key] = newNode;
}
}
}
public bool Remove(TKey key)
{
lock (_lock)
{
if (_cache.TryGetValue(key, out var node))
{
_cache.Remove(key);
_lruList.Remove(node);
return true;
}
return false;
}
}
public void Clear()
{
lock (_lock)
{
_cache.Clear();
_lruList.Clear();
}
}
public IEnumerable<TKey> Keys
{
get
{
lock (_lock)
{
return _cache.Keys.ToList();
}
}
}
/// <summary>
/// 清理過期項
/// </summary>
public int CleanupExpired(TimeSpan maxAge)
{
var cutoffTime = DateTime.UtcNow - maxAge;
var expiredKeys = new List<TKey>();
lock (_lock)
{
foreach (var item in _lruList)
{
if (item.LastAccessed < cutoffTime)
{
expiredKeys.Add(item.Key);
}
}
foreach (var key in expiredKeys)
{
Remove(key);
}
}
return expiredKeys.Count;
}
}
/// <summary>
/// LRU緩存項
/// </summary>
class CacheItem<TKey, TValue>
{
public TKey Key { get; set; }
public TValue Value { get; set; }
public DateTime LastAccessed { get; set; }
}
/// <summary>
/// 高級內存緩存管理器
/// 提供泛型支持、統計信息、性能監控等功能
/// </summary>
public interface IAdvancedMemoryCache
{
Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null);
Task<T> GetAsync<T>(string key);
Task SetAsync<T>(string key, T value, TimeSpan? expiry = null);
Task RemoveAsync(string key);
Task RemoveByPatternAsync(string pattern);
CacheStatistics GetStatistics();
void ClearStatistics();
}
/// <summary>
/// 緩存統計信息
/// </summary>
public class CacheStatistics
{
public long HitCount { get; set; }
public long MissCount { get; set; }
public long SetCount { get; set; }
public long RemoveCount { get; set; }
public double HitRatio => HitCount + MissCount == 0 ? 0 : (double)HitCount / (HitCount + MissCount);
public DateTime StartTime { get; set; }
public TimeSpan Duration => DateTime.UtcNow - StartTime;
}
/// <summary>
/// 緩存配置選項
/// </summary>
public class AdvancedMemoryCacheOptions
{
public int SizeLimit { get; set; } = 1000;
public TimeSpan DefaultExpiry { get; set; } = TimeSpan.FromMinutes(30);
public bool EnableStatistics { get; set; } = true;
public bool EnablePatternRemoval { get; set; } = true;
public double CompactionPercentage { get; set; } = 0.1;
}
/// <summary>
/// 高級內存緩存實現
/// 基于IMemoryCache構建的功能增強版本
/// </summary>
public class AdvancedMemoryCache : IAdvancedMemoryCache, IDisposable
{
private readonly IMemoryCache _cache;
private readonly ILogger<AdvancedMemoryCache> _logger;
private readonly AdvancedMemoryCacheOptions _options;
private readonly CacheStatistics _statistics;
private readonly ConcurrentDictionary<string, byte> _keyTracker;
private readonly SemaphoreSlim _semaphore;
private readonly Timer _cleanupTimer;
public AdvancedMemoryCache(
IMemoryCache cache,
ILogger<AdvancedMemoryCache> logger,
IOptions<AdvancedMemoryCacheOptions> options)
{
_cache = cache ?? throw new ArgumentNullException(nameof(cache));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_options = options?.Value ?? new AdvancedMemoryCacheOptions();
_statistics = new CacheStatistics { StartTime = DateTime.UtcNow };
_keyTracker = new ConcurrentDictionary<string, byte>();
_semaphore = new SemaphoreSlim(1, 1);
// 定期清理過期的key追蹤記錄
_cleanupTimer = new Timer(CleanupKeyTracker, null,
TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5));
}
/// <summary>
/// 獲取或設置緩存項
/// 這是最常用的方法,實現了Cache-Aside模式
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <param name="factory">數據工廠方法</param>
/// <param name="expiry">過期時間</param>
/// <returns>緩存的值</returns>
public async Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
if (factory == null)
throw new ArgumentNullException(nameof(factory));
// 嘗試從緩存獲取
var cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
_logger.LogDebug("Cache hit for key: {Key}", key);
return cachedValue;
}
// 使用信號量防止并發執行相同的factory方法
await _semaphore.WaitAsync();
try
{
// 雙重檢查鎖定模式
cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
_logger.LogDebug("Cache hit on second check for key: {Key}", key);
return cachedValue;
}
// 執行工廠方法獲取數據
_logger.LogDebug("Cache miss for key: {Key}, executing factory method", key);
var value = await factory();
// 將結果存入緩存
await SetAsync(key, value, expiry);
return value;
}
catch (CacheConnectionException ex)
{
_logger.LogWarning(ex, "Cache connection failed for key: {Key}, using fallback", key);
// 緩存連接失敗時,仍執行工廠方法但不緩存結果
return await factory();
}
catch (CacheSerializationException ex)
{
_logger.LogError(ex, "Serialization failed for key: {Key}", key);
throw;
}
catch (CacheTimeoutException ex)
{
_logger.LogWarning(ex, "Cache operation timeout for key: {Key}", key);
return await factory();
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error occurred while executing factory method for key: {Key}", key);
throw new CacheException($"Cache operation failed for key: {key}", ex);
}
finally
{
_semaphore.Release();
}
}
/// <summary>
/// 異步獲取緩存項
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <returns>緩存的值,如果不存在則返回默認值</returns>
public Task<T> GetAsync<T>(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
var found = _cache.TryGetValue(key, out var value);
if (_options.EnableStatistics)
{
if (found)
Interlocked.Increment(ref _statistics.HitCount);
else
Interlocked.Increment(ref _statistics.MissCount);
}
if (found && value is T typedValue)
{
return Task.FromResult(typedValue);
}
return Task.FromResult(default(T));
}
/// <summary>
/// 異步設置緩存項
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <param name="value">緩存值</param>
/// <param name="expiry">過期時間</param>
public Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
var cacheExpiry = expiry ?? _options.DefaultExpiry;
using var entry = _cache.CreateEntry(key);
entry.Value = value;
entry.AbsoluteExpirationRelativeToNow = cacheExpiry;
entry.Size = 1; // 簡化的大小計算,實際應用中可根據對象大小設置
// 設置過期回調
entry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration
{
EvictionCallback = OnCacheEntryEvicted,
State = key
});
// 追蹤緩存鍵
if (_options.EnablePatternRemoval)
{
_keyTracker.TryAdd(key, 0);
}
if (_options.EnableStatistics)
{
Interlocked.Increment(ref _statistics.SetCount);
}
_logger.LogDebug("Set cache entry for key: {Key}, expiry: {Expiry}", key, cacheExpiry);
return Task.CompletedTask;
}
/// <summary>
/// 異步移除緩存項
/// </summary>
/// <param name="key">緩存鍵</param>
public Task RemoveAsync(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
_cache.Remove(key);
_keyTracker.TryRemove(key, out _);
if (_options.EnableStatistics)
{
Interlocked.Increment(ref _statistics.RemoveCount);
}
_logger.LogDebug("Removed cache entry for key: {Key}", key);
return Task.CompletedTask;
}
/// <summary>
/// 根據模式異步移除緩存項
/// 支持通配符匹配,如 "user:*", "*:settings"
/// </summary>
/// <param name="pattern">匹配模式</param>
public async Task RemoveByPatternAsync(string pattern)
{
if (string.IsNullOrEmpty(pattern))
throw new ArgumentException("Pattern cannot be null or empty", nameof(pattern));
if (!_options.EnablePatternRemoval)
{
_logger.LogWarning("Pattern removal is disabled");
return;
}
var keysToRemove = new List<string>();
var regexPattern = ConvertWildcardToRegex(pattern);
var regex = new System.Text.RegularExpressions.Regex(regexPattern,
System.Text.RegularExpressions.RegexOptions.IgnoreCase);
foreach (var key in _keyTracker.Keys)
{
if (regex.IsMatch(key))
{
keysToRemove.Add(key);
}
}
foreach (var key in keysToRemove)
{
await RemoveAsync(key);
}
_logger.LogInformation("Removed {Count} cache entries matching pattern: {Pattern}",
keysToRemove.Count, pattern);
}
/// <summary>
/// 獲取緩存統計信息
/// </summary>
/// <returns>統計信息對象</returns>
public CacheStatistics GetStatistics()
{
if (!_options.EnableStatistics)
{
return new CacheStatistics();
}
return new CacheStatistics
{
HitCount = _statistics.HitCount,
MissCount = _statistics.MissCount,
SetCount = _statistics.SetCount,
RemoveCount = _statistics.RemoveCount,
StartTime = _statistics.StartTime
};
}
/// <summary>
/// 清除統計信息
/// </summary>
public void ClearStatistics()
{
if (_options.EnableStatistics)
{
Interlocked.Exchange(ref _statistics.HitCount, 0);
Interlocked.Exchange(ref _statistics.MissCount, 0);
Interlocked.Exchange(ref _statistics.SetCount, 0);
Interlocked.Exchange(ref _statistics.RemoveCount, 0);
_statistics.StartTime = DateTime.UtcNow;
}
}
/// <summary>
/// 緩存項被驅逐時的回調方法
/// </summary>
/// <param name="key">緩存鍵</param>
/// <param name="value">緩存值</param>
/// <param name="reason">驅逐原因</param>
/// <param name="state">狀態對象</param>
private void OnCacheEntryEvicted(object key, object value, EvictionReason reason, object state)
{
var cacheKey = state?.ToString();
if (!string.IsNullOrEmpty(cacheKey))
{
_keyTracker.TryRemove(cacheKey, out _);
}
_logger.LogDebug("Cache entry evicted - Key: {Key}, Reason: {Reason}", key, reason);
}
/// <summary>
/// 將通配符模式轉換為正則表達式
/// </summary>
/// <param name="wildcardPattern">通配符模式</param>
/// <returns>正則表達式字符串</returns>
private static string ConvertWildcardToRegex(string wildcardPattern)
{
return "^" + System.Text.RegularExpressions.Regex.Escape(wildcardPattern)
.Replace("\\*", ".*")
.Replace("\\?", ".") + "$";
}
/// <summary>
/// 定期清理key追蹤器中的過期項
/// </summary>
/// <param name="state">定時器狀態</param>
private void CleanupKeyTracker(object state)
{
var keysToRemove = new List<string>();
foreach (var key in _keyTracker.Keys)
{
if (!_cache.TryGetValue(key, out _))
{
keysToRemove.Add(key);
}
}
foreach (var key in keysToRemove)
{
_keyTracker.TryRemove(key, out _);
}
if (keysToRemove.Count > 0)
{
_logger.LogDebug("Cleaned up {Count} expired keys from tracker", keysToRemove.Count);
}
}
/// <summary>
/// 釋放資源
/// </summary>
public void Dispose()
{
_cleanupTimer?.Dispose();
_semaphore?.Dispose();
_cache?.Dispose();
}
}
3.3 內存緩存配置和依賴注入
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
/// <summary>
/// 內存緩存服務擴展
/// </summary>
public static class MemoryCacheServiceExtensions
{
/// <summary>
/// 添加高級內存緩存服務
/// </summary>
/// <param name="services">服務集合</param>
/// <param name="setupAction">配置委托</param>
/// <returns>服務集合</returns>
public static IServiceCollection AddAdvancedMemoryCache(
this IServiceCollection services,
Action<AdvancedMemoryCacheOptions> setupAction = null)
{
// 添加基礎內存緩存
services.AddMemoryCache(options =>
{
options.SizeLimit = 1000; // 設置緩存大小限制
options.CompactionPercentage = 0.1; // 內存壓力時的壓縮百分比
options.ExpirationScanFrequency = TimeSpan.FromMinutes(1); // 過期掃描頻率
});
// 配置選項
if (setupAction != null)
{
services.Configure(setupAction);
}
else
{
services.Configure<AdvancedMemoryCacheOptions>(options =>
{
// 默認配置
options.SizeLimit = 1000;
options.DefaultExpiry = TimeSpan.FromMinutes(30);
options.EnableStatistics = true;
options.EnablePatternRemoval = true;
options.CompactionPercentage = 0.1;
});
}
// 注冊高級內存緩存服務(帶安全裝飾器)
services.AddSingleton<AdvancedMemoryCache>();
services.AddSingleton<IAdvancedMemoryCache>(provider =>
{
var innerCache = provider.GetRequiredService<AdvancedMemoryCache>();
var validator = provider.GetRequiredService<ICacheDataValidator>();
var logger = provider.GetRequiredService<ILogger<SecureCacheManagerDecorator>>();
return new SecureCacheManagerDecorator(innerCache, validator, logger);
});
return services;
}
}
/// <summary>
/// 示例:在Program.cs中的配置
/// </summary>
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
// 添加增強組件
builder.Services.AddSingleton<ICacheDataValidator, DefaultCacheDataValidator>();
builder.Services.AddSingleton<ICacheSerializer, SmartCacheSerializer>();
// 添加斷路器配置
builder.Services.Configure<CacheCircuitBreakerOptions>(options =>
{
options.FailureThreshold = 5;
options.OpenTimeout = TimeSpan.FromMinutes(1);
options.SuccessThreshold = 2;
});
builder.Services.AddSingleton<CacheCircuitBreaker>();
// 添加高級內存緩存(帶安全驗證)
builder.Services.AddAdvancedMemoryCache(options =>
{
options.SizeLimit = 2000;
options.DefaultExpiry = TimeSpan.FromHours(1);
options.EnableStatistics = true;
options.EnablePatternRemoval = true;
options.CompactionPercentage = 0.15;
});
var app = builder.Build();
app.Run();
}
}
4. Redis分布式緩存層實現
4.1 Redis連接管理和配置
using StackExchange.Redis;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using System.Text.Json;
/// <summary>
/// Redis緩存配置選項
/// </summary>
public class RedisCacheOptions
{
public string ConnectionString { get; set; } = "localhost:6379";
public int Database { get; set; } = 0;
public string KeyPrefix { get; set; } = "app:";
public TimeSpan DefaultExpiry { get; set; } = TimeSpan.FromHours(1);
public int ConnectTimeout { get; set; } = 5000;
public int SyncTimeout { get; set; } = 1000;
public bool AllowAdmin { get; set; } = false;
public string Password { get; set; }
public bool Ssl { get; set; } = false;
public int ConnectRetry { get; set; } = 3;
public bool AbortOnConnectFail { get; set; } = false;
public string ClientName { get; set; } = "MultiLevelCache";
}
/// <summary>
/// Redis連接管理器
/// 提供連接池管理和故障恢復功能
/// </summary>
public interface IRedisConnectionManager : IDisposable
{
IDatabase GetDatabase();
ISubscriber GetSubscriber();
IServer GetServer();
bool IsConnected { get; }
Task<bool> TestConnectionAsync();
}
/// <summary>
/// Redis連接管理器實現
/// </summary>
public class RedisConnectionManager : IRedisConnectionManager
{
private readonly RedisCacheOptions _options;
private readonly ILogger<RedisConnectionManager> _logger;
private readonly Lazy<ConnectionMultiplexer> _connectionMultiplexer;
private bool _disposed = false;
public RedisConnectionManager(
IOptions<RedisCacheOptions> options,
ILogger<RedisConnectionManager> logger)
{
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_connectionMultiplexer = new Lazy<ConnectionMultiplexer>(CreateConnection);
}
/// <summary>
/// 創建Redis連接
/// </summary>
/// <returns>ConnectionMultiplexer實例</returns>
private ConnectionMultiplexer CreateConnection()
{
var configurationOptions = new ConfigurationOptions
{
EndPoints = { _options.ConnectionString },
ConnectTimeout = _options.ConnectTimeout,
SyncTimeout = _options.SyncTimeout,
AllowAdmin = _options.AllowAdmin,
ConnectRetry = _options.ConnectRetry,
AbortOnConnectFail = _options.AbortOnConnectFail,
ClientName = _options.ClientName,
Ssl = _options.Ssl
};
if (!string.IsNullOrEmpty(_options.Password))
{
configurationOptions.Password = _options.Password;
}
try
{
var connection = ConnectionMultiplexer.Connect(configurationOptions);
// 注冊連接事件
connection.ConnectionFailed += OnConnectionFailed;
connection.ConnectionRestored += OnConnectionRestored;
connection.ErrorMessage += OnErrorMessage;
connection.InternalError += OnInternalError;
_logger.LogInformation("Redis connection established successfully");
return connection;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to establish Redis connection");
throw;
}
}
/// <summary>
/// 獲取數據庫實例
/// </summary>
/// <returns>IDatabase實例</returns>
public IDatabase GetDatabase()
{
return _connectionMultiplexer.Value.GetDatabase(_options.Database);
}
/// <summary>
/// 獲取訂閱者實例
/// </summary>
/// <returns>ISubscriber實例</returns>
public ISubscriber GetSubscriber()
{
return _connectionMultiplexer.Value.GetSubscriber();
}
/// <summary>
/// 獲取服務器實例
/// </summary>
/// <returns>IServer實例</returns>
public IServer GetServer()
{
var endpoints = _connectionMultiplexer.Value.GetEndPoints();
return _connectionMultiplexer.Value.GetServer(endpoints.First());
}
/// <summary>
/// 檢查連接狀態
/// </summary>
public bool IsConnected => _connectionMultiplexer.IsValueCreated &&
_connectionMultiplexer.Value.IsConnected;
/// <summary>
/// 測試連接
/// </summary>
/// <returns>連接是否成功</returns>
public async Task<bool> TestConnectionAsync()
{
try
{
var database = GetDatabase();
await database.PingAsync();
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Redis connection test failed");
return false;
}
}
#region 事件處理
private void OnConnectionFailed(object sender, ConnectionFailedEventArgs e)
{
_logger.LogError(e.Exception, "Redis connection failed: {EndPoint}", e.EndPoint);
}
private void OnConnectionRestored(object sender, ConnectionFailedEventArgs e)
{
_logger.LogInformation("Redis connection restored: {EndPoint}", e.EndPoint);
}
private void OnErrorMessage(object sender, RedisErrorEventArgs e)
{
_logger.LogError("Redis error: {Message}", e.Message);
}
private void OnInternalError(object sender, InternalErrorEventArgs e)
{
_logger.LogError(e.Exception, "Redis internal error");
}
#endregion
/// <summary>
/// 釋放資源
/// </summary>
public void Dispose()
{
if (!_disposed)
{
if (_connectionMultiplexer.IsValueCreated)
{
_connectionMultiplexer.Value.Close();
_connectionMultiplexer.Value.Dispose();
}
_disposed = true;
}
}
}
4.2 Redis分布式緩存服務實現
using StackExchange.Redis;
using System.Text.Json;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
/// <summary>
/// Redis分布式緩存接口
/// </summary>
public interface IRedisDistributedCache
{
Task<T> GetAsync<T>(string key);
Task SetAsync<T>(string key, T value, TimeSpan? expiry = null);
Task<bool> ExistsAsync(string key);
Task<bool> RemoveAsync(string key);
Task<long> RemoveByPatternAsync(string pattern);
Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null);
Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys);
Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null);
Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null);
Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null);
Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null);
Task<TimeSpan?> GetExpiryAsync(string key);
Task<bool> ExpireAsync(string key, TimeSpan expiry);
}
/// <summary>
/// Redis分布式緩存實現
/// </summary>
public class RedisDistributedCache : IRedisDistributedCache
{
private readonly IRedisConnectionManager _connectionManager;
private readonly RedisCacheOptions _options;
private readonly ILogger<RedisDistributedCache> _logger;
private readonly ICacheSerializer _serializer;
public RedisDistributedCache(
IRedisConnectionManager connectionManager,
IOptions<RedisCacheOptions> options,
ILogger<RedisDistributedCache> logger)
{
_connectionManager = connectionManager ?? throw new ArgumentNullException(nameof(connectionManager));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// 配置JSON序列化選項
// 使用智能序列化器替代直接的JSON序列化器
_serializer = serviceProvider?.GetService<ICacheSerializer>() ?? new JsonCacheSerializer();
}
/// <summary>
/// 異步獲取緩存項
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <returns>緩存的值</returns>
public async Task<T> GetAsync<T>(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var value = await database.StringGetAsync(fullKey);
if (!value.HasValue)
{
_logger.LogDebug("Cache miss for key: {Key}", key);
return default(T);
}
_logger.LogDebug("Cache hit for key: {Key}", key);
return DeserializeValue<T>(value);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting value from Redis for key: {Key}", key);
return default(T);
}
}
/// <summary>
/// 異步設置緩存項
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <param name="value">緩存值</param>
/// <param name="expiry">過期時間</param>
public async Task SetAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var serializedValue = SerializeValue(value);
var expiration = expiry ?? _options.DefaultExpiry;
await database.StringSetAsync(fullKey, serializedValue, expiration);
_logger.LogDebug("Set cache value for key: {Key}, expiry: {Expiry}", key, expiration);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting value in Redis for key: {Key}", key);
throw;
}
}
/// <summary>
/// 檢查鍵是否存在
/// </summary>
/// <param name="key">緩存鍵</param>
/// <returns>鍵是否存在</returns>
public async Task<bool> ExistsAsync(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
return await database.KeyExistsAsync(fullKey);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking existence in Redis for key: {Key}", key);
return false;
}
}
/// <summary>
/// 異步移除緩存項
/// </summary>
/// <param name="key">緩存鍵</param>
/// <returns>是否成功移除</returns>
public async Task<bool> RemoveAsync(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var result = await database.KeyDeleteAsync(fullKey);
_logger.LogDebug("Remove cache key: {Key}, success: {Success}", key, result);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing key from Redis: {Key}", key);
return false;
}
}
/// <summary>
/// 根據模式批量刪除緩存項
/// </summary>
/// <param name="pattern">匹配模式</param>
/// <returns>刪除的項目數量</returns>
public async Task<long> RemoveByPatternAsync(string pattern)
{
if (string.IsNullOrEmpty(pattern))
throw new ArgumentException("Pattern cannot be null or empty", nameof(pattern));
try
{
var server = _connectionManager.GetServer();
var database = _connectionManager.GetDatabase();
var fullPattern = GetFullKey(pattern);
var keys = server.Keys(database.Database, fullPattern).ToArray();
if (keys.Length == 0)
{
return 0;
}
var deletedCount = await database.KeyDeleteAsync(keys);
_logger.LogInformation("Deleted {Count} keys matching pattern: {Pattern}", deletedCount, pattern);
return deletedCount;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing keys by pattern from Redis: {Pattern}", pattern);
return 0;
}
}
/// <summary>
/// 獲取或設置緩存項(分布式鎖實現)
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <param name="factory">數據工廠方法</param>
/// <param name="expiry">過期時間</param>
/// <returns>緩存的值</returns>
public async Task<T> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
if (factory == null)
throw new ArgumentNullException(nameof(factory));
// 嘗試從緩存獲取
var cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
return cachedValue;
}
// 使用分布式鎖防止緩存擊穿
var lockKey = $"{key}:lock";
var lockValue = Guid.NewGuid().ToString();
var database = _connectionManager.GetDatabase();
try
{
// 嘗試獲取分布式鎖
var lockAcquired = await database.StringSetAsync(
GetFullKey(lockKey),
lockValue,
TimeSpan.FromMinutes(1),
When.NotExists);
if (lockAcquired)
{
try
{
// 再次檢查緩存
cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
return cachedValue;
}
// 執行工廠方法
_logger.LogDebug("Executing factory method for key: {Key}", key);
var value = await factory();
// 設置緩存
await SetAsync(key, value, expiry);
return value;
}
finally
{
// 釋放分布式鎖(使用Lua腳本確保原子性)
const string releaseLockScript = @"
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end";
await database.ScriptEvaluateAsync(
releaseLockScript,
new RedisKey[] { GetFullKey(lockKey) },
new RedisValue[] { lockValue });
}
}
else
{
// 等待鎖釋放并重試
_logger.LogDebug("Waiting for lock to be released for key: {Key}", key);
await Task.Delay(50); // 短暫等待
// 重試獲取緩存
cachedValue = await GetAsync<T>(key);
if (cachedValue != null)
{
return cachedValue;
}
// 如果仍未獲取到,執行降級策略
_logger.LogWarning("Failed to acquire lock and cache miss for key: {Key}, executing factory method", key);
return await factory();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in GetOrSetAsync for key: {Key}", key);
// 降級到直接執行工廠方法
return await factory();
}
}
/// <summary>
/// 批量獲取緩存項
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="keys">緩存鍵集合</param>
/// <returns>鍵值對字典</returns>
public async Task<Dictionary<string, T>> GetMultipleAsync<T>(IEnumerable<string> keys)
{
if (keys == null)
throw new ArgumentNullException(nameof(keys));
var keyList = keys.ToList();
if (!keyList.Any())
{
return new Dictionary<string, T>();
}
try
{
var database = _connectionManager.GetDatabase();
var fullKeys = keyList.Select(k => (RedisKey)GetFullKey(k)).ToArray();
var values = await database.StringGetAsync(fullKeys);
var result = new Dictionary<string, T>();
for (int i = 0; i < keyList.Count; i++)
{
if (values[i].HasValue)
{
result[keyList[i]] = DeserializeValue<T>(values[i]);
}
}
_logger.LogDebug("Retrieved {Count} out of {Total} keys from Redis",
result.Count, keyList.Count);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting multiple values from Redis");
return new Dictionary<string, T>();
}
}
/// <summary>
/// 批量設置緩存項
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="keyValuePairs">鍵值對字典</param>
/// <param name="expiry">過期時間</param>
public async Task SetMultipleAsync<T>(Dictionary<string, T> keyValuePairs, TimeSpan? expiry = null)
{
if (keyValuePairs == null || !keyValuePairs.Any())
return;
try
{
var database = _connectionManager.GetDatabase();
var expiration = expiry ?? _options.DefaultExpiry;
var tasks = keyValuePairs.Select(async kvp =>
{
var fullKey = GetFullKey(kvp.Key);
var serializedValue = SerializeValue(kvp.Value);
await database.StringSetAsync(fullKey, serializedValue, expiration);
});
await Task.WhenAll(tasks);
_logger.LogDebug("Set {Count} cache values with expiry: {Expiry}",
keyValuePairs.Count, expiration);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting multiple values in Redis");
throw;
}
}
/// <summary>
/// 原子遞增操作
/// </summary>
/// <param name="key">緩存鍵</param>
/// <param name="value">遞增值</param>
/// <param name="expiry">過期時間</param>
/// <returns>遞增后的值</returns>
public async Task<long> IncrementAsync(string key, long value = 1, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var result = await database.StringIncrementAsync(fullKey, value);
if (expiry.HasValue)
{
await database.KeyExpireAsync(fullKey, expiry.Value);
}
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error incrementing value in Redis for key: {Key}", key);
throw;
}
}
/// <summary>
/// 原子遞增操作(浮點數)
/// </summary>
/// <param name="key">緩存鍵</param>
/// <param name="value">遞增值</param>
/// <param name="expiry">過期時間</param>
/// <returns>遞增后的值</returns>
public async Task<double> IncrementAsync(string key, double value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var result = await database.StringIncrementAsync(fullKey, value);
if (expiry.HasValue)
{
await database.KeyExpireAsync(fullKey, expiry.Value);
}
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error incrementing double value in Redis for key: {Key}", key);
throw;
}
}
/// <summary>
/// 僅在鍵不存在時設置值
/// </summary>
/// <typeparam name="T">緩存項類型</typeparam>
/// <param name="key">緩存鍵</param>
/// <param name="value">緩存值</param>
/// <param name="expiry">過期時間</param>
/// <returns>是否設置成功</returns>
public async Task<bool> SetIfNotExistsAsync<T>(string key, T value, TimeSpan? expiry = null)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
var serializedValue = SerializeValue(value);
var expiration = expiry ?? _options.DefaultExpiry;
var result = await database.StringSetAsync(fullKey, serializedValue, expiration, When.NotExists);
_logger.LogDebug("SetIfNotExists for key: {Key}, success: {Success}", key, result);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in SetIfNotExists for key: {Key}", key);
return false;
}
}
/// <summary>
/// 獲取鍵的過期時間
/// </summary>
/// <param name="key">緩存鍵</param>
/// <returns>過期時間,如果鍵不存在或無過期時間則返回null</returns>
public async Task<TimeSpan?> GetExpiryAsync(string key)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
return await database.KeyTimeToLiveAsync(fullKey);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting expiry for key: {Key}", key);
return null;
}
}
/// <summary>
/// 設置鍵的過期時間
/// </summary>
/// <param name="key">緩存鍵</param>
/// <param name="expiry">過期時間</param>
/// <returns>是否設置成功</returns>
public async Task<bool> ExpireAsync(string key, TimeSpan expiry)
{
if (string.IsNullOrEmpty(key))
throw new ArgumentException("Key cannot be null or empty", nameof(key));
try
{
var database = _connectionManager.GetDatabase();
var fullKey = GetFullKey(key);
return await database.KeyExpireAsync(fullKey, expiry);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting expiry for key: {Key}", key);
return false;
}
}
#region 輔助方法
/// <summary>
/// 獲取完整的緩存鍵(帶前綴)
/// </summary>
/// <param name="key">原始鍵</param>
/// <returns>完整鍵</returns>
private string GetFullKey(string key)
{
return $"{_options.KeyPrefix}{key}";
}
/// <summary>
/// 序列化值
/// </summary>
/// <typeparam name="T">值類型</typeparam>
/// <param name="value">要序列化的值</param>
/// <returns>序列化后的字符串</returns>
private string SerializeValue<T>(T value)
{
if (value == null) return null;
try
{
var serializedBytes = _serializer.Serialize(value);
return Convert.ToBase64String(serializedBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error serializing value of type: {Type}", typeof(T).Name);
throw new CacheSerializationException($"Failed to serialize value of type: {typeof(T).Name}", ex);
}
}
/// <summary>
/// 反序列化值
/// </summary>
/// <typeparam name="T">目標類型</typeparam>
/// <param name="value">要反序列化的值</param>
/// <returns>反序列化后的對象</returns>
private T DeserializeValue<T>(string value)
{
if (string.IsNullOrEmpty(value)) return default(T);
try
{
var serializedBytes = Convert.FromBase64String(value);
return _serializer.Deserialize<T>(serializedBytes);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error deserializing value to type: {Type}", typeof(T).Name);
throw new CacheSerializationException($"Failed to deserialize value to type: {typeof(T).Name}", ex);
}
}
#endregion
}
5. Redis發布-訂閱同步機制實現
5.1 緩存同步事件模型
using System.Text.Json;
/// <summary>
/// 緩存同步事件類型
/// </summary>
public enum CacheSyncEventType
{
/// <summary>
/// 緩存項被設置
/// </summary>
Set,
/// <summary>
/// 緩存項被刪除
/// </summary>
Remove,
/// <summary>
/// 緩存項過期
/// </summary>
Expire,
/// <summary>
/// 批量刪除(按模式)
/// </summary>
RemovePattern,
/// <summary>
/// 清空所有緩存
/// </summary>
Clear
}
/// <summary>
/// 緩存同步事件
/// </summary>
public class CacheSyncEvent
{
/// <summary>
/// 事件ID(用于冪等性控制)
/// </summary>
public string EventId { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// 事件類型
/// </summary>
public CacheSyncEventType EventType { get; set; }
/// <summary>
/// 緩存鍵
/// </summary>
public string Key { get; set; }
/// <summary>
/// 模式(用于批量刪除)
/// </summary>
public string Pattern { get; set; }
/// <summary>
/// 事件發生時間
/// </summary>
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
/// <summary>
/// 發起節點標識
/// </summary>
public string NodeId { get; set; }
/// <summary>
/// 附加數據
/// </summary>
public Dictionary<string, object> Metadata { get; set; } = new();
/// <summary>
/// 序列化為JSON
/// </summary>
/// <returns>JSON字符串</returns>
public string ToJson()
{
return JsonSerializer.Serialize(this, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
}
/// <summary>
/// 從JSON反序列化
/// </summary>
/// <param name="json">JSON字符串</param>
/// <returns>緩存同步事件</returns>
public static CacheSyncEvent FromJson(string json)
{
return JsonSerializer.Deserialize<CacheSyncEvent>(json, new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
});
}
}
/// <summary>
/// 緩存同步配置選項
/// </summary>
public class CacheSyncOptions
{
/// <summary>
/// Redis發布訂閱頻道前綴
/// </summary>
public string ChannelPrefix { get; set; } = "cache_sync";
/// <summary>
/// 當前節點ID
/// </summary>
public string NodeId { get; set; } = Environment.MachineName;
/// <summary>
/// 是否啟用緩存同步
/// </summary>
public bool EnableSync { get; set; } = true;
/// <summary>
/// 事件去重窗口時間
/// </summary>
public TimeSpan DeduplicationWindow { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 最大重試次數
/// </summary>
public int MaxRetryAttempts { get; set; } = 3;
/// <summary>
/// 重試延遲
/// </summary>
public TimeSpan RetryDelay { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// 批量處理的最大延遲
/// </summary>
public TimeSpan BatchMaxDelay { get; set; } = TimeSpan.FromMilliseconds(100);
/// <summary>
/// 批量處理的最大大小
/// </summary>
public int BatchMaxSize { get; set; } = 50;
}
5.2 Redis發布-訂閱同步服務
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System.Collections.Concurrent;
using System.Threading.Channels;
/// <summary>
/// 緩存同步服務接口
/// </summary>
public interface ICacheSyncService
{
/// <summary>
/// 發布緩存同步事件
/// </summary>
/// <param name="syncEvent">同步事件</param>
Task PublishAsync(CacheSyncEvent syncEvent);
/// <summary>
/// 訂閱緩存同步事件
/// </summary>
/// <param name="handler">事件處理器</param>
Task SubscribeAsync(Func<CacheSyncEvent, Task> handler);
/// <summary>
/// 取消訂閱
/// </summary>
Task UnsubscribeAsync();
/// <summary>
/// 檢查服務狀態
/// </summary>
bool IsHealthy { get; }
}
/// <summary>
/// Redis發布-訂閱緩存同步服務
/// </summary>
public class RedisCacheSyncService : ICacheSyncService, IHostedService, IDisposable
{
private readonly IRedisConnectionManager _connectionManager;
private readonly CacheSyncOptions _options;
private readonly ILogger<RedisCacheSyncService> _logger;
// 事件處理器集合
private readonly ConcurrentBag<Func<CacheSyncEvent, Task>> _handlers;
// 事件去重緩存
private readonly ConcurrentDictionary<string, DateTime> _processedEvents;
// 批量處理通道
private readonly Channel<CacheSyncEvent> _eventChannel;
private readonly ChannelWriter<CacheSyncEvent> _eventWriter;
private readonly ChannelReader<CacheSyncEvent> _eventReader;
// 訂閱和處理任務
private Task _subscriptionTask;
private Task _processingTask;
private CancellationTokenSource _cancellationTokenSource;
// 服務狀態
private volatile bool _isHealthy = false;
private bool _disposed = false;
public RedisCacheSyncService(
IRedisConnectionManager connectionManager,
IOptions<CacheSyncOptions> options,
ILogger<RedisCacheSyncService> logger)
{
_connectionManager = connectionManager ?? throw new ArgumentNullException(nameof(connectionManager));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_handlers = new ConcurrentBag<Func<CacheSyncEvent, Task>>();
_processedEvents = new ConcurrentDictionary<string, DateTime>();
// 創建有界通道用于批量處理
var channelOptions = new BoundedChannelOptions(_options.BatchMaxSize * 2)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_eventChannel = Channel.CreateBounded<CacheSyncEvent>(channelOptions);
_eventWriter = _eventChannel.Writer;
_eventReader = _eventChannel.Reader;
}
/// <summary>
/// 服務健康狀態
/// </summary>
public bool IsHealthy => _isHealthy;
/// <summary>
/// 發布緩存同步事件
/// </summary>
/// <param name="syncEvent">同步事件</param>
public async Task PublishAsync(CacheSyncEvent syncEvent)
{
if (!_options.EnableSync || syncEvent == null)
return;
try
{
// 設置節點ID
syncEvent.NodeId = _options.NodeId;
var subscriber = _connectionManager.GetSubscriber();
var channel = GetChannelName();
var message = syncEvent.ToJson();
await subscriber.PublishAsync(channel, message);
_logger.LogDebug("Published sync event: {EventType} for key: {Key}",
syncEvent.EventType, syncEvent.Key);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to publish sync event: {EventType} for key: {Key}",
syncEvent.EventType, syncEvent.Key);
}
}
/// <summary>
/// 訂閱緩存同步事件
/// </summary>
/// <param name="handler">事件處理器</param>
public Task SubscribeAsync(Func<CacheSyncEvent, Task> handler)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));
_handlers.Add(handler);
_logger.LogDebug("Added sync event handler");
return Task.CompletedTask;
}
/// <summary>
/// 取消訂閱
/// </summary>
public async Task UnsubscribeAsync()
{
try
{
if (_subscriptionTask != null && !_subscriptionTask.IsCompleted)
{
_cancellationTokenSource?.Cancel();
await _subscriptionTask;
}
_logger.LogDebug("Unsubscribed from sync events");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during unsubscribe");
}
}
/// <summary>
/// 啟動服務
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
public Task StartAsync(CancellationToken cancellationToken)
{
if (!_options.EnableSync)
{
_logger.LogInformation("Cache sync is disabled");
return Task.CompletedTask;
}
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// 啟動Redis訂閱任務
_subscriptionTask = StartSubscriptionAsync(_cancellationTokenSource.Token);
// 啟動事件處理任務
_processingTask = StartProcessingAsync(_cancellationTokenSource.Token);
// 啟動清理任務
_ = Task.Run(() => StartCleanupAsync(_cancellationTokenSource.Token), cancellationToken);
_logger.LogInformation("Cache sync service started with NodeId: {NodeId}", _options.NodeId);
return Task.CompletedTask;
}
/// <summary>
/// 停止服務
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping cache sync service");
_cancellationTokenSource?.Cancel();
// 完成事件通道寫入
_eventWriter.TryComplete();
try
{
// 等待訂閱任務完成
if (_subscriptionTask != null)
{
await _subscriptionTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
}
// 等待處理任務完成
if (_processingTask != null)
{
await _processingTask.WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
}
}
catch (TimeoutException)
{
_logger.LogWarning("Cache sync service stop timed out");
}
catch (Exception ex)
{
_logger.LogError(ex, "Error stopping cache sync service");
}
_isHealthy = false;
_logger.LogInformation("Cache sync service stopped");
}
/// <summary>
/// 啟動Redis訂閱
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
private async Task StartSubscriptionAsync(CancellationToken cancellationToken)
{
var retryCount = 0;
while (!cancellationToken.IsCancellationRequested && retryCount < _options.MaxRetryAttempts)
{
try
{
var subscriber = _connectionManager.GetSubscriber();
var channel = GetChannelName();
await subscriber.SubscribeAsync(channel, OnMessageReceived);
_isHealthy = true;
_logger.LogInformation("Successfully subscribed to Redis channel: {Channel}", channel);
// 保持訂閱狀態
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(1000, cancellationToken);
}
break;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
retryCount++;
_isHealthy = false;
_logger.LogError(ex, "Redis subscription failed, retry {RetryCount}/{MaxRetries}",
retryCount, _options.MaxRetryAttempts);
if (retryCount < _options.MaxRetryAttempts)
{
await Task.Delay(_options.RetryDelay, cancellationToken);
}
}
}
}
/// <summary>
/// 啟動事件批量處理
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
private async Task StartProcessingAsync(CancellationToken cancellationToken)
{
var eventBatch = new List<CacheSyncEvent>();
var batchTimer = Stopwatch.StartNew();
try
{
while (!cancellationToken.IsCancellationRequested)
{
// 嘗試讀取事件(帶超時)
var hasEvent = await _eventReader.WaitToReadAsync(cancellationToken);
if (!hasEvent)
break;
// 收集批量事件
while (_eventReader.TryRead(out var syncEvent) &&
eventBatch.Count < _options.BatchMaxSize)
{
eventBatch.Add(syncEvent);
}
// 檢查是否應該處理批次
var shouldProcess = eventBatch.Count >= _options.BatchMaxSize ||
batchTimer.Elapsed >= _options.BatchMaxDelay ||
!_eventReader.TryPeek(out _); // 沒有更多事件
if (shouldProcess && eventBatch.Count > 0)
{
await ProcessEventBatchAsync(eventBatch, cancellationToken);
eventBatch.Clear();
batchTimer.Restart();
}
else if (eventBatch.Count > 0)
{
// 短暫等待以收集更多事件
await Task.Delay(10, cancellationToken);
}
}
}
catch (OperationCanceledException)
{
// 正常取消
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in event processing loop");
}
finally
{
// 處理剩余的事件
if (eventBatch.Count > 0)
{
try
{
await ProcessEventBatchAsync(eventBatch, CancellationToken.None);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing final event batch");
}
}
}
}
/// <summary>
/// 處理事件批次
/// </summary>
/// <param name="events">事件列表</param>
/// <param name="cancellationToken">取消令牌</param>
private async Task ProcessEventBatchAsync(List<CacheSyncEvent> events, CancellationToken cancellationToken)
{
if (!events.Any())
return;
try
{
var tasks = events
.Where(e => !IsEventProcessed(e.EventId))
.Select(async e =>
{
try
{
// 標記事件為已處理
MarkEventAsProcessed(e.EventId);
// 并行調用所有處理器
var handlerTasks = _handlers.Select(handler => handler(e));
await Task.WhenAll(handlerTasks);
_logger.LogDebug("Processed sync event: {EventType} for key: {Key}",
e.EventType, e.Key);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing sync event: {EventId}", e.EventId);
}
});
await Task.WhenAll(tasks);
_logger.LogDebug("Processed batch of {Count} sync events", events.Count);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing event batch");
}
}
/// <summary>
/// 處理接收到的Redis消息
/// </summary>
/// <param name="channel">頻道</param>
/// <param name="message">消息</param>
private async void OnMessageReceived(RedisChannel channel, RedisValue message)
{
try
{
if (!message.HasValue)
return;
var syncEvent = CacheSyncEvent.FromJson(message);
// 忽略自己發送的事件
if (syncEvent.NodeId == _options.NodeId)
{
_logger.LogTrace("Ignoring self-generated event: {EventId}", syncEvent.EventId);
return;
}
// 將事件加入處理隊列
if (!await _eventWriter.WaitToWriteAsync())
{
_logger.LogWarning("Event channel is closed, dropping event: {EventId}", syncEvent.EventId);
return;
}
if (!_eventWriter.TryWrite(syncEvent))
{
_logger.LogWarning("Failed to queue sync event: {EventId}", syncEvent.EventId);
}
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize sync event message: {Message}", message.ToString());
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing received message");
}
}
/// <summary>
/// 檢查事件是否已處理(防止重復處理)
/// </summary>
/// <param name="eventId">事件ID</param>
/// <returns>是否已處理</returns>
private bool IsEventProcessed(string eventId)
{
return _processedEvents.ContainsKey(eventId);
}
/// <summary>
/// 標記事件為已處理
/// </summary>
/// <param name="eventId">事件ID</param>
private void MarkEventAsProcessed(string eventId)
{
_processedEvents.TryAdd(eventId, DateTime.UtcNow);
}
/// <summary>
/// 啟動定期清理已處理事件記錄
/// </summary>
/// <param name="cancellationToken">取消令牌</param>
private async Task StartCleanupAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromMinutes(5), cancellationToken);
var cutoffTime = DateTime.UtcNow - _options.DeduplicationWindow;
var keysToRemove = _processedEvents
.Where(kvp => kvp.Value < cutoffTime)
.Select(kvp => kvp.Key)
.ToList();
foreach (var key in keysToRemove)
{
_processedEvents.TryRemove(key, out _);
}
if (keysToRemove.Count > 0)
{
_logger.LogDebug("Cleaned up {Count} processed event records", keysToRemove.Count);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in cleanup task");
}
}
}
/// <summary>
/// 獲取Redis頻道名稱
/// </summary>
/// <returns>頻道名稱</returns>
private string GetChannelName()
{
return $"{_options.ChannelPrefix}:events";
}
/// <summary>
/// 釋放資源
/// </summary>
public void Dispose()
{
if (!_disposed)
{
_cancellationTokenSource?.Cancel();
_eventWriter?.TryComplete();
try
{
Task.WhenAll(
_subscriptionTask ?? Task.CompletedTask,
_processingTask ?? Task.CompletedTask
).Wait(TimeSpan.FromSeconds(5));
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during disposal");
}
_cancellationTokenSource?.Dispose();
_disposed = true;
}
}
}
5.3 緩存同步擴展方法
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
/// <summary>
/// 緩存同步服務擴展方法
/// </summary>
public static class CacheSyncServiceExtensions
{
/// <summary>
/// 添加Redis緩存同步服務
/// </summary>
/// <param name="services">服務集合</param>
/// <param name="setupAction">配置委托</param>
/// <returns>服務集合</returns>
public static IServiceCollection AddRedisCacheSync(
this IServiceCollection services,
Action<CacheSyncOptions> setupAction = null)
{
// 配置選項
if (setupAction != null)
{
services.Configure(setupAction);
}
else
{
services.Configure<CacheSyncOptions>(options =>
{
// 使用默認配置
});
}
// 注冊Redis緩存服務(帶斷路器裝飾器)
services.AddSingleton<RedisDistributedCache>();
services.AddSingleton<IRedisDistributedCache>(provider =>
{
var innerCache = provider.GetRequiredService<RedisDistributedCache>();
var circuitBreaker = provider.GetRequiredService<CacheCircuitBreaker>();
var logger = provider.GetRequiredService<ILogger<CircuitBreakerRedisCache>>();
return new CircuitBreakerRedisCache(innerCache, circuitBreaker, logger);
});
// 注冊緩存同步服務
services.AddSingleton<ICacheSyncService, RedisCacheSyncService>();
services.AddHostedService<RedisCacheSyncService>(provider =>
(RedisCacheSyncService)provider.GetRequiredService<ICacheSyncService>());
return services;
}
}
6. 完整的多級緩存管理器實現
現在我將完成多級緩存管理器的實現,這是整個系統的核心組件:
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Diagnostics;
/// <summary>
/// 多級緩存配置選項
/// </summary>
public class MultiLevelCacheOptions
{
/// <summary>
/// L1緩存(內存緩存)配置
/// </summary>
public AdvancedMemoryCacheOptions L1Options { get; set; } = new();
/// <summary>
/// L2緩存(Redis緩存)配置
/// </summary>
public RedisCacheOptions L2Options { get; set; } = new();
/// <summary>
/// 緩存同步配置
/// </summary>
public CacheSyncOptions SyncOptions { get; set; } = new();
/// <summary>
/// 是否啟用L1緩存
/// </summary>
public bool EnableL1Cache { get; set; } = true;
/// <summary>
/// 是否啟用L2緩存
/// </summary>
public bool EnableL2Cache { get; set; } = true;
/// <summary>
/// 是否啟用緩存同步
/// </summary>
public bool EnableCacheSync { get; set; } = true;
/// <summary>
/// L1緩存與L2緩存的一致性策略
/// </summary>
public CacheConsistencyStrategy ConsistencyStrategy { get; set; } = CacheConsistencyStrategy.EventualConsistency;
/// <summary>
/// L2緩存回寫延遲(用于Write-Behind模式)
/// </summary>
public TimeSpan L2WriteDelay { get; set; } = TimeSpan.FromSeconds(1);
/// <summary>
/// 降級策略:L2緩存不可用時的行為
/// </summary>
public CacheDegradationStrategy DegradationStrategy { get; set; } = CacheDegradationStrategy.L1Only;
/// <summary>
/// 健康檢查間隔
/// </summary>
public TimeSpan HealthCheckInterval { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// 是否啟用性能指標收集
/// </summary>
public bool EnableMetrics { get; set; } = true;
}
/// <summary>
/// 緩存一致性策略
/// </summary>
public enum CacheConsistencyStrategy
{
/// <summary>
/// 強一致性:所有操作同步到所有層級
/// </summary>
StrongConsistency,
/// <summary>
/// 最終一致性:異步同步,容忍短期不一致
/// </summary>
EventualConsistency,
/// <summary>
/// 會話一致性:同一會話內保證一致性
/// </summary>
SessionConsistency
}
/// <summary>
/// 緩存降級策略
/// </summary>
public enum CacheDegradationStrategy
{
/// <summary>
/// 僅使用L1緩存
/// </summary>
L1Only,
/// <summary>
/// 直接訪問數據源
/// </summary>
DirectAccess,
/// <summary>
/// 拋出異常
/// </summary>
ThrowException
}
/// <summary>
/// 緩存操作上下文
/// </summary>
public class CacheOperationContext
{
public string Key { get; set; }
public string SessionId { get; set; }
public bool ForceRefresh { get; set; }
public TimeSpan? CustomExpiry { get; set; }
public CacheLevel TargetLevel { get; set; } = CacheLevel.All;
public Dictionary<string, object> Metadata { get; set; } = new();
}
/// <summary>
/// 緩存級別
/// </summary>
[Flags]
public enum CacheLevel
{
None = 0,
L1 = 1,
L2 = 2,
All = L1 | L2
}
/// <summary>
/// 緩存操作結果
/// </summary>
public class CacheOperationResult<T>
{
public T Value { get; set; }
public bool Success { get; set; }
public CacheLevel HitLevel { get; set; }
public TimeSpan Duration { get; set; }
public string Error { get; set; }
public CacheStatistics Statistics { get; set; }
}
/// <summary>
/// 多級緩存管理器接口
/// </summary>
public interface IMultiLevelCacheManager
{
Task<CacheOperationResult<T>> GetAsync<T>(string key, CacheOperationContext context = null);
Task<CacheOperationResult<T>> GetOrSetAsync<T>(string key, Func<Task<T>> factory, TimeSpan? expiry = null, CacheOperationContext context = null);
Task<CacheOperationResult<bool>> SetAsync<T>(string key, T value, TimeSpan? expiry = null, CacheOperationContext context = null);
Task<CacheOperationResult<bool>> RemoveAsync(string key, CacheOperationContext context = null);
Task<CacheOperationResult<long>> RemoveByPatternAsync(string pattern, CacheOperationContext context = null);
Task<CacheOperationResult<bool>> ExistsAsync(string key, CacheLevel level = CacheLevel.All);
Task<MultiLevelCacheStatistics> GetStatisticsAsync();
Task<bool> IsHealthyAsync();
Task ClearAsync(CacheLevel level = CacheLevel.All);
}
/// <summary>
/// 多級緩存統計信息
/// </summary>
public class MultiLevelCacheStatistics
{
public CacheStatistics L1Statistics { get; set; } = new();
public CacheStatistics L2Statistics { get; set; } = new();
public long TotalOperations { get; set; }
public double OverallHitRatio { get; set; }
public Dictionary<string, object> PerformanceMetrics { get; set; } = new();
public DateTime CollectionTime { get; set; } = DateTime.UtcNow;
}
/// <summary>
/// 多級緩存管理器實現
/// </summary>
public class MultiLevelCacheManager : IMultiLevelCacheManager, IDisposable
{
private readonly IAdvancedMemoryCache _l1Cache;
private readonly IRedisDistributedCache _l2Cache;
private readonly ICacheSyncService _syncService;
private readonly MultiLevelCacheOptions _options;
private readonly ILogger<MultiLevelCacheManager> _logger;
// 性能計數器 - 線程安全的統計記錄
private readonly CacheStatisticsTracker _statisticsTracker = new();
private readonly object _statsLock = new object();
// 健康狀態監控
private volatile bool _l2HealthStatus = true;
private readonly Timer _healthCheckTimer;
// 同步狀態管理 - 使用LRU防止內存泄漏
private readonly LRUCache<string, DateTime> _recentUpdates = new(10000);
// 降級狀態
private volatile bool _isDegraded = false;
private DateTime _degradationStartTime;
// 資源釋放標識
private bool _disposed = false;
public MultiLevelCacheManager(
IAdvancedMemoryCache l1Cache,
IRedisDistributedCache l2Cache,
ICacheSyncService syncService,
IOptions<MultiLevelCacheOptions> options,
ILogger<MultiLevelCacheManager> logger)
{
_l1Cache = l1Cache ?? throw new ArgumentNullException(nameof(l1Cache));
_l2Cache = l2Cache ?? throw new ArgumentNullException(nameof(l2Cache));
_syncService = syncService ?? throw new ArgumentNullException(nameof(syncService));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
// 訂閱緩存同步事件
if (_options.EnableCacheSync)
{
_ = _syncService.SubscribeAsync(OnCacheSyncEventReceived);
}
// 啟動健康檢查定時器
_healthCheckTimer = new Timer(PerformHealthCheck, null,
TimeSpan.Zero, _options.HealthCheckInterval);
_logger.LogInformation("MultiLevel cache manager initialized with L1: {L1Enabled}, L2: {L2Enabled}, Sync: {SyncEnabled}",
_options.EnableL1Cache, _options.EnableL2Cache, _options.EnableCacheSync);
}
/// <summary>
/// 異步獲取緩存項
/// </summary>
public async Task<CacheOperationResult<T>> GetAsync<T>(string key, CacheOperationContext context = null)
{
var stopwatch = Stopwatch.StartNew();
context ??= new CacheOperationContext { Key = key };
_statisticsTracker.RecordOperation();
try
{
// L1緩存查找
if (_options.EnableL1Cache && (context.TargetLevel & CacheLevel.L1) != 0)
{
var l1Result = await _l1Cache.GetAsync<T>(key);
if (l1Result != null)
{
_statisticsTracker.RecordHit(CacheLevel.L1);
_logger.LogDebug("L1 cache hit for key: {Key}", key);
return new CacheOperationResult<T>
{
Value = l1Result,
Success = true,
HitLevel = CacheLevel.L1,
Duration = stopwatch.Elapsed
};
}
}
// L2緩存查找
if (_options.EnableL2Cache && (context.TargetLevel & CacheLevel.L2) != 0 && _l2HealthStatus)
{
var l2Result = await _l2Cache.GetAsync<T>(key);
if (l2Result != null)
{
_statisticsTracker.RecordHit(CacheLevel.L2);
_logger.LogDebug("L2 cache hit for key: {Key}", key);
// 將L2結果提升到L1緩存
if (_options.EnableL1Cache)
{
_ = Task.Run(async () =>
{
try
{
await _l1Cache.SetAsync(key, l2Result, context.CustomExpiry);
_logger.LogTrace("Promoted key to L1 cache: {Key}", key);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to promote key to L1 cache: {Key}", key);
}
});
}
return new CacheOperationResult<T>
{
Value = l2Result,
Success = true,
HitLevel = CacheLevel.L2,
Duration = stopwatch.Elapsed
};
}
}
// 緩存完全未命中
_statisticsTracker.RecordMiss();
_logger.LogDebug("Cache miss for key: {Key}", key);
return new CacheOperationResult<T>
{
Success = false,
HitLevel = CacheLevel.None,
Duration = stopwatch.Elapsed
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during cache get operation for key: {Key}", key);
return new CacheOperationResult<T>
{
Success = false,
Error = ex.Message,
Duration = stopwatch.Elapsed
};
}
}
/// <summary>
/// 獲取或設置緩存項(Cache-Aside模式)
/// </summary>
public async Task<CacheOperationResult<T>> GetOrSetAsync<T>(
string key,
Func<Task<T>> factory,
TimeSpan? expiry = null,
CacheOperationContext context = null)
{
if (factory == null)
throw new ArgumentNullException(nameof(factory));
var stopwatch = Stopwatch.StartNew();
context ??= new CacheOperationContext { Key = key };
try
{
// 如果不強制刷新,先嘗試獲取緩存
if (!context.ForceRefresh)
{
var getCacheResult = await GetAsync<T>(key, context);
if (getCacheResult.Success)
{
return getCacheResult;
}
}
// 使用分布式鎖防止緩存擊穿
var lockKey = $"{key}:getorset_lock";
var lockAcquired = false;
if (_options.EnableL2Cache && _l2HealthStatus)
{
try
{
lockAcquired = await _l2Cache.SetIfNotExistsAsync(lockKey, "locked", TimeSpan.FromMinutes(1));
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to acquire distributed lock for key: {Key}", key);
}
}
if (lockAcquired || !_options.EnableL2Cache || !_l2HealthStatus)
{
try
{
// 再次檢查緩存(雙重檢查鎖定)
if (!context.ForceRefresh)
{
var doubleCheckResult = await GetAsync<T>(key, context);
if (doubleCheckResult.Success)
{
return doubleCheckResult;
}
}
// 執行工廠方法
_logger.LogDebug("Executing factory method for key: {Key}", key);
var value = await factory();
// 設置到所有緩存層級
var setResult = await SetAsync(key, value, expiry, context);
return new CacheOperationResult<T>
{
Value = value,
Success = setResult.Success,
HitLevel = CacheLevel.None, // 表示從數據源獲取
Duration = stopwatch.Elapsed
};
}
finally
{
// 釋放分布式鎖
if (lockAcquired)
{
try
{
await _l2Cache.RemoveAsync(lockKey);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to release distributed lock for key: {Key}", key);
}
}
}
}
else
{
// 等待鎖釋放并重試獲取緩存
await Task.Delay(Random.Shared.Next(50, 200)); // 隨機退避
var retryResult = await GetAsync<T>(key, context);
if (retryResult.Success)
{
return retryResult;
}
// 降級:直接執行工廠方法
_logger.LogWarning("Failed to acquire lock and cache miss for key: {Key}, executing factory method", key);
var fallbackValue = await factory();
// 嘗試異步設置緩存
_ = Task.Run(async () =>
{
try
{
await SetAsync(key, fallbackValue, expiry, context);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to set cache in fallback scenario for key: {Key}", key);
}
});
return new CacheOperationResult<T>
{
Value = fallbackValue,
Success = true,
HitLevel = CacheLevel.None,
Duration = stopwatch.Elapsed
};
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in GetOrSetAsync for key: {Key}", key);
// 最終降級:直接執行工廠方法
try
{
var fallbackValue = await factory();
return new CacheOperationResult<T>
{
Value = fallbackValue,
Success = true,
HitLevel = CacheLevel.None,
Duration = stopwatch.Elapsed,
Error = $"Cache operation failed, used fallback: {ex.Message}"
};
}
catch (Exception factoryEx)
{
_logger.LogError(factoryEx, "Factory method also failed for key: {Key}", key);
return new CacheOperationResult<T>
{
Success = false,
Error = $"Both cache and factory failed: {ex.Message}, {factoryEx.Message}",
Duration = stopwatch.Elapsed
};
}
}
}
/// <summary>
/// 異步設置緩存項
/// </summary>
public async Task<CacheOperationResult<bool>> SetAsync<T>(
string key,
T value,
TimeSpan? expiry = null,
CacheOperationContext context = null)
{
var stopwatch = Stopwatch.StartNew();
context ??= new CacheOperationContext { Key = key };
var results = new List<bool>();
var errors = new List<string>();
try
{
// 記錄更新時間(用于同步控制)
_recentUpdates.Add(key, DateTime.UtcNow);
// 根據一致性策略決定同步還是異步設置
if (_options.ConsistencyStrategy == CacheConsistencyStrategy.StrongConsistency)
{
// 強一致性:同步設置所有層級
await SetAllLevelsAsync();
}
else
{
// 最終一致性:異步設置非關鍵層級
await SetCriticalLevelAsync();
_ = Task.Run(SetNonCriticalLevelsAsync);
}
// 發送同步事件
if (_options.EnableCacheSync)
{
var syncEvent = new CacheSyncEvent
{
EventType = CacheSyncEventType.Set,
Key = key,
Timestamp = DateTime.UtcNow
};
_ = Task.Run(async () =>
{
try
{
await _syncService.PublishAsync(syncEvent);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to publish sync event for key: {Key}", key);
}
});
}
var success = results.Count > 0 && results.Any(r => r);
return new CacheOperationResult<bool>
{
Value = success,
Success = success,
Duration = stopwatch.Elapsed,
Error = errors.Count > 0 ? string.Join("; ", errors) : null
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error setting cache for key: {Key}", key);
return new CacheOperationResult<bool>
{
Success = false,
Error = ex.Message,
Duration = stopwatch.Elapsed
};
}
// 本地方法:設置所有層級(同步)
async Task SetAllLevelsAsync()
{
var tasks = new List<Task<bool>>();
if (_options.EnableL1Cache && (context.TargetLevel & CacheLevel.L1) != 0)
{
tasks.Add(SetL1CacheAsync());
}
if (_options.EnableL2Cache && (context.TargetLevel & CacheLevel.L2) != 0 && _l2HealthStatus)
{
tasks.Add(SetL2CacheAsync());
}
var taskResults = await Task.WhenAll(tasks);
results.AddRange(taskResults);
}
// 本地方法:設置關鍵層級(通常是L1)
async Task SetCriticalLevelAsync()
{
if (_options.EnableL1Cache && (context.TargetLevel & CacheLevel.L1) != 0)
{
var result = await SetL1CacheAsync();
results.Add(result);
}
}
// 本地方法:異步設置非關鍵層級(通常是L2)
async Task SetNonCriticalLevelsAsync()
{
if (_options.EnableL2Cache && (context.TargetLevel & CacheLevel.L2) != 0 && _l2HealthStatus)
{
try
{
await Task.Delay(_options.L2WriteDelay); // 可選的寫延遲
await SetL2CacheAsync();
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to set L2 cache for key: {Key}", key);
}
}
}
// 本地方法:設置L1緩存
async Task<bool> SetL1CacheAsync()
{
try
{
await _l1Cache.SetAsync(key, value, expiry ?? context.CustomExpiry);
_logger.LogTrace("Set L1 cache for key: {Key}", key);
return true;
}
catch (Exception ex)
{
var error = $"L1 set failed: {ex.Message}";
errors.Add(error);
_logger.LogWarning(ex, "Failed to set L1 cache for key: {Key}", key);
return false;
}
}
// 本地方法:設置L2緩存
async Task<bool> SetL2CacheAsync()
{
try
{
await _l2Cache.SetAsync(key, value, expiry ?? context.CustomExpiry);
_logger.LogTrace("Set L2 cache for key: {Key}", key);
return true;
}
catch (Exception ex)
{
var error = $"L2 set failed: {ex.Message}";
errors.Add(error);
_logger.LogWarning(ex, "Failed to set L2 cache for key: {Key}", key);
// L2緩存失敗時標記不健康
_l2HealthStatus = false;
return false;
}
}
}
/// <summary>
/// 異步移除緩存項
/// </summary>
public async Task<CacheOperationResult<bool>> RemoveAsync(string key, CacheOperationContext context = null)
{
var stopwatch = Stopwatch.StartNew();
context ??= new CacheOperationContext { Key = key };
var results = new List<bool>();
var errors = new List<string>();
try
{
// 并行移除所有層級
var tasks = new List<Task<bool>>();
if (_options.EnableL1Cache && (context.TargetLevel & CacheLevel.L1) != 0)
{
tasks.Add(RemoveL1CacheAsync());
}
if (_options.EnableL2Cache && (context.TargetLevel & CacheLevel.L2) != 0 && _l2HealthStatus)
{
tasks.Add(RemoveL2CacheAsync());
}
if (tasks.Count > 0)
{
var taskResults = await Task.WhenAll(tasks);
results.AddRange(taskResults);
}
// 發送同步事件
if (_options.EnableCacheSync)
{
var syncEvent = new CacheSyncEvent
{
EventType = CacheSyncEventType.Remove,
Key = key,
Timestamp = DateTime.UtcNow
};
_ = Task.Run(async () =>
{
try
{
await _syncService.PublishAsync(syncEvent);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to publish sync event for key: {Key}", key);
}
});
}
// 清理更新記錄
_recentUpdates.Remove(key);
var success = results.Count > 0 && results.Any(r => r);
return new CacheOperationResult<bool>
{
Value = success,
Success = true, // 移除操作總是被認為是成功的
Duration = stopwatch.Elapsed,
Error = errors.Count > 0 ? string.Join("; ", errors) : null
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing cache for key: {Key}", key);
return new CacheOperationResult<bool>
{
Success = false,
Error = ex.Message,
Duration = stopwatch.Elapsed
};
}
// 本地方法:移除L1緩存
async Task<bool> RemoveL1CacheAsync()
{
try
{
await _l1Cache.RemoveAsync(key);
_logger.LogTrace("Removed L1 cache for key: {Key}", key);
return true;
}
catch (Exception ex)
{
var error = $"L1 remove failed: {ex.Message}";
errors.Add(error);
_logger.LogWarning(ex, "Failed to remove L1 cache for key: {Key}", key);
return false;
}
}
// 本地方法:移除L2緩存
async Task<bool> RemoveL2CacheAsync()
{
try
{
var result = await _l2Cache.RemoveAsync(key);
_logger.LogTrace("Removed L2 cache for key: {Key}, success: {Success}", key, result);
return result;
}
catch (Exception ex)
{
var error = $"L2 remove failed: {ex.Message}";
errors.Add(error);
_logger.LogWarning(ex, "Failed to remove L2 cache for key: {Key}", key);
return false;
}
}
}
/// <summary>
/// 根據模式批量移除緩存項
/// </summary>
public async Task<CacheOperationResult<long>> RemoveByPatternAsync(string pattern, CacheOperationContext context = null)
{
var stopwatch = Stopwatch.StartNew();
context ??= new CacheOperationContext();
long totalRemoved = 0;
var errors = new List<string>();
try
{
// L1緩存模式刪除
if (_options.EnableL1Cache && (context.TargetLevel & CacheLevel.L1) != 0)
{
try
{
await _l1Cache.RemoveByPatternAsync(pattern);
_logger.LogDebug("Removed L1 cache entries by pattern: {Pattern}", pattern);
}
catch (Exception ex)
{
errors.Add($"L1 pattern remove failed: {ex.Message}");
_logger.LogWarning(ex, "Failed to remove L1 cache by pattern: {Pattern}", pattern);
}
}
// L2緩存模式刪除
if (_options.EnableL2Cache && (context.TargetLevel & CacheLevel.L2) != 0 && _l2HealthStatus)
{
try
{
var removedCount = await _l2Cache.RemoveByPatternAsync(pattern);
totalRemoved += removedCount;
_logger.LogDebug("Removed {Count} L2 cache entries by pattern: {Pattern}", removedCount, pattern);
}
catch (Exception ex)
{
errors.Add($"L2 pattern remove failed: {ex.Message}");
_logger.LogWarning(ex, "Failed to remove L2 cache by pattern: {Pattern}", pattern);
}
}
// 發送同步事件
if (_options.EnableCacheSync)
{
var syncEvent = new CacheSyncEvent
{
EventType = CacheSyncEventType.RemovePattern,
Pattern = pattern,
Timestamp = DateTime.UtcNow
};
_ = Task.Run(async () =>
{
try
{
await _syncService.PublishAsync(syncEvent);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to publish sync event for pattern: {Pattern}", pattern);
}
});
}
return new CacheOperationResult<long>
{
Value = totalRemoved,
Success = true,
Duration = stopwatch.Elapsed,
Error = errors.Count > 0 ? string.Join("; ", errors) : null
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error removing cache by pattern: {Pattern}", pattern);
return new CacheOperationResult<long>
{
Success = false,
Error = ex.Message,
Duration = stopwatch.Elapsed
};
}
}
/// <summary>
/// 檢查緩存項是否存在
/// </summary>
public async Task<CacheOperationResult<bool>> ExistsAsync(string key, CacheLevel level = CacheLevel.All)
{
var stopwatch = Stopwatch.StartNew();
try
{
// 檢查L1緩存
if (_options.EnableL1Cache && (level & CacheLevel.L1) != 0)
{
var l1Result = await _l1Cache.GetAsync<object>(key);
if (l1Result != null)
{
return new CacheOperationResult<bool>
{
Value = true,
Success = true,
HitLevel = CacheLevel.L1,
Duration = stopwatch.Elapsed
};
}
}
// 檢查L2緩存
if (_options.EnableL2Cache && (level & CacheLevel.L2) != 0 && _l2HealthStatus)
{
var l2Exists = await _l2Cache.ExistsAsync(key);
if (l2Exists)
{
return new CacheOperationResult<bool>
{
Value = true,
Success = true,
HitLevel = CacheLevel.L2,
Duration = stopwatch.Elapsed
};
}
}
return new CacheOperationResult<bool>
{
Value = false,
Success = true,
HitLevel = CacheLevel.None,
Duration = stopwatch.Elapsed
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking cache existence for key: {Key}", key);
return new CacheOperationResult<bool>
{
Success = false,
Error = ex.Message,
Duration = stopwatch.Elapsed
};
}
}
/// <summary>
/// 獲取統計信息
/// </summary>
public async Task<MultiLevelCacheStatistics> GetStatisticsAsync()
{
try
{
var l1Stats = _l1Cache.GetStatistics();
var l2Stats = new CacheStatistics(); // Redis緩存統計需要自定義實現
var totalOperations = Interlocked.Read(ref _totalOperations);
var totalHits = Interlocked.Read(ref _l1Hits) + Interlocked.Read(ref _l2Hits);
var totalMisses = Interlocked.Read(ref _totalMisses);
return new MultiLevelCacheStatistics
{
L1Statistics = l1Stats,
L2Statistics = l2Stats,
TotalOperations = totalOperations,
OverallHitRatio = totalOperations == 0 ? 0 : (double)totalHits / totalOperations,
PerformanceMetrics = new Dictionary<string, object>
{
["L1HitRatio"] = stats.L1HitRatio,
["L2HitRatio"] = stats.L2HitRatio,
["L2HealthStatus"] = _l2HealthStatus,
["IsDegraded"] = _isDegraded,
["DegradationDuration"] = _isDegraded ? DateTime.UtcNow - _degradationStartTime : TimeSpan.Zero
}
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error getting cache statistics");
return new MultiLevelCacheStatistics();
}
}
/// <summary>
/// 檢查緩存服務健康狀態
/// </summary>
public async Task<bool> IsHealthyAsync()
{
try
{
var l1Healthy = true; // 內存緩存通常總是健康的
var l2Healthy = true;
var syncHealthy = true;
// 檢查L2緩存健康狀態
if (_options.EnableL2Cache)
{
try
{
// 簡單的ping測試
await _l2Cache.SetAsync("health_check", "ok", TimeSpan.FromSeconds(10));
l2Healthy = await _l2Cache.ExistsAsync("health_check");
await _l2Cache.RemoveAsync("health_check");
}
catch
{
l2Healthy = false;
}
}
// 檢查同步服務健康狀態
if (_options.EnableCacheSync)
{
syncHealthy = _syncService.IsHealthy;
}
var overallHealthy = l1Healthy && (!_options.EnableL2Cache || l2Healthy) && (!_options.EnableCacheSync || syncHealthy);
// 更新降級狀態
if (!overallHealthy && !_isDegraded)
{
_isDegraded = true;
_degradationStartTime = DateTime.UtcNow;
_logger.LogWarning("Cache service entered degraded mode");
}
else if (overallHealthy && _isDegraded)
{
_isDegraded = false;
_logger.LogInformation("Cache service recovered from degraded mode after {Duration}",
DateTime.UtcNow - _degradationStartTime);
}
return overallHealthy;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking cache health");
return false;
}
}
/// <summary>
/// 清空緩存
/// </summary>
public async Task ClearAsync(CacheLevel level = CacheLevel.All)
{
try
{
var tasks = new List<Task>();
// 清空L1緩存(通過模式刪除)
if (_options.EnableL1Cache && (level & CacheLevel.L1) != 0)
{
tasks.Add(Task.Run(async () =>
{
try
{
await _l1Cache.RemoveByPatternAsync("*");
_logger.LogInformation("Cleared L1 cache");
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to clear L1 cache");
}
}));
}
// 清空L2緩存
if (_options.EnableL2Cache && (level & CacheLevel.L2) != 0 && _l2HealthStatus)
{
tasks.Add(Task.Run(async () =>
{
try
{
await _l2Cache.RemoveByPatternAsync("*");
_logger.LogInformation("Cleared L2 cache");
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to clear L2 cache");
}
}));
}
await Task.WhenAll(tasks);
// 發送清空同步事件
if (_options.EnableCacheSync)
{
var syncEvent = new CacheSyncEvent
{
EventType = CacheSyncEventType.Clear,
Timestamp = DateTime.UtcNow
};
_ = Task.Run(async () =>
{
try
{
await _syncService.PublishAsync(syncEvent);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to publish clear sync event");
}
});
}
// 重置統計計數器
_statisticsTracker.Reset();
// 清空更新記錄
_recentUpdates.Clear();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error clearing cache");
throw;
}
}
#region 私有方法
/// <summary>
/// 處理接收到的緩存同步事件
/// </summary>
/// <param name="syncEvent">同步事件</param>
private async Task OnCacheSyncEventReceived(CacheSyncEvent syncEvent)
{
try
{
_logger.LogDebug("Received sync event: {EventType} for key: {Key}", syncEvent.EventType, syncEvent.Key);
// 檢查是否為最近的本地更新,避免循環同步
if (!string.IsNullOrEmpty(syncEvent.Key) &&
_recentUpdates.TryGet(syncEvent.Key, out var updateTime) &&
(DateTime.UtcNow - updateTime).TotalSeconds < 5)
{
_logger.LogTrace("Skipping sync for recent local update: {Key}", syncEvent.Key);
return;
}
switch (syncEvent.EventType)
{
case CacheSyncEventType.Remove:
await _l1Cache.RemoveAsync(syncEvent.Key);
break;
case CacheSyncEventType.RemovePattern:
await _l1Cache.RemoveByPatternAsync(syncEvent.Pattern);
break;
case CacheSyncEventType.Clear:
await _l1Cache.RemoveByPatternAsync("*");
break;
case CacheSyncEventType.Expire:
case CacheSyncEventType.Set:
// 對于設置操作,直接刪除L1緩存項,讓下次訪問時從L2緩存重新加載
await _l1Cache.RemoveAsync(syncEvent.Key);
break;
}
_logger.LogTrace("Processed sync event: {EventType} for key: {Key}", syncEvent.EventType, syncEvent.Key);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing sync event: {EventType} for key: {Key}",
syncEvent.EventType, syncEvent.Key);
}
}
/// <summary>
/// 執行定期健康檢查
/// </summary>
/// <param name="state">定時器狀態</param>
private async void PerformHealthCheck(object state)
{
try
{
var previousL2Status = _l2HealthStatus;
// 更新L2緩存健康狀態
if (_options.EnableL2Cache)
{
try
{
var testKey = $"health_check_{Guid.NewGuid():N}";
await _l2Cache.SetAsync(testKey, "test", TimeSpan.FromSeconds(5));
_l2HealthStatus = await _l2Cache.ExistsAsync(testKey);
await _l2Cache.RemoveAsync(testKey);
}
catch (Exception ex)
{
_l2HealthStatus = false;
_logger.LogWarning(ex, "L2 cache health check failed");
}
}
// 記錄狀態變化
if (previousL2Status != _l2HealthStatus)
{
if (_l2HealthStatus)
{
_logger.LogInformation("L2 cache health recovered");
}
else
{
_logger.LogWarning("L2 cache health degraded");
}
}
// 清理過期的更新記錄
var cutoffTime = DateTime.UtcNow.AddMinutes(-5);
var expiredKeys = _recentUpdates
.Where(kvp => kvp.Value < cutoffTime)
.Select(kvp => kvp.Key)
.ToList();
// LRU緩存已經處理了過期清理,這里不再需要手動操作
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in health check");
}
}
#endregion
/// <summary>
/// 釋放資源
/// </summary>
public void Dispose()
{
if (!_disposed)
{
_healthCheckTimer?.Dispose();
_syncService?.Dispose();
_disposed = true;
}
}
}
7. 最佳實踐和性能優化
7.1 緩存設計最佳實踐
7.1.1 緩存鍵設計原則
分層命名規范:
// 推薦的鍵命名模式
public static class CacheKeyPatterns
{
// 基礎模式:{應用名}:{業務域}:{實體}:{標識符}
public const string UserProfile = "myapp:user:profile:{0}";
public const string UserPermissions = "myapp:user:permissions:{0}";
public const string ProductList = "myapp:product:list:page:{0}:size:{1}";
// 會話相關:{應用名}:{業務域}:session:{會話ID}:{實體}
public const string UserSession = "myapp:user:session:{0}:settings";
// 臨時數據:{應用名}:temp:{業務場景}:{標識符}
public const string TempData = "myapp:temp:upload:{0}";
// 配置數據:{應用名}:config:{配置類型}
public const string SystemConfig = "myapp:config:system";
// 統計數據:{應用名}:stats:{時間維度}:{標識符}
public const string DailyStats = "myapp:stats:daily:{0:yyyyMMdd}";
}
/// <summary>
/// 緩存鍵構建器
/// </summary>
public class CacheKeyBuilder
{
private readonly string _applicationName;
private readonly List<string> _segments;
public CacheKeyBuilder(string applicationName)
{
_applicationName = applicationName ?? throw new ArgumentNullException(nameof(applicationName));
_segments = new List<string> { _applicationName };
}
public CacheKeyBuilder Domain(string domain)
{
_segments.Add(domain);
return this;
}
public CacheKeyBuilder Entity(string entity)
{
_segments.Add(entity);
return this;
}
public CacheKeyBuilder Id(object id)
{
_segments.Add(id?.ToString() ?? "null");
return this;
}
public CacheKeyBuilder Attribute(string attribute)
{
_segments.Add(attribute);
return this;
}
public CacheKeyBuilder Session(string sessionId)
{
_segments.Add("session");
_segments.Add(sessionId);
return this;
}
public CacheKeyBuilder WithParameters(params object[] parameters)
{
foreach (var param in parameters)
{
_segments.Add(param?.ToString() ?? "null");
}
return this;
}
public string Build()
{
return string.Join(":", _segments);
}
public override string ToString() => Build();
}
7.1.2 過期策略優化
智能過期時間計算:
/// <summary>
/// 智能過期策略
/// </summary>
public class SmartExpirationStrategy
{
private readonly ILogger<SmartExpirationStrategy> _logger;
private readonly Random _random = new();
public SmartExpirationStrategy(ILogger<SmartExpirationStrategy> logger)
{
_logger = logger;
}
/// <summary>
/// 根據數據類型和訪問模式計算過期時間
/// </summary>
/// <param name="dataType">數據類型</param>
/// <param name="accessFrequency">訪問頻率</param>
/// <param name="dataVolatility">數據變化頻率</param>
/// <param name="businessCritical">是否業務關鍵</param>
/// <returns>推薦的過期時間</returns>
public TimeSpan CalculateExpiry(
CacheDataType dataType,
AccessFrequency accessFrequency,
DataVolatility dataVolatility,
bool businessCritical = false)
{
// 基礎過期時間
var baseExpiry = dataType switch
{
CacheDataType.UserProfile => TimeSpan.FromHours(4),
CacheDataType.SystemConfiguration => TimeSpan.FromHours(12),
CacheDataType.ProductCatalog => TimeSpan.FromHours(2),
CacheDataType.UserPermissions => TimeSpan.FromHours(1),
CacheDataType.SessionData => TimeSpan.FromMinutes(30),
CacheDataType.TemporaryData => TimeSpan.FromMinutes(5),
CacheDataType.StatisticsData => TimeSpan.FromMinutes(15),
_ => TimeSpan.FromHours(1)
};
// 根據訪問頻率調整
var frequencyMultiplier = accessFrequency switch
{
AccessFrequency.VeryHigh => 2.0,
AccessFrequency.High => 1.5,
AccessFrequency.Medium => 1.0,
AccessFrequency.Low => 0.7,
AccessFrequency.VeryLow => 0.5,
_ => 1.0
};
// 根據數據變化頻率調整
var volatilityMultiplier = dataVolatility switch
{
DataVolatility.VeryHigh => 0.3,
DataVolatility.High => 0.5,
DataVolatility.Medium => 0.8,
DataVolatility.Low => 1.2,
DataVolatility.VeryLow => 1.5,
_ => 1.0
};
// 業務關鍵數據縮短過期時間以確保一致性
var criticalMultiplier = businessCritical ? 0.8 : 1.0;
// 計算最終過期時間
var finalExpiry = TimeSpan.FromMilliseconds(
baseExpiry.TotalMilliseconds *
frequencyMultiplier *
volatilityMultiplier *
criticalMultiplier);
// 添加隨機偏移防止緩存雪崩(±10%)
var jitterPercentage = _random.NextDouble() * 0.2 - 0.1; // -10% to +10%
finalExpiry = TimeSpan.FromMilliseconds(
finalExpiry.TotalMilliseconds * (1 + jitterPercentage));
// 確保最小和最大邊界
var minExpiry = TimeSpan.FromMinutes(1);
var maxExpiry = TimeSpan.FromDays(1);
if (finalExpiry < minExpiry) finalExpiry = minExpiry;
if (finalExpiry > maxExpiry) finalExpiry = maxExpiry;
_logger.LogDebug("Calculated expiry for {DataType}: {Expiry} " +
"(base: {BaseExpiry}, freq: {FreqMultiplier:F1}x, vol: {VolMultiplier:F1}x, critical: {CriticalMultiplier:F1}x)",
dataType, finalExpiry, baseExpiry, frequencyMultiplier, volatilityMultiplier, criticalMultiplier);
return finalExpiry;
}
}
public enum CacheDataType
{
UserProfile,
SystemConfiguration,
ProductCatalog,
UserPermissions,
SessionData,
TemporaryData,
StatisticsData
}
public enum AccessFrequency
{
VeryLow,
Low,
Medium,
High,
VeryHigh
}
public enum DataVolatility
{
VeryLow, // 幾乎不變化,如系統配置
Low, // 很少變化,如用戶檔案
Medium, // 定期變化,如產品信息
High, // 頻繁變化,如庫存數據
VeryHigh // 實時變化,如在線用戶狀態
}
7.1.3 緩存預熱策略
/// <summary>
/// 緩存預熱服務
/// </summary>
public interface ICacheWarmupService
{
Task WarmupAsync(CancellationToken cancellationToken = default);
Task WarmupSpecificDataAsync(string dataType, CancellationToken cancellationToken = default);
}
/// <summary>
/// 緩存預熱服務實現
/// </summary>
public class CacheWarmupService : ICacheWarmupService
{
private readonly IMultiLevelCacheManager _cacheManager;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<CacheWarmupService> _logger;
private readonly CacheWarmupOptions _options;
public CacheWarmupService(
IMultiLevelCacheManager cacheManager,
IServiceProvider serviceProvider,
ILogger<CacheWarmupService> logger,
IOptions<CacheWarmupOptions> options)
{
_cacheManager = cacheManager;
_serviceProvider = serviceProvider;
_logger = logger;
_options = options.Value;
}
/// <summary>
/// 執行完整的緩存預熱
/// </summary>
public async Task WarmupAsync(CancellationToken cancellationToken = default)
{
_logger.LogInformation("Starting cache warmup process");
var stopwatch = Stopwatch.StartNew();
try
{
var warmupTasks = new List<Task>
{
WarmupSystemConfigurationAsync(cancellationToken),
WarmupHotUserDataAsync(cancellationToken),
WarmupProductCatalogAsync(cancellationToken),
WarmupFrequentlyAccessedDataAsync(cancellationToken)
};
await Task.WhenAll(warmupTasks);
_logger.LogInformation("Cache warmup completed in {Duration:F2}s", stopwatch.Elapsed.TotalSeconds);
}
catch (Exception ex)
{
_logger.LogError(ex, "Cache warmup failed after {Duration:F2}s", stopwatch.Elapsed.TotalSeconds);
throw;
}
}
/// <summary>
/// 預熱系統配置數據
/// </summary>
private async Task WarmupSystemConfigurationAsync(CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Warming up system configuration");
using var scope = _serviceProvider.CreateScope();
var configService = scope.ServiceProvider.GetRequiredService<IConfigurationService>();
var configKeys = new[]
{
"app_settings",
"feature_flags",
"business_rules",
"system_parameters"
};
var tasks = configKeys.Select(async key =>
{
var cacheKey = new CacheKeyBuilder("myapp")
.Domain("config")
.Entity(key)
.Build();
await _cacheManager.GetOrSetAsync(
cacheKey,
async () => await configService.GetConfigurationAsync(key),
TimeSpan.FromHours(12));
});
await Task.WhenAll(tasks);
_logger.LogDebug("System configuration warmup completed");
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to warmup system configuration");
}
}
/// <summary>
/// 預熱熱點用戶數據
/// </summary>
private async Task WarmupHotUserDataAsync(CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Warming up hot user data");
using var scope = _serviceProvider.CreateScope();
var userService = scope.ServiceProvider.GetRequiredService<IUserService>();
var analyticsService = scope.ServiceProvider.GetRequiredService<IAnalyticsService>();
// 獲取最近活躍用戶列表
var activeUserIds = await analyticsService.GetRecentlyActiveUsersAsync(
TimeSpan.FromDays(7),
_options.TopUsersToWarmup);
var semaphore = new SemaphoreSlim(_options.MaxConcurrency);
var tasks = activeUserIds.Select(async userId =>
{
await semaphore.WaitAsync(cancellationToken);
try
{
// 預熱用戶基本信息
var userCacheKey = new CacheKeyBuilder("myapp")
.Domain("user")
.Entity("profile")
.Id(userId)
.Build();
await _cacheManager.GetOrSetAsync(
userCacheKey,
async () => await userService.GetUserByIdAsync(userId),
TimeSpan.FromHours(4));
// 預熱用戶權限
var permissionsCacheKey = new CacheKeyBuilder("myapp")
.Domain("user")
.Entity("permissions")
.Id(userId)
.Build();
await _cacheManager.GetOrSetAsync(
permissionsCacheKey,
async () => await userService.GetUserPermissionsAsync(userId),
TimeSpan.FromHours(2));
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(tasks);
_logger.LogDebug("Hot user data warmup completed for {Count} users", activeUserIds.Count);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to warmup hot user data");
}
}
/// <summary>
/// 預熱產品目錄數據
/// </summary>
private async Task WarmupProductCatalogAsync(CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Warming up product catalog");
using var scope = _serviceProvider.CreateScope();
var productService = scope.ServiceProvider.GetRequiredService<IProductService>();
// 預熱熱門產品分類
var popularCategories = await productService.GetPopularCategoriesAsync(_options.TopCategoriesToWarmup);
var tasks = popularCategories.Select(async category =>
{
var cacheKey = new CacheKeyBuilder("myapp")
.Domain("product")
.Entity("category")
.Id(category.Id)
.Build();
await _cacheManager.GetOrSetAsync(
cacheKey,
async () => await productService.GetCategoryProductsAsync(category.Id, 1, 20),
TimeSpan.FromHours(2));
});
await Task.WhenAll(tasks);
_logger.LogDebug("Product catalog warmup completed for {Count} categories", popularCategories.Count);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to warmup product catalog");
}
}
/// <summary>
/// 預熱頻繁訪問的數據
/// </summary>
private async Task WarmupFrequentlyAccessedDataAsync(CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Warming up frequently accessed data");
// 這里可以根據實際的訪問日志或分析數據來確定需要預熱的內容
// 示例:預熱首頁數據、熱門搜索結果等
var commonQueries = new[]
{
("homepage_banner", TimeSpan.FromHours(6)),
("popular_products", TimeSpan.FromHours(1)),
("trending_categories", TimeSpan.FromMinutes(30)),
("system_announcements", TimeSpan.FromHours(4))
};
using var scope = _serviceProvider.CreateScope();
var contentService = scope.ServiceProvider.GetRequiredService<IContentService>();
var tasks = commonQueries.Select(async query =>
{
var (queryType, expiry) = query;
var cacheKey = new CacheKeyBuilder("myapp")
.Domain("content")
.Entity(queryType)
.Build();
await _cacheManager.GetOrSetAsync(
cacheKey,
async () => await contentService.GetContentAsync(queryType),
expiry);
});
await Task.WhenAll(tasks);
_logger.LogDebug("Frequently accessed data warmup completed");
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to warmup frequently accessed data");
}
}
/// <summary>
/// 預熱特定類型的數據
/// </summary>
public async Task WarmupSpecificDataAsync(string dataType, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Starting specific cache warmup for data type: {DataType}", dataType);
try
{
switch (dataType.ToLowerInvariant())
{
case "config":
case "configuration":
await WarmupSystemConfigurationAsync(cancellationToken);
break;
case "user":
case "users":
await WarmupHotUserDataAsync(cancellationToken);
break;
case "product":
case "products":
await WarmupProductCatalogAsync(cancellationToken);
break;
case "content":
await WarmupFrequentlyAccessedDataAsync(cancellationToken);
break;
default:
_logger.LogWarning("Unknown data type for warmup: {DataType}", dataType);
break;
}
_logger.LogInformation("Specific cache warmup completed for data type: {DataType}", dataType);
}
catch (Exception ex)
{
_logger.LogError(ex, "Specific cache warmup failed for data type: {DataType}", dataType);
throw;
}
}
}
/// <summary>
/// 緩存預熱配置選項
/// </summary>
public class CacheWarmupOptions
{
public int TopUsersToWarmup { get; set; } = 1000;
public int TopCategoriesToWarmup { get; set; } = 50;
public int MaxConcurrency { get; set; } = Environment.ProcessorCount * 2;
public bool EnableScheduledWarmup { get; set; } = true;
public TimeSpan WarmupInterval { get; set; } = TimeSpan.FromHours(6);
public List<string> WarmupDataTypes { get; set; } = new() { "config", "users", "products", "content" };
}
7.4 安全性和可靠性增強
基于深度技術分析的結果,我們對原有架構進行了重要的安全性和可靠性改進:
7.4.1 增強的異常處理機制
我們引入了分層的異常處理體系,將不同類型的緩存異常進行分類處理:
// 細分異常類型,提供更精確的錯誤處理
public class CacheConnectionException : CacheException { }
public class CacheSerializationException : CacheException { }
public class CacheTimeoutException : CacheException { }
public class CacheValidationException : CacheException { }
// 在緩存操作中使用分層異常處理
try
{
var result = await factory();
return result;
}
catch (CacheConnectionException ex)
{
_logger.LogWarning(ex, "Cache connection failed, using fallback");
return await factory(); // 優雅降級
}
catch (CacheSerializationException ex)
{
_logger.LogError(ex, "Serialization failed");
throw; // 序列化錯誤需要立即處理
}
7.4.2 線程安全的統計系統
原有的統計計數器存在線程安全問題,我們引入了專門的統計追蹤器:
public class CacheStatisticsTracker
{
private long _totalOperations = 0;
private long _l1Hits = 0;
private long _l2Hits = 0;
private long _totalMisses = 0;
public void RecordOperation() => Interlocked.Increment(ref _totalOperations);
public void RecordHit(CacheLevel level) { /* 原子操作 */ }
public CacheStatisticsSnapshot GetSnapshot() { /* 線程安全的快照 */ }
}
7.4.3 緩存數據驗證和安全機制
為防止緩存投毒和數據安全問題,我們實現了多層驗證機制:
public class DefaultCacheDataValidator : ICacheDataValidator
{
public bool IsValid<T>(T value)
{
// 檢查禁止類型
if (_forbiddenTypes.Contains(value.GetType()))
return false;
// 檢查循環引用
if (HasCircularReference(value))
return false;
return true;
}
public void ValidateKey(string key)
{
// 驗證key格式和長度
if (!_keyValidationRegex.IsMatch(key))
throw new CacheValidationException($"Invalid key: {key}");
}
}
7.4.4 智能序列化性能優化
引入多種序列化器支持,根據數據類型自動選擇最佳序列化方案:
public class SmartCacheSerializer : ICacheSerializer
{
private readonly ICacheSerializer[] _serializers = new[]
{
new BinaryCacheSerializer(), // 優先使用高性能二進制序列化
new JsonCacheSerializer() // 備選JSON序列化
};
public byte[] Serialize<T>(T value)
{
foreach (var serializer in _serializers)
{
if (serializer.SupportsType(typeof(T)))
{
return serializer.Serialize(value);
}
}
throw new CacheSerializationException("No suitable serializer found");
}
}
7.4.5 斷路器模式實現
實現斷路器模式來處理Redis連接故障,提高系統的整體可靠性:
public class CacheCircuitBreaker
{
private CircuitBreakerState _state = CircuitBreakerState.Closed;
public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation)
{
if (!CanExecute())
{
throw new CacheException("Circuit breaker is OPEN");
}
try
{
var result = await operation();
OnSuccess();
return result;
}
catch (Exception ex)
{
OnFailure(ex);
throw;
}
}
}
7.4.6 LRU內存管理
為防止內存泄漏,我們用LRU緩存替換了原有的ConcurrentDictionary:
public class LRUCache<TKey, TValue>
{
private readonly int _maxSize;
private readonly Dictionary<TKey, LinkedListNode<CacheItem<TKey, TValue>>> _cache;
private readonly LinkedList<CacheItem<TKey, TValue>> _lruList;
public void Add(TKey key, TValue value)
{
// 檢查容量限制
if (_cache.Count >= _maxSize)
{
// 移除最久未使用的項
var lastNode = _lruList.Last;
_cache.Remove(lastNode.Value.Key);
_lruList.RemoveLast();
}
// 添加新項到鏈表頭部
var newNode = _lruList.AddFirst(new CacheItem<TKey, TValue> { Key = key, Value = value });
_cache[key] = newNode;
}
}
8.1 學習資源和參考文獻
8.1.1 官方文檔
8.1.2 推薦書籍
- 《高性能MySQL》- 緩存設計理論基礎
- 《Redis設計與實現》- Redis深度解析
- 《.NET性能優化》- .NET平臺性能調優
8.1.3 開源項目參考
- EasyCaching - .NET緩存框架
- FusionCache - 高級緩存庫
- CacheManager - 多級緩存管理器
結尾
qq技術交流群:737776595
浙公網安備 33010602011771號