// Admin.NET 项目的版æƒã€å•†æ ‡ã€ä¸“利和其他相关æƒåˆ©å‡å—ç›¸åº”æ³•å¾‹æ³•è§„çš„ä¿æŠ¤ã€‚ä½¿ç”¨æœ¬é¡¹ç›®åº”éµå®ˆç›¸å…³æ³•律法规和许å¯è¯çš„è¦æ±‚。 // // 本项目主è¦éµå¾ª MIT 许å¯è¯å’Œ Apache 许å¯è¯ï¼ˆç‰ˆæœ¬ 2.0)进行分å‘和使用。许å¯è¯ä½äºŽæºä»£ç æ ‘æ ¹ç›®å½•ä¸çš„ LICENSE-MIT å’Œ LICENSE-APACHE 文件。 // // ä¸å¾—利用本项目从事å±å®³å›½å®¶å®‰å…¨ã€æ‰°ä¹±ç¤¾ä¼šç§©åºã€ä¾µçŠ¯ä»–äººåˆæ³•æƒç›Šç‰æ³•å¾‹æ³•è§„ç¦æ¢çš„æ´»åЍï¼ä»»ä½•基于本项目二次开å‘è€Œäº§ç”Ÿçš„ä¸€åˆ‡æ³•å¾‹çº çº·å’Œè´£ä»»ï¼Œæˆ‘ä»¬ä¸æ‰¿æ‹…ä»»ä½•è´£ä»»ï¼ namespace Admin.NET.Core.Service; /// <summary> /// 作业集群控制 /// </summary> public class JobClusterServer : IJobClusterServer { private readonly Random rd = new(DateTime.Now.Millisecond); public JobClusterServer() { } /// <summary> /// 当å‰ä½œä¸šè°ƒåº¦å™¨å¯åŠ¨é€šçŸ¥ /// </summary> /// <param name="context">作业集群æœåŠ¡ä¸Šä¸‹æ–‡</param> public async void Start(JobClusterContext context) { var _sysJobClusterRep = App.GetRequiredService<SqlSugarRepository<SysJobCluster>>(); // 在作业集群表ä¸ï¼Œå¦‚æžœ clusterId ä¸å˜åœ¨ï¼Œåˆ™æ–°å¢žä¸€æ¡ï¼ˆå¦åˆ™æ›´æ–°ä¸€æ¡ï¼‰ï¼Œå¹¶è®¾ç½® status 为 ClusterStatus.Waiting if (await _sysJobClusterRep.IsAnyAsync(u => u.ClusterId == context.ClusterId)) { await _sysJobClusterRep.AsUpdateable().SetColumns(u => u.Status == ClusterStatus.Waiting).Where(u => u.ClusterId == context.ClusterId).ExecuteCommandAsync(); } else { await _sysJobClusterRep.AsInsertable(new SysJobCluster { ClusterId = context.ClusterId, Status = ClusterStatus.Waiting }).ExecuteCommandAsync(); } } /// <summary> /// ç‰å¾…被唤醒 /// </summary> /// <param name="context">作业集群æœåŠ¡ä¸Šä¸‹æ–‡</param> /// <returns><see cref="Task"/></returns> public async Task WaitingForAsync(JobClusterContext context) { var clusterId = context.ClusterId; while (true) { // æŽ§åˆ¶é›†ç¾¤å¿ƒè·³é¢‘çŽ‡ï¼ˆæ”¾åœ¨å¤´éƒ¨ä¸ºäº†é˜²æ¢ IsAnyAsync continue 没sleepå 用大é‡IOå’ŒCPU) await Task.Delay(3000 + rd.Next(500, 1000)); // é”™å¼€é›†ç¾¤åŒæ—¶å¯åЍ try { ICache _cache = App.GetRequiredService<ICacheProvider>().Cache; // 使用分布å¼é” using (_cache.AcquireLock("lock:JobClusterServer:WaitingForAsync", 1000)) { var _sysJobClusterRep = App.GetRequiredService<SqlSugarRepository<SysJobCluster>>(); // 在这里查询数æ®åº“ï¼Œæ ¹æ®ä»¥ä¸‹ä¸¤ç§æƒ…å†µå¤„ç† // 1) 如果作业集群表已有 status 为 ClusterStatus.Working 则继ç»å¾ªçޝ // 2) 如果作业集群表ä¸è¿˜æ²¡æœ‰å…¶ä»–æœåŠ¡æˆ–åªæœ‰è‡ªå·±ï¼Œåˆ™æ’入一æ¡é›†ç¾¤æœåŠ¡æˆ–è°ƒç”¨ await WorkNowAsync(clusterId); ä¹‹åŽ return; // 3) å¦‚æžœä½œä¸šé›†ç¾¤è¡¨ä¸æ²¡æœ‰ status 为 ClusterStatus.Working 的,调用 await WorkNowAsync(clusterId); ä¹‹åŽ return; if (await _sysJobClusterRep.IsAnyAsync(u => u.Status == ClusterStatus.Working)) continue; await WorkNowAsync(clusterId); return; } } catch { } } } /// <summary> /// 当å‰ä½œä¸šè°ƒåº¦å™¨åœæ¢é€šçŸ¥ /// </summary> /// <param name="context">作业集群æœåŠ¡ä¸Šä¸‹æ–‡</param> public async void Stop(JobClusterContext context) { var _sysJobClusterRep = App.GetRequiredService<SqlSugarRepository<SysJobCluster>>(); // 在作业集群表ä¸ï¼Œæ›´æ–° clusterId çš„ status 为 ClusterStatus.Crashed await _sysJobClusterRep.UpdateAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId); } /// <summary> /// 当å‰ä½œä¸šè°ƒåº¦å™¨å®•机 /// </summary> /// <param name="context">作业集群æœåŠ¡ä¸Šä¸‹æ–‡</param> public async void Crash(JobClusterContext context) { var _sysJobClusterRep = App.GetRequiredService<SqlSugarRepository<SysJobCluster>>(); // 在作业集群表ä¸ï¼Œæ›´æ–° clusterId çš„ status 为 ClusterStatus.Crashed await _sysJobClusterRep.UpdateAsync(u => new SysJobCluster { Status = ClusterStatus.Crashed }, u => u.ClusterId == context.ClusterId); } /// <summary> /// 指示集群å¯ä»¥å·¥ä½œ /// </summary> /// <param name="clusterId">集群 Id</param> /// <returns></returns> private static async Task WorkNowAsync(string clusterId) { var _sysJobClusterRep = App.GetRequiredService<SqlSugarRepository<SysJobCluster>>(); // 在作业集群表ä¸ï¼Œæ›´æ–° clusterId çš„ status 为 ClusterStatus.Working await _sysJobClusterRep.UpdateAsync(u => new SysJobCluster { Status = ClusterStatus.Working }, u => u.ClusterId == clusterId); } }