Merge pull request #9463 from ClickHouse/fix_mutations_with_insert_quorum

Trying to fix mutations with last quorum parts
This commit is contained in:
alesapin 2020-03-03 11:11:21 +03:00 committed by GitHub
commit 4620954d24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 44 additions and 25 deletions

View File

@ -1485,7 +1485,6 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
}
/// Load current quorum status.
auto quorum_last_part_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/last_part");
auto quorum_status_future = zookeeper->asyncTryGet(queue.zookeeper_path + "/quorum/status");
/// Load current inserts
@ -1539,19 +1538,6 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
queue_.pullLogsToQueue(zookeeper);
Coordination::GetResponse quorum_last_part_response = quorum_last_part_future.get();
if (!quorum_last_part_response.error)
{
ReplicatedMergeTreeQuorumAddedParts parts_with_quorum(queue.format_version);
if (!quorum_last_part_response.data.empty())
{
parts_with_quorum.fromString(quorum_last_part_response.data);
last_quorum_parts.clear();
for (const auto & added_part : parts_with_quorum.added_parts)
last_quorum_parts.emplace(added_part.second);
}
}
Coordination::GetResponse quorum_status_response = quorum_status_future.get();
if (!quorum_status_response.error)
{
@ -1609,13 +1595,6 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
for (const MergeTreeData::DataPartPtr & part : {left, right})
{
if (last_quorum_parts.find(part->name) != last_quorum_parts.end())
{
if (out_reason)
*out_reason = "Part " + part->name + " is the most recent part with a satisfied quorum";
return false;
}
if (part->name == inprogress_quorum_part)
{
if (out_reason)
@ -1717,8 +1696,9 @@ std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio
/// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to
/// version X for this part.
if (last_quorum_parts.find(part->name) != last_quorum_parts.end()
|| part->name == inprogress_quorum_part)
/// We cannot mutate part if it's beeing inserted with quorum and it's not
/// already reached.
if (part->name == inprogress_quorum_part)
return {};
std::lock_guard lock(queue.state_mutex);

View File

@ -405,7 +405,6 @@ private:
std::unordered_map<String, std::set<Int64>> committing_blocks;
/// Quorum state taken at some later time than prev_virtual_parts.
std::set<std::string> last_quorum_parts;
String inprogress_quorum_part;
};

View File

@ -23,7 +23,7 @@ struct ReplicatedMergeTreeQuorumAddedParts
MergeTreeDataFormatVersion format_version;
ReplicatedMergeTreeQuorumAddedParts(const MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
: format_version(format_version_)
{}
/// Write new parts in buffer with added parts.

View File

@ -2668,6 +2668,7 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
parts_with_quorum.fromString(old_added_parts);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
/// We store one last part which reached quorum for each partition.
parts_with_quorum.added_parts[part_info.partition_id] = part_name;
String new_added_parts = parts_with_quorum.toString();

View File

@ -17,7 +17,18 @@ SELECT x FROM quorum2 ORDER BY x;
OPTIMIZE TABLE quorum1 PARTITION '2018-11-15' FINAL;
-- everything works fine after merge
SELECT x FROM quorum1 ORDER BY x;
SELECT x FROM quorum2 ORDER BY x;
SELECT count(*) FROM system.parts WHERE active AND database = currentDatabase() AND table='quorum1';
INSERT INTO quorum1 VALUES (3, '2018-11-15');
INSERT INTO quorum1 VALUES (4, '2018-11-15');
-- and after we add new parts
SELECT sum(x) FROM quorum1;
SELECT sum(x) FROM quorum2;
DROP TABLE IF EXISTS quorum1;
DROP TABLE IF EXISTS quorum2;

View File

@ -0,0 +1,19 @@
DROP TABLE IF EXISTS mutations_and_quorum1;
DROP TABLE IF EXISTS mutations_and_quorum2;
CREATE TABLE mutations_and_quorum1 (`server_date` Date, `something` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/default/1/mutations_and_quorum', '1') PARTITION BY toYYYYMM(server_date) ORDER BY (server_date, something);
CREATE TABLE mutations_and_quorum2 (`server_date` Date, `something` String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/default/1/mutations_and_quorum', '2') PARTITION BY toYYYYMM(server_date) ORDER BY (server_date, something);
SET insert_quorum=2;
INSERT INTO mutations_and_quorum1 VALUES ('2019-01-01', 'test1'), ('2019-02-01', 'test2'), ('2019-03-01', 'test3'), ('2019-04-01', 'test4'), ('2019-05-01', 'test1'), ('2019-06-01', 'test2'), ('2019-07-01', 'test3'), ('2019-08-01', 'test4'), ('2019-09-01', 'test1'), ('2019-10-01', 'test2'), ('2019-11-01', 'test3'), ('2019-12-01', 'test4');
ALTER TABLE mutations_and_quorum1 DELETE WHERE something = 'test1' SETTINGS mutations_sync=2;
SELECT COUNT() FROM mutations_and_quorum1;
SELECT COUNT() FROM mutations_and_quorum2;
SELECT COUNT() FROM system.mutations WHERE table like 'mutations_and_quorum%' and is_done = 0;
DROP TABLE IF EXISTS mutations_and_quorum1;
DROP TABLE IF EXISTS mutations_and_quorum2;