mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 16:42:05 +00:00
Merge pull request #18215 from filimonov/quorum-dedup-issue
Fix issue with quorum retries behaviour
This commit is contained in:
commit
4b0f973f93
@ -341,13 +341,28 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
/// If it exists on our replica, ignore it.
|
||||
if (storage.getActiveContainingPart(existing_part_name))
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
|
||||
part->is_duplicate = true;
|
||||
last_block_is_duplicate = true;
|
||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||
if (quorum)
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it, but checking quorum.", block_id, existing_part_name);
|
||||
|
||||
std::string quorum_path;
|
||||
if (quorum_parallel)
|
||||
quorum_path = storage.zookeeper_path + "/quorum/parallel/" + existing_part_name;
|
||||
else
|
||||
quorum_path = storage.zookeeper_path + "/quorum/status";
|
||||
|
||||
waitForQuorum(zookeeper, existing_part_name, quorum_path, quorum_info.is_active_node_value);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it.", block_id, existing_part_name);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_INFO(log, "Block with ID {} already exists on other replicas as part {}; will write it locally with that name.",
|
||||
block_id, existing_part_name);
|
||||
|
||||
@ -486,50 +501,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
storage.updateQuorum(part->name, false);
|
||||
}
|
||||
|
||||
/// We are waiting for quorum to be satisfied.
|
||||
LOG_TRACE(log, "Waiting for quorum");
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
||||
|
||||
std::string value;
|
||||
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
|
||||
if (!zookeeper->tryGet(quorum_info.status_path, value, nullptr, event))
|
||||
break;
|
||||
|
||||
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_info.status_path);
|
||||
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
|
||||
|
||||
/// If the node has time to disappear, and then appear again for the next insert.
|
||||
if (quorum_entry.part_name != part->name)
|
||||
break;
|
||||
|
||||
if (!event->tryWait(quorum_timeout_ms))
|
||||
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
|
||||
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_info.status_path);
|
||||
}
|
||||
|
||||
/// And what if it is possible that the current replica at this time has ceased to be active
|
||||
/// and the quorum is marked as failed and deleted?
|
||||
String value;
|
||||
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|
||||
|| value != quorum_info.is_active_node_value)
|
||||
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// We do not know whether or not data has been inserted
|
||||
/// - whether other replicas have time to download the part and mark the quorum as done.
|
||||
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
|
||||
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Quorum satisfied");
|
||||
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
|
||||
}
|
||||
}
|
||||
|
||||
@ -541,4 +513,57 @@ void ReplicatedMergeTreeBlockOutputStream::writePrefix()
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeBlockOutputStream::waitForQuorum(
|
||||
zkutil::ZooKeeperPtr & zookeeper,
|
||||
const std::string & part_name,
|
||||
const std::string & quorum_path,
|
||||
const std::string & is_active_node_value) const
|
||||
{
|
||||
/// We are waiting for quorum to be satisfied.
|
||||
LOG_TRACE(log, "Waiting for quorum");
|
||||
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
zkutil::EventPtr event = std::make_shared<Poco::Event>();
|
||||
|
||||
std::string value;
|
||||
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
|
||||
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
|
||||
break;
|
||||
|
||||
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
|
||||
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
|
||||
|
||||
/// If the node has time to disappear, and then appear again for the next insert.
|
||||
if (quorum_entry.part_name != part_name)
|
||||
break;
|
||||
|
||||
if (!event->tryWait(quorum_timeout_ms))
|
||||
throw Exception("Timeout while waiting for quorum", ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
|
||||
LOG_TRACE(log, "Quorum {} updated, will check quorum node still exists", quorum_path);
|
||||
}
|
||||
|
||||
/// And what if it is possible that the current replica at this time has ceased to be active
|
||||
/// and the quorum is marked as failed and deleted?
|
||||
String value;
|
||||
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, nullptr)
|
||||
|| value != is_active_node_value)
|
||||
throw Exception("Replica become inactive while waiting for quorum", ErrorCodes::NO_ACTIVE_REPLICAS);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// We do not know whether or not data has been inserted
|
||||
/// - whether other replicas have time to download the part and mark the quorum as done.
|
||||
throw Exception("Unknown status, client must retry. Reason: " + getCurrentExceptionMessage(false),
|
||||
ErrorCodes::UNKNOWN_STATUS_OF_INSERT);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Quorum satisfied");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -63,6 +63,12 @@ private:
|
||||
/// Rename temporary part and commit to ZooKeeper.
|
||||
void commitPart(zkutil::ZooKeeperPtr & zookeeper, MergeTreeData::MutableDataPartPtr & part, const String & block_id);
|
||||
|
||||
/// Wait for quorum to be satisfied on path (quorum_path) form part (part_name)
|
||||
/// Also checks that replica still alive.
|
||||
void waitForQuorum(
|
||||
zkutil::ZooKeeperPtr & zookeeper, const std::string & part_name,
|
||||
const std::string & quorum_path, const std::string & is_active_node_value) const;
|
||||
|
||||
StorageReplicatedMergeTree & storage;
|
||||
StorageMetadataPtr metadata_snapshot;
|
||||
size_t quorum;
|
||||
|
@ -1,16 +1,16 @@
|
||||
DROP TABLE IF EXISTS r1;
|
||||
DROP TABLE IF EXISTS r2;
|
||||
DROP TABLE IF EXISTS r1 SYNC;
|
||||
DROP TABLE IF EXISTS r2 SYNC;
|
||||
|
||||
CREATE TABLE r1 (
|
||||
key UInt64, value String
|
||||
)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/01509_no_repliacs', '1')
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/01509_parallel_quorum_insert_no_replicas', '1')
|
||||
ORDER BY tuple();
|
||||
|
||||
CREATE TABLE r2 (
|
||||
key UInt64, value String
|
||||
)
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/01509_no_repliacs', '2')
|
||||
ENGINE = ReplicatedMergeTree('/clickhouse/01509_parallel_quorum_insert_no_replicas', '2')
|
||||
ORDER BY tuple();
|
||||
|
||||
SET insert_quorum_parallel=1;
|
||||
@ -18,8 +18,13 @@ SET insert_quorum_parallel=1;
|
||||
SET insert_quorum=3;
|
||||
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
|
||||
|
||||
-- retry should still fail despite the insert_deduplicate enabled
|
||||
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
|
||||
INSERT INTO r1 VALUES(1, '1'); --{serverError 285}
|
||||
|
||||
SELECT 'insert to two replicas works';
|
||||
SET insert_quorum=2, insert_quorum_parallel=1;
|
||||
|
||||
INSERT INTO r1 VALUES(1, '1');
|
||||
|
||||
SELECT COUNT() FROM r1;
|
||||
@ -29,12 +34,18 @@ DETACH TABLE r2;
|
||||
|
||||
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
|
||||
|
||||
-- retry should fail despite the insert_deduplicate enabled
|
||||
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
|
||||
INSERT INTO r1 VALUES(2, '2'); --{serverError 285}
|
||||
|
||||
SET insert_quorum=1, insert_quorum_parallel=1;
|
||||
SELECT 'insert to single replica works';
|
||||
INSERT INTO r1 VALUES(2, '2');
|
||||
|
||||
ATTACH TABLE r2;
|
||||
|
||||
INSERT INTO r2 VALUES(2, '2');
|
||||
|
||||
SYSTEM SYNC REPLICA r2;
|
||||
|
||||
SET insert_quorum=2, insert_quorum_parallel=1;
|
||||
@ -47,6 +58,17 @@ SELECT COUNT() FROM r2;
|
||||
SELECT 'deduplication works';
|
||||
INSERT INTO r2 VALUES(3, '3');
|
||||
|
||||
-- still works if we relax quorum
|
||||
SET insert_quorum=1, insert_quorum_parallel=1;
|
||||
INSERT INTO r2 VALUES(3, '3');
|
||||
INSERT INTO r1 VALUES(3, '3');
|
||||
-- will start failing if we increase quorum
|
||||
SET insert_quorum=3, insert_quorum_parallel=1;
|
||||
INSERT INTO r1 VALUES(3, '3'); --{serverError 285}
|
||||
-- work back ok when quorum=2
|
||||
SET insert_quorum=2, insert_quorum_parallel=1;
|
||||
INSERT INTO r2 VALUES(3, '3');
|
||||
|
||||
SELECT COUNT() FROM r1;
|
||||
SELECT COUNT() FROM r2;
|
||||
|
||||
@ -56,8 +78,18 @@ SET insert_quorum_timeout=0;
|
||||
|
||||
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
|
||||
|
||||
-- retry should fail despite the insert_deduplicate enabled
|
||||
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
|
||||
INSERT INTO r1 VALUES (4, '4'); -- { serverError 319 }
|
||||
SELECT * FROM r2 WHERE key=4;
|
||||
|
||||
SYSTEM START FETCHES r2;
|
||||
|
||||
SET insert_quorum_timeout=6000000;
|
||||
|
||||
-- now retry should be successful
|
||||
INSERT INTO r1 VALUES (4, '4');
|
||||
|
||||
SYSTEM SYNC REPLICA r2;
|
||||
|
||||
SELECT 'insert happened';
|
||||
|
@ -677,6 +677,7 @@
|
||||
"01760_system_dictionaries",
|
||||
"01761_alter_decimal_zookeeper",
|
||||
"01360_materialized_view_with_join_on_query_log", // creates and drops MVs on query_log, which may interrupt flushes.
|
||||
"01509_parallel_quorum_insert_no_replicas", // It's ok to execute in parallel with oter tests but not several instances of the same test.
|
||||
"attach",
|
||||
"ddl_dictionaries",
|
||||
"dictionary",
|
||||
|
Loading…
Reference in New Issue
Block a user