1 /**
2  * Модуль транспортного уровня на основе HTTP
3  *
4  * Copyright: (c) 2015-2017, Milofon Project.
5  * License: Subject to the terms of the BSD license, as written in the included LICENSE.txt file.
6  * Author: <m.galanin@milofon.org> Maksim Galanin
7  * Date: 2018-01-24
8  */
9 
10 module dango.service.transport.http;
11 
12 private
13 {
14     import std.exception : enforce;
15     import std.format : fmt = format;
16 
17     import vibe.stream.operations : readAll;
18     import vibe.inet.url : URL;
19     import vibe.http.router;
20     import vibe.http.client;
21     import vibe.core.log;
22 
23     import dango.system.properties : getOrEnforce;
24     import dango.controller.core : createOptionCORSHandler, handleCors;
25     import dango.controller.http : loadServiceSettings;
26 
27     import dango.service.transport.core;
28 }
29 
30 
31 class HTTPServerTransport : ServerTransport
32 {
33     private
34     {
35         HTTPListener _listener;
36     }
37 
38 
39     void listen(RpcServerProtocol protocol, Properties config)
40     {
41         auto router = new URLRouter();
42         auto httpSettings = loadServiceSettings(config);
43         string entrypoint = config.getOrElse!string("entrypoint", "/");
44 
45         void handler(HTTPServerRequest req, HTTPServerResponse res)
46         {
47             handleCors(req, res);
48             ubyte[] data = protocol.handle(req.bodyReader.readAll());
49             res.writeBody(data);
50         }
51 
52         void handleError(HTTPServerRequest req, HTTPServerResponse res, HTTPServerErrorInfo err)
53         {
54             handleCors(req, res);
55             if (err.debugMessage.length)
56 				res.writeBody("%s - %s\n\n%s\n\nInternal error information:\n%s"
57                         .fmt(err.code, httpStatusText(err.code), err.message, err.debugMessage));
58             else
59                 res.writeBody("%s - %s\n\n%s"
60                         .fmt(err.code, httpStatusText(err.code), err.message));
61         }
62 
63         httpSettings.errorPageHandler = &handleError;
64 
65         router.post(entrypoint, &handler);
66         router.match(HTTPMethod.OPTIONS, entrypoint, createOptionCORSHandler());
67 
68         _listener = listenHTTP(httpSettings, router);
69     }
70 
71 
72     void shutdown()
73     {
74         _listener.stopListening();
75         logInfo("Transport HTTP Stop");
76     }
77 }
78 
79 
80 
81 class HTTPClientConnection : ClientConnection
82 {
83     private
84     {
85         URL _entrypoint;
86         HTTPClientSettings _settings;
87     }
88 
89 
90     this(URL entrypoint, HTTPClientSettings settings)
91     {
92         _entrypoint = entrypoint;
93         _settings = settings;
94     }
95 
96 
97     bool connected() @property
98     {
99         return true;
100     }
101 
102 
103     void connect() {}
104 
105 
106     void disconnect() {}
107 
108 
109     ubyte[] request(ubyte[] bytes)
110     {
111         import requests : postContent;
112         return postContent(_entrypoint.toString, bytes, "application/binary").data;
113         // auto res = requestHTTP(_entrypoint, (scope HTTPClientRequest req) {
114         //         req.method = HTTPMethod.POST;
115         //         req.writeBody(bytes);
116         //     }, _settings);
117         // return res.bodyReader.readAll();
118     }
119 }
120 
121 
122 
123 class HTTPClientConnectionPool : AsyncClientConnectionPool!HTTPClientConnection
124 {
125     private
126     {
127         URL _entrypoint;
128         HTTPClientSettings _settings;
129     }
130 
131 
132     this(URL entrypoint, HTTPClientSettings settings, uint size)
133     {
134         _entrypoint = entrypoint;
135         _settings = settings;
136         super(size);
137     }
138 
139 
140     HTTPClientConnection createNewConnection()
141     {
142         return new HTTPClientConnection(_entrypoint, _settings);
143     }
144 }
145 
146 
147 
148 class HTTPClientTransport : ClientTransport
149 {
150     private
151     {
152         HTTPClientConnectionPool _pool;
153     }
154 
155 
156     this() {}
157 
158 
159     this(URL entrypoint, HTTPClientSettings settings)
160     {
161         _pool = new HTTPClientConnectionPool(entrypoint, settings, 10);
162     }
163 
164 
165     void initialize(Properties config)
166     {
167         auto settings = new HTTPClientSettings();
168         string entrypoint = config.getOrEnforce!string("entrypoint",
169                 "Not defined entrypoint for client transport");
170         _pool = new HTTPClientConnectionPool(URL(entrypoint), settings, 10);
171     }
172 
173 
174     ubyte[] request(ubyte[] bytes)
175     {
176         auto conn = _pool.getConnection();
177         scope(exit) _pool.freeConnection(conn);
178         return conn.request(bytes);
179     }
180 }