diff --git a/src/Core/Defines.h b/src/Core/Defines.h index ce5c9098399..3620c1dfcfe 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -63,7 +63,9 @@ /// Minimum revision with exactly the same set of aggregation methods and rules to select them. /// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules /// (keys will be placed in different buckets and result will not be fully aggregated). -#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54431 +#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54456 +#define DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 21 +#define DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 4 #define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410 #define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405 diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 71570778645..716c66b8b71 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -83,6 +83,14 @@ public: UInt64 client_version_patch = 0; unsigned client_tcp_protocol_version = 0; + /// In case of distributed query, client info for query is actually a client info of client. + /// In order to get a version of server-initiator, use connection_ values. + /// Also for tcp only. + UInt64 connection_client_version_major = 0; + UInt64 connection_client_version_minor = 0; + UInt64 connection_client_version_patch = 0; + unsigned connection_tcp_protocol_version = 0; + /// For http HTTPMethod http_method = HTTPMethod::UNKNOWN; String http_user_agent; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 563e9ba8997..9ef35a774bf 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -75,6 +75,7 @@ #include #include +#include #include #include #include @@ -2609,6 +2610,19 @@ void InterpreterSelectQuery::initSettings() auto & query = getSelectQuery(); if (query.settings()) InterpreterSetQuery(query.settings(), context).executeForCurrentContext(); + + auto & client_info = context->getClientInfo(); + auto min_major = DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD; + auto min_minor = DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD; + + if (client_info.query_kind == ClientInfo::QueryKind::SECONDARY_QUERY && + std::forward_as_tuple(client_info.connection_client_version_major, client_info.connection_client_version_minor) < std::forward_as_tuple(min_major, min_minor)) + { + /// Disable two-level aggregation due to version incompatibility. + context->setSetting("group_by_two_level_threshold", Field(0)); + context->setSetting("group_by_two_level_threshold_bytes", Field(0)); + + } } } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 5415bf96443..a139ab5c100 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -944,6 +944,11 @@ void TCPHandler::receiveHello() client_info.client_version_patch = client_version_patch; client_info.client_tcp_protocol_version = client_tcp_protocol_version; + client_info.connection_client_version_major = client_version_major; + client_info.connection_client_version_minor = client_version_minor; + client_info.connection_client_version_patch = client_version_patch; + client_info.connection_tcp_protocol_version = client_tcp_protocol_version; + is_interserver_mode = (user == USER_INTERSERVER_MARKER); if (is_interserver_mode) { diff --git a/tests/integration/test_backward_compatibility/test_aggregate_fixed_key.py b/tests/integration/test_backward_compatibility/test_aggregate_fixed_key.py new file mode 100644 index 00000000000..8819be527fd --- /dev/null +++ b/tests/integration/test_backward_compatibility/test_aggregate_fixed_key.py @@ -0,0 +1,61 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server', tag='21.3', with_installed_binary=True) +node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server') +node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server') + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_two_level_merge(start_cluster): + for node in start_cluster.instances.values(): + node.query( + """ + CREATE TABLE IF NOT EXISTS test_two_level_merge(date Date, zone UInt32, number UInt32) + ENGINE = MergeTree() PARTITION BY toUInt64(number / 1000) ORDER BY tuple(); + + INSERT INTO + test_two_level_merge + SELECT + toDate('2021-09-28') - number / 1000, + 249081628, + number + FROM + numbers(15000); + """ + ) + + # covers only the keys64 method + for node in start_cluster.instances.values(): + print(node.query( + """ + SELECT + throwIf(uniqExact(date) != count(), 'group by is borked') + FROM ( + SELECT + date + FROM + remote('node{1,2}', default.test_two_level_merge) + WHERE + date BETWEEN toDate('2021-09-20') AND toDate('2021-09-28') + AND zone = 249081628 + GROUP by date, zone + ) + SETTINGS + group_by_two_level_threshold = 1, + group_by_two_level_threshold_bytes = 1, + max_threads = 2, + prefer_localhost_replica = 0 + """ + ))