From 72e2996e3ee6d5d18da1970794bd19b6ac18ac7a Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 13 Oct 2018 02:43:25 +0300 Subject: [PATCH] Fixed incompatibility when versions prior to 18.12.17 are used on remote servers and newer is used on initiating server, and GROUP BY both fixed and non-fixed keys, and when two-level group by method is activated [#CLICKHOUSE-4035] --- dbms/src/Client/MultiplexedConnections.cpp | 45 +++++++++++----------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/dbms/src/Client/MultiplexedConnections.cpp b/dbms/src/Client/MultiplexedConnections.cpp index feade358b41..5228f9b85b3 100644 --- a/dbms/src/Client/MultiplexedConnections.cpp +++ b/dbms/src/Client/MultiplexedConnections.cpp @@ -87,37 +87,36 @@ void MultiplexedConnections::sendQuery( if (sent_query) throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); - if (replica_states.size() > 1) + Settings modified_settings = settings; + + for (auto & replica : replica_states) { - Settings query_settings = settings; - query_settings.parallel_replicas_count = replica_states.size(); + if (!replica.connection) + throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); - for (size_t i = 0; i < replica_states.size(); ++i) + if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) { - Connection * connection = replica_states[i].connection; - if (connection == nullptr) - throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); + /// Disable two-level aggregation due to version incompatibility. + modified_settings.group_by_two_level_threshold = 0; + modified_settings.group_by_two_level_threshold_bytes = 0; + } + } - if (connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) - { - /// Disable two-level aggregation due to version incompatibility. - query_settings.group_by_two_level_threshold = 0; - query_settings.group_by_two_level_threshold_bytes = 0; - - /// NOTE: It's ok that we disable it for all following connections (even if it wasn't needed). - } - - query_settings.parallel_replica_offset = i; - connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data); + size_t num_replicas = replica_states.size(); + if (num_replicas > 1) + { + /// Use multiple replicas for parallel query processing. + modified_settings.parallel_replicas_count = num_replicas; + for (size_t i = 0; i < num_replicas; ++i) + { + modified_settings.parallel_replica_offset = i; + replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data); } } else { - Connection * connection = replica_states[0].connection; - if (connection == nullptr) - throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); - - connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data); + /// Use single replica. + replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data); } sent_query = true;