diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index b29cfef869b..473ea3fefb8 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -246,9 +246,10 @@ private: /// Поток, выбирающий куски для слияния. std::thread merge_selecting_thread; + /// Поток, удаляющий информацию о старых блоках из ZooKeeper. + std::thread clear_old_blocks_thread; - /// Когда последний раз выбрасывали старые данные из ZooKeeper. - time_t clear_old_blocks_time = 0; + /// Когда последний раз выбрасывали старые логи из ZooKeeper. time_t clear_old_logs_time = 0; Logger * log; @@ -347,6 +348,10 @@ private: */ void mergeSelectingThread(); + /** В бесконечном цикле вызывает clearOldBlocks. + */ + void clearOldBlocksThread(); + /// Вызывается во время выбора кусков для слияния. bool canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 9a95a8626d7..ffac61cb3a3 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -369,10 +369,15 @@ void StorageReplicatedMergeTree::clearOldBlocks() if (!zookeeper.exists(zookeeper_path + "/blocks", &stat)) throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); + int children_count = stat.getnumChildren(); + /// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно. - if (static_cast(stat.getnumChildren()) < data.settings.replicated_deduplication_window * 1.1) + if (static_cast(children_count) < data.settings.replicated_deduplication_window * 1.1) return; + LOG_TRACE(log, "Clearing about " << static_cast(children_count) - data.settings.replicated_deduplication_window + << " old blocks from ZooKeeper"); + Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks"); std::vector > timed_blocks; @@ -384,10 +389,14 @@ void StorageReplicatedMergeTree::clearOldBlocks() timed_blocks.push_back(std::make_pair(stat.getczxid(), block)); } - std::sort(timed_blocks.begin(), timed_blocks.end()); + std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater>()); for (size_t i = data.settings.replicated_deduplication_window; i < timed_blocks.size(); ++i) { - zookeeper.remove(zookeeper_path + "/blocks/" + timed_blocks[i].second); + zkutil::Ops ops; + ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1)); + ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1)); + ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1)); + zookeeper.multi(ops); } LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper"); @@ -882,21 +891,38 @@ void StorageReplicatedMergeTree::mergeSelectingThread() tryLogCurrentException(__PRETTY_FUNCTION__); } - if (shutdown_called) + if (shutdown_called || !is_leader_node) break; - /// Каждую минуту выбрасываем старые блоки. - if (time(0) - clear_old_blocks_time > 60) - { - clear_old_blocks_time = time(0); - clearOldBlocks(); - } - if (!success) std::this_thread::sleep_for(MERGE_SELECTING_SLEEP); } } +void StorageReplicatedMergeTree::clearOldBlocksThread() +{ + while (!shutdown_called && is_leader_node) + { + try + { + clearOldBlocks(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + /// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду. + /// TODO: Лучше во всех подобных местах использовать condition variable. + for (size_t i = 0; i < 60; ++i) + { + if (shutdown_called || !is_leader_node) + break; + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } +} + bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) { if (currently_merging.count(left->name) || currently_merging.count(right->name)) @@ -925,6 +951,7 @@ void StorageReplicatedMergeTree::becomeLeader() LOG_INFO(log, "Became leader"); is_leader_node = true; merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this); + clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this); } String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active) @@ -992,7 +1019,10 @@ void StorageReplicatedMergeTree::shutdown() LOG_TRACE(log, "Waiting for threads to finish"); if (is_leader_node) + { merge_selecting_thread.join(); + clear_old_blocks_thread.join(); + } queue_updating_thread.join(); for (auto & thread : queue_threads) thread.join();