From 44617166d40af1d94d8a076ff4100a14e2bfbbe6 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 25 Sep 2024 16:08:45 +0000 Subject: [PATCH] Backport #69769 to 24.8: S3Queue: support having deprecated settings to not fail server startup --- .../ObjectStorageQueueSettings.cpp | 16 +++++- .../integration/test_storage_s3_queue/test.py | 55 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp index 67743db6197..de73a5edac0 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp @@ -21,16 +21,28 @@ void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def) { try { + std::vector ignore_settings; + auto settings_changes = storage_def.settings->changes; + /// We support settings starting with s3_ for compatibility. - for (auto & change : storage_def.settings->changes) + for (auto & change : settings_changes) { if (change.name.starts_with("s3queue_")) change.name = change.name.substr(std::strlen("s3queue_")); + if (change.name == "enable_logging_to_s3queue_log") change.name = "enable_logging_to_queue_log"; + + if (change.name == "current_shard_num") + ignore_settings.push_back(change.name); + if (change.name == "total_shards_num") + ignore_settings.push_back(change.name); } - applyChanges(storage_def.settings->changes); + for (const auto & setting : ignore_settings) + settings_changes.removeSetting(setting); + + applyChanges(settings_changes); } catch (Exception & e) { diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 8f197e09e61..8b959daba1c 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -124,6 +124,18 @@ def started_cluster(): ], stay_alive=True, ) + cluster.add_instance( + "instance_24.5", + with_zookeeper=True, + image="clickhouse/clickhouse-server", + tag="24.5", + stay_alive=True, + user_configs=[ + "configs/users.xml", + ], + with_installed_binary=True, + use_old_analyzer=True, + ) logging.info("Starting cluster...") cluster.start() @@ -1797,3 +1809,46 @@ def test_commit_on_limit(started_cluster): for value in expected_failed: assert value not in processed assert value in failed + + +def test_upgrade_2(started_cluster): + node = started_cluster.instances["instance_24.5"] + + table_name = f"test_upgrade_2_{uuid4().hex[:8]}" + dst_table_name = f"{table_name}_dst" + # A unique path is necessary for repeatable tests + keeper_path = f"/clickhouse/test_{table_name}_{generate_random_string()}" + files_path = f"{table_name}_data" + files_to_generate = 10 + + create_table( + started_cluster, + node, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + "s3queue_current_shard_num": 0, + "s3queue_processing_threads_num": 2, + }, + ) + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + create_mv(node, table_name, dst_table_name) + + def get_count(): + return int(node.query(f"SELECT count() FROM {dst_table_name}")) + + expected_rows = 10 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + + assert expected_rows == get_count() + + node.restart_with_latest_version() + assert table_name in node.query("SHOW TABLES")