From a777045dd3c2986ff4a0258564a468e806ee1abd Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 26 Mar 2024 13:26:42 +0100 Subject: [PATCH 1/5] Fix and more tests for broken projections handling --- src/Interpreters/MutationsInterpreter.cpp | 4 +- src/Interpreters/MutationsInterpreter.h | 2 + src/Storages/MergeTree/IMergeTreeDataPart.cpp | 4 +- .../ReplicatedMergeTreePartCheckThread.cpp | 4 +- src/Storages/MergeTree/checkDataPart.cpp | 23 +-- .../test_broken_projections/test.py | 191 +++++++++++++++++- 6 files changed, 203 insertions(+), 25 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 222447ca650..7d5443b3e65 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -409,12 +409,13 @@ MutationsInterpreter::MutationsInterpreter( , available_columns(std::move(available_columns_)) , settings(std::move(settings_)) , select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits()) + , logger(getLogger("MutationsInterpreter")) { auto new_context = Context::createCopy(context_); if (new_context->getSettingsRef().allow_experimental_analyzer) { new_context->setSetting("allow_experimental_analyzer", false); - LOG_DEBUG(&Poco::Logger::get("MutationsInterpreter"), "Will use old analyzer to prepare mutation"); + LOG_DEBUG(logger, "Will use old analyzer to prepare mutation"); } context = std::move(new_context); @@ -997,6 +998,7 @@ void MutationsInterpreter::prepare(bool dry_run) /// Always rebuild broken projections. if (source.hasBrokenProjection(projection.name)) { + LOG_DEBUG(logger, "Will rebuild broken projection {}", projection.name); materialized_projections.insert(projection.name); continue; } diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 4c35ec34b58..2d01c7154c8 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -175,6 +175,8 @@ private: Settings settings; SelectQueryOptions select_limits; + LoggerPtr logger; + /// A sequence of mutation commands is executed as a sequence of stages. Each stage consists of several /// filters, followed by updating values of some columns. Commands can reuse expressions calculated by the /// previous commands in the same stage, but at the end of each stage intermediate columns are thrown away diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index a26e2b725be..570175f6614 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -805,8 +805,8 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch throw; auto message = getCurrentExceptionMessage(true); - LOG_ERROR(&Poco::Logger::get("IMergeTreeDataPart"), - "Cannot load projection {}, will consider it broken. Reason: {}", projection.name, message); + LOG_WARNING(storage.log, "Cannot load projection {}, " + "will consider it broken. Reason: {}", projection.name, message); has_broken_projection = true; part->setBrokenReason(message, getCurrentExceptionCode()); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index bc0b4f73a31..181f54688f9 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -386,12 +386,12 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St throw; PreformattedMessage message; - if (is_broken_projection) + if (is_broken_projection && throw_on_broken_projection) { WriteBufferFromOwnString wb; message = PreformattedMessage::create( "Part {} has a broken projections. It will be ignored. Broken projections info: {}", - part_name, getCurrentExceptionMessage(false)); + part_name, getCurrentExceptionMessage(true)); LOG_DEBUG(log, message); result.action = ReplicatedCheckResult::DoNothing; } diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index 0a1057916d6..cabe06b0dde 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -285,11 +285,6 @@ static IMergeTreeDataPart::Checksums checkDataPart( return {}; auto projection_file = name + ".proj"; - if (!throw_on_broken_projection && projection->is_broken) - { - projections_on_disk.erase(projection_file); - checksums_txt.remove(projection_file); - } IMergeTreeDataPart::Checksums projection_checksums; try @@ -306,13 +301,19 @@ static IMergeTreeDataPart::Checksums checkDataPart( if (isRetryableException(std::current_exception())) throw; + is_broken_projection = true; + projections_on_disk.erase(projection_file); + checksums_txt.remove(projection_file); + + const auto exception = getCurrentExceptionMessage(true); + if (!projection->is_broken) { - LOG_TEST(log, "Marking projection {} as broken ({})", name, projection_file); - projection->setBrokenReason(getCurrentExceptionMessage(false), getCurrentExceptionCode()); + LOG_WARNING(log, "Marking projection {} as broken ({}). Reason: {}", + name, projection_file, exception); + projection->setBrokenReason(exception, getCurrentExceptionCode()); } - is_broken_projection = true; if (throw_on_broken_projection) { if (!broken_projections_message.empty()) @@ -320,12 +321,10 @@ static IMergeTreeDataPart::Checksums checkDataPart( broken_projections_message += fmt::format( "Part {} has a broken projection {} (error: {})", - data_part->name, name, getCurrentExceptionMessage(false)); - continue; + data_part->name, name, exception); } - projections_on_disk.erase(projection_file); - checksums_txt.remove(projection_file); + continue; } checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum( diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index 4a4690a5d0a..b4e8b6e5021 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -148,7 +148,7 @@ def break_part(node, table, part): bash(node, f"rm '{part_path}/columns.txt'") -def get_broken_projections_info(node, table): +def get_broken_projections_info(node, table, active=True): return node.query( f""" SELECT parent_name, name, errors.name FROM @@ -158,6 +158,7 @@ def get_broken_projections_info(node, table): WHERE table='{table}' AND database=currentDatabase() AND is_broken = 1 + AND active = {active} ) AS parts_info INNER JOIN system.errors AS errors ON parts_info.exception_code = errors.code @@ -214,14 +215,21 @@ def random_str(length=6): return "".join(random.SystemRandom().choice(alphabet) for _ in range(length)) -def check(node, table, check_result, expect_broken_part="", expected_error=""): +def check( + node, + table, + check_result, + expect_broken_part="", + expected_error="", + do_check_command=True, +): if expect_broken_part == "proj1": assert expected_error in node.query_and_get_error( - f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c" + f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c SETTINGS force_optimize_projection_name = 'proj1'" ) else: query_id = node.query( - f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c)" + f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c SETTINGS force_optimize_projection_name = 'proj1')" ).strip() node.query("SYSTEM FLUSH LOGS") res = node.query( @@ -244,11 +252,11 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""): if expect_broken_part == "proj2": assert expected_error in node.query_and_get_error( - f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d" + f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d SETTINGS force_optimize_projection_name = 'proj2'" ) else: query_id = node.query( - f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d)" + f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d SETTINGS force_optimize_projection_name = 'proj2')" ).strip() node.query("SYSTEM FLUSH LOGS") res = node.query( @@ -269,7 +277,8 @@ def check(node, table, check_result, expect_broken_part="", expected_error=""): assert False assert "proj2" in res - assert check_result == int(node.query(f"CHECK TABLE {table}")) + if do_check_command: + assert check_result == int(node.query(f"CHECK TABLE {table}")) def test_broken_ignored(cluster): @@ -352,7 +361,14 @@ def test_broken_ignored(cluster): # """) # ) - assert "all_3_3_0" in get_broken_projections_info(node, table_name) + assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_5_1"] == get_parts( + node, table_name + ) + + assert "all_3_3_0" in get_broken_projections_info(node, table_name, active=False) + assert "all_2_2_0" in get_broken_projections_info(node, table_name, active=False) + + # 0 because of all_2_2_0 check(node, table_name, 0) @@ -574,3 +590,162 @@ def test_broken_projections_in_backups_3(cluster): assert "all_1_1_0\tproj1\tNO_FILE_IN_DATA_PART" == get_broken_projections_info( node, table_name ) + + +def test_check_part_thread(cluster): + node = cluster.instances["node"] + + table_name = "check_part_thread_test1" + create_table(node, table_name, 1) + + insert(node, table_name, 0, 5) + insert(node, table_name, 5, 5) + insert(node, table_name, 10, 5) + insert(node, table_name, 15, 5) + + assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts( + node, table_name + ) + + # Break data file of projection 'proj2' for part all_2_2_0 + break_projection(node, table_name, "proj2", "all_2_2_0", "data") + + # It will not yet appear in broken projections info. + assert "proj2" not in get_broken_projections_info(node, table_name) + + # Select now fails with error "File doesn't exist" + check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST", do_check_command=False) + + good = False + for _ in range(10): + # We marked projection as broken, checkPartThread must not complain about the part. + good = node.contains_in_log( + f"{table_name} (ReplicatedMergeTreePartCheckThread): Part all_2_2_0 looks good" + ) + if good: + break + time.sleep(1) + + assert good + + +def test_broken_on_start(cluster): + node = cluster.instances["node"] + + table_name = "test1" + create_table(node, table_name, 1) + + insert(node, table_name, 0, 5) + insert(node, table_name, 5, 5) + insert(node, table_name, 10, 5) + insert(node, table_name, 15, 5) + + assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts( + node, table_name + ) + + # Break data file of projection 'proj2' for part all_2_2_0 + break_projection(node, table_name, "proj2", "all_2_2_0", "data") + + # It will not yet appear in broken projections info. + assert "proj2" not in get_broken_projections_info(node, table_name) + + # Select now fails with error "File doesn't exist" + # We will mark projection as broken. + check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST") + + # Projection 'proj2' from part all_2_2_0 will now appear in broken parts info. + assert "all_2_2_0\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info( + node, table_name + ) + + # Second select works, because projection is now marked as broken. + check(node, table_name, 0) + + node.restart_clickhouse() + + # It will not yet appear in broken projections info. + assert "proj2" in get_broken_projections_info(node, table_name) + + # Select works + check(node, table_name, 0) + + +def test_mutation_with_broken_projection(cluster): + node = cluster.instances["node"] + + table_name = "test1" + create_table(node, table_name, 1) + + insert(node, table_name, 0, 5) + insert(node, table_name, 5, 5) + insert(node, table_name, 10, 5) + insert(node, table_name, 15, 5) + + assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts( + node, table_name + ) + + check(node, table_name, 1) + + node.query( + f"ALTER TABLE {table_name} DELETE WHERE c == 11 SETTINGS mutations_sync = 1" + ) + + assert ["all_0_0_0_4", "all_1_1_0_4", "all_2_2_0_4", "all_3_3_0_4"] == get_parts( + node, table_name + ) + + assert "" == get_broken_projections_info(node, table_name) + + check(node, table_name, 1) + + # Break data file of projection 'proj2' for part all_2_2_0_4 + break_projection(node, table_name, "proj2", "all_2_2_0_4", "data") + + # It will not yet appear in broken projections info. + assert "proj2" not in get_broken_projections_info(node, table_name) + + # Select now fails with error "File doesn't exist" + # We will mark projection as broken. + check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST") + + # Projection 'proj2' from part all_2_2_0_4 will now appear in broken parts info. + assert "all_2_2_0_4\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info( + node, table_name + ) + + # Second select works, because projection is now marked as broken. + check(node, table_name, 0) + + assert "all_2_2_0_4" in get_broken_projections_info(node, table_name) + + node.query( + f"ALTER TABLE {table_name} DELETE WHERE _part == 'all_0_0_0_4' SETTINGS mutations_sync = 1" + ) + + # All parts changes because this is how alter delete works, + # but all parts apart from the first have only hardlinks to files in previous part. + assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts( + node, table_name + ) + + # Still broken because it was hardlinked. + assert "all_2_2_0_5" in get_broken_projections_info(node, table_name) + + check(node, table_name, 0) + + node.query( + f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1" + ) + + assert ["all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts( + node, table_name + ) or ["all_0_0_0_6", "all_1_1_0_6", "all_2_2_0_6", "all_3_3_0_6"] == get_parts( + node, table_name + ) + + # Not broken anymore. + assert "" == get_broken_projections_info(node, table_name) + + check(node, table_name, 1) From 2ce0d647f28a9b2950cb8cf9f5d5ed42f4eb6f68 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 26 Mar 2024 21:39:23 +0100 Subject: [PATCH 2/5] Small test fix --- tests/integration/test_broken_projections/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index b4e8b6e5021..5a6232819b6 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -366,7 +366,7 @@ def test_broken_ignored(cluster): ) assert "all_3_3_0" in get_broken_projections_info(node, table_name, active=False) - assert "all_2_2_0" in get_broken_projections_info(node, table_name, active=False) + assert "all_2_2_0" in get_broken_projections_info(node, table_name, active=True) # 0 because of all_2_2_0 check(node, table_name, 0) From 0a5ce69880536c9125f5e990533b886c12a01d3e Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 26 Mar 2024 21:59:11 +0100 Subject: [PATCH 3/5] Review fix --- src/Interpreters/MutationsInterpreter.cpp | 2 +- src/Storages/MergeTree/checkDataPart.cpp | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 7d5443b3e65..35fd549559b 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -409,7 +409,7 @@ MutationsInterpreter::MutationsInterpreter( , available_columns(std::move(available_columns_)) , settings(std::move(settings_)) , select_limits(SelectQueryOptions().analyze(!settings.can_execute).ignoreLimits()) - , logger(getLogger("MutationsInterpreter")) + , logger(getLogger("MutationsInterpreter(" + source.getStorage()->getStorageID().getFullTableName() + ")")) { auto new_context = Context::createCopy(context_); if (new_context->getSettingsRef().allow_experimental_analyzer) diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index cabe06b0dde..d64568e0c3e 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -305,13 +305,13 @@ static IMergeTreeDataPart::Checksums checkDataPart( projections_on_disk.erase(projection_file); checksums_txt.remove(projection_file); - const auto exception = getCurrentExceptionMessage(true); + const auto exception_message = getCurrentExceptionMessage(true); if (!projection->is_broken) { LOG_WARNING(log, "Marking projection {} as broken ({}). Reason: {}", - name, projection_file, exception); - projection->setBrokenReason(exception, getCurrentExceptionCode()); + name, projection_file, exception_message); + projection->setBrokenReason(exception_message, getCurrentExceptionCode()); } if (throw_on_broken_projection) @@ -321,7 +321,7 @@ static IMergeTreeDataPart::Checksums checkDataPart( broken_projections_message += fmt::format( "Part {} has a broken projection {} (error: {})", - data_part->name, name, exception); + data_part->name, name, exception_message); } continue; From 6a585600f0d8ce31201fa909d514ac1a913b0325 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 27 Mar 2024 19:21:13 +0100 Subject: [PATCH 4/5] Update test --- tests/integration/test_broken_projections/test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index 5a6232819b6..adb7b58f171 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -728,10 +728,13 @@ def test_mutation_with_broken_projection(cluster): # but all parts apart from the first have only hardlinks to files in previous part. assert ["all_0_0_0_5", "all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts( node, table_name - ) + ) or ["all_1_1_0_5", "all_2_2_0_5", "all_3_3_0_5"] == get_parts(node, table_name) # Still broken because it was hardlinked. - assert "all_2_2_0_5" in get_broken_projections_info(node, table_name) + broken = get_broken_projections_info(node, table_name) + assert ( + "all_2_2_0_5" in broken or "" == broken + ) # second could be because of a merge. check(node, table_name, 0) From ffd29f8b72f22f6728862128cfdface206905e83 Mon Sep 17 00:00:00 2001 From: Kseniia Sumarokova <54203879+kssenii@users.noreply.github.com> Date: Wed, 27 Mar 2024 22:29:36 +0100 Subject: [PATCH 5/5] Update test.py --- tests/integration/test_broken_projections/test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index adb7b58f171..ccc64de4133 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -736,7 +736,10 @@ def test_mutation_with_broken_projection(cluster): "all_2_2_0_5" in broken or "" == broken ) # second could be because of a merge. - check(node, table_name, 0) + if "" == broken: + check(node, table_name, 1) + else: + check(node, table_name, 0) node.query( f"ALTER TABLE {table_name} DELETE WHERE c == 13 SETTINGS mutations_sync = 1"