From 9f4377b771cc2f205f06027593481a5a2dab66ec Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 4 May 2018 23:17:10 +0300 Subject: [PATCH] load active abandonable locks during pullLogsToQueue (TODO: load quorum parts) --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 137 +++++++++++++++++- .../MergeTree/ReplicatedMergeTreeQueue.h | 6 + .../Storages/StorageReplicatedMergeTree.cpp | 18 --- 3 files changed, 140 insertions(+), 21 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 32b0d8cc10b..3894f7e3ef6 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -94,7 +94,7 @@ void ReplicatedMergeTreeQueue::initialize( void ReplicatedMergeTreeQueue::insertUnlocked(LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, std::lock_guard &) { - virtual_parts.add(entry->new_part_name); + next_virtual_parts.add(entry->new_part_name); /// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted if (entry->type != LogEntry::DROP_RANGE) @@ -122,6 +122,7 @@ void ReplicatedMergeTreeQueue::insert(zkutil::ZooKeeperPtr zookeeper, LogEntryPt { std::lock_guard lock(mutex); insertUnlocked(entry, min_unprocessed_insert_time_changed, lock); + /// TODO: do something with next_virtual_parts } updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); @@ -259,6 +260,59 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri } +std::unordered_map> ReplicatedMergeTreeQueue::loadCurrentInserts(zkutil::ZooKeeperPtr & zookeeper) const +{ + std::unordered_map> result; + + std::unordered_set abandonable_lock_holders; + for (const String & entry : zookeeper->getChildren(zookeeper_path + "/temp")) + { + if (startsWith(entry, "abandonable_lock-")) + abandonable_lock_holders.insert(zookeeper_path + "/temp/" + entry); + } + + if (abandonable_lock_holders.empty()) + return result; + + Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers"); + std::vector> lock_futures; + for (const String & partition : partitions) + lock_futures.push_back(zookeeper->asyncGetChildren(zookeeper_path + "/block_numbers/" + partition)); + + struct BlockInfo + { + String partition; + Int64 number; + String zk_path; + std::future contents_future; + }; + + std::vector block_infos; + for (size_t i = 0; i < partitions.size(); ++i) + { + Strings partition_block_numbers = lock_futures[i].get().names; + for (const String & entry : partition_block_numbers) + { + /// TODO: cache block numbers that are abandoned. + /// We won't need to check them on the next iteration. + Int64 block_number = parse(entry.substr(strlen("block-"))); + String zk_path = zookeeper_path + "/block_numbers/" + partitions[i] + "/" + entry; + block_infos.push_back( + BlockInfo{partitions[i], block_number, zk_path, zookeeper->asyncTryGet(zk_path)}); + } + } + + for (BlockInfo & block : block_infos) + { + zkutil::GetResponse resp = block.contents_future.get(); + if (!resp.error && abandonable_lock_holders.count(resp.data)) + result[block.partition].insert(block.number); + } + + return result; +} + + bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, zkutil::EventPtr next_update_event) { std::lock_guard lock(pull_logs_to_queue_mutex); @@ -395,6 +449,37 @@ bool ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, z } } + auto new_current_inserts = loadCurrentInserts(zookeeper); + + Strings new_log_entries = zookeeper->getChildren(zookeeper_path + "/log"); + if (!log_entries.empty()) + { + new_log_entries.erase( + std::remove_if(new_log_entries.begin(), new_log_entries.end(), + [&](const String & entry) { return entry <= log_entries.back(); }), + new_log_entries.end()); + } + + std::vector> new_log_entry_futures; + for (const String & entry : new_log_entries) + new_log_entry_futures.push_back(zookeeper->asyncTryGet(zookeeper_path + "/log/" + entry)); + + std::vector new_virtual_parts; + for (auto & future : new_log_entry_futures) + { + zkutil::GetResponse res = future.get(); + new_virtual_parts.emplace_back(LogEntry::parse(res.data, res.stat)->new_part_name); + } + + { + std::lock_guard lock(mutex); + + virtual_parts = next_virtual_parts; + current_inserts = new_current_inserts; + for (const String & new_part : new_virtual_parts) + next_virtual_parts.add(new_part); + } + return !log_entries.empty(); } @@ -884,6 +969,17 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const MergeTreeDataPart & left, con if (left.name == right.name) return false; + if (left.info.partition_id != right.info.partition_id) + { + /// If we end here, most likely it is a bug in the merge selector, + /// but we still can return sensible results in this case. + if (out_reason) + *out_reason = "Parts " + left.name + " and " + right.name + " belong to different partitions"; + return false; + } + + std::lock_guard lock(mutex); + auto set_reason = [&out_reason] (const String & part_name) { if (out_reason) @@ -891,14 +987,49 @@ bool ReplicatedMergeTreeQueue::canMergeParts(const MergeTreeDataPart & left, con return false; }; - std::lock_guard lock(mutex); - if (virtual_parts.getContainingPart(left.info) != left.info) return set_reason(left.name); if (virtual_parts.getContainingPart(right.info) != right.info) return set_reason(right.name); + Int64 left_max_block = left.info.max_block; + Int64 right_min_block = right.info.min_block; + + if (left_max_block > right_min_block) + std::swap(left_max_block, right_min_block); + + if (left_max_block + 1 < right_min_block) + { + auto current_inserts_in_partition = current_inserts.find(left.info.partition_id); + + if (current_inserts_in_partition != current_inserts.end()) + { + const std::set & ephemeral_block_numbers = current_inserts_in_partition->second; + + auto left_eph_it = ephemeral_block_numbers.upper_bound(left_max_block); + if (left_eph_it != ephemeral_block_numbers.end() && *left_eph_it < right_min_block) + { + if (out_reason) + *out_reason = "Block number " + toString(*left_eph_it) + " is still being inserted between parts " + + left.name + " and " + right.name; + + return false; + } + } + + MergeTreePartInfo gap_part_info( + left.info.partition_id, left_max_block + 1, right_min_block - 1, 999999); + + Strings covered = next_virtual_parts.getPartsCoveredBy(gap_part_info); + if (!covered.empty()) + { + if (out_reason) + *out_reason = "There are " + toString(covered.size()) + " parts that are still not ready between " + left.name + " and " + right.name; + return false; + } + } + Int64 left_mutation = getCurrentMutationVersion(left.info, lock); Int64 right_mutation = getCurrentMutationVersion(right.info, lock); if (left_mutation != right_mutation) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 70365282482..f72a9a1f9b8 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -79,6 +79,8 @@ private: * This set is protected by its mutex. */ ActiveDataPartSet virtual_parts; + std::unordered_map> current_inserts; + ActiveDataPartSet next_virtual_parts; std::list mutations; std::unordered_map> mutations_by_partition; @@ -128,6 +130,9 @@ private: /// Returns list of currently executing entries blocking execution of specified CLEAR_COLUMN command Queue getConflictsForClearColumnCommand(const LogEntry & entry, String * out_conflicts_description, std::lock_guard &) const; + /// Get the map: partition ID -> block numbers of inserts that are currently committing. + std::unordered_map> loadCurrentInserts(zkutil::ZooKeeperPtr & zookeeper) const; + /// Marks the element of the queue as running. class CurrentlyExecuting { @@ -151,6 +156,7 @@ public: ReplicatedMergeTreeQueue(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) , virtual_parts(format_version) + , next_virtual_parts(format_version) { } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 08b0c45d5dd..a2b04d17651 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1879,24 +1879,6 @@ namespace } } - /// You can merge the parts, if all the numbers between them are abandoned - do not correspond to any blocks. - /// TODO: don't forbid merging across mutations. - const String & partition_id = left->info.partition_id; - for (Int64 number = left->info.max_block + 1; number <= right->info.min_block - 1; ++number) - { - String path1 = zookeeper_path + "/block_numbers/" + partition_id + "/block-" + padIndex(number); - String path2 = zookeeper_path + "/nonincrement_block_numbers/" + partition_id + "/block-" + padIndex(number); - - if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED && - AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED) - { - if (out_reason) - *out_reason = "Block " + toString(number) + " in gap between merging parts " + left->name + " and " - + right->name + " is not abandoned"; - return false; - } - } - return true; }