#include #include #include #include namespace DB { namespace ErrorCodes { extern const int NOT_FOUND_NODE; } ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_) : storage(storage_), log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")), thread([this] { run(); }) {} void ReplicatedMergeTreeCleanupThread::run() { setThreadName("ReplMTCleanup"); const auto CLEANUP_SLEEP_MS = storage.data.settings.cleanup_delay_period * 1000; while (!storage.shutdown_called) { try { iterate(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } storage.shutdown_event.tryWait(CLEANUP_SLEEP_MS); } LOG_DEBUG(log, "Cleanup thread finished"); } void ReplicatedMergeTreeCleanupThread::iterate() { storage.clearOldPartsAndRemoveFromZK(log); storage.data.clearOldTemporaryDirectories(); if (storage.is_leader_node) { clearOldLogs(); clearOldBlocks(); } } void ReplicatedMergeTreeCleanupThread::clearOldLogs() { auto zookeeper = storage.getZooKeeper(); zkutil::Stat stat; if (!zookeeper->exists(storage.zookeeper_path + "/log", &stat)) throw Exception(storage.zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE); int children_count = stat.numChildren; /// We will wait for 1.1 times more records to accumulate than necessary. if (static_cast(children_count) < storage.data.settings.replicated_logs_to_keep * 1.1) return; Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); UInt64 min_pointer = std::numeric_limits::max(); for (const String & replica : replicas) { String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); if (pointer.empty()) return; min_pointer = std::min(min_pointer, parse(pointer)); } Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log"); std::sort(entries.begin(), entries.end()); /// We will not touch the last `replicated_logs_to_keep` records. entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end()); /// We will not touch records that are no less than `min_pointer`. entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end()); if (entries.empty()) return; zkutil::Ops ops; for (size_t i = 0; i < entries.size(); ++i) { ops.emplace_back(std::make_unique(storage.zookeeper_path + "/log/" + entries[i], -1)); if (ops.size() > 400 || i + 1 == entries.size()) { /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(std::make_unique(storage.zookeeper_path + "/replicas", stat.version)); zookeeper->multi(ops); ops.clear(); } } LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back()); } void ReplicatedMergeTreeCleanupThread::clearOldBlocks() { auto zookeeper = storage.getZooKeeper(); zkutil::Stat stat; if (!zookeeper->exists(storage.zookeeper_path + "/blocks", &stat)) throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE); LOG_TRACE(log, "Checking " << stat.numChildren << " blocks to clear old ones from ZooKeeper. This might take several minutes."); Strings blocks = zookeeper->getChildren(storage.zookeeper_path + "/blocks"); using TimedBlock = std::pair; using TimedBlocksComparator = std::greater; std::vector timed_blocks; for (const String & block : blocks) { zkutil::Stat stat; zookeeper->exists(storage.zookeeper_path + "/blocks/" + block, &stat); timed_blocks.emplace_back(stat.ctime, block); } if (timed_blocks.empty()) return; std::sort(timed_blocks.begin(), timed_blocks.end(), TimedBlocksComparator()); /// Use ZooKeeper's first node (last according to time) timestamp as "current" time. Int64 current_time = timed_blocks.front().first; Int64 time_treshold = std::max(0L, current_time - static_cast(storage.data.settings.replicated_deduplication_window_seconds)); TimedBlock block_treshold(time_treshold, ""); auto first_outdated_block_fixed_treshold = timed_blocks.begin() + storage.data.settings.replicated_deduplication_window; auto first_outdated_block_time_treshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_treshold, TimedBlocksComparator()); auto first_outdated_block = std::min(first_outdated_block_fixed_treshold, first_outdated_block_time_treshold); for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { /// TODO After about half a year, we could replace this to multi op, because there will be no obsolete children nodes. zookeeper->removeRecursive(storage.zookeeper_path + "/blocks/" + it->second); } LOG_TRACE(log, "Cleared " << timed_blocks.end() - first_outdated_block << " old blocks from ZooKeeper"); } }