// Admin.NET 项目的版æƒã€å•†æ ‡ã€ä¸“利和其他相关æƒåˆ©å‡å—ç›¸åº”æ³•å¾‹æ³•è§„çš„ä¿æŠ¤ã€‚ä½¿ç”¨æœ¬é¡¹ç›®åº”éµå®ˆç›¸å…³æ³•律法规和许å¯è¯çš„è¦æ±‚。 // // 本项目主è¦éµå¾ª MIT 许å¯è¯å’Œ Apache 许å¯è¯ï¼ˆç‰ˆæœ¬ 2.0)进行分å‘和使用。许å¯è¯ä½äºŽæºä»£ç æ ‘æ ¹ç›®å½•ä¸çš„ LICENSE-MIT å’Œ LICENSE-APACHE 文件。 // // ä¸å¾—利用本项目从事å±å®³å›½å®¶å®‰å…¨ã€æ‰°ä¹±ç¤¾ä¼šç§©åºã€ä¾µçŠ¯ä»–äººåˆæ³•æƒç›Šç‰æ³•å¾‹æ³•è§„ç¦æ¢çš„æ´»åЍï¼ä»»ä½•基于本项目二次开å‘è€Œäº§ç”Ÿçš„ä¸€åˆ‡æ³•å¾‹çº çº·å’Œè´£ä»»ï¼Œæˆ‘ä»¬ä¸æ‰¿æ‹…ä»»ä½•è´£ä»»ï¼ using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Threading.Channels; namespace Admin.NET.Core; /// <summary> /// RabbitMQ自定义事件æºå˜å‚¨å™¨ /// </summary> public class RabbitMQEventSourceStore : IEventSourceStorer { /// <summary> /// 内å˜é€šé“事件æºå˜å‚¨å™¨ /// </summary> private readonly Channel<IEventSource> _channel; /// <summary> /// 通é“对象 /// </summary> private readonly IModel _model; /// <summary> /// 连接对象 /// </summary> private readonly IConnection _connection; /// <summary> /// 路由键 /// </summary> private readonly string _routeKey; /// <summary> /// æž„é€ å‡½æ•° /// </summary> /// <param name="factory">连接工厂</param> /// <param name="routeKey">路由键</param> /// <param name="capacity">å˜å‚¨å™¨æœ€å¤šèƒ½å¤Ÿå¤„ç†å¤šå°‘消æ¯ï¼Œè¶…过该容é‡è¿›å…¥ç‰å¾…写入</param> public RabbitMQEventSourceStore(ConnectionFactory factory, string routeKey, int capacity) { // é…置通é“,设置超出默认容é‡åŽè¿›å…¥ç‰å¾… var boundedChannelOptions = new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.Wait }; // 创建有é™å®¹é‡é€šé“ _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions); // 创建连接 _connection = factory.CreateConnection(); _routeKey = routeKey; // åˆ›å»ºé€šé“ _model = _connection.CreateModel(); // 声明路由队列 _model.QueueDeclare(routeKey, false, false, false, null); // 创建消æ¯è®¢é˜…者 var consumer = new EventingBasicConsumer(_model); // 订阅消æ¯å¹¶å†™å…¥å†…å˜ Channel consumer.Received += (ch, ea) => { // 读å–åŽŸå§‹æ¶ˆæ¯ var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray()); // 转æ¢ä¸º IEventSource,如果自定义了 EventSource,注æ„属性是å¯è¯»å¯å†™ var eventSource = JSON.Deserialize<ChannelEventSource>(stringEventSource); // 写入内å˜ç®¡é“å˜å‚¨å™¨ _channel.Writer.WriteAsync(eventSource); // 确认该消æ¯å·²è¢«æ¶ˆè´¹ _model.BasicAck(ea.DeliveryTag, false); }; // å¯åŠ¨æ¶ˆè´¹è€…ä¸”è®¾ç½®ä¸ºæ‰‹åŠ¨åº”ç”æ¶ˆæ¯ _model.BasicConsume(routeKey, false, consumer); } /// <summary> /// 将事件æºå†™å…¥å˜å‚¨å™¨ /// </summary> /// <param name="eventSource">事件æºå¯¹è±¡</param> /// <param name="cancellationToken">å–æ¶ˆä»»åŠ¡ Token</param> /// <returns><see cref="ValueTask"/></returns> public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken) { if (eventSource == default) throw new ArgumentNullException(nameof(eventSource)); // åˆ¤æ–æ˜¯å¦æ˜¯ ChannelEventSource 或自定义的 EventSource if (eventSource is ChannelEventSource source) { // åºåˆ—化åŠå‘布 var data = Encoding.UTF8.GetBytes(JSON.Serialize(source)); _model.BasicPublish("", _routeKey, null, data); } else { // 处ç†åЍæ€è®¢é˜… await _channel.Writer.WriteAsync(eventSource, cancellationToken); } } /// <summary> /// 从å˜å‚¨å™¨ä¸è¯»å–一æ¡äº‹ä»¶æº /// </summary> /// <param name="cancellationToken">å–æ¶ˆä»»åŠ¡ Token</param> /// <returns>事件æºå¯¹è±¡</returns> public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken) { var eventSource = await _channel.Reader.ReadAsync(cancellationToken); return eventSource; } /// <summary> /// é‡Šæ”¾éžæ‰˜ç®¡èµ„æº /// </summary> public void Dispose() { _model.Dispose(); _connection.Dispose(); } }