// Admin.NET 项目的版æƒã€å•†æ ‡ã€ä¸“利和其他相关æƒåˆ©å‡å—ç›¸åº”æ³•å¾‹æ³•è§„çš„ä¿æŠ¤ã€‚ä½¿ç”¨æœ¬é¡¹ç›®åº”éµå®ˆç›¸å…³æ³•律法规和许å¯è¯çš„è¦æ±‚。
//
// 本项目主è¦éµå¾ª MIT 许å¯è¯å’Œ Apache 许å¯è¯ï¼ˆç‰ˆæœ¬ 2.0)进行分å‘和使用。许å¯è¯ä½äºŽæºä»£ç æ ‘æ ¹ç›®å½•ä¸çš„ LICENSE-MIT å’Œ LICENSE-APACHE 文件。
//
// ä¸å¾—利用本项目从事å±å®³å›½å®¶å®‰å…¨ã€æ‰°ä¹±ç¤¾ä¼šç§©åºã€ä¾µçŠ¯ä»–äººåˆæ³•æƒç›Šç‰æ³•å¾‹æ³•è§„ç¦æ¢çš„æ´»åЍï¼ä»»ä½•基于本项目二次开å‘è€Œäº§ç”Ÿçš„ä¸€åˆ‡æ³•å¾‹çº çº·å’Œè´£ä»»ï¼Œæˆ‘ä»¬ä¸æ‰¿æ‹…任何责任ï¼
namespace Admin.NET.Core;
/// <summary>
/// Redis æ¶ˆæ¯æ‰©å±•
/// </summary>
/// <typeparam name="T"></typeparam>
public class EventConsumer<T> : IDisposable
{
/// <summary>
///
/// </summary>
private Task _consumerTask;
/// <summary>
///
/// </summary>
private CancellationTokenSource _consumerCts;
/// <summary>
/// 消费者
/// </summary>
public IProducerConsumer<T> Consumer { get; }
/// <summary>
/// 消æ¯å›žè°ƒ
/// </summary>
public event EventHandler<T> Received;
/// <summary>
/// æž„é€ å‡½æ•°
/// </summary>
/// <param name="consumer"></param>
public EventConsumer(IProducerConsumer<T> consumer) => Consumer = consumer;
/// <summary>
/// å¯åЍ
/// </summary>
/// <exception cref="InvalidOperationException"></exception>
public void Start()
{
if (Consumer is null)
{
throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
}
if (_consumerTask != null)
{
return;
}
_consumerCts = new CancellationTokenSource();
var ct = _consumerCts.Token;
_consumerTask = Task.Factory.StartNew(() =>
{
while (!ct.IsCancellationRequested)
{
var cr = Consumer.TakeOne(10);
if (cr == null) continue;
Received?.Invoke(this, cr);
}
}, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
/// <summary>
/// åœæ¢
/// </summary>
/// <returns></returns>
public async Task Stop()
{
if (_consumerCts == null || _consumerTask == null) return;
_consumerCts.Cancel();
try
{
await _consumerTask;
}
finally
{
_consumerTask = null;
_consumerCts = null;
}
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (_consumerTask != null)
{
Stop().Wait();
}
}
}
}