diff --git a/dbms/programs/benchmark/Benchmark.cpp b/dbms/programs/benchmark/Benchmark.cpp index 85206f1330a..019080e2391 100644 --- a/dbms/programs/benchmark/Benchmark.cpp +++ b/dbms/programs/benchmark/Benchmark.cpp @@ -54,10 +54,10 @@ public: const String & host_, UInt16 port_, bool secure_, const String & default_database_, const String & user_, const String & password_, const String & stage, bool randomize_, size_t max_iterations_, double max_time_, - const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_) + const String & json_path_, const Settings & settings_) : concurrency(concurrency_), delay(delay_), queue(concurrency), - connections(concurrency, host_, port_, default_database_, user_, password_, timeouts, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable), + connections(concurrency, host_, port_, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable), randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_), json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency) { @@ -240,7 +240,8 @@ private: std::uniform_int_distribution distribution(0, queries.size() - 1); for (size_t i = 0; i < concurrency; ++i) - pool.schedule(std::bind(&Benchmark::thread, this, connections.get())); + pool.schedule(std::bind(&Benchmark::thread, this, + connections.get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); InterruptListener interrupt_listener; info_per_interval.watch.restart(); @@ -310,7 +311,9 @@ private: void execute(ConnectionPool::Entry & connection, Query & query) { Stopwatch watch; - RemoteBlockInputStream stream(*connection, query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage); + RemoteBlockInputStream stream( + *connection, + query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage); Progress progress; stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); @@ -485,7 +488,6 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv) options["iterations"].as(), options["timelimit"].as(), options["json"].as(), - ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings), settings); return benchmark.run(); } diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index 2c421cc20b3..69cdf5b1355 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -204,7 +204,6 @@ private: ConnectionParameters connection_parameters; - void initialize(Poco::Util::Application & self) { Poco::Util::Application::initialize(self); @@ -337,7 +336,7 @@ private: DateLUT::instance(); if (!context.getSettingsRef().use_client_time_zone) { - const auto & time_zone = connection->getServerTimezone(); + const auto & time_zone = connection->getServerTimezone(connection_parameters.timeouts); if (!time_zone.empty()) { try @@ -521,7 +520,6 @@ private: connection_parameters.default_database, connection_parameters.user, connection_parameters.password, - connection_parameters.timeouts, "client", connection_parameters.compression, connection_parameters.security); @@ -537,11 +535,14 @@ private: connection->setThrottler(throttler); } - connection->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch, server_revision); + connection->getServerVersion(connection_parameters.timeouts, + server_name, server_version_major, server_version_minor, server_version_patch, server_revision); server_version = toString(server_version_major) + "." + toString(server_version_minor) + "." + toString(server_version_patch); - if (server_display_name = connection->getServerDisplayName(); server_display_name.length() == 0) + if ( + server_display_name = connection->getServerDisplayName(connection_parameters.timeouts); + server_display_name.length() == 0) { server_display_name = config().getString("host", "localhost"); } @@ -752,7 +753,7 @@ private: } if (!test_hint.checkActual(actual_server_error, actual_client_error, got_exception, last_exception)) - connection->forceConnected(); + connection->forceConnected(connection_parameters.timeouts); if (got_exception && !ignore_error) { @@ -828,7 +829,7 @@ private: if (with_output && with_output->settings_ast) apply_query_settings(*with_output->settings_ast); - connection->forceConnected(); + connection->forceConnected(connection_parameters.timeouts); /// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately. if (insert && !insert->select) @@ -899,7 +900,7 @@ private: /// Process the query that doesn't require transferring data blocks to the server. void processOrdinaryQuery() { - connection->sendQuery(query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true); + connection->sendQuery(connection_parameters.timeouts, query, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true); sendExternalTables(); receiveResult(); } @@ -917,7 +918,7 @@ private: if (!parsed_insert_query.data && (is_interactive || (stdin_is_not_tty && std_in.eof()))) throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); - connection->sendQuery(query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true); + connection->sendQuery(connection_parameters.timeouts, query_without_data, query_id, QueryProcessingStage::Complete, &context.getSettingsRef(), nullptr, true); sendExternalTables(); /// Receive description of table structure. @@ -1064,7 +1065,7 @@ private: bool cancelled = false; // TODO: get the poll_interval from commandline. - const auto receive_timeout = connection->getTimeouts().receive_timeout; + const auto receive_timeout = connection_parameters.timeouts.receive_timeout; constexpr size_t default_poll_interval = 1000000; /// in microseconds constexpr size_t min_poll_interval = 5000; /// in microseconds const size_t poll_interval diff --git a/dbms/programs/client/Suggest.h b/dbms/programs/client/Suggest.h index 6120f875d57..975d0611253 100644 --- a/dbms/programs/client/Suggest.h +++ b/dbms/programs/client/Suggest.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace DB @@ -42,7 +43,7 @@ private: "KILL", "QUERY", "SYNC", "ASYNC", "TEST", "BETWEEN", "TRUNCATE" }; - /// Words are fetched asynchonously. + /// Words are fetched asynchronously. std::thread loading_thread; std::atomic ready{false}; @@ -71,7 +72,7 @@ private: return word; } - void loadImpl(Connection & connection, size_t suggestion_limit) + void loadImpl(Connection & connection, const ConnectionTimeouts & timeouts, size_t suggestion_limit) { std::stringstream query; query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM (" @@ -104,12 +105,12 @@ private: query << ") WHERE notEmpty(res)"; - fetch(connection, query.str()); + fetch(connection, timeouts, query.str()); } - void fetch(Connection & connection, const std::string & query) + void fetch(Connection & connection, const ConnectionTimeouts & timeouts, const std::string & query) { - connection.sendQuery(query); + connection.sendQuery(timeouts, query); while (true) { @@ -175,12 +176,11 @@ public: connection_parameters.default_database, connection_parameters.user, connection_parameters.password, - connection_parameters.timeouts, "client", connection_parameters.compression, connection_parameters.security); - loadImpl(connection, suggestion_limit); + loadImpl(connection, connection_parameters.timeouts, suggestion_limit); } catch (...) { diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 9cd3629192f..0b78e4a54cf 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -54,6 +54,7 @@ #include #include #include +#include #include #include #include @@ -798,13 +799,13 @@ public: } - void discoverShardPartitions(const TaskShardPtr & task_shard) + void discoverShardPartitions(const ConnectionTimeouts & timeouts, const TaskShardPtr & task_shard) { TaskTable & task_table = task_shard->task_table; LOG_INFO(log, "Discover partitions of shard " << task_shard->getDescription()); - auto get_partitions = [&] () { return getShardPartitions(*task_shard); }; + auto get_partitions = [&] () { return getShardPartitions(timeouts, *task_shard); }; auto existing_partitions_names = retry(get_partitions, 60); Strings filtered_partitions_names; Strings missing_partitions; @@ -880,14 +881,14 @@ public: } /// Compute set of partitions, assume set of partitions aren't changed during the processing - void discoverTablePartitions(TaskTable & task_table, UInt64 num_threads = 0) + void discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads = 0) { /// Fetch partitions list from a shard { ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores()); for (const TaskShardPtr & task_shard : task_table.all_shards) - thread_pool.schedule([this, task_shard]() { discoverShardPartitions(task_shard); }); + thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); }); LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs"); thread_pool.wait(); @@ -955,7 +956,7 @@ public: task_descprtion_current_version = version_to_update; } - void process() + void process(const ConnectionTimeouts & timeouts) { for (TaskTable & task_table : task_cluster->table_tasks) { @@ -969,7 +970,7 @@ public: if (!task_table.has_enabled_partitions) { /// If there are no specified enabled_partitions, we must discover them manually - discoverTablePartitions(task_table); + discoverTablePartitions(timeouts, task_table); /// After partitions of each shard are initialized, initialize cluster partitions for (const TaskShardPtr & task_shard : task_table.all_shards) @@ -1009,7 +1010,7 @@ public: bool table_is_done = false; for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries) { - if (tryProcessTable(task_table)) + if (tryProcessTable(timeouts, task_table)) { table_is_done = true; break; @@ -1053,8 +1054,10 @@ protected: return getWorkersPath() + "/" + host_id; } - zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper, - const String & description, bool unprioritized) + zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed( + const zkutil::ZooKeeperPtr & zookeeper, + const String & description, + bool unprioritized) { std::chrono::milliseconds current_sleep_time = default_sleep_time; static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec @@ -1329,7 +1332,7 @@ protected: static constexpr UInt64 max_table_tries = 1000; static constexpr UInt64 max_shard_partition_tries = 600; - bool tryProcessTable(TaskTable & task_table) + bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table) { /// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint bool previous_shard_is_instantly_finished = false; @@ -1360,7 +1363,7 @@ protected: /// If not, did we check existence of that partition previously? if (shard->checked_partitions.count(partition_name) == 0) { - auto check_shard_has_partition = [&] () { return checkShardHasPartition(*shard, partition_name); }; + auto check_shard_has_partition = [&] () { return checkShardHasPartition(timeouts, *shard, partition_name); }; bool has_partition = retry(check_shard_has_partition); shard->checked_partitions.emplace(partition_name); @@ -1397,7 +1400,7 @@ protected: bool was_error = false; for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num) { - task_status = tryProcessPartitionTask(partition, is_unprioritized_task); + task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task); /// Exit if success if (task_status == PartitionTaskStatus::Finished) @@ -1483,13 +1486,13 @@ protected: Error, }; - PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition, bool is_unprioritized_task) + PartitionTaskStatus tryProcessPartitionTask(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task) { PartitionTaskStatus res; try { - res = processPartitionTaskImpl(task_partition, is_unprioritized_task); + res = processPartitionTaskImpl(timeouts, task_partition, is_unprioritized_task); } catch (...) { @@ -1510,7 +1513,7 @@ protected: return res; } - PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition, bool is_unprioritized_task) + PartitionTaskStatus processPartitionTaskImpl(const ConnectionTimeouts & timeouts, ShardPartition & task_partition, bool is_unprioritized_task) { TaskShard & task_shard = task_partition.task_shard; TaskTable & task_table = task_shard.task_table; @@ -1611,7 +1614,7 @@ protected: zookeeper->createAncestors(current_task_status_path); /// We need to update table definitions for each partition, it could be changed after ALTER - createShardInternalTables(task_shard); + createShardInternalTables(timeouts, task_shard); /// Check that destination partition is empty if we are first worker /// NOTE: this check is incorrect if pull and push tables have different partition key! @@ -1828,23 +1831,25 @@ protected: return typeid_cast(*block.safeGetByPosition(0).column).getDataAt(0).toString(); } - ASTPtr getCreateTableForPullShard(TaskShard & task_shard) + ASTPtr getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard) { /// Fetch and parse (possibly) new definition - auto connection_entry = task_shard.info.pool->get(&task_cluster->settings_pull); - String create_query_pull_str = getRemoteCreateTable(task_shard.task_table.table_pull, *connection_entry, - &task_cluster->settings_pull); + auto connection_entry = task_shard.info.pool->get(timeouts, &task_cluster->settings_pull); + String create_query_pull_str = getRemoteCreateTable( + task_shard.task_table.table_pull, + *connection_entry, + &task_cluster->settings_pull); ParserCreateQuery parser_create_query; return parseQuery(parser_create_query, create_query_pull_str, 0); } - void createShardInternalTables(TaskShard & task_shard, bool create_split = true) + void createShardInternalTables(const ConnectionTimeouts & timeouts, TaskShard & task_shard, bool create_split = true) { TaskTable & task_table = task_shard.task_table; /// We need to update table definitions for each part, it could be changed after ALTER - task_shard.current_pull_table_create_query = getCreateTableForPullShard(task_shard); + task_shard.current_pull_table_create_query = getCreateTableForPullShard(timeouts, task_shard); /// Create local Distributed tables: /// a table fetching data from current shard and a table inserting data to the whole destination cluster @@ -1872,9 +1877,9 @@ protected: } - std::set getShardPartitions(TaskShard & task_shard) + std::set getShardPartitions(const ConnectionTimeouts & timeouts, TaskShard & task_shard) { - createShardInternalTables(task_shard, false); + createShardInternalTables(timeouts, task_shard, false); TaskTable & task_table = task_shard.task_table; @@ -1914,9 +1919,9 @@ protected: return res; } - bool checkShardHasPartition(TaskShard & task_shard, const String & partition_quoted_name) + bool checkShardHasPartition(const ConnectionTimeouts & timeouts, TaskShard & task_shard, const String & partition_quoted_name) { - createShardInternalTables(task_shard, false); + createShardInternalTables(timeouts, task_shard, false); TaskTable & task_table = task_shard.task_table; @@ -1998,7 +2003,8 @@ protected: Settings current_settings = settings ? *settings : task_cluster->settings_common; current_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1; - auto connections = shard.pool->getMany(¤t_settings, pool_mode); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings).getSaturated(current_settings.max_execution_time); + auto connections = shard.pool->getMany(timeouts, ¤t_settings, pool_mode); for (auto & connection : connections) { @@ -2187,7 +2193,7 @@ void ClusterCopierApp::mainImpl() copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false)); copier->init(); - copier->process(); + copier->process(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(context->getSettingsRef())); } diff --git a/dbms/programs/performance-test/PerformanceTest.cpp b/dbms/programs/performance-test/PerformanceTest.cpp index 56ac11284e1..c2d8d4f252c 100644 --- a/dbms/programs/performance-test/PerformanceTest.cpp +++ b/dbms/programs/performance-test/PerformanceTest.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,7 @@ namespace void waitQuery(Connection & connection) { bool finished = false; + while (true) { if (!connection.poll(1000000)) @@ -50,12 +52,14 @@ namespace fs = boost::filesystem; PerformanceTest::PerformanceTest( const XMLConfigurationPtr & config_, Connection & connection_, + const ConnectionTimeouts & timeouts_, InterruptListener & interrupt_listener_, const PerformanceTestInfo & test_info_, Context & context_, const std::vector & queries_to_run_) : config(config_) , connection(connection_) + , timeouts(timeouts_) , interrupt_listener(interrupt_listener_) , test_info(test_info_) , context(context_) @@ -108,7 +112,7 @@ bool PerformanceTest::checkPreconditions() const size_t exist = 0; - connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false); + connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false); while (true) { @@ -188,7 +192,7 @@ void PerformanceTest::prepare() const for (const auto & query : test_info.create_and_fill_queries) { LOG_INFO(log, "Executing create or fill query \"" << query << '\"'); - connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false); + connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false); waitQuery(connection); LOG_INFO(log, "Query finished"); } @@ -200,7 +204,7 @@ void PerformanceTest::finish() const for (const auto & query : test_info.drop_queries) { LOG_INFO(log, "Executing drop query \"" << query << '\"'); - connection.sendQuery(query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false); + connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &test_info.settings, nullptr, false); waitQuery(connection); LOG_INFO(log, "Query finished"); } diff --git a/dbms/programs/performance-test/PerformanceTest.h b/dbms/programs/performance-test/PerformanceTest.h index 6368c1f1040..961a348d099 100644 --- a/dbms/programs/performance-test/PerformanceTest.h +++ b/dbms/programs/performance-test/PerformanceTest.h @@ -5,6 +5,7 @@ #include #include +#include #include "PerformanceTestInfo.h" namespace DB @@ -20,6 +21,7 @@ public: PerformanceTest( const XMLConfigurationPtr & config_, Connection & connection_, + const ConnectionTimeouts & timeouts_, InterruptListener & interrupt_listener_, const PerformanceTestInfo & test_info_, Context & context_, @@ -45,6 +47,7 @@ private: private: XMLConfigurationPtr config; Connection & connection; + const ConnectionTimeouts & timeouts; InterruptListener & interrupt_listener; PerformanceTestInfo test_info; diff --git a/dbms/programs/performance-test/PerformanceTestSuite.cpp b/dbms/programs/performance-test/PerformanceTestSuite.cpp index ef0ee715c49..cfa7d202d1d 100644 --- a/dbms/programs/performance-test/PerformanceTestSuite.cpp +++ b/dbms/programs/performance-test/PerformanceTestSuite.cpp @@ -72,10 +72,11 @@ public: Strings && tests_names_regexp_, Strings && skip_names_regexp_, const std::unordered_map> query_indexes_, - const ConnectionTimeouts & timeouts) + const ConnectionTimeouts & timeouts_) : connection(host_, port_, default_database_, user_, - password_, timeouts, "performance-test", Protocol::Compression::Enable, + password_, "performance-test", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable) + , timeouts(timeouts_) , tests_tags(std::move(tests_tags_)) , tests_names(std::move(tests_names_)) , tests_names_regexp(std::move(tests_names_regexp_)) @@ -100,7 +101,7 @@ public: UInt64 version_minor; UInt64 version_patch; UInt64 version_revision; - connection.getServerVersion(name, version_major, version_minor, version_patch, version_revision); + connection.getServerVersion(timeouts, name, version_major, version_minor, version_patch, version_revision); std::stringstream ss; ss << version_major << "." << version_minor << "." << version_patch; @@ -115,6 +116,7 @@ public: private: Connection connection; + const ConnectionTimeouts & timeouts; const Strings & tests_tags; const Strings & tests_names; @@ -195,7 +197,7 @@ private: { PerformanceTestInfo info(test_config, profiles_file, global_context.getSettingsRef()); LOG_INFO(log, "Config for test '" << info.test_name << "' parsed"); - PerformanceTest current(test_config, connection, interrupt_listener, info, global_context, query_indexes[info.path]); + PerformanceTest current(test_config, connection, timeouts, interrupt_listener, info, global_context, query_indexes[info.path]); if (current.checkPreconditions()) { diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index a76803baef2..9651ef54e1b 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -48,7 +48,7 @@ namespace ErrorCodes } -void Connection::connect() +void Connection::connect(const ConnectionTimeouts & timeouts) { try { @@ -230,10 +230,15 @@ UInt16 Connection::getPort() const return port; } -void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision) +void Connection::getServerVersion(const ConnectionTimeouts & timeouts, + String & name, + UInt64 & version_major, + UInt64 & version_minor, + UInt64 & version_patch, + UInt64 & revision) { if (!connected) - connect(); + connect(timeouts); name = server_name; version_major = server_version_major; @@ -242,40 +247,40 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64 revision = server_revision; } -UInt64 Connection::getServerRevision() +UInt64 Connection::getServerRevision(const ConnectionTimeouts & timeouts) { if (!connected) - connect(); + connect(timeouts); return server_revision; } -const String & Connection::getServerTimezone() +const String & Connection::getServerTimezone(const ConnectionTimeouts & timeouts) { if (!connected) - connect(); + connect(timeouts); return server_timezone; } -const String & Connection::getServerDisplayName() +const String & Connection::getServerDisplayName(const ConnectionTimeouts & timeouts) { if (!connected) - connect(); + connect(timeouts); return server_display_name; } -void Connection::forceConnected() +void Connection::forceConnected(const ConnectionTimeouts & timeouts) { if (!connected) { - connect(); + connect(timeouts); } else if (!ping()) { LOG_TRACE(log_wrapper.get(), "Connection was closed, will reconnect."); - connect(); + connect(timeouts); } } @@ -318,10 +323,11 @@ bool Connection::ping() return true; } -TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & request) +TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & timeouts, + const TablesStatusRequest & request) { if (!connected) - connect(); + connect(timeouts); TimeoutSetter timeout_setter(*socket, sync_request_timeout, true); @@ -344,6 +350,7 @@ TablesStatusResponse Connection::getTablesStatus(const TablesStatusRequest & req void Connection::sendQuery( + const ConnectionTimeouts & timeouts, const String & query, const String & query_id_, UInt64 stage, @@ -352,7 +359,9 @@ void Connection::sendQuery( bool with_pending_data) { if (!connected) - connect(); + connect(timeouts); + + TimeoutSetter timeout_setter(*socket, timeouts.send_timeout, timeouts.receive_timeout, true); if (settings) { diff --git a/dbms/src/Client/Connection.h b/dbms/src/Client/Connection.h index 4139bcacae2..2338e4c8965 100644 --- a/dbms/src/Client/Connection.h +++ b/dbms/src/Client/Connection.h @@ -57,7 +57,6 @@ public: Connection(const String & host_, UInt16 port_, const String & default_database_, const String & user_, const String & password_, - const ConnectionTimeouts & timeouts_, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Secure secure_ = Protocol::Secure::Disable, @@ -68,7 +67,6 @@ public: client_name(client_name_), compression(compression_), secure(secure_), - timeouts(timeouts_), sync_request_timeout(sync_request_timeout_), log_wrapper(*this) { @@ -106,11 +104,16 @@ public: /// Change default database. Changes will take effect on next reconnect. void setDefaultDatabase(const String & database); - void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision); - UInt64 getServerRevision(); + void getServerVersion(const ConnectionTimeouts & timeouts, + String & name, + UInt64 & version_major, + UInt64 & version_minor, + UInt64 & version_patch, + UInt64 & revision); + UInt64 getServerRevision(const ConnectionTimeouts & timeouts); - const String & getServerTimezone(); - const String & getServerDisplayName(); + const String & getServerTimezone(const ConnectionTimeouts & timeouts); + const String & getServerDisplayName(const ConnectionTimeouts & timeouts); /// For log and exception messages. const String & getDescription() const; @@ -118,14 +121,9 @@ public: UInt16 getPort() const; const String & getDefaultDatabase() const; - /// For proper polling. - inline const auto & getTimeouts() const - { - return timeouts; - } - /// If last flag is true, you need to call sendExternalTablesData after. void sendQuery( + const ConnectionTimeouts & timeouts, const String & query, const String & query_id_ = "", UInt64 stage = QueryProcessingStage::Complete, @@ -156,9 +154,10 @@ public: Packet receivePacket(); /// If not connected yet, or if connection is broken - then connect. If cannot connect - throw an exception. - void forceConnected(); + void forceConnected(const ConnectionTimeouts & timeouts); - TablesStatusResponse getTablesStatus(const TablesStatusRequest & request); + TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts, + const TablesStatusRequest & request); /** Disconnect. * This may be used, if connection is left in unsynchronised state @@ -216,7 +215,6 @@ private: */ ThrottlerPtr throttler; - ConnectionTimeouts timeouts; Poco::Timespan sync_request_timeout; /// From where to read query execution result. @@ -252,7 +250,7 @@ private: LoggerWrapper log_wrapper; - void connect(); + void connect(const ConnectionTimeouts & timeouts); void sendHello(); void receiveHello(); bool ping(); diff --git a/dbms/src/Client/ConnectionPool.h b/dbms/src/Client/ConnectionPool.h index 83d527a9e28..d18be29b2b3 100644 --- a/dbms/src/Client/ConnectionPool.h +++ b/dbms/src/Client/ConnectionPool.h @@ -30,7 +30,9 @@ public: /// Selects the connection to work. /// If force_connected is false, the client must manually ensure that returned connection is good. - virtual Entry get(const Settings * settings = nullptr, bool force_connected = true) = 0; + virtual Entry get(const ConnectionTimeouts & timeouts, + const Settings * settings = nullptr, + bool force_connected = true) = 0; }; using ConnectionPoolPtr = std::shared_ptr; @@ -50,7 +52,6 @@ public: const String & default_database_, const String & user_, const String & password_, - const ConnectionTimeouts & timeouts, const String & client_name_ = "client", Protocol::Compression compression_ = Protocol::Compression::Enable, Protocol::Secure secure_ = Protocol::Secure::Disable) @@ -63,12 +64,13 @@ public: password(password_), client_name(client_name_), compression(compression_), - secure{secure_}, - timeouts(timeouts) + secure{secure_} { } - Entry get(const Settings * settings = nullptr, bool force_connected = true) override + Entry get(const ConnectionTimeouts & timeouts, + const Settings * settings = nullptr, + bool force_connected = true) override { Entry entry; if (settings) @@ -77,7 +79,7 @@ public: entry = Base::get(-1); if (force_connected) - entry->forceConnected(); + entry->forceConnected(timeouts); return entry; } @@ -93,7 +95,7 @@ protected: { return std::make_shared( host, port, - default_database, user, password, timeouts, + default_database, user, password, client_name, compression, secure); } @@ -108,7 +110,6 @@ private: Protocol::Compression compression; /// Whether to compress data when interacting with the server. Protocol::Secure secure; /// Whether to encrypt data when interacting with the server. - ConnectionTimeouts timeouts; }; } diff --git a/dbms/src/Client/ConnectionPoolWithFailover.cpp b/dbms/src/Client/ConnectionPoolWithFailover.cpp index 9c12ed31560..f746d3f074e 100644 --- a/dbms/src/Client/ConnectionPoolWithFailover.cpp +++ b/dbms/src/Client/ConnectionPoolWithFailover.cpp @@ -8,6 +8,8 @@ #include #include +#include + namespace ProfileEvents { @@ -29,9 +31,8 @@ namespace ErrorCodes ConnectionPoolWithFailover::ConnectionPoolWithFailover( ConnectionPoolPtrs nested_pools_, LoadBalancing load_balancing, - size_t max_tries_, time_t decrease_error_period_) - : Base(std::move(nested_pools_), max_tries_, decrease_error_period_, &Logger::get("ConnectionPoolWithFailover")) + : Base(std::move(nested_pools_), decrease_error_period_, &Logger::get("ConnectionPoolWithFailover")) , default_load_balancing(load_balancing) { const std::string & local_hostname = getFQDNOrHostName(); @@ -44,11 +45,13 @@ ConnectionPoolWithFailover::ConnectionPoolWithFailover( } } -IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings, bool /*force_connected*/) +IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts, + const Settings * settings, + bool /*force_connected*/) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) { - return tryGetEntry(pool, fail_message, settings); + return tryGetEntry(pool, timeouts, fail_message, settings); }; GetPriorityFunc get_priority; @@ -70,11 +73,13 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const Settings * settings return Base::get(try_get_entry, get_priority); } -std::vector ConnectionPoolWithFailover::getMany(const Settings * settings, PoolMode pool_mode) +std::vector ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts, + const Settings * settings, + PoolMode pool_mode) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) { - return tryGetEntry(pool, fail_message, settings); + return tryGetEntry(pool, timeouts, fail_message, settings); }; std::vector results = getManyImpl(settings, pool_mode, try_get_entry); @@ -86,22 +91,27 @@ std::vector ConnectionPoolWithFailover::getMany(const Se return entries; } -std::vector ConnectionPoolWithFailover::getManyForTableFunction(const Settings * settings, PoolMode pool_mode) +std::vector ConnectionPoolWithFailover::getManyForTableFunction( + const ConnectionTimeouts & timeouts, + const Settings * settings, + PoolMode pool_mode) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) { - return tryGetEntry(pool, fail_message, settings); + return tryGetEntry(pool, timeouts, fail_message, settings); }; return getManyImpl(settings, pool_mode, try_get_entry); } std::vector ConnectionPoolWithFailover::getManyChecked( - const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check) + const ConnectionTimeouts & timeouts, + const Settings * settings, PoolMode pool_mode, + const QualifiedTableName & table_to_check) { TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message) { - return tryGetEntry(pool, fail_message, settings, &table_to_check); + return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check); }; return getManyImpl(settings, pool_mode, try_get_entry); @@ -113,6 +123,9 @@ std::vector ConnectionPoolWithFailover::g const TryGetEntryFunc & try_get_entry) { size_t min_entries = (settings && settings->skip_unavailable_shards) ? 0 : 1; + size_t max_tries = (settings ? + size_t{settings->connections_with_failover_max_tries} : + size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES}); size_t max_entries; if (pool_mode == PoolMode::GET_ALL) { @@ -144,12 +157,13 @@ std::vector ConnectionPoolWithFailover::g bool fallback_to_stale_replicas = settings ? bool(settings->fallback_to_stale_replicas_for_distributed_queries) : true; - return Base::getMany(min_entries, max_entries, try_get_entry, get_priority, fallback_to_stale_replicas); + return Base::getMany(min_entries, max_entries, max_tries, try_get_entry, get_priority, fallback_to_stale_replicas); } ConnectionPoolWithFailover::TryResult ConnectionPoolWithFailover::tryGetEntry( IConnectionPool & pool, + const ConnectionTimeouts & timeouts, std::string & fail_message, const Settings * settings, const QualifiedTableName * table_to_check) @@ -157,15 +171,15 @@ ConnectionPoolWithFailover::tryGetEntry( TryResult result; try { - result.entry = pool.get(settings, /* force_connected = */ false); + result.entry = pool.get(timeouts, settings, /* force_connected = */ false); UInt64 server_revision = 0; if (table_to_check) - server_revision = result.entry->getServerRevision(); + server_revision = result.entry->getServerRevision(timeouts); if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS) { - result.entry->forceConnected(); + result.entry->forceConnected(timeouts); result.is_usable = true; result.is_up_to_date = true; return result; @@ -176,7 +190,7 @@ ConnectionPoolWithFailover::tryGetEntry( TablesStatusRequest status_request; status_request.tables.emplace(*table_to_check); - TablesStatusResponse status_response = result.entry->getTablesStatus(status_request); + TablesStatusResponse status_response = result.entry->getTablesStatus(timeouts, status_request); auto table_status_it = status_response.table_states_by_id.find(*table_to_check); if (table_status_it == status_response.table_states_by_id.end()) { diff --git a/dbms/src/Client/ConnectionPoolWithFailover.h b/dbms/src/Client/ConnectionPoolWithFailover.h index 62ca75859ba..968d751fb92 100644 --- a/dbms/src/Client/ConnectionPoolWithFailover.h +++ b/dbms/src/Client/ConnectionPoolWithFailover.h @@ -34,21 +34,24 @@ public: ConnectionPoolWithFailover( ConnectionPoolPtrs nested_pools_, LoadBalancing load_balancing, - size_t max_tries_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD); using Entry = IConnectionPool::Entry; /** Allocates connection to work. */ - Entry get(const Settings * settings = nullptr, bool force_connected = true) override; /// From IConnectionPool + Entry get(const ConnectionTimeouts & timeouts, + const Settings * settings = nullptr, + bool force_connected = true) override; /// From IConnectionPool /** Allocates up to the specified number of connections to work. * Connections provide access to different replicas of one shard. */ - std::vector getMany(const Settings * settings, PoolMode pool_mode); + std::vector getMany(const ConnectionTimeouts & timeouts, + const Settings * settings, PoolMode pool_mode); /// The same as getMany(), but return std::vector. - std::vector getManyForTableFunction(const Settings * settings, PoolMode pool_mode); + std::vector getManyForTableFunction(const ConnectionTimeouts & timeouts, + const Settings * settings, PoolMode pool_mode); using Base = PoolWithFailoverBase; using TryResult = Base::TryResult; @@ -56,7 +59,10 @@ public: /// The same as getMany(), but check that replication delay for table_to_check is acceptable. /// Delay threshold is taken from settings. std::vector getManyChecked( - const Settings * settings, PoolMode pool_mode, const QualifiedTableName & table_to_check); + const ConnectionTimeouts & timeouts, + const Settings * settings, + PoolMode pool_mode, + const QualifiedTableName & table_to_check); private: /// Get the values of relevant settings and call Base::getMany() @@ -70,6 +76,7 @@ private: /// for this table is not too large. TryResult tryGetEntry( IConnectionPool & pool, + const ConnectionTimeouts & timeouts, std::string & fail_message, const Settings * settings, const QualifiedTableName * table_to_check = nullptr); diff --git a/dbms/src/Client/MultiplexedConnections.cpp b/dbms/src/Client/MultiplexedConnections.cpp index 4c6e8fa72df..5c05ee9c5f5 100644 --- a/dbms/src/Client/MultiplexedConnections.cpp +++ b/dbms/src/Client/MultiplexedConnections.cpp @@ -1,4 +1,5 @@ #include +#include namespace DB { @@ -73,6 +74,7 @@ void MultiplexedConnections::sendExternalTablesData(std::vectorgetServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) + if (replica.connection->getServerRevision(timeouts) < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) { /// Disable two-level aggregation due to version incompatibility. modified_settings.group_by_two_level_threshold = 0; @@ -107,13 +109,15 @@ void MultiplexedConnections::sendQuery( for (size_t i = 0; i < num_replicas; ++i) { modified_settings.parallel_replica_offset = i; - replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data); + replica_states[i].connection->sendQuery(timeouts, query, query_id, + stage, &modified_settings, client_info, with_pending_data); } } else { /// Use single replica. - replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data); + replica_states[0].connection->sendQuery(timeouts, query, query_id, stage, + &modified_settings, client_info, with_pending_data); } sent_query = true; diff --git a/dbms/src/Client/MultiplexedConnections.h b/dbms/src/Client/MultiplexedConnections.h index 074a8c8d981..b8567dcd979 100644 --- a/dbms/src/Client/MultiplexedConnections.h +++ b/dbms/src/Client/MultiplexedConnections.h @@ -1,9 +1,10 @@ #pragma once +#include #include #include #include -#include +#include namespace DB { @@ -31,6 +32,7 @@ public: /// Send request to replicas. void sendQuery( + const ConnectionTimeouts & timeouts, const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete, diff --git a/dbms/src/Common/PoolWithFailoverBase.h b/dbms/src/Common/PoolWithFailoverBase.h index eb0fd051f9c..917b9328249 100644 --- a/dbms/src/Common/PoolWithFailoverBase.h +++ b/dbms/src/Common/PoolWithFailoverBase.h @@ -55,11 +55,9 @@ public: PoolWithFailoverBase( NestedPools nested_pools_, - size_t max_tries_, time_t decrease_error_period_, Logger * log_) : nested_pools(std::move(nested_pools_)) - , max_tries(max_tries_) , decrease_error_period(decrease_error_period_) , shared_pool_states(nested_pools.size()) , log(log_) @@ -108,7 +106,7 @@ public: /// The method will throw if it is unable to get min_entries alive connections or /// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas. std::vector getMany( - size_t min_entries, size_t max_entries, + size_t min_entries, size_t max_entries, size_t max_tries, const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority = GetPriorityFunc(), bool fallback_to_stale_replicas = true); @@ -125,8 +123,6 @@ protected: NestedPools nested_pools; - const size_t max_tries; - const time_t decrease_error_period; std::mutex pool_states_mutex; @@ -141,7 +137,7 @@ template typename TNestedPool::Entry PoolWithFailoverBase::get(const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority) { - std::vector results = getMany(1, 1, try_get_entry, get_priority); + std::vector results = getMany(1, 1, 1, try_get_entry, get_priority); if (results.empty() || results[0].entry.isNull()) throw DB::Exception( "PoolWithFailoverBase::getMany() returned less than min_entries entries.", @@ -152,7 +148,7 @@ PoolWithFailoverBase::get(const TryGetEntryFunc & try_get_entry, co template std::vector::TryResult> PoolWithFailoverBase::getMany( - size_t min_entries, size_t max_entries, + size_t min_entries, size_t max_entries, size_t max_tries, const TryGetEntryFunc & try_get_entry, const GetPriorityFunc & get_priority, bool fallback_to_stale_replicas) @@ -192,7 +188,7 @@ PoolWithFailoverBase::getMany( size_t up_to_date_count = 0; size_t failed_pools_count = 0; - /// At exit update shared error counts with error counts occured during this call. + /// At exit update shared error counts with error counts occurred during this call. SCOPE_EXIT( { std::lock_guard lock(pool_states_mutex); diff --git a/dbms/src/DataStreams/RemoteBlockInputStream.cpp b/dbms/src/DataStreams/RemoteBlockInputStream.cpp index e8a29880aad..740e60ffb09 100644 --- a/dbms/src/DataStreams/RemoteBlockInputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockInputStream.cpp @@ -7,6 +7,8 @@ #include #include +#include + namespace DB { @@ -61,17 +63,17 @@ RemoteBlockInputStream::RemoteBlockInputStream( create_multiplexed_connections = [this, pool, throttler]() { const Settings & current_settings = context.getSettingsRef(); - + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); std::vector connections; if (main_table) { - auto try_results = pool->getManyChecked(¤t_settings, pool_mode, *main_table); + auto try_results = pool->getManyChecked(timeouts, ¤t_settings, pool_mode, *main_table); connections.reserve(try_results.size()); for (auto & try_result : try_results) connections.emplace_back(std::move(try_result.entry)); } else - connections = pool->getMany(¤t_settings, pool_mode); + connections = pool->getMany(timeouts, ¤t_settings, pool_mode); return std::make_unique( std::move(connections), current_settings, throttler); @@ -283,12 +285,14 @@ void RemoteBlockInputStream::sendQuery() { multiplexed_connections = create_multiplexed_connections(); - if (context.getSettingsRef().skip_unavailable_shards && 0 == multiplexed_connections->size()) + const auto& settings = context.getSettingsRef(); + if (settings.skip_unavailable_shards && 0 == multiplexed_connections->size()) return; established = true; - multiplexed_connections->sendQuery(query, "", stage, &context.getClientInfo(), true); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); + multiplexed_connections->sendQuery(timeouts, query, "", stage, &context.getClientInfo(), true); established = false; sent_query = true; diff --git a/dbms/src/DataStreams/RemoteBlockInputStream.h b/dbms/src/DataStreams/RemoteBlockInputStream.h index 7f8626398dc..3cef2099030 100644 --- a/dbms/src/DataStreams/RemoteBlockInputStream.h +++ b/dbms/src/DataStreams/RemoteBlockInputStream.h @@ -96,6 +96,7 @@ private: const String query; Context context; + /// Temporary tables needed to be sent to remote servers Tables external_tables; QueryProcessingStage::Enum stage; @@ -118,7 +119,7 @@ private: */ std::atomic finished { false }; - /** Cancel query request was sent to all replicas beacuse data is not needed anymore + /** Cancel query request was sent to all replicas because data is not needed anymore * This behaviour may occur when: * - data size is already satisfactory (when using LIMIT, for example) * - an exception was thrown from client side diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp index ff5fc75f1c4..2f93d88ac7c 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.cpp +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB @@ -18,13 +19,16 @@ namespace ErrorCodes } -RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_) +RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_, + const ConnectionTimeouts & timeouts, + const String & query_, + const Settings * settings_) : connection(connection_), query(query_), settings(settings_) { /** Send query and receive "header", that describe table structure. * Header is needed to know, what structure is required for blocks to be passed to 'write' method. */ - connection.sendQuery(query, "", QueryProcessingStage::Complete, settings, nullptr); + connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, settings, nullptr); while (true) { diff --git a/dbms/src/DataStreams/RemoteBlockOutputStream.h b/dbms/src/DataStreams/RemoteBlockOutputStream.h index 41740c39837..8887277c657 100644 --- a/dbms/src/DataStreams/RemoteBlockOutputStream.h +++ b/dbms/src/DataStreams/RemoteBlockOutputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -18,7 +19,10 @@ struct Settings; class RemoteBlockOutputStream : public IBlockOutputStream { public: - RemoteBlockOutputStream(Connection & connection_, const String & query_, const Settings * settings_ = nullptr); + RemoteBlockOutputStream(Connection & connection_, + const ConnectionTimeouts & timeouts, + const String & query_, + const Settings * settings_ = nullptr); Block getHeader() const override { return header; } diff --git a/dbms/src/DataStreams/UnionBlockInputStream.h b/dbms/src/DataStreams/UnionBlockInputStream.h index 4eea1817cfb..c4e84e85845 100644 --- a/dbms/src/DataStreams/UnionBlockInputStream.h +++ b/dbms/src/DataStreams/UnionBlockInputStream.h @@ -43,10 +43,13 @@ private: public: using ExceptionCallback = std::function; - UnionBlockInputStream(BlockInputStreams inputs, BlockInputStreamPtr additional_input_at_end, size_t max_threads, - ExceptionCallback exception_callback_ = ExceptionCallback()) : - output_queue(std::min(inputs.size(), max_threads)), - handler(*this), + UnionBlockInputStream( + BlockInputStreams inputs, + BlockInputStreamPtr additional_input_at_end, + size_t max_threads, + ExceptionCallback exception_callback_ = ExceptionCallback() + ) : + output_queue(std::min(inputs.size(), max_threads)), handler(*this), processor(inputs, additional_input_at_end, max_threads, handler), exception_callback(exception_callback_) { diff --git a/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp b/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp index cd609bfc70a..3703d11d832 100644 --- a/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp +++ b/dbms/src/Dictionaries/ClickHouseDictionarySource.cpp @@ -30,10 +30,8 @@ static ConnectionPoolWithFailoverPtr createPool( bool secure, const std::string & db, const std::string & user, - const std::string & password, - const Context & context) + const std::string & password) { - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(context.getSettingsRef()); ConnectionPoolPtrs pools; pools.emplace_back(std::make_shared( MAX_CONNECTIONS, @@ -42,7 +40,6 @@ static ConnectionPoolWithFailoverPtr createPool( db, user, password, - timeouts, "ClickHouseDictionarySource", Protocol::Compression::Enable, secure ? Protocol::Secure::Enable : Protocol::Secure::Disable)); @@ -72,7 +69,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource( , sample_block{sample_block} , context(context_) , is_local{isLocalAddress({host, port}, context.getTCPPort())} - , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)} + , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)} , load_all_query{query_builder.composeLoadAllQuery()} { /// We should set user info even for the case when the dictionary is loaded in-process (without TCP communication). @@ -98,7 +95,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar , sample_block{other.sample_block} , context(other.context) , is_local{other.is_local} - , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password, context)} + , pool{is_local ? nullptr : createPool(host, port, secure, db, user, password)} , load_all_query{other.load_all_query} { } @@ -179,6 +176,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con { if (is_local) return executeQuery(query, context, true).in; + return std::make_shared(pool, query, sample_block, context); } diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index e347265d774..d550151a645 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -250,11 +250,10 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting settings.distributed_connections_pool_size, address.host_name, address.port, address.default_database, address.user, address.password, - ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings).getSaturated(settings.max_execution_time), "server", address.compression, address.secure); info.pool = std::make_shared( - ConnectionPoolPtrs{pool}, settings.load_balancing, settings.connections_with_failover_max_tries); + ConnectionPoolPtrs{pool}, settings.load_balancing); info.per_replica_pools = {std::move(pool)}; if (weight) @@ -322,7 +321,6 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting settings.distributed_connections_pool_size, replica.host_name, replica.port, replica.default_database, replica.user, replica.password, - ConnectionTimeouts::getTCPTimeoutsWithFailover(settings).getSaturated(settings.max_execution_time), "server", replica.compression, replica.secure); all_replicas_pools.emplace_back(replica_pool); @@ -331,7 +329,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config, const Setting } ConnectionPoolWithFailoverPtr shard_pool = std::make_shared( - all_replicas_pools, settings.load_balancing, settings.connections_with_failover_max_tries); + all_replicas_pools, settings.load_balancing); if (weight) slot_to_shard.insert(std::end(slot_to_shard), weight, shards_info.size()); @@ -375,7 +373,6 @@ Cluster::Cluster(const Settings & settings, const std::vector( - all_replicas, settings.load_balancing, settings.connections_with_failover_max_tries); + all_replicas, settings.load_balancing); slot_to_shard.insert(std::end(slot_to_shard), default_weight, shards_info.size()); shards_info.push_back({{}, current_shard_num, default_weight, std::move(shard_local_addresses), std::move(shard_pool), diff --git a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp index 0bf593a6078..ce16a431b37 100644 --- a/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp +++ b/dbms/src/Interpreters/ClusterProxy/SelectStreamFactory.cpp @@ -184,13 +184,17 @@ void SelectStreamFactory::createForShard( local_delay]() -> BlockInputStreamPtr { + auto current_settings = context.getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover( + current_settings).getSaturated( + current_settings.max_execution_time); std::vector try_results; try { if (table_func_ptr) - try_results = pool->getManyForTableFunction(&context.getSettingsRef(), PoolMode::GET_MANY); + try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY); else - try_results = pool->getManyChecked(&context.getSettingsRef(), PoolMode::GET_MANY, main_table); + try_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table); } catch (const Exception & ex) { diff --git a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp index 2500b519b23..35eca10bbfa 100644 --- a/dbms/src/Storages/Distributed/DirectoryMonitor.cpp +++ b/dbms/src/Storages/Distributed/DirectoryMonitor.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -142,8 +143,7 @@ void StorageDistributedDirectoryMonitor::run() ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::string & name, const StorageDistributed & storage) { - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef()); - const auto pool_factory = [&storage, &timeouts] (const Cluster::Address & address) -> ConnectionPoolPtr + const auto pool_factory = [&storage] (const Cluster::Address & address) -> ConnectionPoolPtr { const auto & cluster = storage.getCluster(); const auto & shards_info = cluster->getShardsInfo(); @@ -164,7 +164,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri } return std::make_shared( - 1, address.host_name, address.port, address.default_database, address.user, address.password, timeouts, + 1, address.host_name, address.port, address.default_database, address.user, address.password, storage.getName() + '_' + address.user, Protocol::Compression::Enable, address.secure); }; @@ -212,7 +212,8 @@ bool StorageDistributedDirectoryMonitor::findFiles() void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path) { LOG_TRACE(log, "Started processing `" << file_path << '`'); - auto connection = pool->get(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context.getSettingsRef()); + auto connection = pool->get(timeouts); try { @@ -224,7 +225,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa std::string insert_query; readQueryAndSettings(in, insert_settings, insert_query); - RemoteBlockOutputStream remote{*connection, insert_query, &insert_settings}; + RemoteBlockOutputStream remote{*connection, timeouts, insert_query, &insert_settings}; remote.writePrefix(); remote.writePrepared(in); @@ -334,8 +335,8 @@ struct StorageDistributedDirectoryMonitor::Batch WriteBufferFromFile out{parent.current_batch_file_path}; writeText(out); } - - auto connection = parent.pool->get(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(parent.storage.global_context.getSettingsRef()); + auto connection = parent.pool->get(timeouts); bool batch_broken = false; try @@ -361,7 +362,7 @@ struct StorageDistributedDirectoryMonitor::Batch if (first) { first = false; - remote = std::make_unique(*connection, insert_query, &insert_settings); + remote = std::make_unique(*connection, timeouts, insert_query, &insert_settings); remote->writePrefix(); } diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 836ba20a644..da374b1b65d 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -242,6 +242,8 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp { if (!job.stream) { + const Settings & settings = context.getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); if (shard_info.hasInternalReplication()) { /// Skip replica_index in case of internal replication @@ -249,7 +251,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR); /// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here - auto connections = shard_info.pool->getMany(&context.getSettingsRef(), PoolMode::GET_ONE); + auto connections = shard_info.pool->getMany(timeouts, &settings, PoolMode::GET_ONE); if (connections.empty() || connections.front().isNull()) throw Exception("Expected exactly one connection for shard " + toString(job.shard_index), ErrorCodes::LOGICAL_ERROR); @@ -263,7 +265,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp if (!connection_pool) throw Exception("Connection pool for replica " + replica.readableString() + " does not exist", ErrorCodes::LOGICAL_ERROR); - job.connection_entry = connection_pool->get(&context.getSettingsRef()); + job.connection_entry = connection_pool->get(timeouts, &settings); if (job.connection_entry.isNull()) throw Exception("Got empty connection for replica" + replica.readableString(), ErrorCodes::LOGICAL_ERROR); } @@ -271,7 +273,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp if (throttler) job.connection_entry->setThrottler(throttler); - job.stream = std::make_shared(*job.connection_entry, query_string, &context.getSettingsRef()); + job.stream = std::make_shared(*job.connection_entry, timeouts, query_string, &settings); job.stream->writePrefix(); } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 3edf94870f4..f2ba47ba4ce 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3986,8 +3986,6 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query else throw Exception("Can't proxy this query. Unsupported query type", ErrorCodes::NOT_IMPLEMENTED); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithoutFailover(global_context.getSettingsRef()); - const auto & query_settings = query_context.getSettingsRef(); const auto & query_client_info = query_context.getClientInfo(); String user = query_client_info.current_user; @@ -4003,7 +4001,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query leader_address.host, leader_address.queries_port, leader_address.database, - user, password, timeouts, "Follower replica"); + user, password, "Follower replica"); std::stringstream new_query_ss; formatAST(*new_query, new_query_ss, false, true); diff --git a/dbms/tests/integration/test_distributed_respect_user_timeouts/__init__.py b/dbms/tests/integration/test_distributed_respect_user_timeouts/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/integration/test_distributed_respect_user_timeouts/configs/remote_servers.xml b/dbms/tests/integration/test_distributed_respect_user_timeouts/configs/remote_servers.xml new file mode 100644 index 00000000000..ebce4697529 --- /dev/null +++ b/dbms/tests/integration/test_distributed_respect_user_timeouts/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + node1 + 9000 + + + + + node2 + 9000 + + + + + diff --git a/dbms/tests/integration/test_distributed_respect_user_timeouts/configs/set_distributed_defaults.xml b/dbms/tests/integration/test_distributed_respect_user_timeouts/configs/set_distributed_defaults.xml new file mode 100644 index 00000000000..194eb1ebb87 --- /dev/null +++ b/dbms/tests/integration/test_distributed_respect_user_timeouts/configs/set_distributed_defaults.xml @@ -0,0 +1,35 @@ + + + + 3 + 1000 + 1 + + + 5 + 3000 + 1 + + + + + + + + ::/0 + + default + default + + + + + ::/0 + + delays + default + + + + + diff --git a/dbms/tests/integration/test_distributed_respect_user_timeouts/test.py b/dbms/tests/integration/test_distributed_respect_user_timeouts/test.py new file mode 100644 index 00000000000..bbab53edeba --- /dev/null +++ b/dbms/tests/integration/test_distributed_respect_user_timeouts/test.py @@ -0,0 +1,141 @@ +import itertools +import timeit + +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager +from helpers.test_tools import TSV + + +cluster = ClickHouseCluster(__file__) + +NODES = {'node' + str(i): cluster.add_instance( + 'node' + str(i), + main_configs=['configs/remote_servers.xml'], + user_configs=['configs/set_distributed_defaults.xml'], +) for i in (1, 2)} + +CREATE_TABLES_SQL = ''' +CREATE DATABASE test; + +CREATE TABLE base_table( + node String +) +ENGINE = MergeTree +PARTITION BY node +ORDER BY node; + +CREATE TABLE distributed_table +ENGINE = Distributed(test_cluster, default, base_table) AS base_table; +''' + +INSERT_SQL_TEMPLATE = "INSERT INTO base_table VALUES ('{node_id}')" + +SELECTS_SQL = { + 'distributed': 'SELECT node FROM distributed_table ORDER BY node', + 'remote': ("SELECT node FROM remote('node1,node2', default.base_table) " + "ORDER BY node"), +} + +EXCEPTION_NETWORK = 'e.displayText() = DB::NetException: ' +EXCEPTION_TIMEOUT = 'Timeout exceeded while reading from socket (' +EXCEPTION_CONNECT = 'Timeout: connect timed out: ' + +TIMEOUT_MEASUREMENT_EPS = 0.01 + +EXPECTED_BEHAVIOR = { + 'default': { + 'times': 3, + 'timeout': 1, + }, + 'ready_to_wait': { + 'times': 5, + 'timeout': 3, + }, +} + + +def _check_exception(exception, expected_tries=3): + lines = exception.split('\n') + + assert len(lines) > 4, "Unexpected exception (expected: timeout info)" + + assert lines[0].startswith('Received exception from server (version') + + assert lines[1].startswith('Code: 279') + assert lines[1].endswith('All connection tries failed. Log: ') + + assert lines[2] == '', "Unexpected exception text (expected: empty line)" + + for i, line in enumerate(lines[3:3 + expected_tries]): + expected_lines = ( + 'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_TIMEOUT, + 'Code: 209, ' + EXCEPTION_NETWORK + EXCEPTION_CONNECT, + ) + + assert any(line.startswith(expected) for expected in expected_lines), \ + 'Unexpected exception at one of the connection attempts' + + assert lines[3 + expected_tries] == '', 'Wrong number of connect attempts' + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + for node_id, node in NODES.items(): + node.query(CREATE_TABLES_SQL) + node.query(INSERT_SQL_TEMPLATE.format(node_id=node_id)) + + yield cluster + + finally: + cluster.shutdown() + + +def _check_timeout_and_exception(node, user, query_base): + repeats = EXPECTED_BEHAVIOR[user]['times'] + expected_timeout = EXPECTED_BEHAVIOR[user]['timeout'] * repeats + + start = timeit.default_timer() + exception = node.query_and_get_error(SELECTS_SQL[query_base], user=user) + + # And it should timeout no faster than: + measured_timeout = timeit.default_timer() - start + + assert measured_timeout >= expected_timeout - TIMEOUT_MEASUREMENT_EPS + + # And exception should reflect connection attempts: + _check_exception(exception, repeats) + + +@pytest.mark.parametrize( + ('first_user', 'node_name', 'query_base'), + tuple(itertools.product(EXPECTED_BEHAVIOR, NODES, SELECTS_SQL)), +) +def test_reconnect(started_cluster, node_name, first_user, query_base): + node = NODES[node_name] + + # Everything is up, select should work: + assert TSV(node.query(SELECTS_SQL[query_base], + user=first_user)) == TSV('node1\nnode2') + + with PartitionManager() as pm: + # Break the connection. + pm.partition_instances(*NODES.values()) + + # Now it shouldn't: + _check_timeout_and_exception(node, first_user, query_base) + + # Other user should have different timeout and exception + _check_timeout_and_exception( + node, + 'default' if first_user != 'default' else 'ready_to_wait', + query_base, + ) + + # select should work again: + assert TSV(node.query(SELECTS_SQL[query_base], + user=first_user)) == TSV('node1\nnode2')