1 /**
2  * Модуль планировщика задач
3  *
4  * Copyright: (c) 2015-2020, Milofon Project.
5  * License: Subject to the terms of the BSD 3-Clause License, as written in the included LICENSE.md file.
6  * Author: <m.galanin@milofon.pro> Maksim Galanin
7  * Date: 2020-04-21
8  */
9 
10 module dango.system.scheduler;
11 
12 private
13 {
14     import core.time : Duration;
15     import core.thread : Thread;
16     import core.sync.condition : Condition;
17     import core.sync.mutex : Mutex;
18 
19     import std.algorithm.iteration : filter;
20     import std.datetime : DateTime, Clock;
21     import std.exception : assumeWontThrow;
22     import std.format : fmt = format;
23     import std.traits : hasMember;
24     import std.uni : toUpper;
25 
26     import vibe.core.core : Timer, setTimer;
27     import vibe.core.log : logException;
28     import cronexp : CronExpr, CronException;
29     import dango.inject;
30 
31     import dango.system.logging : logWarn, logInfo, logDebug;
32     import dango.system.application : Application, UniConf;
33     import dango.system.exception : DangoConfigException;
34     import dango.system.properties;
35     import dango.system.plugin;
36 }
37 
38 
39 /**
40  * Интерфес задачи
41  */
42 interface Job
43 {
44     /**
45      * Запуск задачи
46      */
47     void execute() shared;
48 }
49 
50 
51 /**
52  * Фабрика задачи планировщика 
53  */
54 alias JobFactory = ComponentFactory!(Job, DependencyContainer, UniConf);
55 
56 /**
57  * Контекст регистрации задач планировщика
58  */
59 alias SchedulerContext = PluginContext!(SchedulerPlugin);
60 
61 
62 /**
63  * Плагин управления задач планировщика
64  */
65 class SchedulerPlugin : DaemonPlugin
66 {
67     private @safe
68     {
69         @Inject
70         Application _application;
71 
72         SchedulerFactory[string] _schedulerFactorys;
73         SysSchedulerFactory[] _sysSchedulerFactorys;
74         JobScheduler[] _schedulers;
75     }
76 
77     /**
78      * Свойство возвращает наименование плагина
79      */
80     string name() pure @safe nothrow
81     {
82         return "Scheduler";
83     }
84 
85     /**
86      * Свойство возвращает версию приложения
87      */
88     SemVer release() pure @safe nothrow
89     {
90         return SemVer(0, 0, 1);
91     }
92 
93     /**
94      * Запуск процесса
95      */
96     int startDaemon()
97     {
98         auto config = _application.getConfig;
99         auto container = _application.getContainer;
100 
101         foreach (sysFactory; _sysSchedulerFactorys)
102             _schedulers ~= sysFactory(container);
103 
104         auto schedulerConf = config.opt!UniConf("scheduler");
105         if (schedulerConf.empty)
106         {
107             logWarn("Not found scheduler configuration");
108             return 0;
109         }
110 
111         auto jobsConfig = schedulerConf.front.opt!UniConf("job");
112         if (jobsConfig.empty)
113         {
114             logWarn("Not found jobs configuration");
115             return 0;
116         }
117 
118         foreach (UniConf jobConf; jobsConfig.front.toSequence.filter!(
119                     c => c.getOrElse!bool("enabled", false)))
120         {
121             string jobName = getNameOrEnforce(jobConf, "Not defined job name");
122             if (auto factory = jobName.toUpper in _schedulerFactorys)
123                 _schedulers ~= (*factory)(container, jobConf);
124             else
125                 throw new DangoConfigException("Job '" ~ jobName ~ "' not register");
126         }
127 
128         logInfo("Start scheduler");
129 
130         foreach (JobScheduler scheduler; _schedulers)
131         {
132             logInfo("  Start job [%s]", scheduler.name);
133             scheduler.start();
134         }
135 
136         return 0;
137     }
138 
139     /**
140      * Остановка процесса
141      *
142      * Params:
143      * exitStatus = Код завершения приложения
144      */
145     int stopDaemon(int exitStatus)
146     {
147         logInfo("Stop scheduler");
148         foreach (JobScheduler scheduler; _schedulers)
149         {
150             scheduler.stop();
151             logInfo("  Stop job [%s]", scheduler.name);
152         }
153         return 0;
154     }
155 
156     /**
157      * Регистрация задачи
158      */
159     void registerJob(J : Job)(string name) @safe
160     {
161         alias JF = ComponentFactoryCtor!(Job, J, UniConf);
162         registerJob!(JF)(name);
163     }
164 
165     /**
166      * Регистрация задачи с использованием фабрики
167      */
168     void registerJob(JF : JobFactory)(string name) @safe
169     {
170         auto factory = new WrapDependencyFactory!(JF)();
171         registerJob!JF(name, factory);
172     }
173 
174     /**
175      * Регистрация задачи с использованием существующей фабрики
176      */
177     void registerJob(JF : JobFactory)(string name, JobFactory factory) @safe
178     {
179         string uName = name.toUpper;
180         JobScheduler creator(DependencyContainer container, UniConf config) @safe
181         {
182             auto expStr = config.getOrEnforce!string("cron",
183                 fmt!"Not defined cron expression in task '%s'"(name));
184 
185             CronExpr cronExp = () @trusted {
186                 try
187                     return CronExpr(expStr);
188                 catch (CronException e)
189                     throw new DangoConfigException(
190                         fmt!"Incorrect cron expression in task '%s'"(name));
191             }();
192 
193             Job job = factory.createComponent(container, config);
194             return new TimerJobScheduler(name, job, cronExp);
195         }
196         _schedulerFactorys[uName] = &creator;
197     }
198 
199     /**
200      * Регистрация системной задачи
201      */
202     void registerSystemJob(J : Job)(string cronStr) @safe
203     {
204         alias JF = ComponentFactoryCtor!(Job, J, UniConf);
205         registerSystemJob!(JF)(cronStr);
206     }
207 
208     /**
209      * Регистрация системной задачи с использованием фабрики
210      */
211     void registerSystemJob(JF : JobFactory)(string cronStr) @safe
212     {
213         auto factory = new WrapDependencyFactory!(JF)();
214         registerSystemJob!JF(cronStr, factory);
215     }
216 
217     /**
218      * Регистрация системной задачи с использованием существующей фабрики
219      */
220     void registerSystemJob(JF : ComponentFactory!(C, A), C, A...)(string cronStr, JobFactory factory) @safe
221     {
222         static if (hasMember!(JF, "ConcreteType"))
223             string title = typeid(JF.ConcreteType).toString;
224         else static if (hasMember!(JF, "ComponentType"))
225             string title = typeid(JF.ComponentType).toString;
226         else
227             string title = typeid(JF).toString;
228 
229         JobScheduler creator(DependencyContainer container) @safe
230         {
231             UniConf config = UniConf([
232                     "__name": UniConf(title),
233                     "enabled": UniConf(true),
234                     "cron": UniConf(cronStr)
235             ]);
236 
237             CronExpr cronExp = () @trusted {
238                 try
239                     return CronExpr(cronStr);
240                 catch (CronException e)
241                     throw new DangoConfigException(
242                         fmt!"Incorrect cron expression in task '%s'"(title));
243             }();
244 
245             Job job = factory.createComponent(container, config);
246             return new TimerJobScheduler(title, job, cronExp);
247         }
248         _sysSchedulerFactorys ~= &creator;
249     }
250 }
251 
252 
253 /**
254   * Задача по выводу информации в консоль о GC
255   */
256 class GarbageCollectorStatJob : Job
257 {
258     /**
259      * Запуск задачи
260      */
261     void execute() shared
262     {
263         import core.memory : GC;
264         auto profileStats = GC.profileStats;
265         auto stats = GC.stats;
266         logInfo("== GC stats ==");
267         logInfo("  Use memory in heap: %s", humanizeSize(stats.usedSize));
268         logInfo("  Free memory in heap: %s", humanizeSize(stats.freeSize));
269         auto total = stats.freeSize + stats.usedSize;
270         logInfo("  Total memory in heap: %s", humanizeSize(total));
271         logInfo("  GC runs: %s", profileStats.numCollections);
272     }
273 
274 
275 private:
276 
277 
278     string humanizeSize(size_t size) shared
279     {
280         enum SUFFIXES = ["", "K", "M", "G"];
281         double dsize = size;
282         ubyte suffix = 0;
283         while (dsize > 1024 && suffix < 4)
284         {
285             dsize /= 1024;
286             suffix++;
287         }
288         return fmt!"%.02f%s"(dsize, SUFFIXES[suffix]);
289     }
290 }
291 
292 
293 /**
294   * Задача по запуску минимизации используемой GC памяти
295   */
296 class GarbageCollectorMinimizeJob : Job
297 {
298     /**
299      * Запуск задачи
300      */
301     void execute() shared
302     {
303         import core.memory : GC;
304         GC.collect();
305         GC.minimize();
306     }
307 }
308 
309 
310 private:
311 
312 
313 /**
314  * Интерфейс планировщика задач
315  */
316 interface JobScheduler
317 {
318     /**
319      * Возвращает наименование задачи
320      */
321     string name() @safe nothrow;
322 
323     /**
324      * Запуск задачи
325      */
326     void start();
327 
328     /**
329      * Остановка задачи
330      */
331     void stop();
332 }
333 
334 
335 alias SchedulerFactory = JobScheduler delegate(DependencyContainer container, UniConf config) @safe;
336 alias SysSchedulerFactory = JobScheduler delegate(DependencyContainer container) @safe;
337 
338 
339 /**
340  * Реализация планировщика задач на основе таймера vibed
341  */
342 class TimerJobScheduler : Thread, JobScheduler
343 {
344     private
345     {
346         CronExpr _expression;
347         bool _isRunning;
348         string _name;
349         Mutex _mutex;
350         Condition _condition;
351         shared(Job) _job;
352     }
353 
354     /**
355      * Main constructor
356      */
357     this(string name, Job job, CronExpr cronExp) @trusted
358     {
359         super(&worker);
360         this._expression = cronExp;
361         this._name = name;
362         this._job = cast(shared)job;
363         this._mutex = new Mutex();
364         this._condition = new Condition(this._mutex);
365     }
366 
367     /**
368      * Возвращает наименование задачи
369      */
370     string name() @safe nothrow
371     {
372         return _name;
373     }
374 
375     /**
376      * Запуск задачи
377      */
378     override void start() nothrow
379     {
380         _isRunning = true;
381 
382         Timer timer;
383         void execute()
384         {
385             synchronized(_mutex)
386                 _condition.notify();
387             timer.rearm(getNextDuration(), true);
388         }
389         timer = setTimer(getNextDuration(), &execute, true);
390 
391         super.start();
392     }
393 
394     /**
395      * Остановка задачи
396      */
397     void stop()
398     {
399         _isRunning = false;
400         synchronized(_mutex)
401             _condition.notify();
402     }
403 
404 
405 private:
406 
407 
408     void worker()
409     {
410         while (_isRunning)
411         {
412             synchronized(_mutex)
413                 _condition.wait();
414             if (!_isRunning) 
415                 return;
416             logDebug("Starting job '%s'", _name);
417             auto interval = getNextDuration();
418             try
419                 _job.execute();
420             catch (Exception e)
421                 logException(e, fmt!"Job error '%s'"(_name));
422             logDebug("Done '%s' next run in '%s'", _name, interval);
423         }
424     }
425 
426     /**
427      * Вычисление интервала
428      */
429     Duration getNextDuration() nothrow
430     {
431         auto now = cast(DateTime)Clock.currTime();
432         return assumeWontThrow(_expression.getNext(now)) - now;
433     }
434 }
435