From 75900f494f088208632892a55c3a5449b7eea919 Mon Sep 17 00:00:00 2001 From: VadimPE Date: Thu, 9 Aug 2018 18:06:39 +0300 Subject: [PATCH] CLICKHOUSE-3847 add "is_lost". load() in Queue can check duplicate records --- .../ReplicatedMergeTreeCleanupThread.cpp | 43 ++++++++++++++++++- .../ReplicatedMergeTreeCleanupThread.h | 5 +++ .../MergeTree/ReplicatedMergeTreeQueue.cpp | 19 ++++++-- .../MergeTree/ReplicatedMergeTreeQueue.h | 6 +-- .../ReplicatedMergeTreeRestartingThread.cpp | 6 ++- .../Storages/StorageReplicatedMergeTree.cpp | 9 ++-- 6 files changed, 75 insertions(+), 13 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 0e22add1e06..880d58b054e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -93,6 +93,9 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() : 0]; String min_pointer_inactive_replica_str; + std::unordered_map log_pointers_lost_replicas; + std::unordered_map log_pointers_version; + for (const String & replica : replicas) { zkutil::Stat log_pointer_stat; @@ -110,6 +113,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() min_pointer_active_replica = std::min(min_pointer_active_replica, log_pointer); else { + log_pointers_lost_replicas[replica] = log_pointer_str; + log_pointers_version[replica] = log_pointer_stat.version; if (log_pointer_str >= min_saved_record_log_str) { @@ -132,11 +137,21 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() /// We will not touch records that are no less than `min_pointer_active_replica`. entries.erase(std::lower_bound(entries.begin(), entries.end(), min_pointer_replica_str), entries.end()); - /// We must check if we are only active_node - if (entries.empty()) return; + /// We must mark lost replicas. + try + { + markLostReplicas(log_pointers_lost_replicas, log_pointers_version, entries[0], zookeeper); + } + catch (const zkutil::KeeperException & e) + { + if (e.code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS) + throw e; + else return; + } + zkutil::Requests ops; for (size_t i = 0; i < entries.size(); ++i) { @@ -155,6 +170,30 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() } +void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map & log_pointers_lost_replicas, + const std::unordered_map & log_pointers_version, + const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper) +{ + std::vector futures; + + for (auto pair : log_pointers_lost_replicas) + { + String replica = pair.first; + if (pair.second <= remove_border) + { + zkutil::Requests ops; + /// If log pointer changes version we can not mark replicas, so we check it. + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer", log_pointers_version.at(replica))); + ops.emplace_back(zkutil::makeCreateRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "", zkutil::CreateMode::Persistent)); + futures.push_back(zookeeper->asyncMulti(ops)); + } + } + + for (auto & future : futures) + future.get(); +} + + struct ReplicatedMergeTreeCleanupThread::NodeWithStat { String node; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index 7d45a158c4c..1baf50cd6ec 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -39,6 +39,11 @@ private: /// Remove old records from ZooKeeper. void clearOldLogs(); + /// Mark lost replicas. + void markLostReplicas(const std::unordered_map & log_pointers_lost_replicas, + const std::unordered_map & log_pointers_version, + const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper); + /// Remove old block hashes from ZooKeeper. This is done by the leader replica. void clearOldBlocks(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 91663dd7457..f8d804f90f7 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -47,13 +47,25 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) std::optional min_unprocessed_insert_time_changed; { - std::lock_guard lock(state_mutex); + std::lock_guard pull_logs_lock(pull_logs_to_queue_mutex); String log_pointer_str = zookeeper->get(replica_path + "/log_pointer"); log_pointer = log_pointer_str.empty() ? 0 : parse(log_pointer_str); Strings children = zookeeper->getChildren(queue_path); - LOG_DEBUG(log, "Having " << children.size() << " queue entries to load."); + + std::unordered_set already_loaded_paths; + auto to_remove_it = std::remove_if( + children.begin(), children.end(), [&](const String & path) + { + return already_loaded_paths.count(path); + }); + + LOG_DEBUG(log, + "Having " << (to_remove_it - children.begin()) << " queue entries to load, " + << (children.end() - to_remove_it) << " entries already loaded."); + children.erase(to_remove_it, children.end()); + std::sort(children.begin(), children.end()); zkutil::AsyncResponses futures; @@ -68,6 +80,8 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) LogEntryPtr entry = LogEntry::parse(res.data, res.stat); entry->znode_name = future.first; + std::lock_guard lock(state_mutex); + insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); updated = true; @@ -93,7 +107,6 @@ void ReplicatedMergeTreeQueue::initialize( log = &Logger::get(logger_name); addVirtualParts(parts); - load(zookeeper); } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 2e642ad148c..6ffa0709404 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -147,9 +147,6 @@ private: /// Put a set of (already existing) parts in virtual_parts. void addVirtualParts(const MergeTreeData::DataParts & parts); - /// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). - bool load(zkutil::ZooKeeperPtr zookeeper); - void insertUnlocked( const LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, std::lock_guard & state_lock); @@ -233,6 +230,9 @@ public: */ bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name); + /// Load (initialize) a queue from ZooKeeper (/replicas/me/queue/). + bool load(zkutil::ZooKeeperPtr zookeeper); + bool removeFromVirtualParts(const MergeTreePartInfo & part_info); /** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 5328e68d40f..a1bc0892ff4 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -200,11 +200,15 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.cloneReplicaIfNeeded(); + const auto & zookeeper = storage.getZooKeeper(); + + storage.queue.load(zookeeper); + /// pullLogsToQueue() after we mark replica 'is_active' and clone(); /// because cleanup_thread don't del our log_pointer. storage.queue.pullLogsToQueue(storage.getZooKeeper()); storage.last_queue_update_finish_time.store(time(nullptr)); - + storage.cloneReplicaIfNeeded(); updateQuorumIfWeHavePart(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 96913af19be..ea74189329f 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1977,11 +1977,10 @@ bool StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku Strings entries = zookeeper->getChildren(zookeeper_path + "/log"); - if (!entries.empty()) + if (entries.empty()) return false; std::sort(entries.begin(), entries.end()); - if ("log-" + padIndex(parse(raw_log_pointer)) < entries[0]) return false; @@ -2053,12 +2052,14 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded() { String source_replica_path = zookeeper_path + "/replicas/" + replica_name; String source_log_pointer_raw = zookeeper->get(source_replica_path + "/log_pointer"); - if ((source_replica_path != replica_path) && (!source_log_pointer_raw.empty()) && ("log-" + padIndex(parse(source_log_pointer_raw)) >= entries[0])) + if ((source_replica_path != replica_path) && (!zookeeper->exists(source_replica_path + "/is_lost"))) source_replica = replica_name; } } - } while (cloneReplica(source_replica, zookeeper)); + } while (!cloneReplica(source_replica, zookeeper)); + + queueUpdatingTask(); }