Fixed incompatibility when versions prior to 18.12.17 are used on remote servers and newer is used on initiating server, and GROUP BY both fixed and non-fixed keys, and when two-level group by method is activated [#CLICKHOUSE-4035]

This commit is contained in:
Alexey Milovidov 2018-10-13 02:43:25 +03:00
parent 98859f232b
commit 72e2996e3e

View File

@ -87,37 +87,36 @@ void MultiplexedConnections::sendQuery(
if (sent_query) if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR); throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (replica_states.size() > 1) Settings modified_settings = settings;
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_states.size();
for (size_t i = 0; i < replica_states.size(); ++i) for (auto & replica : replica_states)
{ {
Connection * connection = replica_states[i].connection; if (!replica.connection)
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR); throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
if (connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD) if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{ {
/// Disable two-level aggregation due to version incompatibility. /// Disable two-level aggregation due to version incompatibility.
query_settings.group_by_two_level_threshold = 0; modified_settings.group_by_two_level_threshold = 0;
query_settings.group_by_two_level_threshold_bytes = 0; modified_settings.group_by_two_level_threshold_bytes = 0;
}
/// NOTE: It's ok that we disable it for all following connections (even if it wasn't needed).
} }
query_settings.parallel_replica_offset = i; size_t num_replicas = replica_states.size();
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data); if (num_replicas > 1)
{
/// Use multiple replicas for parallel query processing.
modified_settings.parallel_replicas_count = num_replicas;
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
} }
} }
else else
{ {
Connection * connection = replica_states[0].connection; /// Use single replica.
if (connection == nullptr) replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
} }
sent_query = true; sent_query = true;