1 /**
2  * Основной модуль транспортного уровня
3  *
4  * Copyright: (c) 2015-2017, Milofon Project.
5  * License: Subject to the terms of the BSD license, as written in the included LICENSE.txt file.
6  * Author: <m.galanin@milofon.org> Maksim Galanin
7  * Date: 2018-01-24
8  */
9 
10 module dango.service.transport.core;
11 
12 public
13 {
14     import std.container : SList;
15 
16     import proped : Properties;
17 
18     import vibe.core.core : Mutex, yield;
19     import vibe.core.sync;
20 
21     import dango.service.protocol: RpcServerProtocol;
22 }
23 
24 
25 /**
26  * Интерфейс серверного транспортного уровня
27  */
28 interface ServerTransport
29 {
30     /**
31      * Запуск транспортного уровня
32      * Params:
33      * config = Конфигурация транспорта
34      */
35     void listen(RpcServerProtocol protocol, Properties config);
36 
37     /**
38      * Завершение работы
39      */
40     void shutdown();
41 }
42 
43 
44 /**
45  * Интерфейс клиентского транспортного уровня
46  */
47 interface ClientTransport
48 {
49     /**
50      * Инициализация транспорта клиента
51      */
52     void initialize(Properties config);
53 
54 
55     /**
56      * Выполнение запроса
57      * Params:
58      * bytes = Входящие данные
59      * Return: Данные ответа
60      */
61     ubyte[] request(ubyte[] bytes);
62 }
63 
64 
65 /**
66   * Интерфейс подключения с возможностью хранения в пуле
67   */
68 interface ClientConnection
69 {
70     /**
71      * Проверка на активность подключения
72      */
73     bool connected() @property;
74 
75     /**
76      * Установка подключения
77      */
78     void connect();
79 
80     /**
81      * Разрыв соединения
82      */
83     void disconnect();
84 }
85 
86 
87 /**
88  * Интерфейс пула подключений
89  */
90 interface ClientConnectionPool(C)
91 {
92     /**
93      * Получить подключение из пула
94      */
95     C getConnection() @safe;
96 
97     /**
98      * Вернуть подключение в пул
99      * Params:
100      *
101      * conn = Освобождаемое подключение
102      */
103     void freeConnection(C conn) @safe;
104 
105     /**
106      * Создание нового подключения
107      */
108     C createNewConnection();
109 }
110 
111 
112 /**
113  * Класс пула с возможностью работы с конкурентной многозадачностью
114  */
115 abstract class AsyncClientConnectionPool(C) : ClientConnectionPool!C
116 {
117     private
118     {
119         SList!C _pool;
120         Mutex _mutex;
121         uint _size;
122     }
123 
124 
125     this(uint size)
126     {
127         _mutex = new Mutex();
128         _size = size;
129         initializePool();
130     }
131 
132 
133     C getConnection() @safe
134     {
135         _mutex.lock();
136         while (_pool.empty)
137             yield();
138 
139         auto conn = () @trusted {
140             auto conn = _pool.front();
141             if (!conn.connected)
142                 conn.connect();
143             return conn;
144         } ();
145 
146         _pool.removeFront();
147         _mutex.unlock();
148         return conn;
149     }
150 
151 
152     void freeConnection(C conn) @safe
153     {
154         _mutex.lock();
155         _pool.insertFront(conn);
156         _mutex.unlock();
157     }
158 
159 
160     private void initializePool()
161     {
162         _mutex.lock();
163         foreach (i; 0.._size)
164             _pool.insertFront(createNewConnection());
165         _mutex.unlock();
166     }
167 }
168 
169 
170 /**
171  * Класс пула с возможностью работы с многозадачностью на основе потоков
172  */
173 abstract class WaitClientConnectionPool(C) : ClientConnectionPool!C
174 {
175     private
176     {
177         SList!C _pool;
178         Mutex _mutex;
179         TaskCondition _condition;
180         uint _size;
181     }
182 
183 
184     this(uint size)
185     {
186         _mutex = new Mutex();
187         _condition = new TaskCondition(_mutex);
188         _size = size;
189         initializePool();
190     }
191 
192 
193     C getConnection() @safe
194     {
195         _mutex.lock();
196         while (_pool.empty)
197             _condition.wait();
198 
199         auto conn = () @trusted {
200             auto conn = _pool.front();
201             if (!conn.connected)
202                 conn.connect();
203             return conn;
204         } ();
205 
206         _pool.removeFront();
207         _mutex.unlock();
208         return conn;
209     }
210 
211 
212     void freeConnection(C conn) @safe
213     {
214         _mutex.lock();
215         _pool.insertFront(conn);
216         _condition.notify();
217         _mutex.unlock();
218     }
219 
220 
221     private void initializePool()
222     {
223         _mutex.lock();
224         foreach (i; 0.._size)
225             _pool.insertFront(createNewConnection());
226         _mutex.unlock();
227     }
228 }