1 /** 2 * Модуль транспортного уровня на основе ZeroMQ 3 * 4 * Copyright: (c) 2015-2017, Milofon Project. 5 * License: Subject to the terms of the BSD license, as written in the included LICENSE.txt file. 6 * Author: <m.galanin@milofon.org> Maksim Galanin 7 * Date: 2018-01-24 8 */ 9 10 module dango.service.transport.zeromq; 11 12 private 13 { 14 import core.thread; 15 16 import std.datetime : Clock; 17 import std..string : toStringz, fromStringz; 18 19 import vibe.core.log; 20 import vibe.core.core : yield; 21 22 import deimos.zmq.zmq; 23 import zmqd; 24 25 import dango.service.exception; 26 import dango.system.properties : getOrEnforce; 27 28 import dango.service.transport.core; 29 } 30 31 32 alias Handler = ubyte[] delegate(ubyte[]); 33 34 35 struct ZeroMQTransportSettings 36 { 37 string uri; 38 bool useBroker; 39 } 40 41 42 43 class ZeroMQServerTransport : ServerTransport 44 { 45 private 46 { 47 ZeroMQTransportSettings _settings; 48 ZeroMQWorker _worker; 49 } 50 51 52 void listen(RpcServerProtocol protocol, Properties config) 53 { 54 _settings.uri = config.getOrEnforce!string("bind", 55 "ZeroMQ transport is not defined bind"); 56 _settings.useBroker = config.getOrElse!bool("broker", false); 57 58 const ver = zmqVersion(); 59 logInfo("Version ZeroMQ: %s.%s.%s", ver.major, ver.minor, ver.patch); 60 61 62 ubyte[] handler(ubyte[] data) 63 { 64 return protocol.handle(data); 65 } 66 67 _worker = new ZeroMQWorker(_settings, &handler); 68 _worker.start(); 69 70 logInfo("Transport ZeroMQ Start"); 71 } 72 73 74 void shutdown() 75 { 76 _worker.stop(); 77 logInfo("Transport ZeroMQ Stop"); 78 } 79 } 80 81 82 83 private final class ZeroMQWorker : Thread 84 { 85 private 86 { 87 ZeroMQTransportSettings _settings; 88 bool _running; 89 Handler _handler; 90 } 91 92 93 this(ZeroMQTransportSettings settings, Handler handler) 94 { 95 _settings = settings; 96 _handler = handler; 97 super(&run); 98 } 99 100 101 void run() 102 { 103 _running = true; 104 ubyte[] buffer; 105 106 auto worker = Socket(SocketType.rep); 107 108 if (_settings.useBroker) 109 { 110 worker.connect(_settings.uri); 111 logInfo("Connect to broker %s", _settings.uri); 112 } 113 else 114 { 115 worker.bind(_settings.uri); 116 logInfo("Listening for requests on %s", _settings.uri); 117 } 118 119 120 PollItem[] items = [ 121 PollItem(worker, PollFlags.pollIn), 122 ]; 123 124 auto payload = Frame(); 125 126 while (_running) 127 { 128 poll(items, 100.msecs); 129 if (items[0].returnedEvents & PollFlags.pollIn) 130 { 131 worker.receive(payload); 132 133 buffer.length = 0; 134 buffer ~= payload.data; 135 136 while (payload.more) 137 { 138 worker.receive(payload); 139 buffer ~= payload.data; 140 } 141 ubyte[] resData = _handler(buffer); 142 worker.send(resData); 143 } 144 } 145 } 146 147 148 void stop() 149 { 150 _running = false; 151 } 152 } 153 154 155 156 class ZeroMQClientConnection : ClientConnection 157 { 158 private 159 { 160 Socket _socket; 161 PollItem[] _items; 162 string _uri; 163 ubyte[] _buffer; 164 bool _connected; 165 Duration _timeout; 166 } 167 168 169 this(string uri, uint timeout) 170 { 171 _timeout = timeout.msecs; 172 _uri = uri; 173 } 174 175 176 bool connected() @property 177 { 178 return _socket.initialized && _connected; 179 } 180 181 182 void connect() 183 { 184 _socket = Socket(SocketType.req); 185 _items = [PollItem(_socket, PollFlags.pollIn)]; 186 _socket.connect(_uri); 187 _connected = true; 188 } 189 190 191 void disconnect() 192 { 193 _socket.linger = Duration.zero; 194 _socket.close(); 195 _connected = false; 196 } 197 198 199 ubyte[] request(ubyte[] bytes) 200 { 201 _socket.send(bytes); 202 203 auto payload = Frame(); 204 auto start = Clock.currTime; 205 auto current = start; 206 207 while ((current - start) < _timeout) 208 { 209 poll(_items, 100.msecs); 210 if (_items[0].returnedEvents & PollFlags.pollIn) 211 { 212 _socket.receive(payload); 213 214 _buffer.length = 0; 215 _buffer ~= payload.data; 216 217 while (payload.more) 218 { 219 _socket.receive(payload); 220 _buffer ~= payload.data; 221 } 222 return _buffer.dup; 223 } 224 current = Clock.currTime; 225 yield(); 226 } 227 228 disconnect(); 229 throw new TransportException("Request timeout error"); 230 } 231 } 232 233 234 235 class ZeroMQClientConnectionPool : WaitClientConnectionPool!ZeroMQClientConnection 236 { 237 private 238 { 239 string _uri; 240 uint _timeout; 241 } 242 243 244 this(string uri, uint timeout, uint size) 245 { 246 _uri = uri; 247 _timeout = timeout; 248 super(size); 249 } 250 251 252 ZeroMQClientConnection createNewConnection() 253 { 254 return new ZeroMQClientConnection(_uri, _timeout); 255 } 256 } 257 258 259 260 class ZeroMQClientTransport : ClientTransport 261 { 262 private 263 { 264 ZeroMQClientConnectionPool _pool; 265 } 266 267 268 this() {} 269 270 271 this(string uri, uint timeout, uint size) 272 { 273 _pool = new ZeroMQClientConnectionPool(uri, timeout, size); 274 } 275 276 277 void initialize(Properties config) 278 { 279 string uri = config.getOrEnforce!string("uri", 280 "Not defined uri for client transport"); 281 long timeout = config.getOrElse!long("timeout", 500); 282 long poolSize = config.getOrElse!long("poolSize", 10); 283 _pool = new ZeroMQClientConnectionPool(uri, cast(uint)timeout, cast(uint)poolSize); 284 } 285 286 287 ubyte[] request(ubyte[] bytes) 288 { 289 auto conn = _pool.getConnection(); 290 scope(exit) _pool.freeConnection(conn); 291 return conn.request(bytes); 292 } 293 }