Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2017-04-06 22:51:53 +03:00
commit 928b8d7653
11 changed files with 82 additions and 188 deletions

View File

@ -1,112 +0,0 @@
#include <Poco/Version.h>
#include <Poco/URI.h>
#include <Poco/Net/DNS.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromHTTP.h>
#include <Common/SimpleCache.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
}
static Poco::Net::IPAddress resolveHostImpl(const String & host)
{
return Poco::Net::DNS::resolveOne(host);
}
static Poco::Net::IPAddress resolveHost(const String & host)
{
static SimpleCache<decltype(resolveHostImpl), &resolveHostImpl> cache;
return cache(host);
}
ReadBufferFromHTTP::ReadBufferFromHTTP(
const String & host_,
int port_,
const String & path_,
const Params & params,
const String & method_,
size_t buffer_size_,
const Poco::Timespan & connection_timeout,
const Poco::Timespan & send_timeout,
const Poco::Timespan & receive_timeout)
: ReadBuffer(nullptr, 0), host(host_), port(port_), path(path_), method(method_)
{
if (method.empty())
method = Poco::Net::HTTPRequest::HTTP_POST;
if (path.empty())
path = "/";
std::stringstream path_params;
path_params << path;
bool first = true;
for (const auto & it : params)
{
path_params << (first ? "?" : "&");
first = false;
String encoded_key;
String encoded_value;
Poco::URI::encode(it.first, "=&#", encoded_key);
Poco::URI::encode(it.second, "&#", encoded_value);
path_params << encoded_key << "=" << encoded_value;
}
std::stringstream uri;
uri << "http://" << host << ":" << port << path_params.str();
session.setHost(resolveHost(host).toString()); /// Cache DNS forever (until server restart)
session.setPort(port);
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session.setTimeout(connection_timeout, send_timeout, receive_timeout);
#else
session.setTimeout(connection_timeout);
#endif
Poco::Net::HTTPRequest request(method, path_params.str());
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadBufferFromHTTP")), "Sending request to " << uri.str());
session.sendRequest(request);
istr = &session.receiveResponse(response);
Poco::Net::HTTPResponse::HTTPStatus status = response.getStatus();
if (status != Poco::Net::HTTPResponse::HTTP_OK)
{
std::stringstream error_message;
error_message << "Received error from remote server " << uri.str() << ". HTTP status code: "
<< status << " " << response.getReason() << ", body: " << istr->rdbuf();
throw Exception(error_message.str(), status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS : ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
bool ReadBufferFromHTTP::nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
}

View File

@ -1,50 +0,0 @@
#pragma once
#include <memory>
#include <Poco/Net/HTTPClientSession.h>
#include <IO/ReadBuffer.h>
#include <Core/Types.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
namespace DB
{
const int HTTP_TOO_MANY_REQUESTS = 429;
/** Perform HTTP-request and provide response to read.
*/
class ReadBufferFromHTTP : public ReadBuffer
{
private:
String host;
int port;
String path;
String method;
Poco::Net::HTTPClientSession session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
public:
using Params = std::vector<std::pair<String, String>>;
ReadBufferFromHTTP(
const String & host_,
int port_,
const String & path_,
const Params & params,
const String & method_ = "",
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const Poco::Timespan & connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0),
const Poco::Timespan & send_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0),
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_TIMEOUT, 0));
bool nextImpl() override;
};
}

View File

@ -17,7 +17,6 @@
namespace DB
{
// copypaste from ReadBufferFromHTTP.cpp
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;

View File

@ -3,13 +3,16 @@
#include <functional>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/URI.h>
#include <IO/ReadBufferFromHTTP.h>
#include <IO/ReadBuffer.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
namespace DB
{
const int HTTP_TOO_MANY_REQUESTS = 429;
struct HTTPTimeouts
{
Poco::Timespan connection_timeout = Poco::Timespan(DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, 0);

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/ReadBufferFromHTTP.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include "ReadHelpers.h"
#define DEFAULT_REMOTE_READ_BUFFER_CONNECTION_TIMEOUT 1
@ -15,7 +15,7 @@ namespace DB
class RemoteReadBuffer : public ReadBuffer
{
private:
std::unique_ptr<ReadBufferFromHTTP> impl;
std::unique_ptr<ReadWriteBufferFromHTTP> impl;
public:
RemoteReadBuffer(
@ -29,12 +29,17 @@ public:
const Poco::Timespan & receive_timeout = Poco::Timespan(DEFAULT_REMOTE_READ_BUFFER_RECEIVE_TIMEOUT, 0))
: ReadBuffer(nullptr, 0)
{
ReadBufferFromHTTP::Params params = {
Poco::URI uri;
uri.setScheme("http");
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
std::make_pair("action", "read"),
std::make_pair("path", path),
std::make_pair("compress", (compress ? "true" : "false"))};
std::make_pair("compress", (compress ? "true" : "false"))});
impl = std::make_unique<ReadBufferFromHTTP>(host, port, "", params, "", buffer_size, connection_timeout, send_timeout, receive_timeout);
impl = std::make_unique<ReadWriteBufferFromHTTP>(uri, std::string(), ReadWriteBufferFromHTTP::OutStreamCallback(), buffer_size, HTTPTimeouts{connection_timeout, send_timeout, receive_timeout});
}
bool nextImpl() override
@ -53,11 +58,16 @@ public:
const std::string & path,
size_t timeout = 0)
{
ReadBufferFromHTTP::Params params = {
Poco::URI uri;
uri.setScheme("http");
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
std::make_pair("action", "list"),
std::make_pair("path", path)};
std::make_pair("path", path)});
ReadBufferFromHTTP in(host, port, "", params, "", timeout);
ReadWriteBufferFromHTTP in(uri, {}, {}, {}, HTTPTimeouts{timeout});
std::vector<std::string> files;
while (!in.eof())

View File

@ -73,3 +73,6 @@ endif ()
add_executable (zlib_buffers zlib_buffers.cpp ${SRCS})
target_link_libraries (zlib_buffers dbms)
add_executable (remote_read_write_buffer remote_read_write_buffer.cpp ${SRCS})
target_link_libraries (remote_read_write_buffer dbms)

View File

@ -0,0 +1,23 @@
//#include <string>
#include <iostream>
#include <IO/RemoteReadBuffer.h>
#include <IO/RemoteWriteBuffer.h>
// Now just compile test
int main(int argc, char ** argv)
{
try
{
DB::RemoteReadBuffer({}, {}, {});
DB::RemoteWriteBuffer({}, {}, {});
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}

View File

@ -2,7 +2,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <IO/ReadBufferFromHTTP.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <ext/scope_guard.hpp>
#include <Poco/Net/HTTPServerResponse.h>
@ -194,15 +194,20 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPartImpl(
const String & shard_no,
bool to_detached)
{
ReadBufferFromHTTP::Params params =
Poco::URI uri;
uri.setScheme("http");
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"shard", shard_no},
{"compress", "false"}
};
}
);
ReadBufferFromHTTP in(host, port, "", params);
ReadWriteBufferFromHTTP in(uri);
String full_part_name = String(to_detached ? "detached/" : "") + "tmp_" + part_name;
String part_path = data.getFullPath() + full_part_name + "/";

View File

@ -1,7 +1,7 @@
#include <Storages/MergeTree/RemoteDiskSpaceMonitor.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <Interpreters/Context.h>
#include <IO/ReadBufferFromHTTP.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -48,13 +48,18 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
size_t Client::getFreeSpace(const InterserverIOEndpointLocation & location) const
{
ReadBufferFromHTTP::Params params =
Poco::URI uri;
uri.setScheme("http");
uri.setHost(location.host);
uri.setPort(location.port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(location.name) },
{"compress", "false"}
};
}
);
ReadBufferFromHTTP in{location.host, location.port, "", params};
ReadWriteBufferFromHTTP in{uri};
size_t free_disk_space;
readBinary(free_disk_space, in);

View File

@ -1,7 +1,7 @@
#include <Storages/MergeTree/RemotePartChecker.h>
#include <Storages/MergeTree/ReshardingWorker.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <IO/ReadBufferFromHTTP.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
@ -65,15 +65,19 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
Status Client::check(const std::string & part_name, const std::string & hash,
const InterserverIOEndpointLocation & to_location)
{
ReadBufferFromHTTP::Params params =
Poco::URI uri;
uri.setScheme("http");
uri.setHost(to_location.host);
uri.setPort(to_location.port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(to_location.name) },
{"compress", "false"},
{"part", part_name},
{"hash", hash}
};
});
ReadBufferFromHTTP in{to_location.host, to_location.port, "", params};
ReadWriteBufferFromHTTP in{uri};
UInt8 val;
readBinary(val, in);

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/RemoteQueryExecutor.h>
#include <Interpreters/executeQuery.h>
#include <IO/ReadBufferFromHTTP.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -60,14 +60,18 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
bool Client::executeQuery(const InterserverIOEndpointLocation & location, const std::string & query)
{
ReadBufferFromHTTP::Params params =
Poco::URI uri;
uri.setScheme("http");
uri.setHost(location.host);
uri.setPort(location.port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(location.name)},
{"compress", "false"},
{"query", query}
};
});
ReadBufferFromHTTP in{location.host, location.port, "", params};
ReadWriteBufferFromHTTP in{uri};
bool flag;
readBinary(flag, in);