Update versions on merge and mutation

This commit is contained in:
Alexey Milovidov 2020-06-12 23:22:55 +03:00
parent 902774c89c
commit 52ac009754
2 changed files with 12 additions and 5 deletions

View File

@ -82,7 +82,6 @@ public:
const AllowedMergingPredicate & can_merge, const AllowedMergingPredicate & can_merge,
String * out_disable_reason = nullptr); String * out_disable_reason = nullptr);
/** Select all the parts in the specified partition for merge, if possible. /** Select all the parts in the specified partition for merge, if possible.
* final - choose to merge even a single part - that is, allow to merge one part "with itself". * final - choose to merge even a single part - that is, allow to merge one part "with itself".
*/ */

View File

@ -2569,8 +2569,8 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
zookeeper_path + "/log/log-", entry.toString(), zookeeper_path + "/log/log-", entry.toString(),
zkutil::CreateMode::PersistentSequential)); zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCheckRequest( ops.emplace_back(zkutil::makeSetRequest(
zookeeper_path + "/log", log_version)); zookeeper_path + "/log", "", log_version)); /// Check and update version.
Coordination::Error code = zookeeper->tryMulti(ops, responses); Coordination::Error code = zookeeper->tryMulti(ops, responses);
@ -2578,9 +2578,12 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
{ {
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created; String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created log entry {} for merge {}", path_created, merged_name);
} }
else if (code == Coordination::Error::ZBADVERSION) else if (code == Coordination::Error::ZBADVERSION)
{ {
LOG_TRACE(log, "Log entry is not created for merge {} because log was updated", merged_name);
return CreateMergeEntryResult::LogUpdated; return CreateMergeEntryResult::LogUpdated;
} }
else else
@ -2633,15 +2636,20 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
zookeeper_path + "/log/log-", entry.toString(), zookeeper_path + "/log/log-", entry.toString(),
zkutil::CreateMode::PersistentSequential)); zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCheckRequest( ops.emplace_back(zkutil::makeSetRequest(
zookeeper_path + "/log", log_version)); zookeeper_path + "/log", "", log_version)); /// Check and update version.
Coordination::Error code = zookeeper->tryMulti(ops, responses); Coordination::Error code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZBADVERSION) if (code == Coordination::Error::ZBADVERSION)
{
LOG_TRACE(log, "Log entry is not created for mutation {} because log was updated", new_part_name);
return CreateMergeEntryResult::LogUpdated; return CreateMergeEntryResult::LogUpdated;
}
zkutil::KeeperMultiException::check(code, ops, responses); zkutil::KeeperMultiException::check(code, ops, responses);
LOG_TRACE(log, "Created log entry for mutation {}", new_part_name);
return CreateMergeEntryResult::Ok; return CreateMergeEntryResult::Ok;
} }