From d943e8dc63f7d25e13d41d1a2ac86e2279bca58c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 8 May 2018 17:13:43 +0300 Subject: [PATCH] wip --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 11 +++++++---- dbms/src/Storages/StorageReplicatedMergeTree.cpp | 16 +++++++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 3954da00d56..ab3b9983931 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -862,14 +862,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const MergeTreePartInfo & part_info, std::lock_guard &) const { + if (part_info.version) + return part_info.version; + auto in_partition = mutations_by_partition.find(part_info.partition_id); if (in_partition == mutations_by_partition.end()) - return -1; + return 0; - Int64 data_version = part_info.version ? part_info.version : part_info.min_block; - auto it = in_partition->second.upper_bound(data_version); + auto it = in_partition->second.upper_bound(part_info.min_block); if (it == in_partition->second.begin()) - return -1; /// 0 can be a valid mutation block number. + return 0; + --it; return it->first; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index a2b04d17651..e33dcb68d51 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1988,10 +1988,11 @@ void StorageReplicatedMergeTree::mergeSelectingThread() /// Will be updated below. std::chrono::steady_clock::time_point now; + String reason; auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *) { - return queue.canMergeParts(*left, *right) - && cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); + cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right); + return queue.canMergeParts(left, right, &reason); }; while (is_leader) @@ -2028,7 +2029,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread() now = std::chrono::steady_clock::now(); - MergeTreeDataMerger::FuturePart future_merged_part; if (max_future_part_size > 0) { MergeTreeDataMerger::FuturePart future_merged_part; @@ -2039,6 +2039,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread() } else { + if (!reason.empty()) + LOG_TRACE(log, "Couldn't select merge because: " << reason); + /// Choose a part to mutate. /// TODO finish early if there are no mutations. @@ -2435,6 +2438,9 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name) bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum) { + // if (std::hash()(part_name) % 4 == 0) + // throw Exception("OLOLO"); + if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting})) { LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); @@ -2703,8 +2709,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason) { - return queue.canMergeParts(*left, *right, out_reason) - && canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason); + return queue.canMergeParts(left, right, out_reason); + // && canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason); }; ReplicatedMergeTreeLogEntryData merge_entry;