// Admin.NET 项目的版æƒã€å•†æ ‡ã€ä¸“利和其他相关æƒåˆ©å‡å—ç›¸åº”æ³•å¾‹æ³•è§„çš„ä¿æŠ¤ã€‚ä½¿ç”¨æœ¬é¡¹ç›®åº”éµå®ˆç›¸å…³æ³•律法规和许å¯è¯çš„è¦æ±‚。 // // 本项目主è¦éµå¾ª MIT 许å¯è¯å’Œ Apache 许å¯è¯ï¼ˆç‰ˆæœ¬ 2.0)进行分å‘和使用。许å¯è¯ä½äºŽæºä»£ç æ ‘æ ¹ç›®å½•ä¸çš„ LICENSE-MIT å’Œ LICENSE-APACHE 文件。 // // ä¸å¾—利用本项目从事å±å®³å›½å®¶å®‰å…¨ã€æ‰°ä¹±ç¤¾ä¼šç§©åºã€ä¾µçŠ¯ä»–äººåˆæ³•æƒç›Šç‰æ³•å¾‹æ³•è§„ç¦æ¢çš„æ´»åЍï¼ä»»ä½•基于本项目二次开å‘è€Œäº§ç”Ÿçš„ä¸€åˆ‡æ³•å¾‹çº çº·å’Œè´£ä»»ï¼Œæˆ‘ä»¬ä¸æ‰¿æ‹…ä»»ä½•è´£ä»»ï¼ using NewLife.Caching.Queues; using Newtonsoft.Json; using System.Threading.Channels; namespace Admin.NET.Core; /// <summary> /// Redis自定义事件æºå˜å‚¨å™¨ /// </summary> /// <remarks> /// 在集群部署时,一般æ¯ä¸€ä¸ªæ¶ˆæ¯åªç”±ä¸€ä¸ªæœåŠ¡èŠ‚ç‚¹æ¶ˆè´¹ä¸€æ¬¡ã€‚ /// 有些特殊情情è¦é€šçŸ¥åˆ°æœåŠ¡å™¨ç¾¤ä¸çš„æ¯ä¸€ä¸ªèŠ‚ç‚¹(比如需è¦å¼ºåˆ¶åŠ è½½æŸäº›é…ç½®ã€é‡ç‚¹æœåŠ¡ç‰), /// åœ¨è¿™ç§æƒ…况下就è¦ä»¥â€œbroadcast:â€å¼€å¤´æ¥å®šä¹‰EventId, /// 本系统会把“broadcast:â€å¼€å¤´çš„äº‹ä»¶è§†ä¸ºâ€œå¹¿æ’æ¶ˆæ¯â€ä¿è¯é›†ç¾¤ä¸çš„æ¯ä¸€ä¸ªæœåŠ¡èŠ‚ç‚¹éƒ½èƒ½æ¶ˆè´¹å¾—åˆ°è¿™ä¸ªæ¶ˆæ¯ /// </remarks> public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable { /// <summary> /// 消费者 /// </summary> private readonly EventConsumer<ChannelEventSource> _eventConsumer; /// <summary> /// 内å˜é€šé“事件æºå˜å‚¨å™¨ /// </summary> private readonly Channel<IEventSource> _channel; private IProducerConsumer<ChannelEventSource> _queueSingle; private RedisStream<string> _queueBroadcast; private ILogger<RedisEventSourceStorer> _logger; /// <summary> /// æž„é€ å‡½æ•° /// </summary> /// <param name="cacheProvider">Redis 连接对象</param> /// <param name="routeKey">路由键</param> /// <param name="capacity">å˜å‚¨å™¨æœ€å¤šèƒ½å¤Ÿå¤„ç†å¤šå°‘消æ¯ï¼Œè¶…过该容é‡è¿›å…¥ç‰å¾…写入</param> public RedisEventSourceStorer(ICacheProvider cacheProvider, string routeKey, int capacity) { _logger = App.GetRequiredService<ILogger<RedisEventSourceStorer>>(); // é…置通é“,设置超出默认容é‡åŽè¿›å…¥ç‰å¾… var boundedChannelOptions = new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.Wait }; // 创建有é™å®¹é‡é€šé“ _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions); //_redis = redis as FullRedis; // åˆ›å»ºå¹¿æ’æ¶ˆæ¯è®¢é˜…è€…ï¼Œå³æ‰€æœ‰æœåŠ¡å™¨èŠ‚ç‚¹éƒ½èƒ½æ”¶åˆ°æ¶ˆæ¯ï¼ˆç”¨æ¥å‘布é‡å¯ã€Reloadé…ç½®ç‰æ¶ˆæ¯ï¼‰ FullRedis redis = (FullRedis)cacheProvider.Cache; var clusterOpt = App.GetConfig<ClusterOptions>("Cluster", true); _queueBroadcast = redis.GetStream<string>(routeKey + ":broadcast"); _queueBroadcast.Group = clusterOpt.ServerId;//æ ¹æ®æœåŠ¡å™¨æ ‡è¯†åˆ†é…到ä¸åŒçš„分组里 _queueBroadcast.Expire = TimeSpan.FromSeconds(10);//消æ¯10秒过期() _queueBroadcast.ConsumeAsync(OnConsumeBroadcast); // 创建队列消æ¯è®¢é˜…者,åªè¦æœ‰ä¸€ä¸ªæœåŠ¡èŠ‚ç‚¹æ¶ˆè´¹äº†æ¶ˆæ¯å³å¯ _queueSingle = redis.GetQueue<ChannelEventSource>(routeKey + ":single"); _eventConsumer = new EventConsumer<ChannelEventSource>(_queueSingle); // 订阅消æ¯å†™å…¥ Channel _eventConsumer.Received += async (send, cr) => { // var oriColor = Console.ForegroundColor; try { ChannelEventSource ces = (ChannelEventSource)cr; await ConsumeChannelEventSourceAsync(ces, ces.CancellationToken); } catch (Exception e) { _logger.LogError(e, "处ç†Receivedä¸çš„æ¶ˆæ¯äº§ç”Ÿé”™è¯¯ï¼"); } }; _eventConsumer.Start(); } private async Task OnConsumeBroadcast(string source, Message message, CancellationToken token) { ChannelEventSource ces = JsonConvert.DeserializeObject<ChannelEventSource>(source); await ConsumeChannelEventSourceAsync(ces, token); } private async Task ConsumeChannelEventSourceAsync(ChannelEventSource ces, CancellationToken cancel = default) { // æ‰“å°æµ‹è¯•事件 if (ces.EventId != null && ces.EventId.IndexOf(":Test") > 0) { var oriColor = Console.ForegroundColor; Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"有消æ¯è¦å¤„ç†{ces.EventId},{ces.Payload}"); Console.ForegroundColor = oriColor; } await _channel.Writer.WriteAsync(ces, cancel); } /// <summary> /// 将事件æºå†™å…¥å˜å‚¨å™¨ /// </summary> /// <param name="eventSource">事件æºå¯¹è±¡</param> /// <param name="cancellationToken">å–æ¶ˆä»»åŠ¡ Token</param> /// <returns><see cref="ValueTask"/></returns> public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken) { // 空检查 if (eventSource == default) throw new ArgumentNullException(nameof(eventSource)); // è¿™é‡Œåˆ¤æ–æ˜¯å¦æ˜¯ ChannelEventSource 或者 自定义的 EventSource if (eventSource is ChannelEventSource source) { // 异æ¥å‘布 await Task.Factory.StartNew(() => { if (source.EventId != null && source.EventId.StartsWith("broadcast:")) { string str = JsonConvert.SerializeObject(source); _queueBroadcast.Add(str); } else { _queueSingle.Add(source); } }, cancellationToken, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default); } else { // 处ç†åЍæ€è®¢é˜…问题 await _channel.Writer.WriteAsync(eventSource, cancellationToken); } } /// <summary> /// 从å˜å‚¨å™¨ä¸è¯»å–一æ¡äº‹ä»¶æº /// </summary> /// <param name="cancellationToken">å–æ¶ˆä»»åŠ¡ Token</param> /// <returns>事件æºå¯¹è±¡</returns> public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken) { // 读å–一æ¡äº‹ä»¶æº var eventSource = await _channel.Reader.ReadAsync(cancellationToken); return eventSource; } /// <summary> /// é‡Šæ”¾éžæ‰˜ç®¡èµ„æº /// </summary> public async void Dispose() { await _eventConsumer.Stop(); GC.SuppressFinalize(this); } }