dbms: Extracted most of RemoteReadBuffer into ReadBufferFromHTTP. [#METR-10202]

This commit is contained in:
Michael Kolupaev 2014-03-21 21:23:09 +04:00
parent 750cb44f94
commit af094cbc76
2 changed files with 109 additions and 84 deletions

View File

@ -0,0 +1,86 @@
#pragma once
#include <Poco/URI.h>
#include <Poco/SharedPtr.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPClientSession.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <Yandex/logger_useful.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
namespace DB
{
/** Делает указанный HTTP-запрос и отдает ответ.
*/
class ReadBufferFromHTTP : public ReadBuffer
{
private:
std::string host;
int port;
std::string params;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// этим владеет session
Poco::SharedPtr<ReadBufferFromIStream> impl;
public:
ReadBufferFromHTTP(
const std::string & host_,
int port_,
const std::string & params_,
size_t timeout_ = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), host(host_), port(port_), params(params_)
{
std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path);
std::stringstream uri;
uri << "http://" << host << ":" << port << "/?" << params;
session.setHost(host);
session.setPort(port);
/// устанавливаем таймаут
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadBufferFromHTTP")), "Sending request to " << uri.str());
session.sendRequest(request);
istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = new ReadBufferFromIStream(*istr, buffer_size_);
}
bool nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
};
}

View File

@ -1,78 +1,35 @@
#pragma once #pragma once
#include <Poco/URI.h> #include <DB/IO/ReadBufferFromHTTP.h>
#include <Poco/SharedPtr.h> #include "ReadHelpers.h"
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPClientSession.h>
#include <DB/IO/ReadBuffer.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <Yandex/logger_useful.h>
#define DEFAULT_REMOTE_READ_BUFFER_TIMEOUT 1800
namespace DB namespace DB
{ {
/** Позволяет читать файл с удалённого сервера. /** Позволяет читать файл с удалённого сервера через riod.
*/ */
class RemoteReadBuffer : public ReadBuffer class RemoteReadBuffer : public ReadBuffer
{ {
private: private:
std::string host; Poco::SharedPtr<ReadBufferFromHTTP> impl;
int port;
std::string path;
bool compress;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// этим владеет session
Poco::SharedPtr<ReadBufferFromIStream> impl;
public: public:
RemoteReadBuffer( RemoteReadBuffer(
const std::string & host_, const std::string & host,
int port_, int port,
const std::string & path_, const std::string & path,
bool compress_ = true, bool compress = true,
size_t timeout_ = 0, size_t timeout = 0,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) size_t buffer_size = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(NULL, 0), host(host_), port(port_), path(path_), compress(compress_)
{ {
std::string encoded_path; std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path); Poco::URI::encode(path, "&#", encoded_path);
std::stringstream uri; std::stringstream params;
uri << "http://" << host << ":" << port << "/?action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false"); params << "action=read&path=" << encoded_path << "&compress=" << (compress ? "true" : "false");
session.setHost(host); impl = new ReadBufferFromHTTP(host, port, params.str, timeout, buffer_size);
session.setPort(port);
/// устанавливаем таймаут
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("RemoteReadBuffer")), "Sending request to " << uri.str());
session.sendRequest(request);
istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = new ReadBufferFromIStream(*istr, buffer_size_);
} }
bool nextImpl() bool nextImpl()
@ -89,42 +46,24 @@ public:
const std::string & host, const std::string & host,
int port, int port,
const std::string & path, const std::string & path,
size_t timeout_ = 0) size_t timeout = 0)
{ {
std::string encoded_path; std::string encoded_path;
Poco::URI::encode(path, "&#", encoded_path); Poco::URI::encode(path, "&#", encoded_path);
std::stringstream uri; std::stringstream params;
uri << "http://" << host << ":" << port << "/?action=list&path=" << encoded_path; params << "action=list&path=" << encoded_path;
Poco::Net::HTTPClientSession session; ReadBufferFromHTTP in(host, port, params.str(), timeout);
session.setHost(host);
session.setPort(port);
session.setTimeout(Poco::Timespan(timeout_ ? timeout_ : DEFAULT_REMOTE_READ_BUFFER_TIMEOUT, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_POST, uri.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("RemoteReadBuffer")), "Sending request to " << uri.str());
session.sendRequest(request);
std::istream * istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
std::vector<std::string> files; std::vector<std::string> files;
while (!in.eof())
{
std::string s; std::string s;
while (getline(*istr, s, '\n') && !s.empty()) readString(s, in);
skipWhitespaceIfAny(in);
files.push_back(s); files.push_back(s);
}
return files; return files;
} }