Backport #65777 to 24.6: Make allow_experimental_analyzer be controlled by the initiator for distributed queries

This commit is contained in:
robot-clickhouse 2024-07-19 14:10:11 +00:00
parent 994edbd6e0
commit aa365d7219
11 changed files with 169 additions and 38 deletions

View File

@ -195,6 +195,12 @@ void HedgedConnections::sendQuery(
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset;
}
/// FIXME: Remove once we will make `allow_experimental_analyzer` obsolete setting.
/// Make the analyzer being set, so it will be effectively applied on the remote server.
/// In other words, the initiator always controls whether the analyzer enabled or not for
/// all servers involved in the distributed query processing.
modified_settings.set("allow_experimental_analyzer", static_cast<bool>(modified_settings.allow_experimental_analyzer));
replica.connection->sendQuery(timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
replica.packet_receiver->setTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout);

View File

@ -150,6 +150,12 @@ void MultiplexedConnections::sendQuery(
}
}
/// FIXME: Remove once we will make `allow_experimental_analyzer` obsolete setting.
/// Make the analyzer being set, so it will be effectively applied on the remote server.
/// In other words, the initiator always controls whether the analyzer enabled or not for
/// all servers involved in the distributed query processing.
modified_settings.set("allow_experimental_analyzer", static_cast<bool>(modified_settings.allow_experimental_analyzer));
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas == 0;
size_t num_replicas = replica_states.size();

View File

@ -398,7 +398,7 @@ class IColumn;
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \
M(Bool, opentelemetry_trace_processors, false, "Collect OpenTelemetry spans for processors.", 0) \
M(Bool, prefer_column_name_to_alias, false, "Prefer using column names instead of aliases if possible.", 0) \
M(Bool, allow_experimental_analyzer, true, "Allow experimental analyzer.", 0) \
M(Bool, allow_experimental_analyzer, true, "Allow experimental analyzer.", IMPORTANT) \
M(Bool, analyzer_compatibility_join_using_top_level_identifier, false, "Force to resolve identifier in JOIN USING from projection (for example, in `SELECT a + 1 AS b FROM t1 JOIN t2 USING (b)` join will be performed by `t1.a + 1 = t2.b`, rather then `t1.b = t2.b`).", 0) \
M(Bool, prefer_global_in_and_join, false, "If enabled, all IN/JOIN operators will be rewritten as GLOBAL IN/JOIN. It's useful when the to-be-joined tables are only available on the initiator and we need to always scatter their data on-the-fly during distributed processing with the GLOBAL keyword. It's also useful to reduce the need to access the external sources joining external tables.", 0) \
M(Bool, enable_vertical_final, true, "If enable, remove duplicated rows during FINAL by marking rows as deleted and filtering them later instead of merging rows", 0) \

View File

@ -1870,7 +1870,7 @@ void TCPHandler::receiveQuery()
#endif
}
query_context = session->makeQueryContext(std::move(client_info));
query_context = session->makeQueryContext(client_info);
/// Sets the default database if it wasn't set earlier for the session context.
if (is_interserver_mode && !default_database.empty())
@ -1885,6 +1885,16 @@ void TCPHandler::receiveQuery()
///
/// Settings
///
/// FIXME: Remove when allow_experimental_analyzer will become obsolete.
/// Analyzer became Beta in 24.3 and started to be enabled by default.
/// We have to disable it for ourselves to make sure we don't have different settings on
/// different servers.
if (query_kind == ClientInfo::QueryKind::SECONDARY_QUERY
&& client_info.getVersionNumber() < VersionNumber(23, 3, 0)
&& !passed_settings.allow_experimental_analyzer.changed)
passed_settings.set("allow_experimental_analyzer", false);
auto settings_changes = passed_settings.changes();
query_kind = query_context->getClientInfo().query_kind;
if (query_kind == ClientInfo::QueryKind::INITIAL_QUERY)

View File

@ -434,7 +434,14 @@ class ClickhouseIntegrationTestsRunner:
"Getting all tests to the file %s with cmd: \n%s", out_file_full, cmd
)
with open(out_file_full, "wb") as ofd:
subprocess.check_call(cmd, shell=True, stdout=ofd, stderr=ofd)
try:
subprocess.check_call(cmd, shell=True, stdout=ofd, stderr=ofd)
except subprocess.CalledProcessError as ex:
print("ERROR: Setting test plan failed. Output:")
with open(out_file_full, "r", encoding="utf-8") as file:
for line in file:
print(" " + line, end="")
raise ex
all_tests = set()
with open(out_file_full, "r", encoding="utf-8") as all_tests_fd:

View File

@ -73,7 +73,7 @@ CLICKHOUSE_ERROR_LOG_FILE = "/var/log/clickhouse-server/clickhouse-server.err.lo
# Minimum version we use in integration tests to check compatibility with old releases
# Keep in mind that we only support upgrading between releases that are at most 1 year different.
# This means that this minimum need to be, at least, 1 year older than the current release
CLICKHOUSE_CI_MIN_TESTED_VERSION = "22.8"
CLICKHOUSE_CI_MIN_TESTED_VERSION = "23.3"
# to create docker-compose env file

View File

@ -0,0 +1,17 @@
<clickhouse>
<remote_servers>
<test_cluster_mixed>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>current</host>
<port>9000</port>
</replica>
<replica>
<host>backward</host>
<port>9000</port>
</replica>
</shard>
</test_cluster_mixed>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,100 @@
import uuid
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
CLICKHOUSE_MAX_VERSION_WITH_ANALYZER_DISABLED_BY_DEFAULT = "24.2"
cluster = ClickHouseCluster(__file__)
# Here analyzer is enabled by default
current = cluster.add_instance(
"current",
main_configs=["configs/remote_servers.xml"],
)
# Here analyzer is disabled by default
backward = cluster.add_instance(
"backward",
use_old_analyzer=True,
main_configs=["configs/remote_servers.xml"],
image="clickhouse/clickhouse-server",
tag=CLICKHOUSE_MAX_VERSION_WITH_ANALYZER_DISABLED_BY_DEFAULT,
with_installed_binary=True,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_two_new_versions(start_cluster):
# Two new versions (both know about the analyzer)
# One have it enabled by default, another one - disabled.
current.query("SYSTEM FLUSH LOGS")
backward.query("SYSTEM FLUSH LOGS")
query_id = str(uuid.uuid4())
current.query(
"SELECT * FROM clusterAllReplicas('test_cluster_mixed', system.tables);",
query_id=query_id,
)
current.query("SYSTEM FLUSH LOGS")
backward.query("SYSTEM FLUSH LOGS")
assert (
current.query(
"""
SELECT hostname() AS h, getSetting('allow_experimental_analyzer')
FROM clusterAllReplicas('test_cluster_mixed', system.one)
ORDER BY h;"""
)
== TSV([["backward", "true"], ["current", "true"]])
)
# Should be enabled everywhere
analyzer_enabled = current.query(
f"""
SELECT
DISTINCT Settings['allow_experimental_analyzer']
FROM clusterAllReplicas('test_cluster_mixed', system.query_log)
WHERE initial_query_id = '{query_id}';"""
)
assert TSV(analyzer_enabled) == TSV("1")
query_id = str(uuid.uuid4())
backward.query(
"SELECT * FROM clusterAllReplicas('test_cluster_mixed', system.tables)",
query_id=query_id,
)
current.query("SYSTEM FLUSH LOGS")
backward.query("SYSTEM FLUSH LOGS")
assert (
backward.query(
"""
SELECT hostname() AS h, getSetting('allow_experimental_analyzer')
FROM clusterAllReplicas('test_cluster_mixed', system.one)
ORDER BY h;"""
)
== TSV([["backward", "false"], ["current", "false"]])
)
# Should be disabled everywhere
analyzer_enabled = backward.query(
f"""
SELECT
DISTINCT Settings['allow_experimental_analyzer']
FROM clusterAllReplicas('test_cluster_mixed', system.query_log)
WHERE initial_query_id = '{query_id}';"""
)
assert TSV(analyzer_enabled) == TSV("0")

View File

@ -130,10 +130,13 @@ def test_string_functions(start_cluster):
functions = map(lambda x: x.strip(), functions)
excludes = [
# The argument of this function is not a seed, but an arbitrary expression needed for bypassing common subexpression elimination.
"rand",
"rand64",
"randConstant",
"randCanonical",
"generateUUIDv4",
"generateULID",
# Syntax error otherwise
"position",
"substring",
@ -153,6 +156,18 @@ def test_string_functions(start_cluster):
"tryBase64Decode",
# Removed in 23.9
"meiliMatch",
# These functions require more than one argument.
"parseDateTimeInJodaSyntaxOrZero",
"parseDateTimeInJodaSyntaxOrNull",
"parseDateTimeOrNull",
"parseDateTimeOrZero",
"parseDateTime",
# The argument is effectively a disk name (and we don't have one with name foo)
"filesystemUnreserved",
"filesystemCapacity",
"filesystemAvailable",
# Exclude it for now. Looks like the result depends on the build type.
"farmHash64",
]
functions = filter(lambda x: x not in excludes, functions)
@ -205,6 +220,9 @@ def test_string_functions(start_cluster):
# Function X takes exactly one parameter:
# The function 'X' can only be used as a window function
"BAD_ARGUMENTS",
# String foo is obviously not a valid IP address.
"CANNOT_PARSE_IPV4",
"CANNOT_PARSE_IPV6",
]
if any(map(lambda x: x in error_message, allowed_errors)):
logging.info("Skipping %s", function)

View File

@ -7,7 +7,7 @@ import uuid
import time
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, CLICKHOUSE_CI_MIN_TESTED_VERSION
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
@ -27,9 +27,6 @@ def make_instance(name, *args, **kwargs):
)
# DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2 added in 23.3, ensure that CLICKHOUSE_CI_MIN_TESTED_VERSION fits
assert CLICKHOUSE_CI_MIN_TESTED_VERSION < "23.3"
# _n1/_n2 contains cluster with different <secret> -- should fail
# only n1 contains new_user
n1 = make_instance(
@ -38,14 +35,6 @@ n1 = make_instance(
user_configs=["configs/users.d/new_user.xml"],
)
n2 = make_instance("n2", main_configs=["configs/remote_servers_n2.xml"])
backward = make_instance(
"backward",
main_configs=["configs/remote_servers_backward.xml"],
image="clickhouse/clickhouse-server",
# version without DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
tag=CLICKHOUSE_CI_MIN_TESTED_VERSION,
with_installed_binary=True,
)
users = pytest.mark.parametrize(
"user,password",
@ -427,28 +416,6 @@ def test_per_user_protocol_settings_secure_cluster(user, password):
)
@users
def test_user_secure_cluster_with_backward(user, password):
id_ = "with-backward-query-dist_secure-" + user
n1.query(
f"SELECT *, '{id_}' FROM dist_secure_backward", user=user, password=password
)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
@users
def test_user_secure_cluster_from_backward(user, password):
id_ = "from-backward-query-dist_secure-" + user
backward.query(f"SELECT *, '{id_}' FROM dist_secure", user=user, password=password)
assert get_query_user_info(n1, id_) == [user, user]
assert get_query_user_info(backward, id_) == [user, user]
assert n1.contains_in_log(
"Using deprecated interserver protocol because the client is too old. Consider upgrading all nodes in cluster."
)
def test_secure_cluster_distributed_over_distributed_different_users():
# This works because we will have initial_user='default'
n1.query(