using Quartz; using Quartz.Impl; using Quartz.Impl.AdoJobStore; using Quartz.Impl.AdoJobStore.Common; using Quartz.Impl.Matchers; using Quartz.Impl.Triggers; using Quartz.Simpl; using Quartz.Util; using Serilog; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc.RazorPages; using Utility.Job; using System.Linq; using Utility.Entity; using Talk.Extensions; namespace Utility { public class SchedulerCenter { /// /// 数据连接 /// private IDbProvider dbProvider; /// /// ADO 数据类型 /// private string driverDelegateType; /// /// 调度器 /// private static IScheduler scheduler; public SchedulerCenter() { InitDriverDelegateType(); dbProvider = new DbProvider(ConfigurationManager.GetTryConfig("Quartz:dbProviderName"), ConfigurationManager.GetTryConfig("Quartz:connectionString")); } /// /// 初始化DriverDelegateType /// private void InitDriverDelegateType() { switch (ConfigurationManager.GetTryConfig("Quartz:dbProviderName")) { case "SQLite-Microsoft": case "SQLite": driverDelegateType = typeof(SQLiteDelegate).AssemblyQualifiedName; break; case "MySql": driverDelegateType = typeof(MySQLDelegate).AssemblyQualifiedName; break; case "OracleODPManaged": driverDelegateType = typeof(OracleDelegate).AssemblyQualifiedName; break; case "SqlServer": case "SQLServerMOT": driverDelegateType = typeof(SqlServerDelegate).AssemblyQualifiedName; break; case "Npgsql": driverDelegateType = typeof(PostgreSQLDelegate).AssemblyQualifiedName; break; case "Firebird": driverDelegateType = typeof(FirebirdDelegate).AssemblyQualifiedName; break; default: throw new Exception("错误数据库类型"); } } /// /// 初始化Scheduler /// private async Task InitSchedulerAsync() { if (scheduler == null) { DBConnectionManager.Instance.AddConnectionProvider("default", dbProvider); var serializer = new JsonObjectSerializer(); serializer.Initialize(); var jobStore = new JobStoreTX { DataSource = "default", TablePrefix = "QRTZ_", InstanceId = "AUTO", DriverDelegateType = driverDelegateType, ObjectSerializer = serializer, }; DirectSchedulerFactory.Instance.CreateScheduler("bennyScheduler", "AUTO", new DefaultThreadPool(), jobStore); scheduler = await SchedulerRepository.Instance.Lookup("bennyScheduler"); } } /// /// 开启调度器 /// /// public async Task StartScheduleAsync() { //初始化Scheduler await InitSchedulerAsync(); //开启调度器 if (scheduler.InStandbyMode) { await scheduler.Start(); Log.Information("任务调度启动!"); } return scheduler.InStandbyMode; } /// /// 添加一个工作调度 /// /// /// /// public async Task AddScheduleJobAsync(ScheduleEntity entity, long? runNumber = null) { //检查任务是否已存在 var jobKey = new JobKey(entity.JobName, entity.JobGroup); if (await scheduler.CheckExists(jobKey)) { throw Oops.Bah("任务已存在"); } //http请求配置 var httpDir = new Dictionary() { { Constant.EndAt, entity.EndTime.ToString()}, { Constant.JobTypeEnum, ((int)entity.JobType).ToString()}, }; if (runNumber.HasValue) httpDir.Add(Constant.RUNNUMBER, runNumber.ToString()); IJobConfigurator jobConfigurator = null; if (entity.JobType == JobTypeEnum.Url) { jobConfigurator = JobBuilder.Create(); httpDir.Add(Constant.REQUESTURL, entity.RequestUrl); httpDir.Add(Constant.HEADERS, entity.Headers); httpDir.Add(Constant.REQUESTPARAMETERS, entity.RequestParameters); httpDir.Add(Constant.REQUESTTYPE, ((int)entity.RequestType).ToString()); } else if (entity.JobType == JobTypeEnum.BuiltIn) { jobConfigurator = JobBuilder.Create(); httpDir.Add(Constant.CLASSNAME, entity.ClassName); httpDir.Add(Constant.METHODNAME, entity.MethodName); httpDir.Add(Constant.METHODPARAMETERS, entity.MethodParameters); } // 定义这个工作,并将其绑定到我们的IJob实现类 IJobDetail job = jobConfigurator .SetJobData(new JobDataMap(httpDir)) .WithDescription(entity.Description) .WithIdentity(entity.JobName, entity.JobGroup) .Build(); // 创建触发器 ITrigger trigger; //校验是否正确的执行周期表达式 if (entity.TriggerType == TriggerTypeEnum.Cron)//CronExpression.IsValidExpression(entity.Cron)) { trigger = CreateCronTrigger(entity); } else { trigger = CreateSimpleTrigger(entity); } // 告诉Quartz使用我们的触发器来安排作业 await scheduler.ScheduleJob(job, trigger); return "添加成功"; } /// /// 暂停/删除 指定的计划 /// /// 任务分组 /// 任务名称 /// 停止并删除任务 /// public async Task StopOrDelScheduleJobAsync(string jobGroup, string jobName, bool isDelete = false) { try { await scheduler.PauseJob(new JobKey(jobName, jobGroup)); if (isDelete) { await scheduler.DeleteJob(new JobKey(jobName, jobGroup)); return "删除任务计划成功!"; } else { return "停止任务计划成功!"; } } catch (Exception ex) { Log.Error(string.Format("停止任务计划失败!{0}", ex)); return "停止任务计划失败" + ex.Message; } } /// /// 恢复运行暂停的任务 /// /// 任务名称 /// 任务分组 public async Task ResumeJobAsync(string jobGroup, string jobName) { try { //检查任务是否存在 var jobKey = new JobKey(jobName, jobGroup); if (await scheduler.CheckExists(jobKey)) { var jobDetail = await scheduler.GetJobDetail(jobKey); var endTime = jobDetail.JobDataMap.GetString("EndAt"); if (!string.IsNullOrWhiteSpace(endTime) && DateTime.Parse(endTime) <= DateTime.Now) { return "Job的结束时间已过期。"; } else { //任务已经存在则暂停任务 await scheduler.ResumeJob(jobKey); Log.Information(string.Format("任务“{0}”恢复运行", jobName)); return "恢复任务计划成功!"; } } else { return "任务不存在"; } } catch (Exception ex) { Log.Error(string.Format("恢复任务失败!{0}", ex)); return "恢复任务计划失败!"; } } /// /// 查询任务 /// /// /// /// public async Task QueryJobAsync(string jobGroup, string jobName) { var entity = new ScheduleEntity(); var jobKey = new JobKey(jobName, jobGroup); var jobDetail = await scheduler.GetJobDetail(jobKey); var triggersList = await scheduler.GetTriggersOfJob(jobKey); var triggers = triggersList.AsEnumerable().FirstOrDefault(); var intervalSeconds = (triggers as SimpleTriggerImpl)?.RepeatInterval.TotalSeconds; var endTime = jobDetail.JobDataMap.GetString("EndAt"); entity.BeginTime = triggers.StartTimeUtc.LocalDateTime; if (!string.IsNullOrWhiteSpace(endTime)) entity.EndTime = DateTime.Parse(endTime); if (intervalSeconds.HasValue) entity.IntervalMilliseconds = Convert.ToInt32(intervalSeconds.Value); entity.JobGroup = jobGroup; entity.JobName = jobName; entity.Cron = (triggers as CronTriggerImpl)?.CronExpressionString; entity.RunTimes = (triggers as SimpleTriggerImpl)?.RepeatCount; entity.TriggerType = triggers is SimpleTriggerImpl ? TriggerTypeEnum.Simple : TriggerTypeEnum.Cron; entity.Description = jobDetail.Description; //旧代码没有保存JobTypeEnum,所以None可以默认为Url。 entity.JobType = (JobTypeEnum)int.Parse(jobDetail.JobDataMap.GetString(Constant.JobTypeEnum) ?? "1"); switch (entity.JobType) { case JobTypeEnum.None: break; case JobTypeEnum.Url: entity.RequestUrl = jobDetail.JobDataMap.GetString(Constant.REQUESTURL); entity.RequestType = (RequestTypeEnum)int.Parse(jobDetail.JobDataMap.GetString(Constant.REQUESTTYPE)); entity.RequestParameters = jobDetail.JobDataMap.GetString(Constant.REQUESTPARAMETERS); entity.Headers = jobDetail.JobDataMap.GetString(Constant.HEADERS); break; case JobTypeEnum.BuiltIn: entity.ClassName = jobDetail.JobDataMap.GetString(Constant.CLASSNAME); entity.MethodName = jobDetail.JobDataMap.GetString(Constant.METHODNAME); entity.MethodParameters = jobDetail.JobDataMap.GetString(Constant.METHODPARAMETERS); break; case JobTypeEnum.Hotreload: break; default: break; } return entity; } /// /// 立即执行 /// /// /// public async Task TriggerJobAsync(JobKey jobKey) { await scheduler.TriggerJob(jobKey); return true; } /// /// 获取job日志 /// /// /// public async Task> GetJobLogsAsync(JobKey jobKey) { var jobDetail = await scheduler.GetJobDetail(jobKey); return jobDetail.JobDataMap[Constant.LOGLIST] as List; } /// /// 获取运行次数 /// /// /// public async Task GetRunNumberAsync(JobKey jobKey) { var jobDetail = await scheduler.GetJobDetail(jobKey); return jobDetail.JobDataMap.GetLong(Constant.RUNNUMBER); } /// /// 获取所有Job(详情信息 - 初始化页面调用) /// /// public async Task> GetAllJobAsync() { List jboKeyList = new List(); List jobInfoList = new List(); var groupNames = await scheduler.GetJobGroupNames(); foreach (var groupName in groupNames.OrderBy(t => t)) { jboKeyList.AddRange(await scheduler.GetJobKeys(GroupMatcher.GroupEquals(groupName))); jobInfoList.Add(new JobInfoEntity() { GroupName = groupName }); } foreach (var jobKey in jboKeyList.OrderBy(t => t.Name)) { var jobDetail = await scheduler.GetJobDetail(jobKey); var triggersList = await scheduler.GetTriggersOfJob(jobKey); var triggers = triggersList.AsEnumerable().FirstOrDefault(); var interval = string.Empty; if (triggers is SimpleTriggerImpl) interval = (triggers as SimpleTriggerImpl)?.RepeatInterval.ToString(); else interval = (triggers as CronTriggerImpl)?.CronExpressionString; foreach (var jobInfo in jobInfoList) { if (jobInfo.GroupName == jobKey.Group) { //旧代码没有保存JobTypeEnum,所以None可以默认为Url。 var jobType = (JobTypeEnum)jobDetail.JobDataMap.GetLong(Constant.JobTypeEnum); jobType = jobType == JobTypeEnum.None ? JobTypeEnum.Url : jobType; var triggerAddress = string.Empty; if (jobType == JobTypeEnum.Url) triggerAddress = jobDetail.JobDataMap.GetString(Constant.REQUESTURL); else if (jobType == JobTypeEnum.BuiltIn) triggerAddress = jobDetail.JobDataMap.GetString(Constant.CLASSNAME); var lastErrMsg = string.Empty; try { lastErrMsg = jobDetail.JobDataMap.GetString(Constant.EXCEPTION); } catch (Exception) { } var requestType = string.Empty; try { requestType = jobDetail.JobDataMap.GetString(Constant.REQUESTTYPE); } catch (Exception) { } long runNumber = 0; try { runNumber = jobDetail.JobDataMap.GetLong(Constant.RUNNUMBER); } catch (Exception) { } jobInfo.JobInfoList.Add(new JobInfo() { Name = jobKey.Name, LastErrMsg = lastErrMsg, TriggerAddress = triggerAddress, TriggerState = await scheduler.GetTriggerState(triggers.Key), PreviousFireTime = triggers.GetPreviousFireTimeUtc()?.LocalDateTime, NextFireTime = triggers.GetNextFireTimeUtc()?.LocalDateTime, BeginTime = triggers.StartTimeUtc.LocalDateTime, Interval = interval, EndTime = triggers.EndTimeUtc?.LocalDateTime, Description = jobDetail.Description, RequestType = requestType, RunNumber = runNumber, JobType = (long)jobType //(triggers as SimpleTriggerImpl)?.TimesTriggered //CronTriggerImpl 中没有 TimesTriggered 所以自己RUNNUMBER记录 }); continue; } } } return jobInfoList; } /// /// 移除异常信息 /// 因为只能在IJob持久化操作JobDataMap,所有这里直接暴力操作数据库。 /// /// /// /// //public async Task RemoveErrLog(string jobGroup, string jobName) //{ // IRepositorie logRepositorie = RepositorieFactory.CreateRepositorie(driverDelegateType, dbProvider); // if (logRepositorie == null) return false; // await logRepositorie.RemoveErrLogAsync(jobGroup, jobName); // var jobKey = new JobKey(jobName, jobGroup); // var jobDetail = await scheduler.GetJobDetail(jobKey); // jobDetail.JobDataMap[Constant.EXCEPTION] = string.Empty; // return true; //} /// /// 获取所有Job信息(简要信息 - 刷新数据的时候使用) /// /// public async Task> GetAllJobBriefInfoAsync() { List jboKeyList = new List(); List jobInfoList = new List(); var groupNames = await scheduler.GetJobGroupNames(); foreach (var groupName in groupNames.OrderBy(t => t)) { jboKeyList.AddRange(await scheduler.GetJobKeys(GroupMatcher.GroupEquals(groupName))); jobInfoList.Add(new JobBriefInfoEntity() { GroupName = groupName }); } foreach (var jobKey in jboKeyList.OrderBy(t => t.Name)) { var jobDetail = await scheduler.GetJobDetail(jobKey); var triggersList = await scheduler.GetTriggersOfJob(jobKey); var triggers = triggersList.AsEnumerable().FirstOrDefault(); foreach (var jobInfo in jobInfoList) { if (jobInfo.GroupName == jobKey.Group) { jobInfo.JobInfoList.Add(new JobBriefInfo() { Name = jobKey.Name, LastErrMsg = jobDetail?.JobDataMap.GetString(Constant.EXCEPTION), TriggerState = await scheduler.GetTriggerState(triggers.Key), PreviousFireTime = triggers.GetPreviousFireTimeUtc()?.LocalDateTime, NextFireTime = triggers.GetNextFireTimeUtc()?.LocalDateTime, RunNumber = jobDetail?.JobDataMap.GetLong(Constant.RUNNUMBER) ?? 0 }); continue; } } } return jobInfoList; } /// /// 停止任务调度 /// public async Task StopScheduleAsync() { //判断调度是否已经关闭 if (!scheduler.InStandbyMode) { //等待任务运行完成 await scheduler.Standby(); //TODO 注意:Shutdown后Start会报错,所以这里使用暂停。 Log.Information("任务调度暂停!"); } return !scheduler.InStandbyMode; } /// /// 创建类型Simple的触发器 /// /// /// private ITrigger CreateSimpleTrigger(ScheduleEntity entity) { //作业触发器 if (entity.RunTimes.HasValue && entity.RunTimes > 0) { return TriggerBuilder.Create() .WithIdentity(entity.JobName, entity.JobGroup) .StartAt(entity.BeginTime)//开始时间 //.EndAt(entity.EndTime)//结束数据 .WithSimpleSchedule(x => { x.WithInterval(TimeSpan.FromMilliseconds(entity.IntervalMilliseconds.Value)) // 执行时间间隔,单位毫秒 .WithRepeatCount(entity.RunTimes.Value)//执行次数、默认从0开始 .WithMisfireHandlingInstructionFireNow(); }) .ForJob(entity.JobName, entity.JobGroup)//作业名称 .Build(); } else { return TriggerBuilder.Create() .WithIdentity(entity.JobName, entity.JobGroup) .StartAt(entity.BeginTime)//开始时间 //.EndAt(entity.EndTime)//结束数据 .WithSimpleSchedule(x => { x.WithInterval(TimeSpan.FromMilliseconds(entity.IntervalMilliseconds.Value)) // 执行时间间隔,单位毫秒 .RepeatForever() // 无限循环 .WithMisfireHandlingInstructionFireNow(); }) .ForJob(entity.JobName, entity.JobGroup) // 作业名称 .Build(); } } /// /// 创建类型Cron的触发器 /// /// /// private ITrigger CreateCronTrigger(ScheduleEntity entity) { // 作业触发器 return TriggerBuilder.Create() .WithIdentity(entity.JobName, entity.JobGroup) .StartAt(entity.BeginTime)//开始时间 //.EndAt(entity.EndTime)//结束时间 .WithCronSchedule(entity.Cron, cronScheduleBuilder => cronScheduleBuilder.WithMisfireHandlingInstructionFireAndProceed())//指定cron表达式 .ForJob(entity.JobName, entity.JobGroup)//作业名称 .Build(); } } }