// Admin.NET 项目的版æƒã€å•†æ ‡ã€ä¸“利和其他相关æƒåˆ©å‡å—ç›¸åº”æ³•å¾‹æ³•è§„çš„ä¿æŠ¤ã€‚ä½¿ç”¨æœ¬é¡¹ç›®åº”éµå®ˆç›¸å…³æ³•律法规和许å¯è¯çš„è¦æ±‚。 // // 本项目主è¦éµå¾ª MIT 许å¯è¯å’Œ Apache 许å¯è¯ï¼ˆç‰ˆæœ¬ 2.0)进行分å‘和使用。许å¯è¯ä½äºŽæºä»£ç æ ‘æ ¹ç›®å½•ä¸çš„ LICENSE-MIT å’Œ LICENSE-APACHE 文件。 // // ä¸å¾—利用本项目从事å±å®³å›½å®¶å®‰å…¨ã€æ‰°ä¹±ç¤¾ä¼šç§©åºã€ä¾µçŠ¯ä»–äººåˆæ³•æƒç›Šç‰æ³•å¾‹æ³•è§„ç¦æ¢çš„æ´»åЍï¼ä»»ä½•基于本项目二次开å‘è€Œäº§ç”Ÿçš„ä¸€åˆ‡æ³•å¾‹çº çº·å’Œè´£ä»»ï¼Œæˆ‘ä»¬ä¸æ‰¿æ‹…ä»»ä½•è´£ä»»ï¼ using NewLife.Caching.Queues; namespace Admin.NET.Core; /// <summary> /// Redis 消æ¯é˜Ÿåˆ— /// </summary> public static class RedisQueue { private static ICacheProvider _cacheProvider = App.GetRequiredService<ICacheProvider>(); /// <summary>创建Redis消æ¯é˜Ÿåˆ—。默认消费一次,指定消费者group时使用STREAM结构,支æŒå¤šæ¶ˆè´¹ç»„共享消æ¯</summary> /// <remarks> /// ä½¿ç”¨é˜Ÿåˆ—æ—¶ï¼Œå¯æ ¹æ®æ˜¯å¦è®¾ç½®æ¶ˆè´¹ç»„æ¥å†³å®šä½¿ç”¨ç®€å•队列还是完整队列。 简å•队列(如RedisQueue)å¯ç”¨ä½œå‘½ä»¤é˜Ÿåˆ—,Topicå¾ˆå¤šï¼Œä½†å‡ ä¹Žæ²¡æœ‰æ¶ˆæ¯ã€‚ 完整队列(如RedisStream)å¯ç”¨ä½œæ¶ˆæ¯é˜Ÿåˆ—,Topic很少,但消æ¯å¾ˆå¤šï¼Œå¹¶ä¸”支æŒå¤šæ¶ˆè´¹ç»„。 /// </remarks> /// <typeparam name="T"></typeparam> /// <param name="topic">主题</param> /// <param name="group">消费组。未指定消费组时使用简å•队列(如RedisQueue),指定消费组时使用完整队列(如RedisStream)</param> /// <returns></returns> public static IProducerConsumer<T> GetQueue<T>(String topic, String group = null) { // 队列需è¦å•列 var key = $"myStream:{topic}"; if (_cacheProvider.InnerCache.TryGetValue<IProducerConsumer<T>>(key, out var queue)) return queue; queue = _cacheProvider.GetQueue<T>(topic, group); _cacheProvider.Cache.Set(key, queue); return queue; } /// <summary> /// 获å–å¯ä¿¡é˜Ÿåˆ—,需è¦ç¡®è®¤ /// </summary> /// <typeparam name="T"></typeparam> /// <param name="topic"></param> /// <returns></returns> public static RedisReliableQueue<T> GetRedisReliableQueue<T>(string topic) { // 队列需è¦å•列 var key = $"myQueue:{topic}"; if (_cacheProvider.InnerCache.TryGetValue<RedisReliableQueue<T>>(key, out var queue)) return queue; queue = (_cacheProvider.Cache as FullRedis).GetReliableQueue<T>(topic); _cacheProvider.Cache.Set(key, queue); return queue; } /// <summary> /// å¯ä¿¡é˜Ÿåˆ—回滚 /// </summary> /// <param name="topic"></param> /// <param name="retryInterval"></param> /// <returns></returns> public static int RollbackAllAck(string topic, int retryInterval = 60) { var queue = GetRedisReliableQueue<string>(topic); queue.RetryInterval = retryInterval; return queue.RollbackAllAck(); } /// <summary> /// å‘é€ä¸€ä¸ªæ•°æ®åˆ—表到å¯ä¿¡é˜Ÿåˆ— /// </summary> /// <param name="topic"></param> /// <param name="value"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static int AddReliableQueueList<T>(string topic, List<T> value) { var queue = GetRedisReliableQueue<T>(topic); var count = queue.Count; var result = queue.Add(value.ToArray()); return result - count; } /// <summary> /// å‘é€ä¸€æ¡æ•°æ®åˆ°å¯ä¿¡é˜Ÿåˆ— /// </summary> /// <param name="topic"></param> /// <param name="value"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static int AddReliableQueue<T>(string topic, T value) { var queue = GetRedisReliableQueue<T>(topic); var count = queue.Count; var result = queue.Add(value); return result - count; } /// <summary> /// 获å–延迟队列 /// </summary> /// <param name="topic"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static RedisDelayQueue<T> GetDelayQueue<T>(string topic) { // 队列需è¦å•列 var key = $"myDelay:{topic}"; if (_cacheProvider.InnerCache.TryGetValue<RedisDelayQueue<T>>(key, out var queue)) return queue; queue = (_cacheProvider.Cache as FullRedis).GetDelayQueue<T>(topic); _cacheProvider.Cache.Set(key, queue); return queue; } /// <summary> /// å‘é€ä¸€æ¡æ•°æ®åˆ°å»¶è¿Ÿé˜Ÿåˆ— /// </summary> /// <param name="topic"></param> /// <param name="value"></param> /// <param name="delay">延迟时间。å•ä½ç§’</param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static int AddDelayQueue<T>(string topic, T value, int delay) { var queue = GetDelayQueue<T>(topic); return queue.Add(value, delay); } /// <summary> /// å‘逿•°æ®åˆ—表到延迟队列 /// </summary> /// <param name="topic"></param> /// <param name="value"></param> /// <param name="delay"></param> /// <typeparam name="T">延迟时间。å•ä½ç§’</typeparam> /// <returns></returns> public static int AddDelayQueue<T>(string topic, List<T> value, int delay) { var queue = GetDelayQueue<T>(topic); queue.Delay = delay; return queue.Add(value.ToArray()); } /// <summary> /// 在å¯ä¿¡é˜Ÿåˆ—获å–ä¸€æ¡æ•°æ® /// </summary> /// <param name="topic"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static T ReliableTakeOne<T>(string topic) { var queue = GetRedisReliableQueue<T>(topic); return queue.TakeOne(1); } /// <summary> /// 异æ¥åœ¨å¯ä¿¡é˜Ÿåˆ—获å–ä¸€æ¡æ•°æ® /// </summary> /// <param name="topic"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static async Task<T> ReliableTakeOneAsync<T>(string topic) { var queue = GetRedisReliableQueue<T>(topic); return await queue.TakeOneAsync(1); } /// <summary> /// 在å¯ä¿¡é˜Ÿåˆ—获å–å¤šæ¡æ•°æ® /// </summary> /// <param name="topic"></param> /// <param name="count"></param> /// <typeparam name="T"></typeparam> /// <returns></returns> public static List<T> ReliableTake<T>(string topic, int count) { var queue = GetRedisReliableQueue<T>(topic); return queue.Take(count).ToList(); } /// <summary> /// 申请分布å¼é” /// </summary> /// <param name="key">è¦é”定的key</param> /// <param name="msTimeout">申请é”ç‰å¾…的时间,å•使¯«ç§’</param> /// <param name="msExpire">é”过期时间,超过该时间没有主动是放则自动是放,必须整数秒,å•使¯«ç§’</param> /// <param name="throwOnFailure">å¤±è´¥æ—¶æ˜¯å¦æŠ›å‡ºå¼‚å¸¸,å¦‚ä¸æŠ›å‡ºå¼‚å¸¸ï¼Œå¯é€šè¿‡åˆ¤æ–返回null得知申请é”失败</param> /// <returns></returns> public static IDisposable? BeginCacheLock(string key, int msTimeout = 500, int msExpire = 10000, bool throwOnFailure = true) { try { return _cacheProvider.Cache.AcquireLock(key, msTimeout, msExpire, throwOnFailure); } catch { return null; } } }