From d4ce674b93a320a2767ee27cbbee0e23e1d6c4fa Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 16 Nov 2018 16:15:17 +0300 Subject: [PATCH] Add http connection pool between replicas --- dbms/src/Common/ProfileEvents.cpp | 1 + dbms/src/Core/Defines.h | 1 + dbms/src/IO/HTTPCommon.cpp | 144 +++++++++++++++--- dbms/src/IO/HTTPCommon.h | 39 +++-- dbms/src/IO/ReadWriteBufferFromHTTP.cpp | 64 -------- dbms/src/IO/ReadWriteBufferFromHTTP.h | 124 ++++++++++++--- dbms/src/IO/WriteBufferFromHTTP.cpp | 1 - dbms/src/IO/WriteBufferFromHTTP.h | 3 +- .../Storages/MergeTree/DataPartsExchange.cpp | 10 +- .../Storages/MergeTree/MergeTreeSettings.h | 4 + .../__init__.py | 0 .../configs/remote_servers.xml | 40 +++++ .../test.py | 91 +++++++++++ 13 files changed, 404 insertions(+), 118 deletions(-) create mode 100644 dbms/tests/integration/test_max_http_connections_for_replication/__init__.py create mode 100644 dbms/tests/integration/test_max_http_connections_for_replication/configs/remote_servers.xml create mode 100644 dbms/tests/integration/test_max_http_connections_for_replication/test.py diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index de50c625f15..7059e02d76c 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -170,6 +170,7 @@ M(OSWriteBytes, "Number of bytes written to disks or block devices. Doesn't include bytes that are in page cache dirty pages. May not include data that was written by OS asynchronously.") \ M(OSReadChars, "Number of bytes read from filesystem, including page cache.") \ M(OSWriteChars, "Number of bytes written to filesystem, including page cache.") \ + M(CreatedHTTPConnections, "Total amount of created HTTP connections (closed or opened).") \ namespace ProfileEvents { diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index cf7a0b621e0..deef10323fe 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -62,6 +62,7 @@ #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 +#define DEFAULT_HTTP_AMOUNT_CONNECTIONS_PER_ENDPOINT 10 // more aliases: https://mailman.videolan.org/pipermail/x264-devel/2014-May/010660.html diff --git a/dbms/src/IO/HTTPCommon.cpp b/dbms/src/IO/HTTPCommon.cpp index b789e63609d..1bb7783b56f 100644 --- a/dbms/src/IO/HTTPCommon.cpp +++ b/dbms/src/IO/HTTPCommon.cpp @@ -1,9 +1,9 @@ #include +#include #include #include #include -#include #if USE_POCO_NETSSL #include #include @@ -13,14 +13,22 @@ #include #include #endif +#include +#include #include #include +#include +#include +#include #include +namespace ProfileEvents +{ + extern const Event CreatedHTTPConnections; +} namespace DB { - namespace ErrorCodes { extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER; @@ -29,6 +37,105 @@ namespace ErrorCodes } +namespace +{ + void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts) + { +#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000 + session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout); +#else + session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout})); +#endif + } + + HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https) + { + HTTPSessionPtr session; + + if (https) +#if USE_POCO_NETSSL + session = std::make_unique(); +#else + throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME); +#endif + else + session = std::make_unique(); + + ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections); + + session->setHost(DNSResolver::instance().resolveHost(host).toString()); + session->setPort(port); + + return session; + } + + class SingleEndpointHTTPSessionPool : public PoolBase + { + private: + const std::string host; + const UInt16 port; + bool https; + using Base = PoolBase; + + ObjectPtr allocObject() override + { + return makeHTTPSessionImpl(host, port, https); + } + + public: + SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_) + : Base(max_pool_size_, &Poco::Logger::get("HttpSessionsPool")), host(host_), port(port_), https(https_) + { + } + }; + + class HTTPSessionPool : public ext::singleton + { + private: + using Key = std::tuple; + using PoolPtr = std::shared_ptr; + using Entry = SingleEndpointHTTPSessionPool::Entry; + + friend class ext::singleton; + + struct Hasher + { + size_t operator()(const Key & k) const + { + SipHash s; + s.update(std::get<0>(k)); + s.update(std::get<1>(k)); + s.update(std::get<2>(k)); + return s.get64(); + } + }; + + std::mutex mutex; + std::unordered_map endpoints_pool; + + protected: + HTTPSessionPool() = default; + + public: + Entry getSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t max_connections_per_endpoint) + { + std::unique_lock lock(mutex); + const std::string & host = uri.getHost(); + UInt16 port = uri.getPort(); + bool https = (uri.getScheme() == "https"); + auto key = std::make_tuple(host, port, https); + auto pool_ptr = endpoints_pool.find(key); + if (pool_ptr == endpoints_pool.end()) + std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace( + key, std::make_shared(host, port, https, max_connections_per_endpoint)); + + auto session = pool_ptr->second->get(-1); + setTimeouts(*session, timeouts); + return session; + } + }; +} + void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout) { if (!response.getKeepAlive()) @@ -39,33 +146,24 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds())); } -std::unique_ptr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts) +HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts) { - bool is_ssl = static_cast(uri.getScheme() == "https"); - std::unique_ptr session; - - if (is_ssl) -#if USE_POCO_NETSSL - session = std::make_unique(); -#else - throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME); -#endif - else - session = std::make_unique(); - - session->setHost(DNSResolver::instance().resolveHost(uri.getHost()).toString()); - session->setPort(uri.getPort()); - -#if POCO_CLICKHOUSE_PATCH || POCO_VERSION >= 0x02000000 - session->setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout); -#else - session->setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout})); -#endif + const std::string & host = uri.getHost(); + UInt16 port = uri.getPort(); + bool https = (uri.getScheme() == "https"); + auto session = makeHTTPSessionImpl(host, port, https); + setTimeouts(*session, timeouts); return session; } +PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size) +{ + return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size); +} + + std::istream * receiveResponse( Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response) { diff --git a/dbms/src/IO/HTTPCommon.h b/dbms/src/IO/HTTPCommon.h index 011f718e205..a607e5f512c 100644 --- a/dbms/src/IO/HTTPCommon.h +++ b/dbms/src/IO/HTTPCommon.h @@ -1,43 +1,60 @@ #pragma once -#include -#include #include +#include +#include #include #include #include #include +#include #include namespace Poco { - namespace Net - { - class HTTPServerResponse; - } +namespace Net +{ + class HTTPServerResponse; +} } namespace DB { +constexpr int HTTP_TOO_MANY_REQUESTS = 429; -const int HTTP_TOO_MANY_REQUESTS = 429; +class SingleEndpointHTTPSessionPool : public PoolBase +{ +private: + const std::string host; + const UInt16 port; + const bool https; + using Base = PoolBase; + + ObjectPtr allocObject() override; + +public: + SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_); +}; +using PooledHTTPSessionPtr = SingleEndpointHTTPSessionPool::Entry; +using HTTPSessionPtr = std::unique_ptr; void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout); - /// Create session object to perform requests and set required parameters. -std::unique_ptr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts); +HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts); +/// As previous method creates session, but tooks it from pool +PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size); /** Used to receive response (response headers and possibly body) * after sending data (request headers and possibly body). * Throws exception in case of non HTTP_OK (200) response code. * Returned istream lives in 'session' object. */ -std::istream * receiveResponse(Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response); - +std::istream * receiveResponse( + Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response); } diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp index 467f040ef38..4d046bfe2c6 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.cpp +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.cpp @@ -1,65 +1 @@ #include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace DB -{ - - -ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(Poco::URI uri, - const std::string & method_, - OutStreamCallback out_stream_callback, - const ConnectionTimeouts & timeouts, - const Poco::Net::HTTPBasicCredentials & credentials, - size_t buffer_size_) - : ReadBuffer(nullptr, 0), - uri{uri}, - method{!method_.empty() ? method_ : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}, - session{makeHTTPSession(uri, timeouts)} -{ - // With empty path poco will send "POST HTTP/1.1" its bug. - if (uri.getPath().empty()) - uri.setPath("/"); - - Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); - request.setHost(uri.getHost()); // use original, not resolved host name in header - - if (out_stream_callback) - request.setChunkedTransferEncoding(true); - - if (!credentials.getUsername().empty()) - credentials.authenticate(request); - - Poco::Net::HTTPResponse response; - - LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString()); - - auto & stream_out = session->sendRequest(request); - - if (out_stream_callback) - out_stream_callback(stream_out); - - istr = receiveResponse(*session, request, response); - - impl = std::make_unique(*istr, buffer_size_); -} - - -bool ReadWriteBufferFromHTTP::nextImpl() -{ - if (!impl->next()) - return false; - internal_buffer = impl->buffer(); - working_buffer = internal_buffer; - return true; -} - -} diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.h b/dbms/src/IO/ReadWriteBufferFromHTTP.h index 363c9becddc..648053ee83b 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.h +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.h @@ -1,42 +1,132 @@ #pragma once #include +#include +#include +#include +#include +#include #include #include +#include +#include #include -#include -#include +#include +#include +#include +#include + #define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 1800 #define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1 namespace DB { - /** Perform HTTP POST request and provide response to read. */ -class ReadWriteBufferFromHTTP : public ReadBuffer -{ -private: - Poco::URI uri; - std::string method; - std::unique_ptr session; - std::istream * istr; /// owned by session - std::unique_ptr impl; +namespace detail +{ + template + class ReadWriteBufferFromHTTPBase : public ReadBuffer + { + protected: + Poco::URI uri; + std::string method; + + SessionPtr session; + std::istream * istr; /// owned by session + std::unique_ptr impl; + + public: + using OutStreamCallback = std::function; + + explicit ReadWriteBufferFromHTTPBase(SessionPtr session_, + Poco::URI uri, + const std::string & method = {}, + OutStreamCallback out_stream_callback = {}, + const Poco::Net::HTTPBasicCredentials & credentials = {}, + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) + : ReadBuffer(nullptr, 0) + , uri {uri} + , method {!method.empty() ? method : out_stream_callback ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} + , session {std::move(session_)} + { + // With empty path poco will send "POST HTTP/1.1" its bug. + if (uri.getPath().empty()) + uri.setPath("/"); + + Poco::Net::HTTPRequest request(method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1); + request.setHost(uri.getHost()); // use original, not resolved host name in header + + if (out_stream_callback) + request.setChunkedTransferEncoding(true); + + if (!credentials.getUsername().empty()) + credentials.authenticate(request); + + Poco::Net::HTTPResponse response; + + LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString()); + + auto & stream_out = session->sendRequest(request); + + if (out_stream_callback) + out_stream_callback(stream_out); + + istr = receiveResponse(*session, request, response); + + impl = std::make_unique(*istr, buffer_size_); + } + + + bool nextImpl() override + { + if (!impl->next()) + return false; + internal_buffer = impl->buffer(); + working_buffer = internal_buffer; + return true; + } + }; +} + +class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase +{ + using Parent = detail::ReadWriteBufferFromHTTPBase; public: - using OutStreamCallback = std::function; - - explicit ReadWriteBufferFromHTTP( - Poco::URI uri, + explicit ReadWriteBufferFromHTTP(Poco::URI uri, const std::string & method = {}, OutStreamCallback out_stream_callback = {}, const ConnectionTimeouts & timeouts = {}, const Poco::Net::HTTPBasicCredentials & credentials = {}, - size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE); + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE) + : Parent(makeHTTPSession(uri, timeouts), uri, method, out_stream_callback, credentials, buffer_size_) + { + } +}; +class PooledReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase +{ + using Parent = detail::ReadWriteBufferFromHTTPBase; - bool nextImpl() override; +public: + explicit PooledReadWriteBufferFromHTTP(Poco::URI uri, + const std::string & method = {}, + OutStreamCallback out_stream_callback = {}, + const ConnectionTimeouts & timeouts = {}, + const Poco::Net::HTTPBasicCredentials & credentials = {}, + size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, + size_t max_connections_per_endpoint = DEFAULT_HTTP_AMOUNT_CONNECTIONS_PER_ENDPOINT) + : Parent(makePooledHTTPSession(uri, timeouts, max_connections_per_endpoint), + uri, + method, + out_stream_callback, + credentials, + buffer_size_) + { + } }; + } diff --git a/dbms/src/IO/WriteBufferFromHTTP.cpp b/dbms/src/IO/WriteBufferFromHTTP.cpp index 51472de85e0..c74c74a0bd0 100644 --- a/dbms/src/IO/WriteBufferFromHTTP.cpp +++ b/dbms/src/IO/WriteBufferFromHTTP.cpp @@ -1,6 +1,5 @@ #include -#include #include diff --git a/dbms/src/IO/WriteBufferFromHTTP.h b/dbms/src/IO/WriteBufferFromHTTP.h index 12eed48021a..c68b8f88d3e 100644 --- a/dbms/src/IO/WriteBufferFromHTTP.h +++ b/dbms/src/IO/WriteBufferFromHTTP.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -17,7 +18,7 @@ namespace DB class WriteBufferFromHTTP : public WriteBufferFromOStream { private: - std::unique_ptr session; + HTTPSessionPtr session; Poco::Net::HTTPRequest request; Poco::Net::HTTPResponse response; diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index f54e74fa5fa..00e005a3907 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -185,7 +185,15 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( creds.setPassword(password); } - ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, creds}; + PooledReadWriteBufferFromHTTP in{ + uri, + Poco::Net::HTTPRequest::HTTP_POST, + {}, + timeouts, + creds, + DBMS_DEFAULT_BUFFER_SIZE, + data.settings.replicated_max_parallel_fetches_for_host + }; static const String TMP_PREFIX = "tmp_fetch_"; String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index f7fa9bf6703..619f41edd20 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -91,6 +91,10 @@ struct MergeTreeSettings /** Limit parallel fetches */ \ M(SettingUInt64, replicated_max_parallel_fetches, 0) \ M(SettingUInt64, replicated_max_parallel_fetches_for_table, 0) \ + \ + /** Limit parallel fetches from endpoint (actually pool size) */ \ + M(SettingUInt64, replicated_max_parallel_fetches_for_host, DEFAULT_HTTP_AMOUNT_CONNECTIONS_PER_ENDPOINT) \ + \ /** Limit parallel sends */ \ M(SettingUInt64, replicated_max_parallel_sends, 0) \ M(SettingUInt64, replicated_max_parallel_sends_for_table, 0) \ diff --git a/dbms/tests/integration/test_max_http_connections_for_replication/__init__.py b/dbms/tests/integration/test_max_http_connections_for_replication/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_max_http_connections_for_replication/configs/remote_servers.xml b/dbms/tests/integration/test_max_http_connections_for_replication/configs/remote_servers.xml new file mode 100644 index 00000000000..e44d4eef3ca --- /dev/null +++ b/dbms/tests/integration/test_max_http_connections_for_replication/configs/remote_servers.xml @@ -0,0 +1,40 @@ + + + + + true + + test + node1 + 9000 + + + test + node2 + 9000 + + + + + + true + + test + node3 + 9000 + + + test + node4 + 9000 + + + test + node5 + 9000 + + + + + + diff --git a/dbms/tests/integration/test_max_http_connections_for_replication/test.py b/dbms/tests/integration/test_max_http_connections_for_replication/test.py new file mode 100644 index 00000000000..b90650337d4 --- /dev/null +++ b/dbms/tests/integration/test_max_http_connections_for_replication/test.py @@ -0,0 +1,91 @@ +import time +import pytest + +from helpers.cluster import ClickHouseCluster +from multiprocessing.dummy import Pool + +from helpers.test_tools import assert_eq_with_retry + +def _fill_nodes(nodes, shard, connections_count): + for node in nodes: + node.query( + ''' + CREATE DATABASE test; + + CREATE TABLE test_table(date Date, id UInt32, dummy UInt32) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}') + PARTITION BY date + ORDER BY id + SETTINGS + replicated_max_parallel_fetches_for_host={connections}, + index_granularity=8192; + '''.format(shard=shard, replica=node.name, connections=connections_count)) + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True) +node2 = cluster.add_instance('node2', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True) + +@pytest.fixture(scope="module") +def start_small_cluster(): + try: + cluster.start() + + _fill_nodes([node1, node2], 1, 1) + + yield cluster + + finally: + cluster.shutdown() + +def test_single_endpoint_connections_count(start_small_cluster): + + def task(count): + print("Inserting ten times from {}".format(count)) + for i in xrange(count, count + 10): + node1.query("insert into test_table values ('2017-06-16', {}, 0)".format(i)) + + p = Pool(10) + p.map(task, xrange(0, 100, 10)) + + assert_eq_with_retry(node1, "select count() from test_table", "100") + assert_eq_with_retry(node2, "select count() from test_table", "100") + + assert node2.query("SELECT value FROM system.events where event='CreatedHTTPConnections'") == '1\n' + +node3 = cluster.add_instance('node3', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True) +node4 = cluster.add_instance('node4', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True) +node5 = cluster.add_instance('node5', config_dir="configs", main_configs=['configs/remote_servers.xml'], with_zookeeper=True) + +@pytest.fixture(scope="module") +def start_big_cluster(): + try: + cluster.start() + + _fill_nodes([node3, node4, node5], 2, 2) + + yield cluster + + finally: + cluster.shutdown() + +def test_multiple_endpoint_connections_count(start_big_cluster): + + def task(count): + print("Inserting ten times from {}".format(count)) + if (count / 10) % 2 == 1: + node = node3 + else: + node = node4 + + for i in xrange(count, count + 10): + node.query("insert into test_table values ('2017-06-16', {}, 0)".format(i)) + + p = Pool(10) + p.map(task, xrange(0, 100, 10)) + + assert_eq_with_retry(node3, "select count() from test_table", "100") + assert_eq_with_retry(node4, "select count() from test_table", "100") + assert_eq_with_retry(node5, "select count() from test_table", "100") + + # two per each host + assert node5.query("SELECT value FROM system.events where event='CreatedHTTPConnections'") == '4\n'