mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Merge pull request #52116 from aalexfvk/http_failover
Handle multiple resolved IPs for HTTP/HTTPS session connection
This commit is contained in:
commit
a39ba00ec3
@ -306,7 +306,7 @@ namespace Net
|
||||
DEFAULT_KEEP_ALIVE_TIMEOUT = 8
|
||||
};
|
||||
|
||||
void reconnect();
|
||||
virtual void reconnect();
|
||||
/// Connects the underlying socket to the HTTP server.
|
||||
|
||||
int write(const char * buffer, std::streamsize length);
|
||||
|
@ -60,7 +60,7 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
|
||||
{
|
||||
auto resolved_endpoint = endpoint;
|
||||
resolved_endpoint.setHost(resolved_hosts[i].toString());
|
||||
session = makeHTTPSession(resolved_endpoint, timeouts, false);
|
||||
session = makeHTTPSession(resolved_endpoint, timeouts);
|
||||
|
||||
try
|
||||
{
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Server/HTTP/HTTPServerResponse.h>
|
||||
#include <Poco/Any.h>
|
||||
#include <Common/Concepts.h>
|
||||
#include <Common/DNSResolver.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/MemoryTrackerSwitcher.h>
|
||||
@ -24,9 +25,9 @@
|
||||
|
||||
#include <Poco/Util/Application.h>
|
||||
|
||||
#include <sstream>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -54,6 +55,78 @@ namespace
|
||||
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
||||
}
|
||||
|
||||
template <typename Session>
|
||||
requires std::derived_from<Session, Poco::Net::HTTPClientSession>
|
||||
class HTTPSessionAdapter : public Session
|
||||
{
|
||||
static_assert(std::has_virtual_destructor_v<Session>, "The base class must have a virtual destructor");
|
||||
|
||||
public:
|
||||
HTTPSessionAdapter(const std::string & host, UInt16 port) : Session(host, port), log{&Poco::Logger::get("HTTPSessionAdapter")} { }
|
||||
~HTTPSessionAdapter() override = default;
|
||||
|
||||
protected:
|
||||
void reconnect() override
|
||||
{
|
||||
// First of all will try to establish connection with last used addr.
|
||||
if (!Session::getResolvedHost().empty())
|
||||
{
|
||||
try
|
||||
{
|
||||
Session::reconnect();
|
||||
return;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
Session::close();
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Last ip ({}) is unreachable for {}:{}. Will try another resolved address.",
|
||||
Session::getResolvedHost(),
|
||||
Session::getHost(),
|
||||
Session::getPort());
|
||||
}
|
||||
}
|
||||
|
||||
const auto endpoinds = DNSResolver::instance().resolveHostAll(Session::getHost());
|
||||
|
||||
for (auto it = endpoinds.begin();;)
|
||||
{
|
||||
try
|
||||
{
|
||||
Session::setResolvedHost(it->toString());
|
||||
Session::reconnect();
|
||||
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Created HTTP(S) session with {}:{} ({}:{})",
|
||||
Session::getHost(),
|
||||
Session::getPort(),
|
||||
it->toString(),
|
||||
Session::getPort());
|
||||
|
||||
break;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
Session::close();
|
||||
if (++it == endpoinds.end())
|
||||
{
|
||||
Session::setResolvedHost("");
|
||||
throw;
|
||||
}
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Failed to create connection with {}:{}, Will try another resolved address. {}",
|
||||
Session::getResolvedHost(),
|
||||
Session::getPort(),
|
||||
getCurrentExceptionMessage(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
bool isHTTPS(const Poco::URI & uri)
|
||||
{
|
||||
if (uri.getScheme() == "https")
|
||||
@ -64,28 +137,21 @@ namespace
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_URI_SCHEME, "Unsupported scheme in URI '{}'", uri.toString());
|
||||
}
|
||||
|
||||
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host = true)
|
||||
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
|
||||
{
|
||||
HTTPSessionPtr session;
|
||||
|
||||
if (https)
|
||||
{
|
||||
#if USE_SSL
|
||||
/// Cannot resolve host in advance, otherwise SNI won't work in Poco.
|
||||
/// For more information about SNI, see the https://en.wikipedia.org/wiki/Server_Name_Indication
|
||||
auto https_session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
|
||||
if (resolve_host)
|
||||
https_session->setResolvedHost(DNSResolver::instance().resolveHost(host).toString());
|
||||
|
||||
session = std::move(https_session);
|
||||
session = std::make_shared<HTTPSessionAdapter<Poco::Net::HTTPSClientSession>>(host, port);
|
||||
#else
|
||||
throw Exception(ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME, "ClickHouse was built without HTTPS support");
|
||||
#endif
|
||||
}
|
||||
else
|
||||
{
|
||||
String resolved_host = resolve_host ? DNSResolver::instance().resolveHost(host).toString() : host;
|
||||
session = std::make_shared<Poco::Net::HTTPClientSession>(resolved_host, port);
|
||||
session = std::make_shared<HTTPSessionAdapter<Poco::Net::HTTPClientSession>>(host, port);
|
||||
}
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);
|
||||
@ -104,7 +170,6 @@ namespace
|
||||
const String proxy_host;
|
||||
const UInt16 proxy_port;
|
||||
const bool proxy_https;
|
||||
const bool resolve_host;
|
||||
|
||||
using Base = PoolBase<Poco::Net::HTTPClientSession>;
|
||||
|
||||
@ -113,7 +178,7 @@ namespace
|
||||
/// Pool is global, we shouldn't attribute this memory to query/user.
|
||||
MemoryTrackerSwitcher switcher{&total_memory_tracker};
|
||||
|
||||
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
|
||||
auto session = makeHTTPSessionImpl(host, port, https, true);
|
||||
if (!proxy_host.empty())
|
||||
{
|
||||
const String proxy_scheme = proxy_https ? "https" : "http";
|
||||
@ -137,7 +202,6 @@ namespace
|
||||
UInt16 proxy_port_,
|
||||
bool proxy_https_,
|
||||
size_t max_pool_size_,
|
||||
bool resolve_host_,
|
||||
bool wait_on_pool_size_limit)
|
||||
: Base(
|
||||
static_cast<unsigned>(max_pool_size_),
|
||||
@ -149,7 +213,6 @@ namespace
|
||||
, proxy_host(proxy_host_)
|
||||
, proxy_port(proxy_port_)
|
||||
, proxy_https(proxy_https_)
|
||||
, resolve_host(resolve_host_)
|
||||
{
|
||||
}
|
||||
};
|
||||
@ -197,24 +260,6 @@ namespace
|
||||
std::mutex mutex;
|
||||
std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;
|
||||
|
||||
void updateHostIfIpChanged(Entry & session, const String & new_ip)
|
||||
{
|
||||
const auto old_ip = session->getResolvedHost().empty() ? session->getHost() : session->getResolvedHost();
|
||||
|
||||
if (new_ip != old_ip)
|
||||
{
|
||||
session->reset();
|
||||
if (session->getResolvedHost().empty())
|
||||
{
|
||||
session->setHost(new_ip);
|
||||
}
|
||||
else
|
||||
{
|
||||
session->setResolvedHost(new_ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
HTTPSessionPool() = default;
|
||||
|
||||
@ -230,7 +275,6 @@ namespace
|
||||
const Poco::URI & proxy_uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t max_connections_per_endpoint,
|
||||
bool resolve_host,
|
||||
bool wait_on_pool_size_limit)
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
@ -261,7 +305,6 @@ namespace
|
||||
proxy_port,
|
||||
proxy_https,
|
||||
max_connections_per_endpoint,
|
||||
resolve_host,
|
||||
wait_on_pool_size_limit));
|
||||
|
||||
/// Some routines held session objects until the end of its lifetime. Also this routines may create another sessions in this time frame.
|
||||
@ -273,17 +316,6 @@ namespace
|
||||
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
|
||||
auto session = pool_ptr->second->get(retry_timeout);
|
||||
|
||||
const auto & session_data = session->sessionData();
|
||||
if (session_data.empty() || !Poco::AnyCast<HTTPSessionReuseTag>(&session_data))
|
||||
{
|
||||
session->reset();
|
||||
|
||||
if (resolve_host)
|
||||
updateHostIfIpChanged(session, DNSResolver::instance().resolveHost(host).toString());
|
||||
}
|
||||
|
||||
session->attachSessionData({});
|
||||
|
||||
setTimeouts(*session, timeouts);
|
||||
|
||||
return session;
|
||||
@ -301,13 +333,13 @@ void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_
|
||||
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
|
||||
}
|
||||
|
||||
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host)
|
||||
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
const std::string & host = uri.getHost();
|
||||
UInt16 port = uri.getPort();
|
||||
bool https = isHTTPS(uri);
|
||||
|
||||
auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host);
|
||||
auto session = makeHTTPSessionImpl(host, port, https, false);
|
||||
setTimeouts(*session, timeouts);
|
||||
return session;
|
||||
}
|
||||
@ -317,10 +349,9 @@ PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host,
|
||||
bool wait_on_pool_size_limit)
|
||||
{
|
||||
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
|
||||
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, wait_on_pool_size_limit);
|
||||
}
|
||||
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
@ -328,10 +359,9 @@ PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & proxy_uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host,
|
||||
bool wait_on_pool_size_limit)
|
||||
{
|
||||
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
|
||||
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, wait_on_pool_size_limit);
|
||||
}
|
||||
|
||||
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
|
||||
|
@ -70,14 +70,13 @@ void markSessionForReuse(PooledHTTPSessionPtr session);
|
||||
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout);
|
||||
|
||||
/// Create session object to perform requests and set required parameters.
|
||||
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
|
||||
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
|
||||
|
||||
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host = true,
|
||||
bool wait_on_pool_size_limit = true);
|
||||
|
||||
PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
@ -85,7 +84,6 @@ PooledHTTPSessionPtr makePooledHTTPSession(
|
||||
const Poco::URI & proxy_uri,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
size_t per_endpoint_pool_size,
|
||||
bool resolve_host = true,
|
||||
bool wait_on_pool_size_limit = true);
|
||||
|
||||
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);
|
||||
|
@ -336,9 +336,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
/// This can lead to request signature difference on S3 side.
|
||||
if constexpr (pooled)
|
||||
session = makePooledHTTPSession(
|
||||
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
|
||||
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
|
||||
else
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
|
||||
session = makeHTTPSession(target_uri, timeouts);
|
||||
bool use_tunnel = request_configuration.proxy_scheme == Aws::Http::Scheme::HTTP && target_uri.getScheme() == "https";
|
||||
|
||||
session->setProxy(
|
||||
@ -352,9 +352,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
|
||||
{
|
||||
if constexpr (pooled)
|
||||
session = makePooledHTTPSession(
|
||||
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
|
||||
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
|
||||
else
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
|
||||
session = makeHTTPSession(target_uri, timeouts);
|
||||
}
|
||||
|
||||
/// In case of error this address will be written to logs
|
||||
|
@ -3548,6 +3548,24 @@ class ClickHouseInstance:
|
||||
|
||||
return error
|
||||
|
||||
def append_hosts(self, name, ip):
|
||||
self.exec_in_container(
|
||||
(["bash", "-c", "echo '{}' {} >> /etc/hosts".format(ip, name)]),
|
||||
privileged=True,
|
||||
user="root",
|
||||
)
|
||||
|
||||
def set_hosts(self, hosts):
|
||||
entries = ["127.0.0.1 localhost", "::1 localhost"]
|
||||
for host in hosts:
|
||||
entries.append(f"{host[0]} {host[1]}")
|
||||
|
||||
self.exec_in_container(
|
||||
["bash", "-c", 'echo -e "{}" > /etc/hosts'.format("\\n".join(entries))],
|
||||
privileged=True,
|
||||
user="root",
|
||||
)
|
||||
|
||||
# Connects to the instance via HTTP interface, sends a query and returns both the answer and the error message
|
||||
# as a tuple (output, error).
|
||||
def http_query_and_get_answer_with_error(
|
||||
|
@ -68,5 +68,9 @@
|
||||
"test_server_reload/test.py::test_remove_postgresql_port",
|
||||
"test_server_reload/test.py::test_remove_tcp_port",
|
||||
|
||||
"test_keeper_map/test.py::test_keeper_map_without_zk"
|
||||
"test_keeper_map/test.py::test_keeper_map_without_zk",
|
||||
|
||||
"test_http_failover/test.py::test_url_destination_host_with_multiple_addrs",
|
||||
"test_http_failover/test.py::test_url_invalid_hostname",
|
||||
"test_http_failover/test.py::test_url_ip_change"
|
||||
]
|
||||
|
@ -55,6 +55,13 @@ def cluster_without_dns_cache_update():
|
||||
# node1 is a source, node2 downloads data
|
||||
# node2 has long dns_cache_update_period, so dns cache update wouldn't work
|
||||
def test_ip_change_drop_dns_cache(cluster_without_dns_cache_update):
|
||||
# In this case we should manually set up the static DNS entries on the source host
|
||||
# to exclude resplving addresses automatically added by docker.
|
||||
# We use ipv6 for hosts, but resolved DNS entries may contain an unexpected ipv4 address.
|
||||
node2.set_hosts([("2001:3984:3989::1:1111", "node1")])
|
||||
# drop DNS cache
|
||||
node2.query("SYSTEM DROP DNS CACHE")
|
||||
|
||||
# First we check, that normal replication works
|
||||
node1.query(
|
||||
"INSERT INTO test_table_drop VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)"
|
||||
@ -64,6 +71,7 @@ def test_ip_change_drop_dns_cache(cluster_without_dns_cache_update):
|
||||
|
||||
# We change source node ip
|
||||
cluster.restart_instance_with_ip_change(node1, "2001:3984:3989::1:7777")
|
||||
node2.set_hosts([("2001:3984:3989::1:7777", "node1")])
|
||||
|
||||
# Put some data to source node1
|
||||
node1.query(
|
||||
@ -163,17 +171,8 @@ def test_ip_change_update_dns_cache(cluster_with_dns_cache_update):
|
||||
assert_eq_with_retry(node4, "SELECT count(*) from test_table_update", "7")
|
||||
|
||||
|
||||
def set_hosts(node, hosts):
|
||||
new_content = "\\n".join(["127.0.0.1 localhost", "::1 localhost"] + hosts)
|
||||
node.exec_in_container(
|
||||
["bash", "-c", 'echo -e "{}" > /etc/hosts'.format(new_content)],
|
||||
privileged=True,
|
||||
user="root",
|
||||
)
|
||||
|
||||
|
||||
def test_dns_cache_update(cluster_with_dns_cache_update):
|
||||
set_hosts(node4, ["127.255.255.255 lost_host"])
|
||||
node4.set_hosts([("127.255.255.255", "lost_host")])
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node4.query("SELECT * FROM remote('lost_host', 'system', 'one')")
|
||||
@ -184,7 +183,7 @@ def test_dns_cache_update(cluster_with_dns_cache_update):
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node4.query("SELECT * FROM distributed_lost_host")
|
||||
|
||||
set_hosts(node4, ["127.0.0.1 lost_host"])
|
||||
node4.set_hosts([("127.0.0.1", "lost_host")])
|
||||
|
||||
# Wait a bit until dns cache will be updated
|
||||
assert_eq_with_retry(
|
||||
@ -239,11 +238,10 @@ def test_user_access_ip_change(cluster_with_dns_cache_update, node):
|
||||
== "0\n"
|
||||
)
|
||||
|
||||
set_hosts(
|
||||
node,
|
||||
node.set_hosts(
|
||||
[
|
||||
"127.255.255.255 node3",
|
||||
"2001:3984:3989::1:88{}4 unknown_host".format(node_num),
|
||||
("127.255.255.255", "node3"),
|
||||
(f"2001:3984:3989::1:88{node_num}4", "unknown_host"),
|
||||
],
|
||||
)
|
||||
|
||||
@ -260,7 +258,7 @@ def test_user_access_ip_change(cluster_with_dns_cache_update, node):
|
||||
node4.query("SELECT * FROM remote('{}', 'system', 'one')".format(node_name))
|
||||
# now wrong addresses are cached
|
||||
|
||||
set_hosts(node, [])
|
||||
node.set_hosts([])
|
||||
retry_count = 60
|
||||
if node_name == "node5":
|
||||
# client is not allowed to connect, so execute it directly in container to send query from localhost
|
||||
@ -298,7 +296,7 @@ def test_host_is_drop_from_cache_after_consecutive_failures(
|
||||
# Note that the list of hosts in variable since lost_host will be there too (and it's dropped and added back)
|
||||
# dns_update_short -> dns_max_consecutive_failures set to 6
|
||||
assert node4.wait_for_log_line(
|
||||
"Cannot resolve host \\(InvalidHostThatDoesNotExist\\), error 0: Host not found."
|
||||
"Code: 198. DB::Exception: Not found address of host: InvalidHostThatDoesNotExist."
|
||||
)
|
||||
assert node4.wait_for_log_line(
|
||||
"Cached hosts not found:.*InvalidHostThatDoesNotExist**",
|
||||
|
0
tests/integration/test_http_failover/__init__.py
Normal file
0
tests/integration/test_http_failover/__init__.py
Normal file
1
tests/integration/test_http_failover/configs/listen.xml
Normal file
1
tests/integration/test_http_failover/configs/listen.xml
Normal file
@ -0,0 +1 @@
|
||||
<clickhouse><listen_host>::</listen_host></clickhouse>
|
113
tests/integration/test_http_failover/test.py
Normal file
113
tests/integration/test_http_failover/test.py
Normal file
@ -0,0 +1,113 @@
|
||||
import pytest
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.test_tools import exec_query_with_retry
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
|
||||
ACCESSIBLE_IPV4 = "10.5.172.10"
|
||||
OTHER_ACCESSIBLE_IPV4 = "10.5.172.20"
|
||||
NOT_ACCESSIBLE_IPV4 = "10.5.172.11"
|
||||
|
||||
ACCESSIBLE_IPV6 = "2001:3984:3989::1:1000"
|
||||
NOT_ACCESSIBLE_IPV6 = "2001:3984:3989::1:1001"
|
||||
|
||||
DST_NODE_IPV4 = ACCESSIBLE_IPV4
|
||||
DST_NODE_IPV6 = ACCESSIBLE_IPV6
|
||||
SRC_NODE_IPV6 = "2001:3984:3989::1:2000"
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
# Destination node
|
||||
dst_node = cluster.add_instance(
|
||||
"dst_node",
|
||||
with_zookeeper=True,
|
||||
ipv4_address=DST_NODE_IPV4,
|
||||
ipv6_address=DST_NODE_IPV6,
|
||||
main_configs=["configs/listen.xml"],
|
||||
)
|
||||
# Source node
|
||||
src_node = cluster.add_instance(
|
||||
"src_node",
|
||||
with_zookeeper=True,
|
||||
ipv6_address=SRC_NODE_IPV6,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dst_node_addrs(started_cluster, request):
|
||||
src_node.set_hosts([(ip, "dst_node") for ip in request.param])
|
||||
src_node.query("SYSTEM DROP DNS CACHE")
|
||||
|
||||
yield
|
||||
|
||||
# Clear static DNS entries
|
||||
src_node.set_hosts([])
|
||||
src_node.query("SYSTEM DROP DNS CACHE")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"dst_node_addrs, expectation",
|
||||
[
|
||||
((ACCESSIBLE_IPV4, ACCESSIBLE_IPV6), does_not_raise()),
|
||||
((NOT_ACCESSIBLE_IPV4, ACCESSIBLE_IPV6), does_not_raise()),
|
||||
((ACCESSIBLE_IPV4, NOT_ACCESSIBLE_IPV6), does_not_raise()),
|
||||
(
|
||||
(NOT_ACCESSIBLE_IPV4, NOT_ACCESSIBLE_IPV6),
|
||||
pytest.raises(QueryRuntimeException),
|
||||
),
|
||||
],
|
||||
indirect=["dst_node_addrs"],
|
||||
)
|
||||
def test_url_destination_host_with_multiple_addrs(dst_node_addrs, expectation):
|
||||
with expectation:
|
||||
result = src_node.query(
|
||||
"SELECT * FROM url('http://dst_node:8123/?query=SELECT+42', TSV, 'column1 UInt32')"
|
||||
)
|
||||
assert result == "42\n"
|
||||
|
||||
|
||||
def test_url_invalid_hostname(started_cluster):
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
src_node.query(
|
||||
"SELECT count(*) FROM url('http://notvalidhost:8123/?query=SELECT+1', TSV, 'column1 UInt32');"
|
||||
)
|
||||
|
||||
|
||||
def test_url_ip_change(started_cluster):
|
||||
assert (
|
||||
src_node.query(
|
||||
"SELECT * FROM url('http://dst_node:8123/?query=SELECT+42', TSV, 'column1 UInt32')"
|
||||
)
|
||||
== "42\n"
|
||||
)
|
||||
|
||||
started_cluster.restart_instance_with_ip_change(dst_node, OTHER_ACCESSIBLE_IPV4)
|
||||
|
||||
# Ensure that only new IPV4 address is accessible
|
||||
src_node.set_hosts(
|
||||
[(OTHER_ACCESSIBLE_IPV4, "dst_node"), (NOT_ACCESSIBLE_IPV6, "dst_node")]
|
||||
)
|
||||
src_node.query("SYSTEM DROP DNS CACHE")
|
||||
|
||||
assert (
|
||||
src_node.query(
|
||||
"SELECT * FROM url('http://dst_node:8123/?query=SELECT+42', TSV, 'column1 UInt32')"
|
||||
)
|
||||
== "42\n"
|
||||
)
|
Loading…
Reference in New Issue
Block a user