1 /** 2 * Модуль планировщика задач 3 * 4 * Copyright: (c) 2015-2020, Milofon Project. 5 * License: Subject to the terms of the BSD 3-Clause License, as written in the included LICENSE.md file. 6 * Author: <m.galanin@milofon.pro> Maksim Galanin 7 * Date: 2020-04-21 8 */ 9 10 module dango.system.scheduler; 11 12 private 13 { 14 import core.time : Duration; 15 import core.thread : Thread; 16 import core.sync.condition : Condition; 17 import core.sync.mutex : Mutex; 18 19 import std.algorithm.iteration : filter; 20 import std.datetime : DateTime, Clock; 21 import std.exception : assumeWontThrow; 22 import std.format : fmt = format; 23 import std.traits : hasMember; 24 import std.uni : toUpper; 25 26 import vibe.core.core : Timer, setTimer; 27 import vibe.core.log : logException; 28 import cronexp : CronExpr, CronException; 29 import dango.inject; 30 31 import dango.system.logging : logWarn, logInfo, logDebug; 32 import dango.system.application : Application, UniConf; 33 import dango.system.exception : DangoConfigException; 34 import dango.system.properties; 35 import dango.system.plugin; 36 } 37 38 39 /** 40 * Интерфес задачи 41 */ 42 interface Job 43 { 44 /** 45 * Запуск задачи 46 */ 47 void execute() shared; 48 } 49 50 51 /** 52 * Фабрика задачи планировщика 53 */ 54 alias JobFactory = ComponentFactory!(Job, DependencyContainer, UniConf); 55 56 /** 57 * Контекст регистрации задач планировщика 58 */ 59 alias SchedulerContext = PluginContext!(SchedulerPlugin); 60 61 62 /** 63 * Плагин управления задач планировщика 64 */ 65 class SchedulerPlugin : DaemonPlugin 66 { 67 private @safe 68 { 69 @Inject 70 Application _application; 71 72 SchedulerFactory[string] _schedulerFactorys; 73 SysSchedulerFactory[] _sysSchedulerFactorys; 74 JobScheduler[] _schedulers; 75 } 76 77 /** 78 * Свойство возвращает наименование плагина 79 */ 80 string name() pure @safe nothrow 81 { 82 return "Scheduler"; 83 } 84 85 /** 86 * Свойство возвращает версию приложения 87 */ 88 SemVer release() pure @safe nothrow 89 { 90 return SemVer(0, 0, 1); 91 } 92 93 /** 94 * Запуск процесса 95 */ 96 int startDaemon() 97 { 98 auto config = _application.getConfig; 99 auto container = _application.getContainer; 100 101 foreach (sysFactory; _sysSchedulerFactorys) 102 _schedulers ~= sysFactory(container); 103 104 auto schedulerConf = config.opt!UniConf("scheduler"); 105 if (schedulerConf.empty) 106 { 107 logWarn("Not found scheduler configuration"); 108 return 0; 109 } 110 111 auto jobsConfig = schedulerConf.front.opt!UniConf("job"); 112 if (jobsConfig.empty) 113 { 114 logWarn("Not found jobs configuration"); 115 return 0; 116 } 117 118 foreach (UniConf jobConf; jobsConfig.front.toSequence.filter!( 119 c => c.getOrElse!bool("enabled", false))) 120 { 121 string jobName = getNameOrEnforce(jobConf, "Not defined job name"); 122 if (auto factory = jobName.toUpper in _schedulerFactorys) 123 _schedulers ~= (*factory)(container, jobConf); 124 else 125 throw new DangoConfigException("Job '" ~ jobName ~ "' not register"); 126 } 127 128 logInfo("Start scheduler"); 129 130 foreach (JobScheduler scheduler; _schedulers) 131 { 132 logInfo(" Start job [%s]", scheduler.name); 133 scheduler.start(); 134 } 135 136 return 0; 137 } 138 139 /** 140 * Остановка процесса 141 * 142 * Params: 143 * exitStatus = Код завершения приложения 144 */ 145 int stopDaemon(int exitStatus) 146 { 147 logInfo("Stop scheduler"); 148 foreach (JobScheduler scheduler; _schedulers) 149 { 150 scheduler.stop(); 151 logInfo(" Stop job [%s]", scheduler.name); 152 } 153 return 0; 154 } 155 156 /** 157 * Регистрация задачи 158 */ 159 void registerJob(J : Job)(string name) @safe 160 { 161 alias JF = ComponentFactoryCtor!(Job, J, UniConf); 162 registerJob!(JF)(name); 163 } 164 165 /** 166 * Регистрация задачи с использованием фабрики 167 */ 168 void registerJob(JF : JobFactory)(string name) @safe 169 { 170 auto factory = new WrapDependencyFactory!(JF)(); 171 registerJob!JF(name, factory); 172 } 173 174 /** 175 * Регистрация задачи с использованием существующей фабрики 176 */ 177 void registerJob(JF : JobFactory)(string name, JobFactory factory) @safe 178 { 179 string uName = name.toUpper; 180 JobScheduler creator(DependencyContainer container, UniConf config) @safe 181 { 182 auto expStr = config.getOrEnforce!string("cron", 183 fmt!"Not defined cron expression in task '%s'"(name)); 184 185 CronExpr cronExp = () @trusted { 186 try 187 return CronExpr(expStr); 188 catch (CronException e) 189 throw new DangoConfigException( 190 fmt!"Incorrect cron expression in task '%s'"(name)); 191 }(); 192 193 Job job = factory.createComponent(container, config); 194 return new TimerJobScheduler(name, job, cronExp); 195 } 196 _schedulerFactorys[uName] = &creator; 197 } 198 199 /** 200 * Регистрация системной задачи 201 */ 202 void registerSystemJob(J : Job)(string cronStr) @safe 203 { 204 alias JF = ComponentFactoryCtor!(Job, J, UniConf); 205 registerSystemJob!(JF)(cronStr); 206 } 207 208 /** 209 * Регистрация системной задачи с использованием фабрики 210 */ 211 void registerSystemJob(JF : JobFactory)(string cronStr) @safe 212 { 213 auto factory = new WrapDependencyFactory!(JF)(); 214 registerSystemJob!JF(cronStr, factory); 215 } 216 217 /** 218 * Регистрация системной задачи с использованием существующей фабрики 219 */ 220 void registerSystemJob(JF : ComponentFactory!(C, A), C, A...)(string cronStr, JobFactory factory) @safe 221 { 222 static if (hasMember!(JF, "ConcreteType")) 223 string title = typeid(JF.ConcreteType).toString; 224 else static if (hasMember!(JF, "ComponentType")) 225 string title = typeid(JF.ComponentType).toString; 226 else 227 string title = typeid(JF).toString; 228 229 JobScheduler creator(DependencyContainer container) @safe 230 { 231 UniConf config = UniConf([ 232 "__name": UniConf(title), 233 "enabled": UniConf(true), 234 "cron": UniConf(cronStr) 235 ]); 236 237 CronExpr cronExp = () @trusted { 238 try 239 return CronExpr(cronStr); 240 catch (CronException e) 241 throw new DangoConfigException( 242 fmt!"Incorrect cron expression in task '%s'"(title)); 243 }(); 244 245 Job job = factory.createComponent(container, config); 246 return new TimerJobScheduler(title, job, cronExp); 247 } 248 _sysSchedulerFactorys ~= &creator; 249 } 250 } 251 252 253 /** 254 * Задача по выводу информации в консоль о GC 255 */ 256 class GarbageCollectorStatJob : Job 257 { 258 /** 259 * Запуск задачи 260 */ 261 void execute() shared 262 { 263 import core.memory : GC; 264 auto profileStats = GC.profileStats; 265 auto stats = GC.stats; 266 logInfo("== GC stats =="); 267 logInfo(" Use memory in heap: %s", humanizeSize(stats.usedSize)); 268 logInfo(" Free memory in heap: %s", humanizeSize(stats.freeSize)); 269 auto total = stats.freeSize + stats.usedSize; 270 logInfo(" Total memory in heap: %s", humanizeSize(total)); 271 logInfo(" GC runs: %s", profileStats.numCollections); 272 } 273 274 275 private: 276 277 278 string humanizeSize(size_t size) shared 279 { 280 enum SUFFIXES = ["", "K", "M", "G"]; 281 double dsize = size; 282 ubyte suffix = 0; 283 while (dsize > 1024 && suffix < 4) 284 { 285 dsize /= 1024; 286 suffix++; 287 } 288 return fmt!"%.02f%s"(dsize, SUFFIXES[suffix]); 289 } 290 } 291 292 293 /** 294 * Задача по запуску минимизации используемой GC памяти 295 */ 296 class GarbageCollectorMinimizeJob : Job 297 { 298 /** 299 * Запуск задачи 300 */ 301 void execute() shared 302 { 303 import core.memory : GC; 304 GC.collect(); 305 GC.minimize(); 306 } 307 } 308 309 310 private: 311 312 313 /** 314 * Интерфейс планировщика задач 315 */ 316 interface JobScheduler 317 { 318 /** 319 * Возвращает наименование задачи 320 */ 321 string name() @safe nothrow; 322 323 /** 324 * Запуск задачи 325 */ 326 void start(); 327 328 /** 329 * Остановка задачи 330 */ 331 void stop(); 332 } 333 334 335 alias SchedulerFactory = JobScheduler delegate(DependencyContainer container, UniConf config) @safe; 336 alias SysSchedulerFactory = JobScheduler delegate(DependencyContainer container) @safe; 337 338 339 /** 340 * Реализация планировщика задач на основе таймера vibed 341 */ 342 class TimerJobScheduler : Thread, JobScheduler 343 { 344 private 345 { 346 CronExpr _expression; 347 bool _isRunning; 348 string _name; 349 Mutex _mutex; 350 Condition _condition; 351 shared(Job) _job; 352 } 353 354 /** 355 * Main constructor 356 */ 357 this(string name, Job job, CronExpr cronExp) @trusted 358 { 359 super(&worker); 360 this._expression = cronExp; 361 this._name = name; 362 this._job = cast(shared)job; 363 this._mutex = new Mutex(); 364 this._condition = new Condition(this._mutex); 365 } 366 367 /** 368 * Возвращает наименование задачи 369 */ 370 string name() @safe nothrow 371 { 372 return _name; 373 } 374 375 /** 376 * Запуск задачи 377 */ 378 override void start() nothrow 379 { 380 _isRunning = true; 381 382 Timer timer; 383 void execute() 384 { 385 synchronized(_mutex) 386 _condition.notify(); 387 timer.rearm(getNextDuration(), true); 388 } 389 timer = setTimer(getNextDuration(), &execute, true); 390 391 super.start(); 392 } 393 394 /** 395 * Остановка задачи 396 */ 397 void stop() 398 { 399 _isRunning = false; 400 synchronized(_mutex) 401 _condition.notify(); 402 } 403 404 405 private: 406 407 408 void worker() 409 { 410 while (_isRunning) 411 { 412 synchronized(_mutex) 413 _condition.wait(); 414 if (!_isRunning) 415 return; 416 logDebug("Starting job '%s'", _name); 417 auto interval = getNextDuration(); 418 try 419 _job.execute(); 420 catch (Exception e) 421 logException(e, fmt!"Job error '%s'"(_name)); 422 logDebug("Done '%s' next run in '%s'", _name, interval); 423 } 424 } 425 426 /** 427 * Вычисление интервала 428 */ 429 Duration getNextDuration() nothrow 430 { 431 auto now = cast(DateTime)Clock.currTime(); 432 return assumeWontThrow(_expression.getNext(now)) - now; 433 } 434 } 435