#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 namespace DB { /** Perform HTTP POST request and provide response to read. */ namespace detail { template class ReadWriteBufferFromHTTPBase : public ReadBuffer { protected: Poco::URI uri; std::string method; SessionPtr session; std::istream * istr; /// owned by session std::unique_ptr impl; public: using OutStreamCallback = std::function; explicit ReadWriteBufferFromHTTPBase(SessionPtr session_, Poco::URI uri, const std::string & method = {}, OutStreamCallback out_stream_callback = {}, const Poco::Net::HTTPBasicCredentials & credentials = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) : ReadBuffer(nullptr, 0) , uri {uri} , method {!method.empty() ? method : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} , session {session_} { // With empty path poco will send "POST HTTP/1.1" its bug. if (uri.getPath().empty()) uri.setPath("/"); Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); request.setHost(uri.getHost()); // use original, not resolved host name in header if (out_stream_callback) request.setChunkedTransferEncoding(true); if (!credentials.getUsername().empty()) credentials.authenticate(request); Poco::Net::HTTPResponse response; LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString()); auto & stream_out = session->sendRequest(request); if (out_stream_callback) out_stream_callback(stream_out); istr = receiveResponse(*session, request, response); impl = std::make_unique(*istr, buffer_size_); } bool nextImpl() override { if (!impl->next()) return false; internal_buffer = impl->buffer(); working_buffer = internal_buffer; return true; } }; } class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase { using Parent = detail::ReadWriteBufferFromHTTPBase; public: explicit ReadWriteBufferFromHTTP(Poco::URI uri_, const std::string & method_ = {}, OutStreamCallback out_stream_callback = {}, const ConnectionTimeouts & timeouts = {}, const Poco::Net::HTTPBasicCredentials & credentials = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) : Parent(makeHTTPSession(uri_, timeouts), uri_, method_, out_stream_callback, credentials, buffer_size_) { } }; class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase { using Parent = detail::ReadWriteBufferFromHTTPBase; public: explicit PooledReadWriteBufferFromHTTP(Poco::URI uri_, const std::string & method_ = {}, OutStreamCallback out_stream_callback = {}, const ConnectionTimeouts & timeouts = {}, const Poco::Net::HTTPBasicCredentials & credentials = {}, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT) : Parent(makePooledHTTPSession(uri_, timeouts, max_connections_per_endpoint), uri_, method_, out_stream_callback, credentials, buffer_size_) { } }; }