Fix rare case when quorum insert is not really quorum because of deduplication

This commit is contained in:
alesapin 2021-04-08 13:35:38 +03:00
parent dd47f521ef
commit e17444757b
3 changed files with 79 additions and 53 deletions

View File

@ -341,13 +341,28 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
/// If it exists on our replica, ignore it. /// If it exists on our replica, ignore it.
if (storage.getActiveContainingPart(existing_part_name)) if (storage.getActiveContainingPart(existing_part_name))
{ {
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
part->is_duplicate = true; part->is_duplicate = true;
last_block_is_duplicate = true; last_block_is_duplicate = true;
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks); ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
if (quorum)
{
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it, but checking quorum.", block_id, existing_part_name);
std::string quorum_path;
if (quorum_parallel)
quorum_path = storage.zookeeper_path + "/quorum/parallel/" + existing_part_name;
else
quorum_path = storage.zookeeper_path + "/quorum/status";
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_value);
}
else
{
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
}
return; return;
} }
LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.", LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.",
block_id, existing_part_name); block_id, existing_part_name);
@ -486,50 +501,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
storage.updateQuorum(part->name, false); storage.updateQuorum(part->name, false);
} }
/// We are waiting for quorum to be satisfied. waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
LOG_TRACE(log, "Waiting for quorum");
try
{
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_info.status_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_info.status_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part->name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_info.status_path);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|| value != quorum_info.is_active_node_value)
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
}
catch (...)
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
LOG_TRACE(log, "Quorum satisfied");
} }
} }
@ -541,4 +513,57 @@ void ReplicatedMergeTreeBlockOutputStream::writePrefix()
} }
void ReplicatedMergeTreeBlockOutputStream::waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper,
const std::string & part_name,
const std::string & quorum_path,
const std::string & is_active_node_value)
{
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum");
try
{
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part_name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_path);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|| value != is_active_node_value)
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
}
catch (...)
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
}
LOG_TRACE(log, "Quorum satisfied");
}
} }

View File

@ -63,6 +63,12 @@ private:
/// Rename temporary part and commit to ZooKeeper. /// Rename temporary part and commit to ZooKeeper.
void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id); void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id);
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
/// Also checks that replica still alive.
void waitForQuorum(
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
const std::string & quorum_path, const std::string & is_active_node_value);
StorageReplicatedMergeTree & storage; StorageReplicatedMergeTree & storage;
StorageMetadataPtr metadata_snapshot; StorageMetadataPtr metadata_snapshot;
size_t quorum; size_t quorum;

View File

@ -15,7 +15,6 @@ ORDER BY tuple();
SET insert_quorum_parallel=1; SET insert_quorum_parallel=1;
SET insert_quorum_timeout=1;
SET insert_quorum=3; SET insert_quorum=3;
INSERT INTO r1 VALUES(1, '1'); --{serverError 285} INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
@ -26,17 +25,11 @@ INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
SELECT 'insert to two replicas works'; SELECT 'insert to two replicas works';
SET insert_quorum=2, insert_quorum_parallel=1; SET insert_quorum=2, insert_quorum_parallel=1;
-- the first 'good' retry after failure will return:
-- Unknown status, client must retry. Reason: Code: 159, Timeout while waiting for quorum
-- it's related to decreased insert_quorum_timeout=1 at the test beginning
-- it's not critical but not good
INSERT INTO r1 VALUES(1, '1'); --{serverError 319}
INSERT INTO r1 VALUES(1, '1'); INSERT INTO r1 VALUES(1, '1');
SELECT COUNT() FROM r1; SELECT COUNT() FROM r1;
SELECT COUNT() FROM r2; SELECT COUNT() FROM r2;
SET insert_quorum_timeout=600000;
DETACH TABLE r2; DETACH TABLE r2;
INSERT INTO r1 VALUES(2, '2'); --{serverError 285} INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
@ -93,6 +86,8 @@ SELECT * FROM r2 WHERE key=4;
SYSTEM START FETCHES r2; SYSTEM START FETCHES r2;
SET insert_quorum_timeout=6000000;
-- now retry should be successful -- now retry should be successful
INSERT INTO r1 VALUES (4, '4'); INSERT INTO r1 VALUES (4, '4');