1 /**
2  * Модуль транспортного уровня на основе ZeroMQ
3  *
4  * Copyright: (c) 2015-2017, Milofon Project.
5  * License: Subject to the terms of the BSD license, as written in the included LICENSE.txt file.
6  * Author: <m.galanin@milofon.org> Maksim Galanin
7  * Date: 2018-01-24
8  */
9 
10 module dango.service.transport.zeromq;
11 
12 private
13 {
14     import core.thread;
15 
16     import std.datetime : Clock;
17     import std..string : toStringz, fromStringz;
18 
19     import vibe.core.log;
20     import vibe.core.core : yield;
21 
22     import deimos.zmq.zmq;
23     import zmqd;
24 
25     import dango.service.exception;
26     import dango.system.properties : getOrEnforce;
27 
28     import dango.service.transport.core;
29 }
30 
31 
32 alias Handler = ubyte[] delegate(ubyte[]);
33 
34 
35 struct ZeroMQTransportSettings
36 {
37     string uri;
38     bool useBroker;
39 }
40 
41 
42 
43 class ZeroMQServerTransport : ServerTransport
44 {
45     private
46     {
47         ZeroMQTransportSettings _settings;
48         ZeroMQWorker _worker;
49     }
50 
51 
52     void listen(RpcServerProtocol protocol, Properties config)
53     {
54         _settings.uri = config.getOrEnforce!string("bind",
55                 "ZeroMQ transport is not defined bind");
56         _settings.useBroker = config.getOrElse!bool("broker", false);
57 
58         const ver = zmqVersion();
59         logInfo("Version ZeroMQ: %s.%s.%s", ver.major, ver.minor, ver.patch);
60 
61 
62         ubyte[] handler(ubyte[] data)
63         {
64             return protocol.handle(data);
65         }
66 
67         _worker = new ZeroMQWorker(_settings, &handler);
68         _worker.start();
69 
70         logInfo("Transport ZeroMQ Start");
71     }
72 
73 
74     void shutdown()
75     {
76         _worker.stop();
77         logInfo("Transport ZeroMQ Stop");
78     }
79 }
80 
81 
82 
83 private final class ZeroMQWorker : Thread
84 {
85     private
86     {
87         ZeroMQTransportSettings _settings;
88         bool _running;
89         Handler _handler;
90     }
91 
92 
93     this(ZeroMQTransportSettings settings, Handler handler)
94     {
95         _settings = settings;
96         _handler = handler;
97         super(&run);
98     }
99 
100 
101     void run()
102     {
103         _running = true;
104         ubyte[] buffer;
105 
106         auto worker = Socket(SocketType.rep);
107 
108         if (_settings.useBroker)
109         {
110             worker.connect(_settings.uri);
111             logInfo("Connect to broker %s", _settings.uri);
112         }
113         else
114         {
115             worker.bind(_settings.uri);
116             logInfo("Listening for requests on %s", _settings.uri);
117         }
118 
119 
120         PollItem[] items = [
121             PollItem(worker, PollFlags.pollIn),
122         ];
123 
124         auto payload = Frame();
125 
126         while (_running)
127         {
128             poll(items, 100.msecs);
129             if (items[0].returnedEvents & PollFlags.pollIn)
130             {
131                 worker.receive(payload);
132 
133                 buffer.length = 0;
134                 buffer ~= payload.data;
135 
136                 while (payload.more)
137                 {
138                     worker.receive(payload);
139                     buffer ~= payload.data;
140                 }
141                 ubyte[] resData = _handler(buffer);
142                 worker.send(resData);
143             }
144         }
145     }
146 
147 
148     void stop()
149     {
150         _running = false;
151     }
152 }
153 
154 
155 
156 class ZeroMQClientConnection : ClientConnection
157 {
158     private
159     {
160         Socket _socket;
161         PollItem[] _items;
162         string _uri;
163         ubyte[] _buffer;
164         bool _connected;
165         Duration _timeout;
166     }
167 
168 
169     this(string uri, uint timeout)
170     {
171         _timeout = timeout.msecs;
172         _uri = uri;
173     }
174 
175 
176     bool connected() @property
177     {
178         return _socket.initialized && _connected;
179     }
180 
181 
182     void connect()
183     {
184         _socket = Socket(SocketType.req);
185         _items = [PollItem(_socket, PollFlags.pollIn)];
186         _socket.connect(_uri);
187         _connected = true;
188     }
189 
190 
191     void disconnect()
192     {
193         _socket.linger = Duration.zero;
194         _socket.close();
195         _connected = false;
196     }
197 
198 
199     ubyte[] request(ubyte[] bytes)
200     {
201         _socket.send(bytes);
202 
203         auto payload = Frame();
204         auto start = Clock.currTime;
205         auto current = start;
206 
207         while ((current - start) < _timeout)
208         {
209             poll(_items, 100.msecs);
210             if (_items[0].returnedEvents & PollFlags.pollIn)
211             {
212                 _socket.receive(payload);
213 
214                 _buffer.length = 0;
215                 _buffer ~= payload.data;
216 
217                 while (payload.more)
218                 {
219                     _socket.receive(payload);
220                     _buffer ~= payload.data;
221                 }
222                 return _buffer.dup;
223             }
224             current = Clock.currTime;
225             yield();
226         }
227 
228         disconnect();
229         throw new TransportException("Request timeout error");
230     }
231 }
232 
233 
234 
235 class ZeroMQClientConnectionPool : WaitClientConnectionPool!ZeroMQClientConnection
236 {
237     private
238     {
239         string _uri;
240         uint _timeout;
241     }
242 
243 
244     this(string uri, uint timeout, uint size)
245     {
246         _uri = uri;
247         _timeout = timeout;
248         super(size);
249     }
250 
251 
252     ZeroMQClientConnection createNewConnection()
253     {
254         return new ZeroMQClientConnection(_uri, _timeout);
255     }
256 }
257 
258 
259 
260 class ZeroMQClientTransport : ClientTransport
261 {
262     private
263     {
264         ZeroMQClientConnectionPool _pool;
265     }
266 
267 
268     this() {}
269 
270 
271     this(string uri, uint timeout, uint size)
272     {
273         _pool = new ZeroMQClientConnectionPool(uri, timeout, size);
274     }
275 
276 
277     void initialize(Properties config)
278     {
279         string uri = config.getOrEnforce!string("uri",
280                 "Not defined uri for client transport");
281         long timeout = config.getOrElse!long("timeout", 500);
282         long poolSize = config.getOrElse!long("poolSize", 10);
283         _pool = new ZeroMQClientConnectionPool(uri, cast(uint)timeout, cast(uint)poolSize);
284     }
285 
286 
287     ubyte[] request(ubyte[] bytes)
288     {
289         auto conn = _pool.getConnection();
290         scope(exit) _pool.freeConnection(conn);
291         return conn.request(bytes);
292     }
293 }