// Admin.NET 项目的版æƒã€å•†æ ‡ã€ä¸“利和其他相关æƒåˆ©å‡å—ç›¸åº”æ³•å¾‹æ³•è§„çš„ä¿æŠ¤ã€‚ä½¿ç”¨æœ¬é¡¹ç›®åº”éµå®ˆç›¸å…³æ³•律法规和许å¯è¯çš„è¦æ±‚。
//
// 本项目主è¦éµå¾ª MIT 许å¯è¯å’Œ Apache 许å¯è¯ï¼ˆç‰ˆæœ¬ 2.0)进行分å‘和使用。许å¯è¯ä½äºŽæºä»£ç æ ‘æ ¹ç›®å½•ä¸çš„ LICENSE-MIT å’Œ LICENSE-APACHE 文件。
//
// ä¸å¾—利用本项目从事å±å®³å›½å®¶å®‰å…¨ã€æ‰°ä¹±ç¤¾ä¼šç§©åºã€ä¾µçŠ¯ä»–äººåˆæ³•æƒç›Šç‰æ³•å¾‹æ³•è§„ç¦æ¢çš„æ´»åЍï¼ä»»ä½•基于本项目二次开å‘è€Œäº§ç”Ÿçš„ä¸€åˆ‡æ³•å¾‹çº çº·å’Œè´£ä»»ï¼Œæˆ‘ä»¬ä¸æ‰¿æ‹…任何责任ï¼
using NewLife.Caching.Queues;
namespace Admin.NET.Core;
/// <summary>
/// Redis 消æ¯é˜Ÿåˆ—
/// </summary>
public static class RedisQueue
{
private static ICacheProvider _cacheProvider = App.GetRequiredService<ICacheProvider>();
/// <summary>创建Redis消æ¯é˜Ÿåˆ—。默认消费一次,指定消费者group时使用STREAM结构,支æŒå¤šæ¶ˆè´¹ç»„共享消æ¯</summary>
/// <remarks>
/// ä½¿ç”¨é˜Ÿåˆ—æ—¶ï¼Œå¯æ ¹æ®æ˜¯å¦è®¾ç½®æ¶ˆè´¹ç»„æ¥å†³å®šä½¿ç”¨ç®€å•队列还是完整队列。 简å•队列(如RedisQueue)å¯ç”¨ä½œå‘½ä»¤é˜Ÿåˆ—,Topicå¾ˆå¤šï¼Œä½†å‡ ä¹Žæ²¡æœ‰æ¶ˆæ¯ã€‚ 完整队列(如RedisStream)å¯ç”¨ä½œæ¶ˆæ¯é˜Ÿåˆ—,Topic很少,但消æ¯å¾ˆå¤šï¼Œå¹¶ä¸”支æŒå¤šæ¶ˆè´¹ç»„。
/// </remarks>
/// <typeparam name="T"></typeparam>
/// <param name="topic">主题</param>
/// <param name="group">消费组。未指定消费组时使用简å•队列(如RedisQueue),指定消费组时使用完整队列(如RedisStream)</param>
/// <returns></returns>
public static IProducerConsumer<T> GetQueue<T>(String topic, String group = null)
{
// 队列需è¦å•列
var key = $"myStream:{topic}";
if (_cacheProvider.InnerCache.TryGetValue<IProducerConsumer<T>>(key, out var queue)) return queue;
queue = _cacheProvider.GetQueue<T>(topic, group);
_cacheProvider.Cache.Set(key, queue);
return queue;
}
/// <summary>
/// 获å–å¯ä¿¡é˜Ÿåˆ—,需è¦ç¡®è®¤
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="topic"></param>
/// <returns></returns>
public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic)
{
// 队列需è¦å•列
var key = $"myQueue:{topic}";
if (_cacheProvider.InnerCache.TryGetValue<RedisReliableQueue<T>>(key, out var queue)) return queue;
queue = (_cacheProvider.Cache as FullRedis).GetReliableQueue<T>(topic);
_cacheProvider.Cache.Set(key, queue);
return queue;
}
/// <summary>
/// å¯ä¿¡é˜Ÿåˆ—回滚
/// </summary>
/// <param name="topic"></param>
/// <param name="retryInterval"></param>
/// <returns></returns>
public static int RollbackAllAck(string topic, int retryInterval = 60)
{
var queue = GetRedisReliableQueue<string>(topic);
queue.RetryInterval = retryInterval;
return queue.RollbackAllAck();
}
/// <summary>
/// å‘é€ä¸€ä¸ªæ•°æ®åˆ—表到å¯ä¿¡é˜Ÿåˆ—
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddReliableQueueList<T>(string topic, List<T> value)
{
var queue = GetRedisReliableQueue<T>(topic);
var count = queue.Count;
var result = queue.Add(value.ToArray());
return result - count;
}
/// <summary>
/// å‘é€ä¸€æ¡æ•°æ®åˆ°å¯ä¿¡é˜Ÿåˆ—
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddReliableQueue<T>(string topic, T value)
{
var queue = GetRedisReliableQueue<T>(topic);
var count = queue.Count;
var result = queue.Add(value);
return result - count;
}
/// <summary>
/// 获å–延迟队列
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static RedisDelayQueue<T> GetDelayQueue<T>(string topic)
{
// 队列需è¦å•列
var key = $"myDelay:{topic}";
if (_cacheProvider.InnerCache.TryGetValue<RedisDelayQueue<T>>(key, out var queue)) return queue;
queue = (_cacheProvider.Cache as FullRedis).GetDelayQueue<T>(topic);
_cacheProvider.Cache.Set(key, queue);
return queue;
}
/// <summary>
/// å‘é€ä¸€æ¡æ•°æ®åˆ°å»¶è¿Ÿé˜Ÿåˆ—
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <param name="delay">延迟时间。å•ä½ç§’</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static int AddDelayQueue<T>(string topic, T value, int delay)
{
var queue = GetDelayQueue<T>(topic);
return queue.Add(value, delay);
}
/// <summary>
/// å‘逿•°æ®åˆ—表到延迟队列
/// </summary>
/// <param name="topic"></param>
/// <param name="value"></param>
/// <param name="delay"></param>
/// <typeparam name="T">延迟时间。å•ä½ç§’</typeparam>
/// <returns></returns>
public static int AddDelayQueue<T>(string topic, List<T> value, int delay)
{
var queue = GetDelayQueue<T>(topic);
queue.Delay = delay;
return queue.Add(value.ToArray());
}
/// <summary>
/// 在å¯ä¿¡é˜Ÿåˆ—获å–ä¸€æ¡æ•°æ®
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static T ReliableTakeOne<T>(string topic)
{
var queue = GetRedisReliableQueue<T>(topic);
return queue.TakeOne(1);
}
/// <summary>
/// 异æ¥åœ¨å¯ä¿¡é˜Ÿåˆ—获å–ä¸€æ¡æ•°æ®
/// </summary>
/// <param name="topic"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T> ReliableTakeOneAsync<T>(string topic)
{
var queue = GetRedisReliableQueue<T>(topic);
return await queue.TakeOneAsync(1);
}
/// <summary>
/// 在å¯ä¿¡é˜Ÿåˆ—获å–å¤šæ¡æ•°æ®
/// </summary>
/// <param name="topic"></param>
/// <param name="count"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static List<T> ReliableTake<T>(string topic, int count)
{
var queue = GetRedisReliableQueue<T>(topic);
return queue.Take(count).ToList();
}
/// <summary>
/// 申请分布å¼é”
/// </summary>
/// <param name="key">è¦é”定的key</param>
/// <param name="msTimeout">申请é”ç‰å¾…的时间,å•使¯«ç§’</param>
/// <param name="msExpire">é”过期时间,超过该时间没有主动是放则自动是放,必须整数秒,å•使¯«ç§’</param>
/// <param name="throwOnFailure">å¤±è´¥æ—¶æ˜¯å¦æŠ›å‡ºå¼‚å¸¸,å¦‚ä¸æŠ›å‡ºå¼‚å¸¸ï¼Œå¯é€šè¿‡åˆ¤æ–返回null得知申请é”失败</param>
/// <returns></returns>
public static IDisposable? BeginCacheLock(string key, int msTimeout = 500, int msExpire = 10000, bool throwOnFailure = true)
{
try
{
return _cacheProvider.Cache.AcquireLock(key, msTimeout, msExpire, throwOnFailure);
}
catch
{
return null;
}
}
}