diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 3a44359b537..918a4cda714 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -72,6 +72,7 @@ namespace ErrorCodes extern const int BAD_TTL_FILE; extern const int NOT_IMPLEMENTED; extern const int NO_SUCH_COLUMN_IN_TABLE; + extern const int FILE_DOESNT_EXIST; } @@ -749,8 +750,16 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks /// Probably there is something wrong with files of this part. /// So it can be helpful to add to the error message some information about those files. String files_in_part; + for (auto it = getDataPartStorage().iterate(); it->isValid(); it->next()) - files_in_part += fmt::format("{}{} ({} bytes)", (files_in_part.empty() ? "" : ", "), it->name(), getDataPartStorage().getFileSize(it->name())); + { + std::string file_info; + if (!getDataPartStorage().isDirectory(it->name())) + file_info = fmt::format(" ({} bytes)", getDataPartStorage().getFileSize(it->name())); + + files_in_part += fmt::format("{}{}{}", (files_in_part.empty() ? "" : ", "), it->name(), file_info); + + } if (!files_in_part.empty()) e->addMessage("Part contains files: {}", files_in_part); if (isEmpty()) @@ -2141,7 +2150,27 @@ void IMergeTreeDataPart::checkConsistencyBase() const } } - checksums.checkSizes(getDataPartStorage()); + const auto & data_part_storage = getDataPartStorage(); + for (const auto & [filename, checksum] : checksums.files) + { + try + { + checksum.checkSize(data_part_storage, filename); + } + catch (const Exception & ex) + { + /// For projection parts check will mark them broken in loadProjections + if (!parent_part && filename.ends_with(".proj")) + { + std::string projection_name = fs::path(filename).stem(); + LOG_INFO(storage.log, "Projection {} doesn't exist on start for part {}, marking it as broken", projection_name, name); + if (hasProjection(projection_name)) + markProjectionPartAsBroken(projection_name, ex.message(), ex.code()); + } + else + throw; + } + } } else { diff --git a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp index b327480fa92..3ef36ce364c 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartChecksum.cpp @@ -100,12 +100,6 @@ void MergeTreeDataPartChecksums::checkEqual(const MergeTreeDataPartChecksums & r } } -void MergeTreeDataPartChecksums::checkSizes(const IDataPartStorage & storage) const -{ - for (const auto & [name, checksum] : files) - checksum.checkSize(storage, name); -} - UInt64 MergeTreeDataPartChecksums::getTotalSizeOnDisk() const { UInt64 res = 0; diff --git a/src/Storages/MergeTree/MergeTreeDataPartChecksum.h b/src/Storages/MergeTree/MergeTreeDataPartChecksum.h index 05178dc3a60..dc52f1ada2b 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartChecksum.h +++ b/src/Storages/MergeTree/MergeTreeDataPartChecksum.h @@ -65,9 +65,6 @@ struct MergeTreeDataPartChecksums static bool isBadChecksumsErrorCode(int code); - /// Checks that the directory contains all the needed files of the correct size. Does not check the checksum. - void checkSizes(const IDataPartStorage & storage) const; - /// Returns false if the checksum is too old. bool read(ReadBuffer & in); /// Assume that header with version (the first line) is read diff --git a/tests/integration/test_broken_projections/test.py b/tests/integration/test_broken_projections/test.py index 162c0dbaa2f..578ff42369c 100644 --- a/tests/integration/test_broken_projections/test.py +++ b/tests/integration/test_broken_projections/test.py @@ -4,6 +4,7 @@ import logging import string import random from helpers.cluster import ClickHouseCluster +from multiprocessing.dummy import Pool cluster = ClickHouseCluster(__file__) @@ -18,6 +19,12 @@ def cluster(): stay_alive=True, with_zookeeper=True, ) + cluster.add_instance( + "node_restart", + main_configs=["config.d/dont_start_broken.xml"], + stay_alive=True, + with_zookeeper=True, + ) logging.info("Starting cluster...") cluster.start() @@ -632,6 +639,49 @@ def test_broken_on_start(cluster): check(node, table_name, 0) +def test_disappeared_projection_on_start(cluster): + node = cluster.instances["node_restart"] + + table_name = "test_disapperead_projection" + create_table(node, table_name, 1) + + node.query(f"SYSTEM STOP MERGES {table_name}") + + insert(node, table_name, 0, 5) + insert(node, table_name, 5, 5) + insert(node, table_name, 10, 5) + insert(node, table_name, 15, 5) + + assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts( + node, table_name + ) + + def drop_projection(): + node.query( + f"ALTER TABLE {table_name} DROP PROJECTION proj2", + settings={"mutations_sync": "0"}, + ) + + p = Pool(2) + p.apply_async(drop_projection) + + for i in range(30): + create_query = node.query(f"SHOW CREATE TABLE {table_name}") + if "proj2" not in create_query: + break + time.sleep(0.5) + + assert "proj2" not in create_query + + # Remove 'proj2' for part all_2_2_0 + break_projection(node, table_name, "proj2", "all_2_2_0", "part") + + node.restart_clickhouse() + + # proj2 is not broken, it doesn't exist, but ok + check(node, table_name, 0, expect_broken_part="proj2", do_check_command=0) + + def test_mutation_with_broken_projection(cluster): node = cluster.instances["node"]