diff --git a/dbms/src/Storages/MergeTree/MergeTreePartInfo.h b/dbms/src/Storages/MergeTree/MergeTreePartInfo.h index 4bc660c84f1..a5be3010f95 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartInfo.h +++ b/dbms/src/Storages/MergeTree/MergeTreePartInfo.h @@ -40,6 +40,10 @@ struct MergeTreePartInfo String getPartName() const; String getPartNameV0(DayNum_t left_date, DayNum_t right_date) const; + UInt64 getBlocksCount() const + { + return static_cast(max_block - min_block + 1); + } static MergeTreePartInfo fromPartName(const String & part_name, MergeTreeDataFormatVersion format_version); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 14cc102f609..42023e82e4e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -741,6 +741,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) /// Which local parts to added into ZK. MergeTreeData::DataPartsVector parts_to_add; + UInt64 parts_to_add_rows = 0; /// Which parts should be taken from other replicas. Strings parts_to_fetch; @@ -756,6 +757,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) { parts_to_add.push_back(containing); unexpected_parts.erase(containing); + parts_to_add_rows += containing->rows_count; } } else @@ -768,21 +770,53 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) for (const String & name : parts_to_fetch) expected_parts.erase(name); + /** To check the adequacy, for the parts that are in the FS, but not in ZK, we will only consider not the most recent parts. * Because unexpected new parts usually arise only because they did not have time to enroll in ZK with a rough restart of the server. * It also occurs from deduplicated parts that did not have time to retire. */ size_t unexpected_parts_nonnew = 0; + UInt64 unexpected_parts_nonnew_rows = 0; + UInt64 unexpected_parts_rows = 0; for (const auto & part : unexpected_parts) + { if (part->info.level > 0) + { ++unexpected_parts_nonnew; + unexpected_parts_nonnew_rows += part->rows_count; + } - String sanity_report = "There are " - + toString(unexpected_parts.size()) + " unexpected parts (" - + toString(unexpected_parts_nonnew) + " of them is not just-written), " - + toString(parts_to_add.size()) + " unexpectedly merged parts, " - + toString(expected_parts.size()) + " missing obsolete parts, " - + toString(parts_to_fetch.size()) + " missing parts"; + unexpected_parts_rows += part->rows_count; + } + + + /// Additional helpful statistics + auto get_blocks_count_in_data_part = [&] (const String & part_name) -> UInt64 + { + MergeTreePartInfo part_info; + if (MergeTreePartInfo::tryParsePartName(part_name, &part_info, data.format_version)) + return part_info.getBlocksCount(); + + LOG_ERROR(log, "Unexpected part name: " << part_name); + return 0; + }; + + UInt64 parts_to_fetch_blocks = 0; + for (const String & name : parts_to_fetch) + parts_to_fetch_blocks += get_blocks_count_in_data_part(name); + + UInt64 expected_parts_blocks = 0; + for (const String & name : expected_parts) + expected_parts_blocks += get_blocks_count_in_data_part(name); + + + std::stringstream sanity_report; + sanity_report << "There are " + << unexpected_parts.size() << " unexpected parts with " << unexpected_parts_rows << " rows (" + << unexpected_parts_nonnew << " of them is not just-written with " << unexpected_parts_rows << " rows), " + << parts_to_add.size() << " unexpectedly merged parts with " << parts_to_add_rows << " rows, " + << expected_parts.size() << " missing obsolete parts (with " << expected_parts_blocks << " blocks), " + << parts_to_fetch.size() << " missing parts (with " << parts_to_fetch_blocks << " blocks)."; /** We can automatically synchronize data, * if the ratio of the total number of errors to the total number of parts (minimum - on the local filesystem or in ZK) @@ -793,17 +827,28 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks) * In this case, the protection mechanism does not allow the server to start. */ - size_t min_parts_local_or_expected = std::min(expected_parts_vec.size(), parts.size()); - size_t total_difference = parts_to_add.size() + unexpected_parts_nonnew + parts_to_fetch.size(); + UInt64 total_rows_on_filesystem = 0; + for (const auto & part : parts) + total_rows_on_filesystem += part->rows_count; - bool insane = total_difference > min_parts_local_or_expected * data.settings.replicated_max_ratio_of_wrong_parts; + UInt64 total_suspicious_rows = parts_to_add_rows + unexpected_parts_rows; + UInt64 total_suspicious_rows_no_new = parts_to_add_rows + unexpected_parts_nonnew_rows; + + bool insane = total_suspicious_rows > total_rows_on_filesystem * data.settings.replicated_max_ratio_of_wrong_parts; if (insane && !skip_sanity_checks) - throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. " - + sanity_report, ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS); + { + std::stringstream why; + why << "The local set of parts of table " << database_name << "." << table_name << " doesn't look like the set of parts " + << "in ZooKeeper: " + << formatReadableQuantity(total_suspicious_rows) << " rows of " << formatReadableQuantity(total_rows_on_filesystem) + << " total rows in filesystem are suspicious."; - if (total_difference > 0) - LOG_WARNING(log, sanity_report); + throw Exception(why.str() + " " + sanity_report.str(), ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS); + } + + if (total_suspicious_rows_no_new > 0) + LOG_WARNING(log, sanity_report.str()); /// Add information to the ZK about the parts that cover the missing parts. for (const MergeTreeData::DataPartPtr & part : parts_to_add) diff --git a/dbms/tests/integration/test_random_inserts/test.sh b/dbms/tests/integration/test_random_inserts/test.sh index 76f1b05e61e..9022f307d56 100755 --- a/dbms/tests/integration/test_random_inserts/test.sh +++ b/dbms/tests/integration/test_random_inserts/test.sh @@ -2,9 +2,9 @@ #set -e [[ -n "$1" ]] && host="$1" || host="localhost" -[[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 10 )) -[[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 10 )) -[[ -n "$4" ]] && iters_per_timestamp="$4" || iters_per_timestamp=1 +[[ -n "$2" ]] && min_timestamp="$2" || min_timestamp=$(( $(date +%s) - 60 )) +[[ -n "$3" ]] && max_timestamp="$3" || max_timestamp=$(( $(date +%s) + 60 )) +[[ -n "$4" ]] && iters_per_timestamp="$4" || iters_per_timestamp=5 timestamps=`seq $min_timestamp $max_timestamp`