#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if !defined(ARCADIA_BUILD) # include #endif #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 namespace DB { /** Perform HTTP POST request and provide response to read. */ namespace ErrorCodes { extern const int TOO_MANY_REDIRECTS; } template class UpdatableSessionBase { protected: SessionPtr session; UInt64 redirects { 0 }; Poco::URI initial_uri; const ConnectionTimeouts & timeouts; SettingUInt64 max_redirects; public: virtual void buildNewSession(const Poco::URI & uri) = 0; explicit UpdatableSessionBase(const Poco::URI uri, const ConnectionTimeouts & timeouts_, SettingUInt64 max_redirects_) : initial_uri { uri } , timeouts { timeouts_ } , max_redirects { max_redirects_ } { } SessionPtr getSession() { return session; } void updateSession(const Poco::URI & uri) { ++redirects; if (redirects <= max_redirects) { buildNewSession(uri); } else { std::stringstream error_message; error_message << "Too many redirects while trying to access " << initial_uri.toString(); throw Exception(error_message.str(), ErrorCodes::TOO_MANY_REDIRECTS); } } virtual ~UpdatableSessionBase() { } }; namespace detail { template class ReadWriteBufferFromHTTPBase : public ReadBuffer { public: using HTTPHeaderEntry = std::tuple; using HTTPHeaderEntries = std::vector; protected: Poco::URI uri; std::string method; UpdatableSessionPtr session; std::istream * istr; /// owned by session std::unique_ptr impl; std::function out_stream_callback; const Poco::Net::HTTPBasicCredentials & credentials; std::vector cookies; HTTPHeaderEntries http_header_entries; RemoteHostFilter remote_host_filter; std::istream * call(const Poco::URI uri_, Poco::Net::HTTPResponse & response) { // With empty path poco will send "POST HTTP/1.1" its bug. if (uri.getPath().empty()) uri.setPath("/"); Poco::Net::HTTPRequest request(method, uri_.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request.setHost(uri_.getHost()); // use original, not resolved host name in header if (out_stream_callback) request.setChunkedTransferEncoding(true); for (auto & http_header_entry: http_header_entries) { request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry)); } if (!credentials.getUsername().empty()) credentials.authenticate(request); LOG_TRACE((&Poco::Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri.toString()); auto sess = session->getSession(); try { auto & stream_out = sess->sendRequest(request); if (out_stream_callback) out_stream_callback(stream_out); istr = receiveResponse(*sess, request, response, true); response.getCookies(cookies); return istr; } catch (const Poco::Exception & e) { /// We use session data storage as storage for exception text /// Depend on it we can deduce to reconnect session or reresolve session host sess->attachSessionData(e.message()); throw; } } public: using OutStreamCallback = std::function; explicit ReadWriteBufferFromHTTPBase( UpdatableSessionPtr session_, Poco::URI uri_, const std::string & method_ = {}, OutStreamCallback out_stream_callback_ = {}, const Poco::Net::HTTPBasicCredentials & credentials_ = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, HTTPHeaderEntries http_header_entries_ = {}, const RemoteHostFilter & remote_host_filter_ = {}) : ReadBuffer(nullptr, 0) , uri {uri_} , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} , session {session_} , out_stream_callback {out_stream_callback_} , credentials {credentials_} , http_header_entries {http_header_entries_} , remote_host_filter {remote_host_filter_} { Poco::Net::HTTPResponse response; istr = call(uri, response); while (isRedirect(response.getStatus())) { Poco::URI uri_redirect(response.get("Location")); remote_host_filter.checkURL(uri_redirect); session->updateSession(uri_redirect); istr = call(uri_redirect,response); } try { impl = std::make_unique(*istr, buffer_size_); } catch (const Poco::Exception & e) { /// We use session data storage as storage for exception text /// Depend on it we can deduce to reconnect session or reresolve session host auto sess = session->getSession(); sess->attachSessionData(e.message()); throw; } } bool nextImpl() override { if (!impl->next()) return false; internal_buffer = impl->buffer(); working_buffer = internal_buffer; return true; } std::string getResponseCookie(const std::string & name, const std::string & def) const { for (const auto & cookie : cookies) if (cookie.getName() == name) return cookie.getValue(); return def; } }; } class UpdatableSession : public UpdatableSessionBase { using Parent = UpdatableSessionBase; public: explicit UpdatableSession(const Poco::URI uri, const ConnectionTimeouts & timeouts_, const SettingUInt64 max_redirects_) : Parent(uri, timeouts_, max_redirects_) { session = makeHTTPSession(initial_uri, timeouts); } void buildNewSession(const Poco::URI & uri) override { session = makeHTTPSession(uri, timeouts); } }; class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase> { using Parent = detail::ReadWriteBufferFromHTTPBase>; public: explicit ReadWriteBufferFromHTTP(Poco::URI uri_, const std::string & method_, OutStreamCallback out_stream_callback_, const ConnectionTimeouts & timeouts, const SettingUInt64 max_redirects = 0, const Poco::Net::HTTPBasicCredentials & credentials_ = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, const HTTPHeaderEntries & http_header_entries_ = {}, const RemoteHostFilter & remote_host_filter_ = {}) : Parent(std::make_shared(uri_, timeouts, max_redirects), uri_, method_, out_stream_callback_, credentials_, buffer_size_, http_header_entries_, remote_host_filter_) { } }; class UpdatablePooledSession : public UpdatableSessionBase { using Parent = UpdatableSessionBase; private: size_t per_endpoint_pool_size; public: explicit UpdatablePooledSession(const Poco::URI uri, const ConnectionTimeouts & timeouts_, const SettingUInt64 max_redirects_, size_t per_endpoint_pool_size_) : Parent(uri, timeouts_, max_redirects_) , per_endpoint_pool_size { per_endpoint_pool_size_ } { session = makePooledHTTPSession(initial_uri, timeouts, per_endpoint_pool_size); } void buildNewSession(const Poco::URI & uri) override { session = makePooledHTTPSession(uri, timeouts, per_endpoint_pool_size); } }; class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase> { using Parent = detail::ReadWriteBufferFromHTTPBase>; public: explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_, const std::string & method_ = {}, OutStreamCallback out_stream_callback_ = {}, const ConnectionTimeouts & timeouts_ = {}, const Poco::Net::HTTPBasicCredentials & credentials_ = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, const SettingUInt64 max_redirects = 0, size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) : Parent(std::make_shared(uri_, timeouts_, max_redirects, max_connections_per_endpoint), uri_, method_, out_stream_callback_, credentials_, buffer_size_) { } }; }