From 07ee60b5468155ef4ab2bce5696aa7d805db4281 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 5 Jun 2019 12:23:41 +0300 Subject: [PATCH] Fix bug with pooled HTTP sessions and host ip change --- dbms/src/IO/HTTPCommon.cpp | 20 ++++++++++++++++ dbms/src/IO/ReadWriteBufferFromHTTP.h | 21 ++++++++++++---- dbms/tests/integration/helpers/cluster.py | 24 +++++++++++++++---- .../helpers/docker_compose_net.yml | 11 +++++++++ .../integration/test_host_ip_change/test.py | 21 +++++++++------- 5 files changed, 78 insertions(+), 19 deletions(-) create mode 100644 dbms/tests/integration/helpers/docker_compose_net.yml diff --git a/dbms/src/IO/HTTPCommon.cpp b/dbms/src/IO/HTTPCommon.cpp index f2665049b18..ca5b5ab700b 100644 --- a/dbms/src/IO/HTTPCommon.cpp +++ b/dbms/src/IO/HTTPCommon.cpp @@ -158,6 +158,26 @@ void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeout auto retry_timeout = timeouts.connection_timeout.totalMicroseconds(); auto session = pool_ptr->second->get(retry_timeout); + /// We store exception messages in session data. + /// Poco HTTPSession also stores exception, but it can be removed at any time. + const auto & sessionData = session->sessionData(); + if (!sessionData.empty()) + { + auto msg = Poco::AnyCast(sessionData); + if (!msg.empty()) + { + LOG_TRACE((&Logger::get("HTTPCommon")), "Failed communicating with " << host << " with error '" << msg << "' will try to reconnect session"); + /// Host can change IP + const auto ip = DNSResolver::instance().resolveHost(host).toString(); + if (ip != session->getHost()) + { + session->reset(); + session->setHost(ip); + session->attachSessionData({}); + } + } + } + setTimeouts(*session, timeouts); return session; diff --git a/dbms/src/IO/ReadWriteBufferFromHTTP.h b/dbms/src/IO/ReadWriteBufferFromHTTP.h index 092d6829107..62f2b0351f6 100644 --- a/dbms/src/IO/ReadWriteBufferFromHTTP.h +++ b/dbms/src/IO/ReadWriteBufferFromHTTP.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -69,14 +70,24 @@ namespace detail LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString()); - auto & stream_out = session->sendRequest(request); + try + { + auto & stream_out = session->sendRequest(request); - if (out_stream_callback) - out_stream_callback(stream_out); + if (out_stream_callback) + out_stream_callback(stream_out); - istr = receiveResponse(*session, request, response); + istr = receiveResponse(*session, request, response); - impl = std::make_unique(*istr, buffer_size_); + impl = std::make_unique(*istr, buffer_size_); + } + catch (const Poco::Exception & e) + { + /// We use session data storage as storage for exception text + /// Depend on it we can deduce to reconnect session or reresolve session host + session->attachSessionData(e.message()); + throw; + } } diff --git a/dbms/tests/integration/helpers/cluster.py b/dbms/tests/integration/helpers/cluster.py index 4cec732860e..157ba616246 100644 --- a/dbms/tests/integration/helpers/cluster.py +++ b/dbms/tests/integration/helpers/cluster.py @@ -102,7 +102,7 @@ class ClickHouseCluster: self.with_odbc_drivers = False self.with_hdfs = False self.with_mongo = False - self.with_net = False + self.with_net_trics = False self.docker_client = None self.is_up = False @@ -138,17 +138,18 @@ class ClickHouseCluster: self.instances[name] = instance if ipv4_address is not None or ipv6_address is not None: - self.with_net = True + self.with_net_trics = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')]) self.base_cmd.extend(['--file', instance.docker_compose_path]) + cmds = [] if with_zookeeper and not self.with_zookeeper: self.with_zookeeper = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')]) - if self.with_net: - self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', - self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml'), '--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')] + self.base_zookeeper_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', + self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_zookeeper.yml')] + cmds.append(self.base_zookeeper_cmd) if with_mysql and not self.with_mysql: self.with_mysql = True @@ -156,11 +157,14 @@ class ClickHouseCluster: self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')] + cmds.append(self.base_mysql_cmd) + if with_postgres and not self.with_postgres: self.with_postgres = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]) self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')] + cmds.append(self.base_postgres_cmd) if with_odbc_drivers and not self.with_odbc_drivers: self.with_odbc_drivers = True @@ -169,29 +173,39 @@ class ClickHouseCluster: self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')]) self.base_mysql_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mysql.yml')] + cmds.append(self.base_mysql_cmd) + if not self.with_postgres: self.with_postgres = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')]) self.base_postgres_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_postgres.yml')] + cmds.append(self.base_postgres_cmd) if with_kafka and not self.with_kafka: self.with_kafka = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')]) self.base_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_kafka.yml')] + cmds.append(self.base_kafka_cmd) if with_hdfs and not self.with_hdfs: self.with_hdfs = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')]) self.base_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_hdfs.yml')] + cmds.append(self.base_hdfs_cmd) if with_mongo and not self.with_mongo: self.with_mongo = True self.base_cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')]) self.base_mongo_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.project_name, '--file', p.join(HELPERS_DIR, 'docker_compose_mongo.yml')] + cmds.append(self.base_mongo_cmd) + + if self.with_net_trics: + for cmd in cmds: + cmd.extend(['--file', p.join(HELPERS_DIR, 'docker_compose_net.yml')]) return instance diff --git a/dbms/tests/integration/helpers/docker_compose_net.yml b/dbms/tests/integration/helpers/docker_compose_net.yml new file mode 100644 index 00000000000..4e8cabe5403 --- /dev/null +++ b/dbms/tests/integration/helpers/docker_compose_net.yml @@ -0,0 +1,11 @@ +version: '2.2' +networks: + default: + driver: bridge + enable_ipv6: true + ipam: + config: + - subnet: 10.5.0.0/12 + gateway: 10.5.1.1 + - subnet: 2001:3984:3989::/64 + gateway: 2001:3984:3989::1 diff --git a/dbms/tests/integration/test_host_ip_change/test.py b/dbms/tests/integration/test_host_ip_change/test.py index 7e9bcaeb410..19280720488 100644 --- a/dbms/tests/integration/test_host_ip_change/test.py +++ b/dbms/tests/integration/test_host_ip_change/test.py @@ -7,13 +7,9 @@ from helpers.test_tools import assert_eq_with_retry cluster = ClickHouseCluster(__file__) -#node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True, ipv4_address='10.5.1.233') -#node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True, ipv4_address='10.5.1.122') node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1111') node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/listen_host.xml'], with_zookeeper=True, ipv6_address='2001:3984:3989::1:1112') -#node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True) -#node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], with_zookeeper=True) @pytest.fixture(scope="module") @@ -22,7 +18,6 @@ def start_cluster(): cluster.start() for node in [node1, node2]: - print "Creating tables", node.name node.query( ''' CREATE DATABASE IF NOT EXISTS test; @@ -42,22 +37,30 @@ def start_cluster(): pass -# Test that outdated parts are not removed when they cannot be removed from zookeeper def test_merge_doesnt_work_without_zookeeper(start_cluster): + # First we check, that normal replication works node1.query("INSERT INTO test_table VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)") assert node1.query("SELECT count(*) from test_table") == "3\n" assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3") - print "IP before:", cluster.get_instance_ip("node1") + # We change source node ip cluster.restart_instance_with_ip_change(node1, "2001:3984:3989::1:7777") - print "IP after:", cluster.get_instance_ip("node1") - assert_eq_with_retry(node2, "SELECT count(*) from test_table", "3") + # Put some data to source node1 node1.query("INSERT INTO test_table VALUES ('2018-10-01', 5), ('2018-10-02', 6), ('2018-10-03', 7)") + # Check that data is placed on node1 assert node1.query("SELECT count(*) from test_table") == "6\n" + # Because of DNS cache dest node2 cannot download data from node1 with pytest.raises(Exception): assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6") + # drop DNS cache node2.query("SYSTEM DROP DNS CACHE") + # Data is downloaded assert_eq_with_retry(node2, "SELECT count(*) from test_table", "6") + + # Just to be sure check one more time + node1.query("INSERT INTO test_table VALUES ('2018-10-01', 8)") + assert node1.query("SELECT count(*) from test_table") == "7\n" + assert_eq_with_retry(node2, "SELECT count(*) from test_table", "7")