mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #5534 from yandex/debug_dns_cache
Fix bug in pooled sessions and host ip change
This commit is contained in:
commit
a0e7d183f7
@ -158,6 +158,26 @@ void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeout
|
||||
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
|
||||
auto session = pool_ptr->second->get(retry_timeout);
|
||||
|
||||
/// We store exception messages in session data.
|
||||
/// Poco HTTPSession also stores exception, but it can be removed at any time.
|
||||
const auto & sessionData = session->sessionData();
|
||||
if (!sessionData.empty())
|
||||
{
|
||||
auto msg = Poco::AnyCast<std::string>(sessionData);
|
||||
if (!msg.empty())
|
||||
{
|
||||
LOG_TRACE((&Logger::get("HTTPCommon")), "Failed communicating with " << host << " with error '" << msg << "' will try to reconnect session");
|
||||
/// Host can change IP
|
||||
const auto ip = DNSResolver::instance().resolveHost(host).toString();
|
||||
if (ip != session->getHost())
|
||||
{
|
||||
session->reset();
|
||||
session->setHost(ip);
|
||||
session->attachSessionData({});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setTimeouts(*session, timeouts);
|
||||
|
||||
return session;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <IO/HTTPCommon.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ReadBufferFromIStream.h>
|
||||
#include <Poco/Any.h>
|
||||
#include <Poco/Net/HTTPBasicCredentials.h>
|
||||
#include <Poco/Net/HTTPClientSession.h>
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
@ -69,14 +70,24 @@ namespace detail
|
||||
|
||||
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
|
||||
|
||||
auto & stream_out = session->sendRequest(request);
|
||||
try
|
||||
{
|
||||
auto & stream_out = session->sendRequest(request);
|
||||
|
||||
if (out_stream_callback)
|
||||
out_stream_callback(stream_out);
|
||||
if (out_stream_callback)
|
||||
out_stream_callback(stream_out);
|
||||
|
||||
istr = receiveResponse(*session, request, response);
|
||||
istr = receiveResponse(*session, request, response);
|
||||
|
||||
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
|
||||
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
/// We use session data storage as storage for exception text
|
||||
/// Depend on it we can deduce to reconnect session or reresolve session host
|
||||
session->attachSessionData(e.message());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -102,6 +102,7 @@ class ClickHouseCluster:
|
||||
self.with_odbc_drivers = False
|
||||
self.with_hdfs = False
|
||||
self.with_mongo = False
|
||||
self.with_net_trics = False
|
||||
|
||||
self.docker_client = None
|
||||
self.is_up = False
|
||||
@ -136,12 +137,19 @@ class ClickHouseCluster:
|
||||
env_variables=env_variables, image=image, stay_alive=stay_alive, ipv4_address=ipv4_address, ipv6_address=ipv6_address)
|
||||
|
||||
self.instances[name] = instance
|
||||
if ipv4_address is not None or ipv6_address is not None:
|
||||
self.with_net_trics = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
|
||||
|
||||
self.base_cmd.extend(['--file', instance.docker_compose_path])
|
||||
|
||||
cmds = []
|
||||
if with_zookeeper and not self.with_zookeeper:
|
||||
self.with_zookeeper = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
|
||||
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
|
||||
cmds.append(self.base_zookeeper_cmd)
|
||||
|
||||
if with_mysql and not self.with_mysql:
|
||||
self.with_mysql = True
|
||||
@ -149,11 +157,14 @@ class ClickHouseCluster:
|
||||
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
|
||||
|
||||
cmds.append(self.base_mysql_cmd)
|
||||
|
||||
if with_postgres and not self.with_postgres:
|
||||
self.with_postgres = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
|
||||
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
|
||||
cmds.append(self.base_postgres_cmd)
|
||||
|
||||
if with_odbc_drivers and not self.with_odbc_drivers:
|
||||
self.with_odbc_drivers = True
|
||||
@ -162,29 +173,39 @@ class ClickHouseCluster:
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
|
||||
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
|
||||
cmds.append(self.base_mysql_cmd)
|
||||
|
||||
if not self.with_postgres:
|
||||
self.with_postgres = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
|
||||
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
|
||||
cmds.append(self.base_postgres_cmd)
|
||||
|
||||
if with_kafka and not self.with_kafka:
|
||||
self.with_kafka = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')])
|
||||
self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]
|
||||
cmds.append(self.base_kafka_cmd)
|
||||
|
||||
if with_hdfs and not self.with_hdfs:
|
||||
self.with_hdfs = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')])
|
||||
self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
|
||||
cmds.append(self.base_hdfs_cmd)
|
||||
|
||||
if with_mongo and not self.with_mongo:
|
||||
self.with_mongo = True
|
||||
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
|
||||
self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
|
||||
cmds.append(self.base_mongo_cmd)
|
||||
|
||||
if self.with_net_trics:
|
||||
for cmd in cmds:
|
||||
cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
|
||||
|
||||
return instance
|
||||
|
||||
@ -193,6 +214,32 @@ class ClickHouseCluster:
|
||||
# According to how docker-compose names containers.
|
||||
return self.project_name + '_' + instance_name + '_1'
|
||||
|
||||
def _replace(self, path, what, to):
|
||||
with open(path, 'r') as p:
|
||||
data = p.read()
|
||||
data = data.replace(what, to)
|
||||
with open(path, 'w') as p:
|
||||
p.write(data)
|
||||
|
||||
def restart_instance_with_ip_change(self, node, new_ip):
|
||||
if '::' in new_ip:
|
||||
if node.ipv6_address is None:
|
||||
raise Exception("You shoud specity ipv6_address in add_node method")
|
||||
self._replace(node.docker_compose_path, node.ipv6_address, new_ip)
|
||||
node.ipv6_address = new_ip
|
||||
else:
|
||||
if node.ipv4_address is None:
|
||||
raise Exception("You shoud specity ipv4_address in add_node method")
|
||||
self._replace(node.docker_compose_path, node.ipv4_address, new_ip)
|
||||
node.ipv4_address = new_ip
|
||||
subprocess.check_call(self.base_cmd + ["stop", node.name])
|
||||
subprocess.check_call(self.base_cmd + ["rm", "--force", "--stop", node.name])
|
||||
subprocess.check_call(self.base_cmd + ["up", "--force-recreate", "--no-deps", "-d", node.name])
|
||||
node.ip_address = self.get_instance_ip(node.name)
|
||||
node.client = Client(node.ip_address, command=self.client_bin_path)
|
||||
start_deadline = time.time() + 20.0 # seconds
|
||||
node.wait_for_start(start_deadline)
|
||||
return node
|
||||
|
||||
def get_instance_ip(self, instance_name):
|
||||
docker_id = self.get_instance_docker_id(instance_name)
|
||||
@ -397,20 +444,9 @@ services:
|
||||
{app_net}
|
||||
{ipv4_address}
|
||||
{ipv6_address}
|
||||
|
||||
networks:
|
||||
app_net:
|
||||
driver: bridge
|
||||
enable_ipv6: true
|
||||
ipam:
|
||||
driver: default
|
||||
config:
|
||||
- subnet: 10.5.0.0/12
|
||||
gateway: 10.5.1.1
|
||||
- subnet: 2001:3984:3989::/64
|
||||
gateway: 2001:3984:3989::1
|
||||
'''
|
||||
|
||||
|
||||
class ClickHouseInstance:
|
||||
|
||||
def __init__(
|
||||
@ -527,7 +563,7 @@ class ClickHouseInstance:
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.get_docker_handle().stop(self.default_timeout)
|
||||
self.get_docker_handle().stop()
|
||||
|
||||
|
||||
def start(self):
|
||||
@ -708,7 +744,7 @@ class ClickHouseInstance:
|
||||
app_net = ""
|
||||
else:
|
||||
networks = "networks:"
|
||||
app_net = "app_net:"
|
||||
app_net = "default:"
|
||||
if self.ipv4_address is not None:
|
||||
ipv4_address = "ipv4_address: " + self.ipv4_address
|
||||
if self.ipv6_address is not None:
|
||||
|
11
dbms/tests/integration/helpers/docker_compose_net.yml
Normal file
11
dbms/tests/integration/helpers/docker_compose_net.yml
Normal file
@ -0,0 +1,11 @@
|
||||
version: '2.2'
|
||||
networks:
|
||||
default:
|
||||
driver: bridge
|
||||
enable_ipv6: true
|
||||
ipam:
|
||||
config:
|
||||
- subnet: 10.5.0.0/12
|
||||
gateway: 10.5.1.1
|
||||
- subnet: 2001:3984:3989::/64
|
||||
gateway: 2001:3984:3989::1
|
@ -0,0 +1,3 @@
|
||||
<yandex>
|
||||
<listen_host>::</listen_host>
|
||||
</yandex>
|
@ -0,0 +1,14 @@
|
||||
<yandex>
|
||||
<remote_servers>
|
||||
<test_cluster>
|
||||
<shard>
|
||||
<internal_replication>true</internal_replication>
|
||||
<replica>
|
||||
<default_database>shard_0</default_database>
|
||||
<host>node1</host>
|
||||
<port>9000</port>
|
||||
</replica>
|
||||
</shard>
|
||||
</test_cluster>
|
||||
</remote_servers>
|
||||
</yandex>
|
66
dbms/tests/integration/test_host_ip_change/test.py
Normal file
66
dbms/tests/integration/test_host_ip_change/test.py
Normal file
@ -0,0 +1,66 @@
|
||||
import time
|
||||
import pytest
|
||||
|
||||
import subprocess
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111')
|
||||
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112')
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
for node in [node1, node2]:
|
||||
node.query(
|
||||
'''
|
||||
CREATE DATABASE IF NOT EXISTS test;
|
||||
CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', '{}')
|
||||
ORDER BY id PARTITION BY toYYYYMM(date);
|
||||
'''.format(node.name)
|
||||
)
|
||||
|
||||
yield cluster
|
||||
|
||||
except Exception as ex:
|
||||
print ex
|
||||
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
pass
|
||||
|
||||
|
||||
def test_merge_doesnt_work_without_zookeeper(start_cluster):
|
||||
# First we check, that normal replication works
|
||||
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)")
|
||||
assert node1.query("SELECT count(*) from test_table") == "3\n"
|
||||
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3")
|
||||
|
||||
# We change source node ip
|
||||
cluster.restart_instance_with_ip_change(node1, "2001:3984:3989::1:7777")
|
||||
|
||||
# Put some data to source node1
|
||||
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)")
|
||||
# Check that data is placed on node1
|
||||
assert node1.query("SELECT count(*) from test_table") == "6\n"
|
||||
|
||||
# Because of DNS cache dest node2 cannot download data from node1
|
||||
with pytest.raises(Exception):
|
||||
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6")
|
||||
|
||||
# drop DNS cache
|
||||
node2.query("SYSTEM DROP DNS CACHE")
|
||||
# Data is downloaded
|
||||
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6")
|
||||
|
||||
# Just to be sure check one more time
|
||||
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 8)")
|
||||
assert node1.query("SELECT count(*) from test_table") == "7\n"
|
||||
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "7")
|
Loading…
Reference in New Issue
Block a user