1 /** 2 * Модуль транспортного уровня на основе HTTP 3 * 4 * Copyright: (c) 2015-2017, Milofon Project. 5 * License: Subject to the terms of the BSD license, as written in the included LICENSE.txt file. 6 * Author: <m.galanin@milofon.org> Maksim Galanin 7 * Date: 2018-01-24 8 */ 9 10 module dango.service.transport.http; 11 12 private 13 { 14 import std.exception : enforce; 15 import std.format : fmt = format; 16 17 import vibe.stream.operations : readAll; 18 import vibe.inet.url : URL; 19 import vibe.http.router; 20 import vibe.http.client; 21 import vibe.core.log; 22 23 import dango.system.properties : getOrEnforce; 24 import dango.controller.core : createOptionCORSHandler, handleCors; 25 import dango.controller.http : loadServiceSettings; 26 27 import dango.service.transport.core; 28 } 29 30 31 class HTTPServerTransport : ServerTransport 32 { 33 private 34 { 35 HTTPListener _listener; 36 } 37 38 39 void listen(RpcServerProtocol protocol, Properties config) 40 { 41 auto router = new URLRouter(); 42 auto httpSettings = loadServiceSettings(config); 43 string entrypoint = config.getOrElse!string("entrypoint", "/"); 44 45 void handler(HTTPServerRequest req, HTTPServerResponse res) 46 { 47 handleCors(req, res); 48 ubyte[] data = protocol.handle(req.bodyReader.readAll()); 49 res.writeBody(data); 50 } 51 52 void handleError(HTTPServerRequest req, HTTPServerResponse res, HTTPServerErrorInfo err) 53 { 54 handleCors(req, res); 55 if (err.debugMessage.length) 56 res.writeBody("%s - %s\n\n%s\n\nInternal error information:\n%s" 57 .fmt(err.code, httpStatusText(err.code), err.message, err.debugMessage)); 58 else 59 res.writeBody("%s - %s\n\n%s" 60 .fmt(err.code, httpStatusText(err.code), err.message)); 61 } 62 63 httpSettings.errorPageHandler = &handleError; 64 65 router.post(entrypoint, &handler); 66 router.match(HTTPMethod.OPTIONS, entrypoint, createOptionCORSHandler()); 67 68 _listener = listenHTTP(httpSettings, router); 69 } 70 71 72 void shutdown() 73 { 74 _listener.stopListening(); 75 logInfo("Transport HTTP Stop"); 76 } 77 } 78 79 80 81 class HTTPClientConnection : ClientConnection 82 { 83 private 84 { 85 URL _entrypoint; 86 HTTPClientSettings _settings; 87 } 88 89 90 this(URL entrypoint, HTTPClientSettings settings) 91 { 92 _entrypoint = entrypoint; 93 _settings = settings; 94 } 95 96 97 bool connected() @property 98 { 99 return true; 100 } 101 102 103 void connect() {} 104 105 106 void disconnect() {} 107 108 109 ubyte[] request(ubyte[] bytes) 110 { 111 import requests : postContent; 112 return postContent(_entrypoint.toString, bytes, "application/binary").data; 113 // auto res = requestHTTP(_entrypoint, (scope HTTPClientRequest req) { 114 // req.method = HTTPMethod.POST; 115 // req.writeBody(bytes); 116 // }, _settings); 117 // return res.bodyReader.readAll(); 118 } 119 } 120 121 122 123 class HTTPClientConnectionPool : AsyncClientConnectionPool!HTTPClientConnection 124 { 125 private 126 { 127 URL _entrypoint; 128 HTTPClientSettings _settings; 129 } 130 131 132 this(URL entrypoint, HTTPClientSettings settings, uint size) 133 { 134 _entrypoint = entrypoint; 135 _settings = settings; 136 super(size); 137 } 138 139 140 HTTPClientConnection createNewConnection() 141 { 142 return new HTTPClientConnection(_entrypoint, _settings); 143 } 144 } 145 146 147 148 class HTTPClientTransport : ClientTransport 149 { 150 private 151 { 152 HTTPClientConnectionPool _pool; 153 } 154 155 156 this() {} 157 158 159 this(URL entrypoint, HTTPClientSettings settings) 160 { 161 _pool = new HTTPClientConnectionPool(entrypoint, settings, 10); 162 } 163 164 165 void initialize(Properties config) 166 { 167 auto settings = new HTTPClientSettings(); 168 string entrypoint = config.getOrEnforce!string("entrypoint", 169 "Not defined entrypoint for client transport"); 170 _pool = new HTTPClientConnectionPool(URL(entrypoint), settings, 10); 171 } 172 173 174 ubyte[] request(ubyte[] bytes) 175 { 176 auto conn = _pool.getConnection(); 177 scope(exit) _pool.freeConnection(conn); 178 return conn.request(bytes); 179 } 180 }