Add http connection pool between replicas

This commit is contained in:
alesapin 2018-11-16 16:15:17 +03:00
parent be9ac87fcc
commit d4ce674b93
13 changed files with 404 additions and 118 deletions

View File

@ -170,6 +170,7 @@
M(OSWriteBytes, "Number of bytes written to disks or block devices. Doesn't include bytes that are in page cache dirty pages. May not include data that was written by OS asynchronously.") \
M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \
M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \
M(CreatedHTTPConnections, "Total amount of created HTTP connections (closed or opened).") \
namespace ProfileEvents
{

View File

@ -62,6 +62,7 @@
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
#define DEFAULT_HTTP_AMOUNT_CONNECTIONS_PER_ENDPOINT 10
// more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html

View File

@ -1,9 +1,9 @@
#include <IO/HTTPCommon.h>
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/config.h>
#include <Poco/Version.h>
#if USE_POCO_NETSSL
#include <Poco/Net/AcceptCertificateHandler.h>
#include <Poco/Net/Context.h>
@ -13,14 +13,22 @@
#include <Poco/Net/RejectCertificateHandler.h>
#include <Poco/Net/SSLManager.h>
#endif
#include <tuple>
#include <unordered_map>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/Application.h>
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <sstream>
namespace ProfileEvents
{
extern const Event CreatedHTTPConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
@ -29,6 +37,105 @@ namespace ErrorCodes
}
namespace
{
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
}
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https)
{
HTTPSessionPtr session;
if (https)
#if USE_POCO_NETSSL
session = std::make_unique<Poco::Net::HTTPSClientSession>();
#else
throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
else
session = std::make_unique<Poco::Net::HTTPClientSession>();
ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);
session->setHost(DNSResolver::instance().resolveHost(host).toString());
session->setPort(port);
return session;
}
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
const std::string host;
const UInt16 port;
bool https;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
ObjectPtr allocObject() override
{
return makeHTTPSessionImpl(host, port, https);
}
public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_)
: Base(max_pool_size_, &Poco::Logger::get("HttpSessionsPool")), host(host_), port(port_), https(https_)
{
}
};
class HTTPSessionPool : public ext::singleton<HTTPSessionPool>
{
private:
using Key = std::tuple<std::string, UInt16, bool>;
using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
using Entry = SingleEndpointHTTPSessionPool::Entry;
friend class ext::singleton<HTTPSessionPool>;
struct Hasher
{
size_t operator()(const Key & k) const
{
SipHash s;
s.update(std::get<0>(k));
s.update(std::get<1>(k));
s.update(std::get<2>(k));
return s.get64();
}
};
std::mutex mutex;
std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;
protected:
HTTPSessionPool() = default;
public:
Entry getSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t max_connections_per_endpoint)
{
std::unique_lock<std::mutex> lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = (uri.getScheme() == "https");
auto key = std::make_tuple(host, port, https);
auto pool_ptr = endpoints_pool.find(key);
if (pool_ptr == endpoints_pool.end())
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, max_connections_per_endpoint));
auto session = pool_ptr->second->get(-1);
setTimeouts(*session, timeouts);
return session;
}
};
}
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
{
if (!response.getKeepAlive())
@ -39,33 +146,24 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}
std::unique_ptr<Poco::Net::HTTPClientSession> makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
{
bool is_ssl = static_cast<bool>(uri.getScheme() == "https");
std::unique_ptr<Poco::Net::HTTPClientSession> session;
if (is_ssl)
#if USE_POCO_NETSSL
session = std::make_unique<Poco::Net::HTTPSClientSession>();
#else
throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
else
session = std::make_unique<Poco::Net::HTTPClientSession>();
session->setHost(DNSResolver::instance().resolveHost(uri.getHost()).toString());
session->setPort(uri.getPort());
#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000
session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session->setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = (uri.getScheme() == "https");
auto session = makeHTTPSessionImpl(host, port, https);
setTimeouts(*session, timeouts);
return session;
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size)
{
return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
}
std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response)
{

View File

@ -1,43 +1,60 @@
#pragma once
#include <mutex>
#include <memory>
#include <iostream>
#include <memory>
#include <mutex>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <Common/PoolBase.h>
#include <IO/ConnectionTimeouts.h>
namespace Poco
{
namespace Net
{
class HTTPServerResponse;
}
namespace Net
{
class HTTPServerResponse;
}
}
namespace DB
{
constexpr int HTTP_TOO_MANY_REQUESTS = 429;
const int HTTP_TOO_MANY_REQUESTS = 429;
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
const std::string host;
const UInt16 port;
const bool https;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
ObjectPtr allocObject() override;
public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_);
};
using PooledHTTPSessionPtr = SingleEndpointHTTPSessionPool::Entry;
using HTTPSessionPtr = std::unique_ptr<Poco::Net::HTTPClientSession>;
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout);
/// Create session object to perform requests and set required parameters.
std::unique_ptr<Poco::Net::HTTPClientSession> makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
/// As previous method creates session, but tooks it from pool
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size);
/** Used to receive response (response headers and possibly body)
* after sending data (request headers and possibly body).
* Throws exception in case of non HTTP_OK (200) response code.
* Returned istream lives in 'session' object.
*/
std::istream * receiveResponse(Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response);
std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response);
}

View File

@ -1,65 +1 @@
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Common/config.h>
#include <Core/Types.h>
#include <IO/ReadBufferFromIStream.h>
#include <Common/DNSResolver.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Version.h>
#include <common/logger_useful.h>
#include <IO/HTTPCommon.h>
namespace DB
{
ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(Poco::URI uri,
const std::string & method_,
OutStreamCallback out_stream_callback,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_)
: ReadBuffer(nullptr, 0),
uri{uri},
method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET},
session{makeHTTPSession(uri, timeouts)}
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri.getPath().empty())
uri.setPath("/");
Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
auto & stream_out = session->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
istr = receiveResponse(*session, request, response);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
bool ReadWriteBufferFromHTTP::nextImpl()
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
}

View File

@ -1,42 +1,132 @@
#pragma once
#include <functional>
#include <Core/Types.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/URI.h>
#include <IO/ReadBuffer.h>
#include <IO/ConnectionTimeouts.h>
#include <Poco/Version.h>
#include <Common/DNSResolver.h>
#include <Common/config.h>
#include <common/logger_useful.h>
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
namespace DB
{
/** Perform HTTP POST request and provide response to read.
*/
class ReadWriteBufferFromHTTP : public ReadBuffer
{
private:
Poco::URI uri;
std::string method;
std::unique_ptr<Poco::Net::HTTPClientSession> session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
namespace detail
{
template <typename SessionPtr>
class ReadWriteBufferFromHTTPBase : public ReadBuffer
{
protected:
Poco::URI uri;
std::string method;
SessionPtr session;
std::istream * istr; /// owned by session
std::unique_ptr<ReadBuffer> impl;
public:
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTPBase(SessionPtr session_,
Poco::URI uri,
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: ReadBuffer(nullptr, 0)
, uri {uri}
, method {!method.empty() ? method : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
, session {std::move(session_)}
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri.getPath().empty())
uri.setPath("/");
Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
request.setHost(uri.getHost()); // use original, not resolved host name in header
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
auto & stream_out = session->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
istr = receiveResponse(*session, request, response);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
bool nextImpl() override
{
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
return true;
}
};
}
class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<HTTPSessionPtr>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<HTTPSessionPtr>;
public:
using OutStreamCallback = std::function<void(std::ostream &)>;
explicit ReadWriteBufferFromHTTP(
Poco::URI uri,
explicit ReadWriteBufferFromHTTP(Poco::URI uri,
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE)
: Parent(makeHTTPSession(uri, timeouts), uri, method, out_stream_callback, credentials, buffer_size_)
{
}
};
class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<PooledHTTPSessionPtr>
{
using Parent = detail::ReadWriteBufferFromHTTPBase<PooledHTTPSessionPtr>;
bool nextImpl() override;
public:
explicit PooledReadWriteBufferFromHTTP(Poco::URI uri,
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
size_t max_connections_per_endpoint = DEFAULT_HTTP_AMOUNT_CONNECTIONS_PER_ENDPOINT)
: Parent(makePooledHTTPSession(uri, timeouts, max_connections_per_endpoint),
uri,
method,
out_stream_callback,
credentials,
buffer_size_)
{
}
};
}

View File

@ -1,6 +1,5 @@
#include <IO/WriteBufferFromHTTP.h>
#include <IO/HTTPCommon.h>
#include <common/logger_useful.h>

View File

@ -3,6 +3,7 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/HTTPCommon.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
@ -17,7 +18,7 @@ namespace DB
class WriteBufferFromHTTP : public WriteBufferFromOStream
{
private:
std::unique_ptr<Poco::Net::HTTPClientSession> session;
HTTPSessionPtr session;
Poco::Net::HTTPRequest request;
Poco::Net::HTTPResponse response;

View File

@ -185,7 +185,15 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
creds.setPassword(password);
}
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, creds};
PooledReadWriteBufferFromHTTP in{
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
};
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;

View File

@ -91,6 +91,10 @@ struct MergeTreeSettings
/** Limit parallel fetches */ \
M(SettingUInt64, replicated_max_parallel_fetches, 0) \
M(SettingUInt64, replicated_max_parallel_fetches_for_table, 0) \
\
/** Limit parallel fetches from endpoint (actually pool size) */ \
M(SettingUInt64, replicated_max_parallel_fetches_for_host, DEFAULT_HTTP_AMOUNT_CONNECTIONS_PER_ENDPOINT) \
\
/** Limit parallel sends */ \
M(SettingUInt64, replicated_max_parallel_sends, 0) \
M(SettingUInt64, replicated_max_parallel_sends_for_table, 0) \

View File

@ -0,0 +1,40 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node4</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node5</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,91 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.test_tools import assert_eq_with_retry
def _fill_nodes(nodes, shard, connections_count):
for node in nodes:
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}')
PARTITION BY date
ORDER BY id
SETTINGS
replicated_max_parallel_fetches_for_host={connections},
index_granularity=8192;
'''.format(shard=shard, replica=node.name, connections=connections_count))
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def start_small_cluster():
try:
cluster.start()
_fill_nodes([node1, node2], 1, 1)
yield cluster
finally:
cluster.shutdown()
def test_single_endpoint_connections_count(start_small_cluster):
def task(count):
print("Inserting ten times from {}".format(count))
for i in xrange(count, count + 10):
node1.query("insert into test_table values ('2017-06-16', {}, 0)".format(i))
p = Pool(10)
p.map(task, xrange(0, 100, 10))
assert_eq_with_retry(node1, "select count() from test_table", "100")
assert_eq_with_retry(node2, "select count() from test_table", "100")
assert node2.query("SELECT value FROM system.events where event='CreatedHTTPConnections'") == '1\n'
node3 = cluster.add_instance('node3', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
node5 = cluster.add_instance('node5', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def start_big_cluster():
try:
cluster.start()
_fill_nodes([node3, node4, node5], 2, 2)
yield cluster
finally:
cluster.shutdown()
def test_multiple_endpoint_connections_count(start_big_cluster):
def task(count):
print("Inserting ten times from {}".format(count))
if (count / 10) % 2 == 1:
node = node3
else:
node = node4
for i in xrange(count, count + 10):
node.query("insert into test_table values ('2017-06-16', {}, 0)".format(i))
p = Pool(10)
p.map(task, xrange(0, 100, 10))
assert_eq_with_retry(node3, "select count() from test_table", "100")
assert_eq_with_retry(node4, "select count() from test_table", "100")
assert_eq_with_retry(node5, "select count() from test_table", "100")
# two per each host
assert node5.query("SELECT value FROM system.events where event='CreatedHTTPConnections'") == '4\n'