From 219e3ca3c637fc3e2eaf7e0a9eba1b2d976118a4 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 25 Jul 2014 20:09:58 +0400 Subject: [PATCH] Merge --- .../ReplicatedMergeTreeBlockOutputStream.h | 1 + .../Storages/StorageReplicatedMergeTree.cpp | 27 +++++++++---------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h index cc3d2f975f9..4c3b1044e80 100644 --- a/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h +++ b/dbms/include/DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h @@ -129,6 +129,7 @@ public: e.code == ZCONNECTIONLOSS) { transaction.commit(); + storage.enqueuePartForCheck(part->name); } throw; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index f29d09a9309..d0e5657c526 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -649,14 +649,12 @@ void StorageReplicatedMergeTree::clearOldBlocks() ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1)); ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1)); - if (ops.size() > 400) + if (ops.size() > 400 || i + 1 == timed_blocks.size()) { zookeeper->multi(ops); ops.clear(); } } - if (!ops.empty()) - zookeeper->multi(ops); LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); } @@ -1384,11 +1382,7 @@ void StorageReplicatedMergeTree::partCheckThread() * Для кусков, полученных в результате слияния такая проверка была бы некорректной, * потому что слитого куска может еще ни у кого не быть. */ - if (part_info.left != part_info.right) - { - LOG_WARNING(log, "Not checking if part " << part_name << " is lost because it is a result of a merge."); - } - else + if (part_info.left == part_info.right) { LOG_WARNING(log, "Checking if anyone has part covering " << part_name << "."); @@ -1412,13 +1406,7 @@ void StorageReplicatedMergeTree::partCheckThread() if (!found) { - /** Такая ситуация возможна при нормальной работе, без потери данных, например, так: - * ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK, - * получил operation timeout, удалил локальный кусок и бросил исключение, - * а на самом деле, кусок добавился в ZK. - */ - LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. " - << "There might or might not be a data loss."); + LOG_ERROR(log, "No replica has part covering " << part_name); ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed); /** Если ни у кого нет такого куска, удалим его из нашей очереди. @@ -1429,6 +1417,8 @@ void StorageReplicatedMergeTree::partCheckThread() * TODO: можно это исправить, сделав две директории block_numbers: для автоинкрементных и ручных нод. */ + bool was_in_queue = false; + { Poco::ScopedLock lock(queue_mutex); @@ -1442,6 +1432,7 @@ void StorageReplicatedMergeTree::partCheckThread() { zookeeper->remove(replica_path + "/queue/" + it->znode_name); queue.erase(it++); + was_in_queue = true; } else { @@ -1449,6 +1440,12 @@ void StorageReplicatedMergeTree::partCheckThread() } } } + + if (was_in_queue) + /** Такая ситуация возможна, если на всех репликах, где был кусок, он испортился. + * Например, у реплики, которая только что его записала, отключили питание, и данные не записались из кеша на диск. + */ + LOG_ERROR(log, "Part " << part_name << " is lost forever. Say goodbye to a piece of data!"); } } }