From af094cbc7619a1b5779ee44218a9ad843dbe916b Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 21 Mar 2014 21:23:09 +0400 Subject: [PATCH] dbms: Extracted most of RemoteReadBuffer into ReadBufferFromHTTP. [#METR-10202] --- dbms/include/DB/IO/ReadBufferFromHTTP.h | 86 +++++++++++++++++++ dbms/include/DB/IO/RemoteReadBuffer.h | 107 +++++------------------- 2 files changed, 109 insertions(+), 84 deletions(-) create mode 100644 dbms/include/DB/IO/ReadBufferFromHTTP.h diff --git a/dbms/include/DB/IO/ReadBufferFromHTTP.h b/dbms/include/DB/IO/ReadBufferFromHTTP.h new file mode 100644 index 00000000000..d94f8d53bbc --- /dev/null +++ b/dbms/include/DB/IO/ReadBufferFromHTTP.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include + +#include + +#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 + + +namespace DB +{ + +/** Делает указанный HTTP-запрос и отдает ответ. + */ +class ReadBufferFromHTTP : public ReadBuffer +{ +private: + std::string host; + int port; + std::string params; + + Poco::Net::HTTPClientSession session; + std::istream * istr; /// этим владеет session + Poco::SharedPtr impl; + +public: + ReadBufferFromHTTP( + const std::string & host_, + int port_, + const std::string & params_, + size_t timeout_ = 0, + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) + : ReadBuffer(NULL, 0), host(host_), port(port_), params(params_) + { + std::string encoded_path; + Poco::URI::encode(path, "&#", encoded_path); + + std::stringstream uri; + uri << "http://" << host << ":" << port << "/?" << params; + + session.setHost(host); + session.setPort(port); + + /// устанавливаем таймаут + session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0)); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str()); + Poco::Net::HTTPResponse response; + + LOG_TRACE((&Logger::get("ReadBufferFromHTTP")), "Sending request to " << uri.str()); + + session.sendRequest(request); + istr = &session.receiveResponse(response); + + Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus(); + + if (status != Poco::Net::HTTPResponse::HTTP_OK) + { + std::stringstream error_message; + error_message << "Received error from remote server " << uri.str() << ". HTTP status code: " + << status << ", body: " << istr->rdbuf(); + + throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); + } + + impl = new ReadBufferFromIStream(*istr, buffer_size_); + } + + bool nextImpl() + { + if (!impl->next()) + return false; + internal_buffer = impl->buffer(); + working_buffer = internal_buffer; + return true; + } +}; + +} diff --git a/dbms/include/DB/IO/RemoteReadBuffer.h b/dbms/include/DB/IO/RemoteReadBuffer.h index 5f35211f118..0366da65a18 100644 --- a/dbms/include/DB/IO/RemoteReadBuffer.h +++ b/dbms/include/DB/IO/RemoteReadBuffer.h @@ -1,78 +1,35 @@ #pragma once -#include -#include -#include -#include -#include - -#include -#include - -#include - -#define DEFAULT_REMOTE_READ_BUFFER_TIMEOUT 1800 +#include +#include "ReadHelpers.h" namespace DB { -/** Позволяет читать файл с удалённого сервера. +/** Позволяет читать файл с удалённого сервера через riod. */ class RemoteReadBuffer : public ReadBuffer { private: - std::string host; - int port; - std::string path; - bool compress; - - Poco::Net::HTTPClientSession session; - std::istream * istr; /// этим владеет session - Poco::SharedPtr impl; + Poco::SharedPtr impl; public: RemoteReadBuffer( - const std::string & host_, - int port_, - const std::string & path_, - bool compress_ = true, - size_t timeout_ = 0, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) - : ReadBuffer(NULL, 0), host(host_), port(port_), path(path_), compress(compress_) + const std::string & host, + int port, + const std::string & path, + bool compress = true, + size_t timeout = 0, + size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE) { std::string encoded_path; Poco::URI::encode(path, "&#", encoded_path); - std::stringstream uri; - uri << "http://" << host << ":" << port << "/?action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false"); + std::stringstream params; + params << "action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false"); - session.setHost(host); - session.setPort(port); - - /// устанавливаем таймаут - session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0)); - - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str()); - Poco::Net::HTTPResponse response; - - LOG_TRACE((&Logger::get("RemoteReadBuffer")), "Sending request to " << uri.str()); - - session.sendRequest(request); - istr = &session.receiveResponse(response); - - Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus(); - - if (status != Poco::Net::HTTPResponse::HTTP_OK) - { - std::stringstream error_message; - error_message << "Received error from remote server " << uri.str() << ". HTTP status code: " - << status << ", body: " << istr->rdbuf(); - - throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); - } - - impl = new ReadBufferFromIStream(*istr, buffer_size_); + impl = new ReadBufferFromHTTP(host, port, params.str, timeout, buffer_size); } bool nextImpl() @@ -89,42 +46,24 @@ public: const std::string & host, int port, const std::string & path, - size_t timeout_ = 0) + size_t timeout = 0) { std::string encoded_path; Poco::URI::encode(path, "&#", encoded_path); - std::stringstream uri; - uri << "http://" << host << ":" << port << "/?action=list&path=" << encoded_path; + std::stringstream params; + params << "action=list&path=" << encoded_path; - Poco::Net::HTTPClientSession session; - session.setHost(host); - session.setPort(port); - session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0)); - - Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str()); - Poco::Net::HTTPResponse response; - - LOG_TRACE((&Logger::get("RemoteReadBuffer")), "Sending request to " << uri.str()); - - session.sendRequest(request); - std::istream * istr = &session.receiveResponse(response); - - Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus(); - - if (status != Poco::Net::HTTPResponse::HTTP_OK) - { - std::stringstream error_message; - error_message << "Received error from remote server " << uri.str() << ". HTTP status code: " - << status << ", body: " << istr->rdbuf(); - - throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER); - } + ReadBufferFromHTTP in(host, port, params.str(), timeout); std::vector files; - std::string s; - while (getline(*istr, s, '\n') && !s.empty()) + while (!in.eof()) + { + std::string s; + readString(s, in); + skipWhitespaceIfAny(in); files.push_back(s); + } return files; }