This commit is contained in:
Alexey Zatelepin 2018-05-08 17:13:43 +03:00
parent 7c9cd787cd
commit d943e8dc63
2 changed files with 18 additions and 9 deletions

View File

@ -862,14 +862,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
Int64 ReplicatedMergeTreeQueue::getCurrentMutationVersion(const MergeTreePartInfo & part_info, std::lock_guard<std::mutex> &) const
{
if (part_info.version)
return part_info.version;
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
return -1;
return 0;
Int64 data_version = part_info.version ? part_info.version : part_info.min_block;
auto it = in_partition->second.upper_bound(data_version);
auto it = in_partition->second.upper_bound(part_info.min_block);
if (it == in_partition->second.begin())
return -1; /// 0 can be a valid mutation block number.
return 0;
--it;
return it->first;
}

View File

@ -1988,10 +1988,11 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// Will be updated below.
std::chrono::steady_clock::time_point now;
String reason;
auto can_merge = [&] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String *)
{
return queue.canMergeParts(*left, *right)
&& cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
return queue.canMergeParts(left, right, &reason);
};
while (is_leader)
@ -2028,7 +2029,6 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
now = std::chrono::steady_clock::now();
MergeTreeDataMerger::FuturePart future_merged_part;
if (max_future_part_size > 0)
{
MergeTreeDataMerger::FuturePart future_merged_part;
@ -2039,6 +2039,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
}
else
{
if (!reason.empty())
LOG_TRACE(log, "Couldn't select merge because: " << reason);
/// Choose a part to mutate.
/// TODO finish early if there are no mutations.
@ -2435,6 +2438,9 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
{
// if (std::hash<std::string>()(part_name) % 4 == 0)
// throw Exception("OLOLO");
if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
{
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
@ -2703,8 +2709,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
auto can_merge = [this] (const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right, String * out_reason)
{
return queue.canMergeParts(*left, *right, out_reason)
&& canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason);
return queue.canMergeParts(left, right, out_reason);
// && canMergePartsAccordingToZooKeeperInfo(left, right, getZooKeeper(), zookeeper_path, data, out_reason);
};
ReplicatedMergeTreeLogEntryData merge_entry;