From aa365d72197a34c7430f0f54c0cf2a68e1c3eee4 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Fri, 19 Jul 2024 14:10:11 +0000 Subject: [PATCH] Backport #65777 to 24.6: Make `allow_experimental_analyzer` be controlled by the initiator for distributed queries --- src/Client/HedgedConnections.cpp | 6 ++ src/Client/MultiplexedConnections.cpp | 6 ++ src/Core/Settings.h | 2 +- src/Server/TCPHandler.cpp | 12 ++- tests/ci/integration_tests_runner.py | 9 +- tests/integration/helpers/cluster.py | 2 +- .../test_analyzer_compatibility/__init__.py | 0 .../configs/remote_servers.xml | 17 +++ .../test_analyzer_compatibility/test.py | 100 ++++++++++++++++++ .../test_functions.py | 18 ++++ .../test.py | 35 +----- 11 files changed, 169 insertions(+), 38 deletions(-) create mode 100644 tests/integration/test_analyzer_compatibility/__init__.py create mode 100644 tests/integration/test_analyzer_compatibility/configs/remote_servers.xml create mode 100644 tests/integration/test_analyzer_compatibility/test.py diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index 8c993f906e0..51cbe6f3d6f 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -195,6 +195,12 @@ void HedgedConnections::sendQuery( modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset; } + /// FIXME: Remove once we will make `allow_experimental_analyzer` obsolete setting. + /// Make the analyzer being set, so it will be effectively applied on the remote server. + /// In other words, the initiator always controls whether the analyzer enabled or not for + /// all servers involved in the distributed query processing. + modified_settings.set("allow_experimental_analyzer", static_cast(modified_settings.allow_experimental_analyzer)); + replica.connection->sendQuery(timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {}); replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout); replica.packet_receiver->setTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout); diff --git a/src/Client/MultiplexedConnections.cpp b/src/Client/MultiplexedConnections.cpp index 5d0fc8fd39e..99bdd706d8b 100644 --- a/src/Client/MultiplexedConnections.cpp +++ b/src/Client/MultiplexedConnections.cpp @@ -150,6 +150,12 @@ void MultiplexedConnections::sendQuery( } } + /// FIXME: Remove once we will make `allow_experimental_analyzer` obsolete setting. + /// Make the analyzer being set, so it will be effectively applied on the remote server. + /// In other words, the initiator always controls whether the analyzer enabled or not for + /// all servers involved in the distributed query processing. + modified_settings.set("allow_experimental_analyzer", static_cast(modified_settings.allow_experimental_analyzer)); + const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0; size_t num_replicas = replica_states.size(); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index a3dfb3d3b7f..f157342d7d7 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -398,7 +398,7 @@ class IColumn; M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \ M(Bool, opentelemetry_trace_processors, false, "Collect OpenTelemetry spans for processors.", 0) \ M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \ - M(Bool, allow_experimental_analyzer, true, "Allow experimental analyzer.", 0) \ + M(Bool, allow_experimental_analyzer, true, "Allow experimental analyzer.", IMPORTANT) \ M(Bool, analyzer_compatibility_join_using_top_level_identifier, false, "Force to resolve identifier in JOIN USING from projection (for example, in `SELECT a + 1 AS b FROM t1 JOIN t2 USING (b)` join will be performed by `t1.a + 1 = t2.b`, rather then `t1.b = t2.b`).", 0) \ M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \ M(Bool, enable_vertical_final, true, "If enable, remove duplicated rows during FINAL by marking rows as deleted and filtering them later instead of merging rows", 0) \ diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 14a2bceebf1..70e41ae669a 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1870,7 +1870,7 @@ void TCPHandler::receiveQuery() #endif } - query_context = session->makeQueryContext(std::move(client_info)); + query_context = session->makeQueryContext(client_info); /// Sets the default database if it wasn't set earlier for the session context. if (is_interserver_mode && !default_database.empty()) @@ -1885,6 +1885,16 @@ void TCPHandler::receiveQuery() /// /// Settings /// + + /// FIXME: Remove when allow_experimental_analyzer will become obsolete. + /// Analyzer became Beta in 24.3 and started to be enabled by default. + /// We have to disable it for ourselves to make sure we don't have different settings on + /// different servers. + if (query_kind == ClientInfo::QueryKind::SECONDARY_QUERY + && client_info.getVersionNumber() < VersionNumber(23, 3, 0) + && !passed_settings.allow_experimental_analyzer.changed) + passed_settings.set("allow_experimental_analyzer", false); + auto settings_changes = passed_settings.changes(); query_kind = query_context->getClientInfo().query_kind; if (query_kind == ClientInfo::QueryKind::INITIAL_QUERY) diff --git a/tests/ci/integration_tests_runner.py b/tests/ci/integration_tests_runner.py index 87f721cfde7..21f16d995a4 100755 --- a/tests/ci/integration_tests_runner.py +++ b/tests/ci/integration_tests_runner.py @@ -434,7 +434,14 @@ class ClickhouseIntegrationTestsRunner: "Getting all tests to the file %s with cmd: \n%s", out_file_full, cmd ) with open(out_file_full, "wb") as ofd: - subprocess.check_call(cmd, shell=True, stdout=ofd, stderr=ofd) + try: + subprocess.check_call(cmd, shell=True, stdout=ofd, stderr=ofd) + except subprocess.CalledProcessError as ex: + print("ERROR: Setting test plan failed. Output:") + with open(out_file_full, "r", encoding="utf-8") as file: + for line in file: + print(" " + line, end="") + raise ex all_tests = set() with open(out_file_full, "r", encoding="utf-8") as all_tests_fd: diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 41c162217d2..766fafcbc4e 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -73,7 +73,7 @@ CLICKHOUSE_ERROR_LOG_FILE = "/var/log/clickhouse-server/clickhouse-server.err.lo # Minimum version we use in integration tests to check compatibility with old releases # Keep in mind that we only support upgrading between releases that are at most 1 year different. # This means that this minimum need to be, at least, 1 year older than the current release -CLICKHOUSE_CI_MIN_TESTED_VERSION = "22.8" +CLICKHOUSE_CI_MIN_TESTED_VERSION = "23.3" # to create docker-compose env file diff --git a/tests/integration/test_analyzer_compatibility/__init__.py b/tests/integration/test_analyzer_compatibility/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_analyzer_compatibility/configs/remote_servers.xml b/tests/integration/test_analyzer_compatibility/configs/remote_servers.xml new file mode 100644 index 00000000000..0a50dab7fd3 --- /dev/null +++ b/tests/integration/test_analyzer_compatibility/configs/remote_servers.xml @@ -0,0 +1,17 @@ + + + + + true + + current + 9000 + + + backward + 9000 + + + + + diff --git a/tests/integration/test_analyzer_compatibility/test.py b/tests/integration/test_analyzer_compatibility/test.py new file mode 100644 index 00000000000..d4ded420c61 --- /dev/null +++ b/tests/integration/test_analyzer_compatibility/test.py @@ -0,0 +1,100 @@ +import uuid + +import pytest +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV + +CLICKHOUSE_MAX_VERSION_WITH_ANALYZER_DISABLED_BY_DEFAULT = "24.2" + +cluster = ClickHouseCluster(__file__) +# Here analyzer is enabled by default +current = cluster.add_instance( + "current", + main_configs=["configs/remote_servers.xml"], +) +# Here analyzer is disabled by default +backward = cluster.add_instance( + "backward", + use_old_analyzer=True, + main_configs=["configs/remote_servers.xml"], + image="clickhouse/clickhouse-server", + tag=CLICKHOUSE_MAX_VERSION_WITH_ANALYZER_DISABLED_BY_DEFAULT, + with_installed_binary=True, +) + + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def test_two_new_versions(start_cluster): + # Two new versions (both know about the analyzer) + # One have it enabled by default, another one - disabled. + + current.query("SYSTEM FLUSH LOGS") + backward.query("SYSTEM FLUSH LOGS") + + query_id = str(uuid.uuid4()) + current.query( + "SELECT * FROM clusterAllReplicas('test_cluster_mixed', system.tables);", + query_id=query_id, + ) + + current.query("SYSTEM FLUSH LOGS") + backward.query("SYSTEM FLUSH LOGS") + + assert ( + current.query( + """ +SELECT hostname() AS h, getSetting('allow_experimental_analyzer') +FROM clusterAllReplicas('test_cluster_mixed', system.one) +ORDER BY h;""" + ) + == TSV([["backward", "true"], ["current", "true"]]) + ) + + # Should be enabled everywhere + analyzer_enabled = current.query( + f""" +SELECT +DISTINCT Settings['allow_experimental_analyzer'] +FROM clusterAllReplicas('test_cluster_mixed', system.query_log) +WHERE initial_query_id = '{query_id}';""" + ) + + assert TSV(analyzer_enabled) == TSV("1") + + query_id = str(uuid.uuid4()) + backward.query( + "SELECT * FROM clusterAllReplicas('test_cluster_mixed', system.tables)", + query_id=query_id, + ) + + current.query("SYSTEM FLUSH LOGS") + backward.query("SYSTEM FLUSH LOGS") + + assert ( + backward.query( + """ +SELECT hostname() AS h, getSetting('allow_experimental_analyzer') +FROM clusterAllReplicas('test_cluster_mixed', system.one) +ORDER BY h;""" + ) + == TSV([["backward", "false"], ["current", "false"]]) + ) + + # Should be disabled everywhere + analyzer_enabled = backward.query( + f""" +SELECT +DISTINCT Settings['allow_experimental_analyzer'] +FROM clusterAllReplicas('test_cluster_mixed', system.query_log) +WHERE initial_query_id = '{query_id}';""" + ) + + assert TSV(analyzer_enabled) == TSV("0") diff --git a/tests/integration/test_backward_compatibility/test_functions.py b/tests/integration/test_backward_compatibility/test_functions.py index 758dda655da..fc03a77030e 100644 --- a/tests/integration/test_backward_compatibility/test_functions.py +++ b/tests/integration/test_backward_compatibility/test_functions.py @@ -130,10 +130,13 @@ def test_string_functions(start_cluster): functions = map(lambda x: x.strip(), functions) excludes = [ + # The argument of this function is not a seed, but an arbitrary expression needed for bypassing common subexpression elimination. "rand", "rand64", "randConstant", + "randCanonical", "generateUUIDv4", + "generateULID", # Syntax error otherwise "position", "substring", @@ -153,6 +156,18 @@ def test_string_functions(start_cluster): "tryBase64Decode", # Removed in 23.9 "meiliMatch", + # These functions require more than one argument. + "parseDateTimeInJodaSyntaxOrZero", + "parseDateTimeInJodaSyntaxOrNull", + "parseDateTimeOrNull", + "parseDateTimeOrZero", + "parseDateTime", + # The argument is effectively a disk name (and we don't have one with name foo) + "filesystemUnreserved", + "filesystemCapacity", + "filesystemAvailable", + # Exclude it for now. Looks like the result depends on the build type. + "farmHash64", ] functions = filter(lambda x: x not in excludes, functions) @@ -205,6 +220,9 @@ def test_string_functions(start_cluster): # Function X takes exactly one parameter: # The function 'X' can only be used as a window function "BAD_ARGUMENTS", + # String foo is obviously not a valid IP address. + "CANNOT_PARSE_IPV4", + "CANNOT_PARSE_IPV6", ] if any(map(lambda x: x in error_message, allowed_errors)): logging.info("Skipping %s", function) diff --git a/tests/integration/test_distributed_inter_server_secret/test.py b/tests/integration/test_distributed_inter_server_secret/test.py index 50d7be4d11e..7ecb2cda257 100644 --- a/tests/integration/test_distributed_inter_server_secret/test.py +++ b/tests/integration/test_distributed_inter_server_secret/test.py @@ -7,7 +7,7 @@ import uuid import time from helpers.client import QueryRuntimeException -from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION +from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) @@ -27,9 +27,6 @@ def make_instance(name, *args, **kwargs): ) -# DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 added in 23.3, ensure that CLICKHOUSE_CI_MIN_TESTED_VERSION fits -assert CLICKHOUSE_CI_MIN_TESTED_VERSION < "23.3" - # _n1/_n2 contains cluster with different -- should fail # only n1 contains new_user n1 = make_instance( @@ -38,14 +35,6 @@ n1 = make_instance( user_configs=["configs/users.d/new_user.xml"], ) n2 = make_instance("n2", main_configs=["configs/remote_servers_n2.xml"]) -backward = make_instance( - "backward", - main_configs=["configs/remote_servers_backward.xml"], - image="clickhouse/clickhouse-server", - # version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 - tag=CLICKHOUSE_CI_MIN_TESTED_VERSION, - with_installed_binary=True, -) users = pytest.mark.parametrize( "user,password", @@ -427,28 +416,6 @@ def test_per_user_protocol_settings_secure_cluster(user, password): ) -@users -def test_user_secure_cluster_with_backward(user, password): - id_ = "with-backward-query-dist_secure-" + user - n1.query( - f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password - ) - assert get_query_user_info(n1, id_) == [user, user] - assert get_query_user_info(backward, id_) == [user, user] - - -@users -def test_user_secure_cluster_from_backward(user, password): - id_ = "from-backward-query-dist_secure-" + user - backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password) - assert get_query_user_info(n1, id_) == [user, user] - assert get_query_user_info(backward, id_) == [user, user] - - assert n1.contains_in_log( - "Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster." - ) - - def test_secure_cluster_distributed_over_distributed_different_users(): # This works because we will have initial_user='default' n1.query(