1 /** 2 * Модуль планировщика задач 3 * 4 * Copyright: (c) 2015-2020, Milofon Project. 5 * License: Subject to the terms of the BSD 3-Clause License, as written in the included LICENSE.md file. 6 * Author: <m.galanin@milofon.pro> Maksim Galanin 7 * Date: 2020-04-21 8 */ 9 10 module dango.system.scheduler; 11 12 public 13 { 14 import uniconf.core : UniConf; 15 } 16 17 private 18 { 19 import core.time : Duration; 20 import core.thread : Thread; 21 import core.sync.condition : Condition; 22 import core.sync.mutex : Mutex; 23 24 import std.algorithm.iteration : filter; 25 import std.datetime : DateTime, Clock; 26 import std.exception : assumeWontThrow; 27 import std.format : fmt = format; 28 import std.traits : hasMember; 29 import std.uni : toUpper; 30 31 import vibe.core.core : Timer, setTimer; 32 import vibe.core.log : logException; 33 import cronexp : CronExpr, CronException; 34 import dango.inject; 35 36 import dango.system.logging : logWarn, logInfo, logDebug; 37 import dango.system.application : Application; 38 import dango.system.exception : DangoConfigException; 39 import dango.system.properties; 40 import dango.system.plugin; 41 } 42 43 44 /** 45 * Интерфес задачи 46 */ 47 interface Job 48 { 49 /** 50 * Запуск задачи 51 */ 52 void execute() shared; 53 } 54 55 56 /** 57 * Фабрика задачи планировщика 58 */ 59 alias JobFactory = ComponentFactory!(Job, DependencyContainer, UniConf); 60 61 /** 62 * Контекст регистрации задач планировщика 63 */ 64 alias SchedulerContext = PluginContext!(SchedulerPlugin); 65 66 67 /** 68 * Плагин управления задач планировщика 69 */ 70 class SchedulerPlugin : DaemonPlugin 71 { 72 private @safe 73 { 74 DependencyContainer _container; 75 UniConf _config; 76 77 SchedulerFactory[string] _schedulerFactorys; 78 SysSchedulerFactory[] _sysSchedulerFactorys; 79 JobScheduler[] _schedulers; 80 } 81 82 /** 83 * Main constructor 84 */ 85 @Inject 86 this(Application application) 87 { 88 this._container = application.getContainer(); 89 this._config = application.getConfig(); 90 } 91 92 /** 93 * Свойство возвращает наименование плагина 94 */ 95 string name() pure @safe nothrow 96 { 97 return "Scheduler"; 98 } 99 100 /** 101 * Свойство возвращает версию приложения 102 */ 103 SemVer release() pure @safe nothrow 104 { 105 return SemVer(0, 0, 1); 106 } 107 108 /** 109 * Запуск процесса 110 */ 111 int startDaemon() 112 { 113 foreach (sysFactory; _sysSchedulerFactorys) 114 _schedulers ~= sysFactory(_container); 115 116 auto schedulerConf = _config.opt!UniConf("scheduler"); 117 if (!schedulerConf.isNull) 118 { 119 auto jobsConfig = schedulerConf.get.opt!UniConf("job"); 120 if (!jobsConfig.isNull) 121 { 122 foreach (UniConf jobConf; jobsConfig.get.toSequence.filter!( 123 c => c.getOrElse!bool("enabled", false))) 124 { 125 string jobName = getNameOrEnforce(jobConf, "Not defined job name"); 126 if (auto factory = jobName.toUpper in _schedulerFactorys) 127 _schedulers ~= (*factory)(_container, jobConf); 128 else 129 throw new DangoConfigException("Job '" ~ jobName ~ "' not register"); 130 } 131 } 132 else 133 logWarn("Not found jobs configuration"); 134 } 135 else 136 logWarn("Not found scheduler configuration"); 137 138 logInfo("Start scheduler"); 139 140 foreach (JobScheduler scheduler; _schedulers) 141 { 142 logInfo(" Start job [%s]", scheduler.name); 143 scheduler.start(); 144 } 145 146 return 0; 147 } 148 149 /** 150 * Остановка процесса 151 * 152 * Params: 153 * exitStatus = Код завершения приложения 154 */ 155 int stopDaemon(int exitStatus) 156 { 157 logInfo("Stop scheduler"); 158 foreach (JobScheduler scheduler; _schedulers) 159 { 160 scheduler.stop(); 161 logInfo(" Stop job [%s]", scheduler.name); 162 } 163 return 0; 164 } 165 166 /** 167 * Регистрация задачи 168 */ 169 void registerJob(J : Job)(string name) @safe 170 { 171 alias JF = ComponentFactoryCtor!(Job, J, UniConf); 172 registerJob!(JF)(name); 173 } 174 175 /** 176 * Регистрация задачи с использованием фабрики 177 */ 178 void registerJob(JF : JobFactory)(string name) @safe 179 { 180 auto factory = new WrapDependencyFactory!(JF)(); 181 registerJob!JF(name, factory); 182 } 183 184 /** 185 * Регистрация задачи с использованием существующей фабрики 186 */ 187 void registerJob(JF : JobFactory)(string name, JobFactory factory) @safe 188 { 189 string uName = name.toUpper; 190 JobScheduler creator(DependencyContainer container, UniConf config) @safe 191 { 192 auto expStr = config.getOrEnforce!string("cron", 193 fmt!"Not defined cron expression in task '%s'"(name)); 194 195 CronExpr cronExp = () @trusted { 196 try 197 return CronExpr(expStr); 198 catch (CronException e) 199 throw new DangoConfigException( 200 fmt!"Incorrect cron expression in task '%s'"(name)); 201 }(); 202 203 Job job = factory.createComponent(container, config); 204 return new TimerJobScheduler(name, job, cronExp); 205 } 206 _schedulerFactorys[uName] = &creator; 207 } 208 209 /** 210 * Регистрация системной задачи 211 */ 212 void registerSystemJob(J : Job)(string cronStr) @safe 213 { 214 alias JF = ComponentFactoryCtor!(Job, J, UniConf); 215 registerSystemJob!(JF)(cronStr); 216 } 217 218 /** 219 * Регистрация системной задачи с использованием фабрики 220 */ 221 void registerSystemJob(JF : JobFactory)(string cronStr) @safe 222 { 223 auto factory = new WrapDependencyFactory!(JF)(); 224 registerSystemJob!JF(cronStr, factory); 225 } 226 227 /** 228 * Регистрация системной задачи с использованием существующей фабрики 229 */ 230 void registerSystemJob(JF : ComponentFactory!(C, A), C, A...)(string cronStr, JobFactory factory) @safe 231 { 232 static if (hasMember!(JF, "ConcreteType")) 233 string title = typeid(JF.ConcreteType).toString; 234 else static if (hasMember!(JF, "ComponentType")) 235 string title = typeid(JF.ComponentType).toString; 236 else 237 string title = typeid(JF).toString; 238 239 JobScheduler creator(DependencyContainer container) @safe 240 { 241 UniConf config = UniConf([ 242 "__name": UniConf(title), 243 "enabled": UniConf(true), 244 "cron": UniConf(cronStr) 245 ]); 246 247 CronExpr cronExp = () @trusted { 248 try 249 return CronExpr(cronStr); 250 catch (CronException e) 251 throw new DangoConfigException( 252 fmt!"Incorrect cron expression in task '%s'"(title)); 253 }(); 254 255 Job job = factory.createComponent(container, config); 256 return new TimerJobScheduler(title, job, cronExp); 257 } 258 _sysSchedulerFactorys ~= &creator; 259 } 260 } 261 262 263 /** 264 * Задача по выводу информации в консоль о GC 265 */ 266 class GarbageCollectorStatJob : Job 267 { 268 /** 269 * Запуск задачи 270 */ 271 void execute() shared 272 { 273 import core.memory : GC; 274 auto profileStats = GC.profileStats; 275 auto stats = GC.stats; 276 logInfo("== GC stats =="); 277 logInfo(" Use memory in heap: %s", humanizeSize(stats.usedSize)); 278 logInfo(" Free memory in heap: %s", humanizeSize(stats.freeSize)); 279 auto total = stats.freeSize + stats.usedSize; 280 logInfo(" Total memory in heap: %s", humanizeSize(total)); 281 logInfo(" GC runs: %s", profileStats.numCollections); 282 } 283 284 285 private: 286 287 288 string humanizeSize(size_t size) shared 289 { 290 enum SUFFIXES = ["", "K", "M", "G"]; 291 double dsize = size; 292 ubyte suffix = 0; 293 while (dsize > 1024 && suffix < 4) 294 { 295 dsize /= 1024; 296 suffix++; 297 } 298 return fmt!"%.02f%s"(dsize, SUFFIXES[suffix]); 299 } 300 } 301 302 303 /** 304 * Задача по запуску минимизации используемой GC памяти 305 */ 306 class GarbageCollectorMinimizeJob : Job 307 { 308 /** 309 * Запуск задачи 310 */ 311 void execute() shared 312 { 313 import core.memory : GC; 314 GC.collect(); 315 GC.minimize(); 316 } 317 } 318 319 320 private: 321 322 323 /** 324 * Интерфейс планировщика задач 325 */ 326 interface JobScheduler 327 { 328 /** 329 * Возвращает наименование задачи 330 */ 331 string name() @safe nothrow; 332 333 /** 334 * Запуск задачи 335 */ 336 void start(); 337 338 /** 339 * Остановка задачи 340 */ 341 void stop(); 342 } 343 344 345 alias SchedulerFactory = JobScheduler delegate(DependencyContainer container, UniConf config) @safe; 346 alias SysSchedulerFactory = JobScheduler delegate(DependencyContainer container) @safe; 347 348 349 /** 350 * Реализация планировщика задач на основе таймера vibed 351 */ 352 class TimerJobScheduler : Thread, JobScheduler 353 { 354 private 355 { 356 CronExpr _expression; 357 bool _isRunning; 358 string _name; 359 Mutex _mutex; 360 Condition _condition; 361 shared(Job) _job; 362 } 363 364 /** 365 * Main constructor 366 */ 367 this(string name, Job job, CronExpr cronExp) @trusted 368 { 369 super(&worker); 370 this._expression = cronExp; 371 this._name = name; 372 this._job = cast(shared)job; 373 this._mutex = new Mutex(); 374 this._condition = new Condition(this._mutex); 375 } 376 377 /** 378 * Возвращает наименование задачи 379 */ 380 string name() @safe nothrow 381 { 382 return _name; 383 } 384 385 /** 386 * Запуск задачи 387 */ 388 override void start() nothrow 389 { 390 _isRunning = true; 391 392 Timer timer; 393 void execute() 394 { 395 synchronized(_mutex) 396 _condition.notify(); 397 timer.rearm(getNextDuration(), true); 398 } 399 timer = setTimer(getNextDuration(), &execute, true); 400 401 super.start(); 402 } 403 404 /** 405 * Остановка задачи 406 */ 407 void stop() 408 { 409 _isRunning = false; 410 synchronized(_mutex) 411 _condition.notify(); 412 } 413 414 415 private: 416 417 418 void worker() 419 { 420 while (_isRunning) 421 { 422 synchronized(_mutex) 423 _condition.wait(); 424 if (!_isRunning) 425 return; 426 logDebug("Starting job '%s'", _name); 427 auto interval = getNextDuration(); 428 try 429 _job.execute(); 430 catch (Exception e) 431 logException(e, fmt!"Job error '%s'"(_name)); 432 logDebug("Done '%s' next run in '%s'", _name, interval); 433 } 434 } 435 436 /** 437 * Вычисление интервала 438 */ 439 Duration getNextDuration() nothrow 440 { 441 auto now = cast(DateTime)Clock.currTime(); 442 return assumeWontThrow(_expression.getNext(now)).get - now; 443 } 444 } 445