1 /**
2  * Модуль планировщика задач
3  *
4  * Copyright: (c) 2015-2020, Milofon Project.
5  * License: Subject to the terms of the BSD 3-Clause License, as written in the included LICENSE.md file.
6  * Author: <m.galanin@milofon.pro> Maksim Galanin
7  * Date: 2020-04-21
8  */
9 
10 module dango.system.scheduler;
11 
12 public
13 {
14     import uniconf.core : UniConf;
15 }
16 
17 private
18 {
19     import core.time : Duration;
20     import core.thread : Thread;
21     import core.sync.condition : Condition;
22     import core.sync.mutex : Mutex;
23 
24     import std.algorithm.iteration : filter;
25     import std.datetime : DateTime, Clock;
26     import std.exception : assumeWontThrow;
27     import std.format : fmt = format;
28     import std.traits : hasMember;
29     import std.uni : toUpper;
30 
31     import vibe.core.core : Timer, setTimer;
32     import vibe.core.log : logException;
33     import cronexp : CronExpr, CronException;
34     import dango.inject;
35 
36     import dango.system.logging : logWarn, logInfo, logDebug;
37     import dango.system.application : Application;
38     import dango.system.exception : DangoConfigException;
39     import dango.system.properties;
40     import dango.system.plugin;
41 }
42 
43 
44 /**
45  * Интерфес задачи
46  */
47 interface Job
48 {
49     /**
50      * Запуск задачи
51      */
52     void execute() shared;
53 }
54 
55 
56 /**
57  * Фабрика задачи планировщика 
58  */
59 alias JobFactory = ComponentFactory!(Job, DependencyContainer, UniConf);
60 
61 /**
62  * Контекст регистрации задач планировщика
63  */
64 alias SchedulerContext = PluginContext!(SchedulerPlugin);
65 
66 
67 /**
68  * Плагин управления задач планировщика
69  */
70 class SchedulerPlugin : DaemonPlugin
71 {
72     private @safe
73     {
74         DependencyContainer _container;
75         UniConf _config;
76 
77         SchedulerFactory[string] _schedulerFactorys;
78         SysSchedulerFactory[] _sysSchedulerFactorys;
79         JobScheduler[] _schedulers;
80     }
81 
82     /**
83      * Main constructor
84      */
85     @Inject
86     this(Application application)
87     {
88         this._container = application.getContainer();
89         this._config = application.getConfig();
90     }
91 
92     /**
93      * Свойство возвращает наименование плагина
94      */
95     string name() pure @safe nothrow
96     {
97         return "Scheduler";
98     }
99 
100     /**
101      * Свойство возвращает версию приложения
102      */
103     SemVer release() pure @safe nothrow
104     {
105         return SemVer(0, 0, 1);
106     }
107 
108     /**
109      * Запуск процесса
110      */
111     int startDaemon()
112     {
113         foreach (sysFactory; _sysSchedulerFactorys)
114             _schedulers ~= sysFactory(_container);
115 
116         auto schedulerConf = _config.opt!UniConf("scheduler");
117         if (!schedulerConf.isNull)
118         {
119             auto jobsConfig = schedulerConf.get.opt!UniConf("job");
120             if (!jobsConfig.isNull)
121             {
122                 foreach (UniConf jobConf; jobsConfig.get.toSequence.filter!(
123                             c => c.getOrElse!bool("enabled", false)))
124                 {
125                     string jobName = getNameOrEnforce(jobConf, "Not defined job name");
126                     if (auto factory = jobName.toUpper in _schedulerFactorys)
127                         _schedulers ~= (*factory)(_container, jobConf);
128                     else
129                         throw new DangoConfigException("Job '" ~ jobName ~ "' not register");
130                 }
131             }
132             else
133                 logWarn("Not found jobs configuration");
134         }
135         else
136             logWarn("Not found scheduler configuration");
137 
138         logInfo("Start scheduler");
139 
140         foreach (JobScheduler scheduler; _schedulers)
141         {
142             logInfo("  Start job [%s]", scheduler.name);
143             scheduler.start();
144         }
145 
146         return 0;
147     }
148 
149     /**
150      * Остановка процесса
151      *
152      * Params:
153      * exitStatus = Код завершения приложения
154      */
155     int stopDaemon(int exitStatus)
156     {
157         logInfo("Stop scheduler");
158         foreach (JobScheduler scheduler; _schedulers)
159         {
160             scheduler.stop();
161             logInfo("  Stop job [%s]", scheduler.name);
162         }
163         return 0;
164     }
165 
166     /**
167      * Регистрация задачи
168      */
169     void registerJob(J : Job)(string name) @safe
170     {
171         alias JF = ComponentFactoryCtor!(Job, J, UniConf);
172         registerJob!(JF)(name);
173     }
174 
175     /**
176      * Регистрация задачи с использованием фабрики
177      */
178     void registerJob(JF : JobFactory)(string name) @safe
179     {
180         auto factory = new WrapDependencyFactory!(JF)();
181         registerJob!JF(name, factory);
182     }
183 
184     /**
185      * Регистрация задачи с использованием существующей фабрики
186      */
187     void registerJob(JF : JobFactory)(string name, JobFactory factory) @safe
188     {
189         string uName = name.toUpper;
190         JobScheduler creator(DependencyContainer container, UniConf config) @safe
191         {
192             auto expStr = config.getOrEnforce!string("cron",
193                 fmt!"Not defined cron expression in task '%s'"(name));
194 
195             CronExpr cronExp = () @trusted {
196                 try
197                     return CronExpr(expStr);
198                 catch (CronException e)
199                     throw new DangoConfigException(
200                         fmt!"Incorrect cron expression in task '%s'"(name));
201             }();
202 
203             Job job = factory.createComponent(container, config);
204             return new TimerJobScheduler(name, job, cronExp);
205         }
206         _schedulerFactorys[uName] = &creator;
207     }
208 
209     /**
210      * Регистрация системной задачи
211      */
212     void registerSystemJob(J : Job)(string cronStr) @safe
213     {
214         alias JF = ComponentFactoryCtor!(Job, J, UniConf);
215         registerSystemJob!(JF)(cronStr);
216     }
217 
218     /**
219      * Регистрация системной задачи с использованием фабрики
220      */
221     void registerSystemJob(JF : JobFactory)(string cronStr) @safe
222     {
223         auto factory = new WrapDependencyFactory!(JF)();
224         registerSystemJob!JF(cronStr, factory);
225     }
226 
227     /**
228      * Регистрация системной задачи с использованием существующей фабрики
229      */
230     void registerSystemJob(JF : ComponentFactory!(C, A), C, A...)(string cronStr, JobFactory factory) @safe
231     {
232         static if (hasMember!(JF, "ConcreteType"))
233             string title = typeid(JF.ConcreteType).toString;
234         else static if (hasMember!(JF, "ComponentType"))
235             string title = typeid(JF.ComponentType).toString;
236         else
237             string title = typeid(JF).toString;
238 
239         JobScheduler creator(DependencyContainer container) @safe
240         {
241             UniConf config = UniConf([
242                     "__name": UniConf(title),
243                     "enabled": UniConf(true),
244                     "cron": UniConf(cronStr)
245             ]);
246 
247             CronExpr cronExp = () @trusted {
248                 try
249                     return CronExpr(cronStr);
250                 catch (CronException e)
251                     throw new DangoConfigException(
252                         fmt!"Incorrect cron expression in task '%s'"(title));
253             }();
254 
255             Job job = factory.createComponent(container, config);
256             return new TimerJobScheduler(title, job, cronExp);
257         }
258         _sysSchedulerFactorys ~= &creator;
259     }
260 }
261 
262 
263 /**
264   * Задача по выводу информации в консоль о GC
265   */
266 class GarbageCollectorStatJob : Job
267 {
268     /**
269      * Запуск задачи
270      */
271     void execute() shared
272     {
273         import core.memory : GC;
274         auto profileStats = GC.profileStats;
275         auto stats = GC.stats;
276         logInfo("== GC stats ==");
277         logInfo("  Use memory in heap: %s", humanizeSize(stats.usedSize));
278         logInfo("  Free memory in heap: %s", humanizeSize(stats.freeSize));
279         auto total = stats.freeSize + stats.usedSize;
280         logInfo("  Total memory in heap: %s", humanizeSize(total));
281         logInfo("  GC runs: %s", profileStats.numCollections);
282     }
283 
284 
285 private:
286 
287 
288     string humanizeSize(size_t size) shared
289     {
290         enum SUFFIXES = ["", "K", "M", "G"];
291         double dsize = size;
292         ubyte suffix = 0;
293         while (dsize > 1024 && suffix < 4)
294         {
295             dsize /= 1024;
296             suffix++;
297         }
298         return fmt!"%.02f%s"(dsize, SUFFIXES[suffix]);
299     }
300 }
301 
302 
303 /**
304   * Задача по запуску минимизации используемой GC памяти
305   */
306 class GarbageCollectorMinimizeJob : Job
307 {
308     /**
309      * Запуск задачи
310      */
311     void execute() shared
312     {
313         import core.memory : GC;
314         GC.collect();
315         GC.minimize();
316     }
317 }
318 
319 
320 private:
321 
322 
323 /**
324  * Интерфейс планировщика задач
325  */
326 interface JobScheduler
327 {
328     /**
329      * Возвращает наименование задачи
330      */
331     string name() @safe nothrow;
332 
333     /**
334      * Запуск задачи
335      */
336     void start();
337 
338     /**
339      * Остановка задачи
340      */
341     void stop();
342 }
343 
344 
345 alias SchedulerFactory = JobScheduler delegate(DependencyContainer container, UniConf config) @safe;
346 alias SysSchedulerFactory = JobScheduler delegate(DependencyContainer container) @safe;
347 
348 
349 /**
350  * Реализация планировщика задач на основе таймера vibed
351  */
352 class TimerJobScheduler : Thread, JobScheduler
353 {
354     private
355     {
356         CronExpr _expression;
357         bool _isRunning;
358         string _name;
359         Mutex _mutex;
360         Condition _condition;
361         shared(Job) _job;
362     }
363 
364     /**
365      * Main constructor
366      */
367     this(string name, Job job, CronExpr cronExp) @trusted
368     {
369         super(&worker);
370         this._expression = cronExp;
371         this._name = name;
372         this._job = cast(shared)job;
373         this._mutex = new Mutex();
374         this._condition = new Condition(this._mutex);
375     }
376 
377     /**
378      * Возвращает наименование задачи
379      */
380     string name() @safe nothrow
381     {
382         return _name;
383     }
384 
385     /**
386      * Запуск задачи
387      */
388     override void start() nothrow
389     {
390         _isRunning = true;
391 
392         Timer timer;
393         void execute()
394         {
395             synchronized(_mutex)
396                 _condition.notify();
397             timer.rearm(getNextDuration(), true);
398         }
399         timer = setTimer(getNextDuration(), &execute, true);
400 
401         super.start();
402     }
403 
404     /**
405      * Остановка задачи
406      */
407     void stop()
408     {
409         _isRunning = false;
410         synchronized(_mutex)
411             _condition.notify();
412     }
413 
414 
415 private:
416 
417 
418     void worker()
419     {
420         while (_isRunning)
421         {
422             synchronized(_mutex)
423                 _condition.wait();
424             if (!_isRunning) 
425                 return;
426             logDebug("Starting job '%s'", _name);
427             auto interval = getNextDuration();
428             try
429                 _job.execute();
430             catch (Exception e)
431                 logException(e, fmt!"Job error '%s'"(_name));
432             logDebug("Done '%s' next run in '%s'", _name, interval);
433         }
434     }
435 
436     /**
437      * Вычисление интервала
438      */
439     Duration getNextDuration() nothrow
440     {
441         auto now = cast(DateTime)Clock.currTime();
442         return assumeWontThrow(_expression.getNext(now)).get - now;
443     }
444 }
445