changing the condition for updateQuorum, specialized it and reduced the number of calls to the zk

This commit is contained in:
Alexandra Latysheva 2020-10-07 11:28:48 +00:00
parent 9077c92862
commit a43cac2c1a
4 changed files with 25 additions and 30 deletions

View File

@ -77,7 +77,6 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
{
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
std::future<Coordination::GetResponse> quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path);
std::future<Coordination::GetResponse> is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active");
std::future<Coordination::GetResponse> host_future = zookeeper->asyncTryGet(storage.replica_path + "/host");
@ -99,9 +98,9 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe
* If the quorum is reached, then the node is deleted.
*/
auto quorum_status = quorum_status_future.get();
if (quorum_status.error != Coordination::Error::ZNONODE && !quorum_parallel)
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data,
String quorum_status;
if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status))
throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status,
ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE);
/// Both checks are implicitly made also later (otherwise there would be a race condition).
@ -470,7 +469,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
if (is_already_existing_part)
{
/// We get duplicate part without fetch
storage.updateQuorum(part->name);
/// Check if this quorum insert is parallel or not
if (zookeeper->exists(storage.zookeeper_path + "/quorum/status"))
storage.updateQuorum(part->name, false);
else if (zookeeper->exists(storage.zookeeper_path + "/quorum/parallel/" + part->name))
storage.updateQuorum(part->name, true);
}
/// We are waiting for quorum to be satisfied.

View File

@ -232,7 +232,7 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
&& zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name))
{
LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name);
storage.updateQuorum(quorum_entry.part_name);
storage.updateQuorum(quorum_entry.part_name, false);
}
}
@ -249,7 +249,7 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart()
&& zookeeper->exists(storage.replica_path + "/parts/" + partition))
{
LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", partition);
storage.updateQuorum(partition);
storage.updateQuorum(partition, true);
}
}
}

View File

@ -3038,35 +3038,22 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(
/** If a quorum is tracked for a part, update information about it in ZK.
*/
void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_parallel)
{
auto zookeeper = getZooKeeper();
/// Information on which replicas a part has been added, if the quorum has not yet been reached.
const String quorum_unparallel_status_path = zookeeper_path + "/quorum/status";
const String quorum_status_path = is_parallel ? zookeeper_path + "/quorum/parallel/" + part_name : zookeeper_path + "/quorum/status";
/// The name of the previous part for which the quorum was reached.
const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";
const String quorum_parallel_status_path = zookeeper_path + "/quorum/parallel/" + part_name;
String value;
Coordination::Stat stat;
/// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed.
while (true)
while (zookeeper->tryGet(quorum_status_path, value, &stat))
{
bool quorum_parallel(false);
ReplicatedMergeTreeQuorumEntry quorum_entry;
if (zookeeper->tryGet(quorum_unparallel_status_path, value, &stat))
{}
else if (zookeeper->tryGet(quorum_parallel_status_path, value, &stat))
quorum_parallel = true;
else
break;
const String quorum_status_path = quorum_parallel ? quorum_parallel_status_path : quorum_unparallel_status_path;
quorum_entry.fromString(value);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
if (quorum_entry.part_name != part_name)
{
/// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started.
@ -3082,7 +3069,7 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
Coordination::Requests ops;
Coordination::Responses responses;
if (!quorum_parallel)
if (!is_parallel)
{
Coordination::Stat added_parts_stat;
String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat);
@ -3332,18 +3319,23 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
*/
if (quorum)
updateQuorum(part_name);
{
/// Check if this quorum insert is parallel or not
if (zookeeper->exists(zookeeper_path + "/quorum/status"))
updateQuorum(part_name, false);
else if (zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_name))
updateQuorum(part_name, true);
}
/// alexelex: i'm not sure in witch order should that and merge_selecting_task->schedule() be
/// merged parts that are still inserted with quorum. if it only contains one block, it hasn't been merged before
if (part_info.max_block != part_info.min_block)
if (part_info.level != 0 || part_info.mutation != 0)
{
Strings quorum_parts = zookeeper->getChildren(zookeeper_path + "/quorum/parallel");
for (const String & quorum_part : quorum_parts)
{
auto quorum_part_info = MergeTreePartInfo::fromPartName(quorum_part, format_version);
if (part_info.contains(quorum_part_info))
updateQuorum(quorum_part);
updateQuorum(quorum_part, true);
}
}

View File

@ -492,7 +492,7 @@ private:
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name);
void updateQuorum(const String & part_name, bool is_parallel);
/// Deletes info from quorum/last_part node for particular partition_id.
void cleanLastPartNode(const String & partition_id);