Fix bug with pooled HTTP sessions and host ip change

This commit is contained in:
alesapin 2019-06-05 12:23:41 +03:00
parent aa39e5a745
commit 07ee60b546
5 changed files with 78 additions and 19 deletions

View File

@ -158,6 +158,26 @@ void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeout
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);
/// We store exception messages in session data.
/// Poco HTTPSession also stores exception, but it can be removed at any time.
const auto & sessionData = session->sessionData();
if (!sessionData.empty())
{
auto msg = Poco::AnyCast<std::string>(sessionData);
if (!msg.empty())
{
LOG_TRACE((&Logger::get("HTTPCommon")), "Failed communicating with " << host << " with error '" << msg << "' will try to reconnect session");
/// Host can change IP
const auto ip = DNSResolver::instance().resolveHost(host).toString();
if (ip != session->getHost())
{
session->reset();
session->setHost(ip);
session->attachSessionData({});
}
}
}
setTimeouts(*session, timeouts);
return session;

View File

@ -6,6 +6,7 @@
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <Poco/Any.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
@ -69,14 +70,24 @@ namespace detail
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
auto & stream_out = session->sendRequest(request);
try
{
auto & stream_out = session->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
if (out_stream_callback)
out_stream_callback(stream_out);
istr = receiveResponse(*session, request, response);
istr = receiveResponse(*session, request, response);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
session->attachSessionData(e.message());
throw;
}
}

View File

@ -102,7 +102,7 @@ class ClickHouseCluster:
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_mongo = False
self.with_net = False
self.with_net_trics = False
self.docker_client = None
self.is_up = False
@ -138,17 +138,18 @@ class ClickHouseCluster:
self.instances[name] = instance
if ipv4_address is not None or ipv6_address is not None:
self.with_net = True
self.with_net_trics = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
self.base_cmd.extend(['--file', instance.docker_compose_path])
cmds = []
if with_zookeeper and not self.with_zookeeper:
self.with_zookeeper = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')])
if self.with_net:
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml'), '--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')]
self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]
cmds.append(self.base_zookeeper_cmd)
if with_mysql and not self.with_mysql:
self.with_mysql = True
@ -156,11 +157,14 @@ class ClickHouseCluster:
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
cmds.append(self.base_mysql_cmd)
if with_postgres and not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
cmds.append(self.base_postgres_cmd)
if with_odbc_drivers and not self.with_odbc_drivers:
self.with_odbc_drivers = True
@ -169,29 +173,39 @@ class ClickHouseCluster:
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')])
self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]
cmds.append(self.base_mysql_cmd)
if not self.with_postgres:
self.with_postgres = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')])
self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]
cmds.append(self.base_postgres_cmd)
if with_kafka and not self.with_kafka:
self.with_kafka = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')])
self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]
cmds.append(self.base_kafka_cmd)
if with_hdfs and not self.with_hdfs:
self.with_hdfs = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')])
self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]
cmds.append(self.base_hdfs_cmd)
if with_mongo and not self.with_mongo:
self.with_mongo = True
self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')])
self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]
cmds.append(self.base_mongo_cmd)
if self.with_net_trics:
for cmd in cmds:
cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')])
return instance

View File

@ -0,0 +1,11 @@
version: '2.2'
networks:
default:
driver: bridge
enable_ipv6: true
ipam:
config:
- subnet: 10.5.0.0/12
gateway: 10.5.1.1
- subnet: 2001:3984:3989::/64
gateway: 2001:3984:3989::1

View File

@ -7,13 +7,9 @@ from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
#node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True, ipv4_address='10.5.1.233')
#node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True, ipv4_address='10.5.1.122')
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111')
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112')
#node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
#node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
@ -22,7 +18,6 @@ def start_cluster():
cluster.start()
for node in [node1, node2]:
print "Creating tables", node.name
node.query(
'''
CREATE DATABASE IF NOT EXISTS test;
@ -42,22 +37,30 @@ def start_cluster():
pass
# Test that outdated parts are not removed when they cannot be removed from zookeeper
def test_merge_doesnt_work_without_zookeeper(start_cluster):
# First we check, that normal replication works
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)")
assert node1.query("SELECT count(*) from test_table") == "3\n"
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3")
print "IP before:", cluster.get_instance_ip("node1")
# We change source node ip
cluster.restart_instance_with_ip_change(node1, "2001:3984:3989::1:7777")
print "IP after:", cluster.get_instance_ip("node1")
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3")
# Put some data to source node1
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)")
# Check that data is placed on node1
assert node1.query("SELECT count(*) from test_table") == "6\n"
# Because of DNS cache dest node2 cannot download data from node1
with pytest.raises(Exception):
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6")
# drop DNS cache
node2.query("SYSTEM DROP DNS CACHE")
# Data is downloaded
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6")
# Just to be sure check one more time
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 8)")
assert node1.query("SELECT count(*) from test_table") == "7\n"
assert_eq_with_retry(node2, "SELECT count(*) from test_table", "7")