Fix backward compatibility after #21196.

This commit is contained in:
Nikolai Kochetov 2021-10-04 17:27:44 +03:00
parent 3e0d6a9709
commit 6169b180cb
5 changed files with 91 additions and 1 deletions

View File

@ -63,7 +63,9 @@
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54431
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54456
#define DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 21
#define DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 4
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405

View File

@ -83,6 +83,14 @@ public:
UInt64 client_version_patch = 0;
unsigned client_tcp_protocol_version = 0;
/// In case of distributed query, client info for query is actually a client info of client.
/// In order to get a version of server-initiator, use connection_ values.
/// Also for tcp only.
UInt64 connection_client_version_major = 0;
UInt64 connection_client_version_minor = 0;
UInt64 connection_client_version_patch = 0;
unsigned connection_tcp_protocol_version = 0;
/// For http
HTTPMethod http_method = HTTPMethod::UNKNOWN;
String http_user_agent;

View File

@ -75,6 +75,7 @@
#include <Functions/IFunction.h>
#include <Core/Field.h>
#include <Core/Defines.h>
#include <common/types.h>
#include <Columns/Collator.h>
#include <Common/FieldVisitorsAccurateComparison.h>
@ -2609,6 +2610,19 @@ void InterpreterSelectQuery::initSettings()
auto & query = getSelectQuery();
if (query.settings())
InterpreterSetQuery(query.settings(), context).executeForCurrentContext();
auto & client_info = context->getClientInfo();
auto min_major = DBMS_MIN_MAJOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD;
auto min_minor = DBMS_MIN_MINOR_VERSION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD;
if (client_info.query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
std::forward_as_tuple(client_info.connection_client_version_major, client_info.connection_client_version_minor) < std::forward_as_tuple(min_major, min_minor))
{
/// Disable two-level aggregation due to version incompatibility.
context->setSetting("group_by_two_level_threshold", Field(0));
context->setSetting("group_by_two_level_threshold_bytes", Field(0));
}
}
}

View File

@ -944,6 +944,11 @@ void TCPHandler::receiveHello()
client_info.client_version_patch = client_version_patch;
client_info.client_tcp_protocol_version = client_tcp_protocol_version;
client_info.connection_client_version_major = client_version_major;
client_info.connection_client_version_minor = client_version_minor;
client_info.connection_client_version_patch = client_version_patch;
client_info.connection_tcp_protocol_version = client_tcp_protocol_version;
is_interserver_mode = (user == USER_INTERSERVER_MARKER);
if (is_interserver_mode)
{

View File

@ -0,0 +1,61 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server', tag='21.3', with_installed_binary=True)
node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server')
node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server')
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_two_level_merge(start_cluster):
for node in start_cluster.instances.values():
node.query(
"""
CREATE TABLE IF NOT EXISTS test_two_level_merge(date Date, zone UInt32, number UInt32)
ENGINE = MergeTree() PARTITION BY toUInt64(number / 1000) ORDER BY tuple();
INSERT INTO
test_two_level_merge
SELECT
toDate('2021-09-28') - number / 1000,
249081628,
number
FROM
numbers(15000);
"""
)
# covers only the keys64 method
for node in start_cluster.instances.values():
print(node.query(
"""
SELECT
throwIf(uniqExact(date) != count(), 'group by is borked')
FROM (
SELECT
date
FROM
remote('node{1,2}', default.test_two_level_merge)
WHERE
date BETWEEN toDate('2021-09-20') AND toDate('2021-09-28')
AND zone = 249081628
GROUP by date, zone
)
SETTINGS
group_by_two_level_threshold = 1,
group_by_two_level_threshold_bytes = 1,
max_threads = 2,
prefer_localhost_replica = 0
"""
))