1 /** 2 * Основной модуль транспортного уровня 3 * 4 * Copyright: (c) 2015-2017, Milofon Project. 5 * License: Subject to the terms of the BSD license, as written in the included LICENSE.txt file. 6 * Author: <m.galanin@milofon.org> Maksim Galanin 7 * Date: 2018-01-24 8 */ 9 10 module dango.service.transport.core; 11 12 public 13 { 14 import std.container : SList; 15 16 import proped : Properties; 17 18 import vibe.core.core : Mutex, yield; 19 import vibe.core.sync; 20 21 import dango.service.protocol: RpcServerProtocol; 22 } 23 24 25 /** 26 * Интерфейс серверного транспортного уровня 27 */ 28 interface ServerTransport 29 { 30 /** 31 * Запуск транспортного уровня 32 * Params: 33 * config = Конфигурация транспорта 34 */ 35 void listen(RpcServerProtocol protocol, Properties config); 36 37 /** 38 * Завершение работы 39 */ 40 void shutdown(); 41 } 42 43 44 /** 45 * Интерфейс клиентского транспортного уровня 46 */ 47 interface ClientTransport 48 { 49 /** 50 * Инициализация транспорта клиента 51 */ 52 void initialize(Properties config); 53 54 55 /** 56 * Выполнение запроса 57 * Params: 58 * bytes = Входящие данные 59 * Return: Данные ответа 60 */ 61 ubyte[] request(ubyte[] bytes); 62 } 63 64 65 /** 66 * Интерфейс подключения с возможностью хранения в пуле 67 */ 68 interface ClientConnection 69 { 70 /** 71 * Проверка на активность подключения 72 */ 73 bool connected() @property; 74 75 /** 76 * Установка подключения 77 */ 78 void connect(); 79 80 /** 81 * Разрыв соединения 82 */ 83 void disconnect(); 84 } 85 86 87 /** 88 * Интерфейс пула подключений 89 */ 90 interface ClientConnectionPool(C) 91 { 92 /** 93 * Получить подключение из пула 94 */ 95 C getConnection() @safe; 96 97 /** 98 * Вернуть подключение в пул 99 * Params: 100 * 101 * conn = Освобождаемое подключение 102 */ 103 void freeConnection(C conn) @safe; 104 105 /** 106 * Создание нового подключения 107 */ 108 C createNewConnection(); 109 } 110 111 112 /** 113 * Класс пула с возможностью работы с конкурентной многозадачностью 114 */ 115 abstract class AsyncClientConnectionPool(C) : ClientConnectionPool!C 116 { 117 private 118 { 119 SList!C _pool; 120 Mutex _mutex; 121 uint _size; 122 } 123 124 125 this(uint size) 126 { 127 _mutex = new Mutex(); 128 _size = size; 129 initializePool(); 130 } 131 132 133 C getConnection() @safe 134 { 135 _mutex.lock(); 136 while (_pool.empty) 137 yield(); 138 139 auto conn = () @trusted { 140 auto conn = _pool.front(); 141 if (!conn.connected) 142 conn.connect(); 143 return conn; 144 } (); 145 146 _pool.removeFront(); 147 _mutex.unlock(); 148 return conn; 149 } 150 151 152 void freeConnection(C conn) @safe 153 { 154 _mutex.lock(); 155 _pool.insertFront(conn); 156 _mutex.unlock(); 157 } 158 159 160 private void initializePool() 161 { 162 _mutex.lock(); 163 foreach (i; 0.._size) 164 _pool.insertFront(createNewConnection()); 165 _mutex.unlock(); 166 } 167 } 168 169 170 /** 171 * Класс пула с возможностью работы с многозадачностью на основе потоков 172 */ 173 abstract class WaitClientConnectionPool(C) : ClientConnectionPool!C 174 { 175 private 176 { 177 SList!C _pool; 178 Mutex _mutex; 179 TaskCondition _condition; 180 uint _size; 181 } 182 183 184 this(uint size) 185 { 186 _mutex = new Mutex(); 187 _condition = new TaskCondition(_mutex); 188 _size = size; 189 initializePool(); 190 } 191 192 193 C getConnection() @safe 194 { 195 _mutex.lock(); 196 while (_pool.empty) 197 _condition.wait(); 198 199 auto conn = () @trusted { 200 auto conn = _pool.front(); 201 if (!conn.connected) 202 conn.connect(); 203 return conn; 204 } (); 205 206 _pool.removeFront(); 207 _mutex.unlock(); 208 return conn; 209 } 210 211 212 void freeConnection(C conn) @safe 213 { 214 _mutex.lock(); 215 _pool.insertFront(conn); 216 _condition.notify(); 217 _mutex.unlock(); 218 } 219 220 221 private void initializePool() 222 { 223 _mutex.lock(); 224 foreach (i; 0.._size) 225 _pool.insertFront(createNewConnection()); 226 _mutex.unlock(); 227 } 228 }