mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Add majority_insert_quorum setting
majority_insert_quorum is defined as (number_of_replicas/2)+1. Insert will be successful only if majority of quorum have applied it. If insert_quorum and majority_insert_quorum both are specified, max of both will be used.
This commit is contained in:
parent
0fcee94835
commit
ade4337978
@ -1196,6 +1196,16 @@ See also:
|
||||
- [insert_quorum_parallel](#settings-insert_quorum_parallel)
|
||||
- [select_sequential_consistency](#settings-select_sequential_consistency)
|
||||
|
||||
## majority_insert_quorum {#settings-majority_insert_quorum}
|
||||
|
||||
Use majority number as quorum number. Majority is defined as (number_of_replicas/2) + 1. If insert_quorum and majority_insert_quorum both are specified, max(insert_quorum , majority number) will be used as quorum.
|
||||
|
||||
Default value: false
|
||||
|
||||
See also:
|
||||
|
||||
- [insert_quorum](#settings-insert_quorum)
|
||||
|
||||
## insert_quorum_timeout {#settings-insert_quorum_timeout}
|
||||
|
||||
Write to a quorum timeout in milliseconds. If the timeout has passed and no write has taken place yet, ClickHouse will generate an exception and the client must repeat the query to write the same block to the same or any other replica.
|
||||
|
@ -213,7 +213,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
\
|
||||
M(Bool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be performed", 0) \
|
||||
\
|
||||
M(UInt64Auto, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
|
||||
M(UInt64Auto, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled, 'auto' - use majority", 0) \
|
||||
M(Milliseconds, insert_quorum_timeout, 600000, "If the quorum of replicas did not meet in specified time (in milliseconds), exception will be thrown and insertion is aborted.", 0) \
|
||||
M(Bool, insert_quorum_parallel, true, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
|
||||
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
|
||||
|
@ -52,6 +52,7 @@ ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
|
||||
size_t max_parts_per_block_,
|
||||
bool quorum_parallel_,
|
||||
bool deduplicate_,
|
||||
bool majority_quorum_,
|
||||
ContextPtr context_,
|
||||
bool is_attach_)
|
||||
: SinkToStorage(metadata_snapshot_->getSampleBlock())
|
||||
@ -63,6 +64,7 @@ ReplicatedMergeTreeSink::ReplicatedMergeTreeSink(
|
||||
, is_attach(is_attach_)
|
||||
, quorum_parallel(quorum_parallel_)
|
||||
, deduplicate(deduplicate_)
|
||||
, majority_quorum(majority_quorum_)
|
||||
, log(&Poco::Logger::get(storage.getLogName() + " (Replicated OutputStream)"))
|
||||
, context(context_)
|
||||
, storage_snapshot(storage.getStorageSnapshot(metadata_snapshot, context))
|
||||
@ -86,7 +88,7 @@ static void assertSessionIsNotExpired(zkutil::ZooKeeperPtr & zookeeper)
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper)
|
||||
void ReplicatedMergeTreeSink::setMajorityQuorumAndCheckQuorum(zkutil::ZooKeeperPtr & zookeeper)
|
||||
{
|
||||
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
|
||||
|
||||
@ -105,6 +107,9 @@ void ReplicatedMergeTreeSink::checkQuorumPrecondition(zkutil::ZooKeeperPtr & zoo
|
||||
if (status.get().error == Coordination::Error::ZOK)
|
||||
++active_replicas;
|
||||
|
||||
if (majority_quorum)
|
||||
quorum = std::max(quorum, replicas.size() / 2 + 1);
|
||||
|
||||
if (active_replicas < quorum)
|
||||
throw Exception(ErrorCodes::TOO_FEW_LIVE_REPLICAS, "Number of alive replicas ({}) is less than requested quorum ({}).",
|
||||
active_replicas, quorum);
|
||||
@ -148,8 +153,8 @@ void ReplicatedMergeTreeSink::consume(Chunk chunk)
|
||||
* And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node).
|
||||
* TODO Too complex logic, you can do better.
|
||||
*/
|
||||
if (quorum)
|
||||
checkQuorumPrecondition(zookeeper);
|
||||
if (quorumEnabled())
|
||||
setMajorityQuorumAndCheckQuorum(zookeeper);
|
||||
|
||||
storage.writer.deduceTypesOfObjectColumns(storage_snapshot, block);
|
||||
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context);
|
||||
@ -281,8 +286,8 @@ void ReplicatedMergeTreeSink::writeExistingPart(MergeTreeData::MutableDataPartPt
|
||||
auto zookeeper = storage.getZooKeeper();
|
||||
assertSessionIsNotExpired(zookeeper);
|
||||
|
||||
if (quorum)
|
||||
checkQuorumPrecondition(zookeeper);
|
||||
if (quorumEnabled())
|
||||
setMajorityQuorumAndCheckQuorum(zookeeper);
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
@ -384,7 +389,7 @@ void ReplicatedMergeTreeSink::commitPart(
|
||||
* but for it the quorum has not yet been reached.
|
||||
* You can not do the next quorum record at this time.)
|
||||
*/
|
||||
if (quorum)
|
||||
if (quorumEnabled())
|
||||
{
|
||||
ReplicatedMergeTreeQuorumEntry quorum_entry;
|
||||
quorum_entry.part_name = part->name;
|
||||
@ -436,7 +441,7 @@ void ReplicatedMergeTreeSink::commitPart(
|
||||
{
|
||||
part->is_duplicate = true;
|
||||
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
|
||||
if (quorum)
|
||||
if (quorumEnabled())
|
||||
{
|
||||
LOG_INFO(log, "Block with ID {} already exists locally as part {}; ignoring it, but checking quorum.", block_id, existing_part_name);
|
||||
|
||||
@ -593,7 +598,7 @@ void ReplicatedMergeTreeSink::commitPart(
|
||||
break;
|
||||
}
|
||||
|
||||
if (quorum)
|
||||
if (quorumEnabled())
|
||||
{
|
||||
if (is_already_existing_part)
|
||||
{
|
||||
|
@ -32,6 +32,7 @@ public:
|
||||
size_t max_parts_per_block_,
|
||||
bool quorum_parallel_,
|
||||
bool deduplicate_,
|
||||
bool majority_quorum_,
|
||||
ContextPtr context_,
|
||||
// special flag to determine the ALTER TABLE ATTACH PART without the query context,
|
||||
// needed to set the special LogEntryType::ATTACH_PART
|
||||
@ -68,7 +69,8 @@ private:
|
||||
};
|
||||
|
||||
QuorumInfo quorum_info;
|
||||
void checkQuorumPrecondition(zkutil::ZooKeeperPtr & zookeeper);
|
||||
/// set quorum if majority_quorum is true and checks active replicas
|
||||
void setMajorityQuorumAndCheckQuorum(zkutil::ZooKeeperPtr & zookeeper);
|
||||
|
||||
/// Rename temporary part and commit to ZooKeeper.
|
||||
void commitPart(
|
||||
@ -93,6 +95,7 @@ private:
|
||||
bool quorum_parallel = false;
|
||||
const bool deduplicate = true;
|
||||
bool last_block_is_duplicate = false;
|
||||
bool majority_quorum = false;
|
||||
|
||||
using Logger = Poco::Logger;
|
||||
Poco::Logger * log;
|
||||
@ -107,6 +110,10 @@ private:
|
||||
std::unique_ptr<DelayedChunk> delayed_chunk;
|
||||
|
||||
void finishDelayedChunk(zkutil::ZooKeeperPtr & zookeeper);
|
||||
bool quorumEnabled() const
|
||||
{
|
||||
return majority_quorum || quorum != 0;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -4437,6 +4437,7 @@ SinkToStoragePtr StorageReplicatedMergeTree::write(const ASTPtr & /*query*/, con
|
||||
query_settings.max_partitions_per_insert_block,
|
||||
query_settings.insert_quorum_parallel,
|
||||
deduplicate,
|
||||
query_settings.majority_insert_quorum,
|
||||
local_context);
|
||||
}
|
||||
|
||||
@ -5125,7 +5126,7 @@ PartitionCommandsResultInfo StorageReplicatedMergeTree::attachPartition(
|
||||
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, query_context, renamed_parts);
|
||||
|
||||
/// TODO Allow to use quorum here.
|
||||
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, query_context,
|
||||
ReplicatedMergeTreeSink output(*this, metadata_snapshot, 0, 0, 0, false, false, false, query_context,
|
||||
/*is_attach*/true);
|
||||
|
||||
for (size_t i = 0; i < loaded_parts.size(); ++i)
|
||||
@ -8394,7 +8395,7 @@ void StorageReplicatedMergeTree::restoreDataFromBackup(RestorerFromBackup & rest
|
||||
void StorageReplicatedMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
|
||||
{
|
||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*this, metadata_snapshot, 0, 0, 0, false, false, getContext(), /*is_attach*/true);
|
||||
auto sink = std::make_shared<ReplicatedMergeTreeSink>(*this, metadata_snapshot, 0, 0, 0, false, false, false, getContext(), /*is_attach*/true);
|
||||
for (auto part : parts)
|
||||
sink->writeExistingPart(part);
|
||||
}
|
||||
|
@ -0,0 +1,39 @@
|
||||
1
|
||||
2
|
||||
3
|
||||
1
|
||||
2
|
||||
3
|
||||
majority_insert_quorum 1 1 Choose number_of_replicas/2 + 1 as insert_quorum. Note if insert_quorum and majority_insert_quorum both are specified max(insert_quorum, number_of_replicas/2+1) will be used \N \N 0 Bool
|
||||
majority_insert_quorum 0 1 Choose number_of_replicas/2 + 1 as insert_quorum. Note if insert_quorum and majority_insert_quorum both are specified max(insert_quorum, number_of_replicas/2+1) will be used \N \N 0 Bool
|
||||
majority_insert_quorum 1 1 Choose number_of_replicas/2 + 1 as insert_quorum. Note if insert_quorum and majority_insert_quorum both are specified max(insert_quorum, number_of_replicas/2+1) will be used \N \N 0 Bool
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
2
|
||||
3
|
||||
1
|
||||
2
|
||||
3
|
||||
1
|
||||
2
|
||||
3
|
||||
11
|
||||
11
|
||||
11
|
||||
11
|
||||
11
|
||||
11
|
||||
11
|
||||
12
|
||||
13
|
||||
11
|
||||
12
|
||||
13
|
||||
11
|
||||
12
|
||||
13
|
@ -0,0 +1,122 @@
|
||||
-- Tags: long, zookeeper
|
||||
|
||||
SET send_logs_level = 'fatal';
|
||||
SET insert_quorum_parallel = false;
|
||||
SET select_sequential_consistency=1;
|
||||
|
||||
DROP TABLE IF EXISTS quorum1;
|
||||
DROP TABLE IF EXISTS quorum2;
|
||||
DROP TABLE IF EXISTS quorum3;
|
||||
|
||||
CREATE TABLE quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum', '1') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum', '2') ORDER BY x PARTITION BY y;
|
||||
|
||||
-- majority_insert_quorum = n/2 + 1 , so insert will be written to both replica
|
||||
SET majority_insert_quorum = true;
|
||||
|
||||
INSERT INTO quorum1 VALUES (1, '2018-11-15');
|
||||
INSERT INTO quorum1 VALUES (2, '2018-11-15');
|
||||
INSERT INTO quorum1 VALUES (3, '2018-12-16');
|
||||
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
|
||||
DROP TABLE quorum1;
|
||||
DROP TABLE quorum2;
|
||||
|
||||
-- check majority_insert_quorum valid values
|
||||
SET majority_insert_quorum = TrUe;
|
||||
select * from system.settings where name like 'majority_insert_quorum';
|
||||
SET majority_insert_quorum = FalSE;
|
||||
select * from system.settings where name like 'majority_insert_quorum';
|
||||
-- this is also allowed
|
||||
SET majority_insert_quorum = 10;
|
||||
select * from system.settings where name like 'majority_insert_quorum';
|
||||
|
||||
-- Create 3 replicas and stop sync 2 replicas
|
||||
CREATE TABLE quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum1', '1') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum1', '2') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum3(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum1', '3') ORDER BY x PARTITION BY y;
|
||||
|
||||
SET majority_insert_quorum = true;
|
||||
|
||||
-- Insert should be successful
|
||||
-- stop replica 3
|
||||
SYSTEM STOP FETCHES quorum3;
|
||||
INSERT INTO quorum1 VALUES (1, '2018-11-15');
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
SELECT x FROM quorum3 ORDER BY x; -- {serverError 289}
|
||||
|
||||
-- Sync replica 3
|
||||
SYSTEM START FETCHES quorum3;
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
SELECT x FROM quorum3 ORDER BY x;
|
||||
|
||||
-- Stop 2 replicas , so insert wont be successful
|
||||
SYSTEM STOP FETCHES quorum2;
|
||||
SYSTEM STOP FETCHES quorum3;
|
||||
SET insert_quorum_timeout = 5000;
|
||||
INSERT INTO quorum1 VALUES (2, '2018-11-15'); -- { serverError 319 }
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
SELECT x FROM quorum3 ORDER BY x;
|
||||
|
||||
-- Sync replica 2 and 3
|
||||
SYSTEM START FETCHES quorum2;
|
||||
SYSTEM SYNC REPLICA quorum2;
|
||||
SYSTEM START FETCHES quorum3;
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
|
||||
INSERT INTO quorum1 VALUES (3, '2018-11-15');
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SYSTEM SYNC REPLICA quorum2;
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
SELECT x FROM quorum3 ORDER BY x;
|
||||
|
||||
DROP TABLE quorum1;
|
||||
DROP TABLE quorum2;
|
||||
DROP TABLE quorum3;
|
||||
|
||||
-- both insert_quorum and majority_insert_quorum are on, in that case max of both will be used as insert quorum
|
||||
-- insert_quorum < n/2 +1
|
||||
SET majority_insert_quorum = true;
|
||||
set insert_quorum = 1;
|
||||
CREATE TABLE quorum1(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum2', '1') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum2', '2') ORDER BY x PARTITION BY y;
|
||||
CREATE TABLE quorum3(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02377/quorum2', '3') ORDER BY x PARTITION BY y;
|
||||
|
||||
-- Insert should be successful
|
||||
-- stop replica 3
|
||||
SYSTEM STOP FETCHES quorum3;
|
||||
INSERT INTO quorum1 VALUES (11, '2018-11-15');
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
SELECT x FROM quorum3 ORDER BY x; -- {serverError 289}
|
||||
|
||||
-- Sync replica 3
|
||||
SYSTEM START FETCHES quorum3;
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
SELECT x FROM quorum3 ORDER BY x;
|
||||
|
||||
-- insert_quorum > n/2 +1
|
||||
set insert_quorum = 3;
|
||||
SET majority_insert_quorum = false;
|
||||
SYSTEM STOP FETCHES quorum3;
|
||||
INSERT INTO quorum1 VALUES (12, '2018-11-15'); -- { serverError 319 }
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
SELECT x FROM quorum3 ORDER BY x;
|
||||
|
||||
-- Sync replica 3
|
||||
SYSTEM START FETCHES quorum3;
|
||||
SYSTEM SYNC REPLICA quorum3;
|
||||
INSERT INTO quorum1 VALUES (13, '2018-11-15');
|
||||
SELECT x FROM quorum1 ORDER BY x;
|
||||
SELECT x FROM quorum2 ORDER BY x;
|
||||
SELECT x FROM quorum3 ORDER BY x;
|
||||
|
||||
DROP TABLE quorum1;
|
||||
DROP TABLE quorum2;
|
||||
DROP TABLE quorum3;
|
Loading…
Reference in New Issue
Block a user