From 77589b0c6e73027311e3177e64a6a670e4ea8791 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Thu, 22 May 2014 14:37:17 +0400 Subject: [PATCH] Merge --- .../DB/Storages/MergeTree/MergeTreeData.h | 3 +- .../Storages/StorageReplicatedMergeTree.cpp | 94 +++++++++++++++---- 2 files changed, 77 insertions(+), 20 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h index 1096abc3a80..d47d8bcb54a 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeData.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeData.h @@ -427,7 +427,8 @@ public: /// Кладет в DataPart данные из имени кусочка. void parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches = nullptr); - std::string getTableName() const { throw Exception("Logical error: calling method getTableName of not a table.", ErrorCodes::LOGICAL_ERROR); } + std::string getTableName() const { throw Exception("Logical error: calling method getTableName of not a table.", + ErrorCodes::LOGICAL_ERROR); } const NamesAndTypesList & getColumnsList() const { return *columns; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 5c7ef76100f..f19cb113dbb 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -254,11 +254,15 @@ bool StorageReplicatedMergeTree::isTableEmpty() void StorageReplicatedMergeTree::checkParts() { Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts"); + + /// Куски в ZK. NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end()); MergeTreeData::DataParts parts = data.getAllDataParts(); + /// Локальные куски, которых нет в ZK. MergeTreeData::DataParts unexpected_parts; + for (const auto & part : parts) { if (expected_parts.count(part->name)) @@ -271,31 +275,61 @@ void StorageReplicatedMergeTree::checkParts() } } - /// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим. + /// Какие локальные куски добавить в ZK. MergeTreeData::DataPartsVector parts_to_add; + + /// Какие куски нужно забрать с других реплик. + Strings parts_to_fetch; + for (const String & missing_name : expected_parts) { + /// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим. auto containing = data.getContainingPart(missing_name); - if (!containing) - throw Exception("Not found " + toString(expected_parts.size()) - + " parts (including " + missing_name + ") in table " + getTableName(), - ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART); - LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists"); - if (unexpected_parts.count(containing)) + if (containing) { - parts_to_add.push_back(containing); - unexpected_parts.erase(containing); + LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists"); + if (unexpected_parts.count(containing)) + { + parts_to_add.push_back(containing); + unexpected_parts.erase(containing); + } + } + else + { + parts_to_fetch.push_back(missing_name); } } - if (parts_to_add.size() > 2 || - unexpected_parts.size() > 2 || - expected_parts.size() > 20) + for (const String & name : parts_to_fetch) + expected_parts.erase(name); + + bool skip_sanity_check = false; + if (zookeeper->exists(replica_path + "/flags/force_restore_data")) { - throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper." - " There are " + toString(unexpected_parts.size()) + " unexpected parts, " - + toString(parts_to_add.size()) + " unexpectedly merged parts, " - + toString(expected_parts.size()) + " unexpectedly obsolete parts", + skip_sanity_check = true; + zookeeper->remove(replica_path + "/flags/force_restore_data"); + } + + String sanity_report = + "There are " + toString(unexpected_parts.size()) + " unexpected parts, " + + toString(parts_to_add.size()) + " unexpectedly merged parts, " + + toString(expected_parts.size()) + " unexpectedly obsolete parts, " + + toString(parts_to_fetch.size()) + " missing parts"; + bool insane = + parts_to_add.size() > 2 || + unexpected_parts.size() > 2 || + expected_parts.size() > 20 || + parts_to_fetch.size() > 2; + + if (skip_sanity_check) + { + LOG_WARNING(log, "Skipping the limits on severity of changes to data parts (flag " + << replica_path << "/flags/force_restore_data). " << sanity_report); + } + else if (insane) + { + throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. " + + sanity_report, ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS); } @@ -303,6 +337,7 @@ void StorageReplicatedMergeTree::checkParts() for (MergeTreeData::DataPartPtr part : parts_to_add) { LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name); + zkutil::Ops ops; checkPartAndAddToZooKeeper(part, ops); zookeeper->multi(ops); @@ -311,16 +346,37 @@ void StorageReplicatedMergeTree::checkParts() /// Удалим из ZK информацию о кусках, покрытых только что добавленными. for (const String & name : expected_parts) { + LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name); + zkutil::Ops ops; ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1)); ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1)); zookeeper->multi(ops); } + /// Добавим в очередь задание забрать недостающие куски с других реплик и уберем из ZK информацию, что они у нас есть. + for (const String & name : parts_to_fetch) + { + LOG_ERROR(log, "Removing missing part from ZooKeeper and queueing a fetch: " << name); + + LogEntry log_entry; + log_entry.type = LogEntry::GET_PART; + log_entry.source_replica = replica_name; + log_entry.new_part_name = name; + + /// Полагаемся, что это происходит до загрузки очереди (loadQueue). + zkutil::Ops ops; + ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1)); + ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1)); + ops.push_back(new zkutil::Op::Create( + replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential)); + zookeeper->multi(ops); + } + /// Удалим лишние локальные куски. for (MergeTreeData::DataPartPtr part : unexpected_parts) { - LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name); + LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name); data.renameAndDetachPart(part, "ignored_"); } } @@ -588,7 +644,7 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name) - LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist. This is a bug."); + LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist."); bool do_fetch = false; @@ -863,7 +919,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread() continue; if (part->name != name) { - LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in" << part->name); + LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in " << part->name); continue; } if (part->size * data.index_granularity > 25 * 1024 * 1024)