mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #9652 from ClickHouse/fix_compatibility_for_mutations
Fix compatibility in replication
This commit is contained in:
commit
930e6f24b1
@ -70,8 +70,9 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
|
||||
<< source_parts.at(0) << "\n"
|
||||
<< "to\n"
|
||||
<< new_part_name;
|
||||
out << "\nalter_version\n";
|
||||
out << alter_version;
|
||||
|
||||
if (isAlterMutation())
|
||||
out << "\nalter_version\n" << alter_version;
|
||||
break;
|
||||
|
||||
case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global
|
||||
@ -127,6 +128,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
|
||||
|
||||
in >> type_str >> "\n";
|
||||
|
||||
bool trailing_newline_found = false;
|
||||
if (type_str == "get")
|
||||
{
|
||||
type = GET_PART;
|
||||
@ -177,7 +179,13 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
|
||||
>> "to\n"
|
||||
>> new_part_name;
|
||||
source_parts.push_back(source_part);
|
||||
in >> "\nalter_version\n" >> alter_version;
|
||||
|
||||
in >> "\n";
|
||||
|
||||
if (in.eof())
|
||||
trailing_newline_found = true;
|
||||
else if (checkString("alter_version\n", in))
|
||||
in >> alter_version;
|
||||
}
|
||||
else if (type_str == "alter")
|
||||
{
|
||||
@ -198,7 +206,9 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
|
||||
in.readStrict(&metadata_str[0], metadata_size);
|
||||
}
|
||||
|
||||
in >> "\n";
|
||||
if (!trailing_newline_found)
|
||||
in >> "\n";
|
||||
|
||||
if (checkString("part_type: ", in))
|
||||
{
|
||||
String part_type_str;
|
||||
|
@ -2,6 +2,7 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -52,7 +53,8 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
|
||||
|
||||
in >> "commands: ";
|
||||
commands.readText(in);
|
||||
in >> "\nalter version: " >> alter_version;
|
||||
if (checkString("\nalter version: ", in))
|
||||
in >> alter_version;
|
||||
}
|
||||
|
||||
String ReplicatedMergeTreeMutationEntry::toString() const
|
||||
|
@ -0,0 +1,74 @@
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True)
|
||||
node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True)
|
||||
node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:20.1.6.3', with_installed_binary=True, stay_alive=True)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_mutate_and_upgrade(start_cluster):
|
||||
for node in [node1, node2]:
|
||||
node.query("CREATE TABLE mt (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t', '{}') ORDER BY tuple()".format(node.name))
|
||||
|
||||
node1.query("INSERT INTO mt VALUES ('2020-02-13', 1), ('2020-02-13', 2);")
|
||||
|
||||
node1.query("ALTER TABLE mt DELETE WHERE id = 2", settings={"mutations_sync": "2"})
|
||||
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
||||
|
||||
node1.restart_with_latest_version()
|
||||
node2.restart_with_latest_version()
|
||||
|
||||
node2.query("INSERT INTO mt VALUES ('2020-02-13', 3);")
|
||||
|
||||
node1.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
|
||||
assert node2.query("SELECT COUNT() FROM mt") == "2\n"
|
||||
|
||||
node1.query("INSERT INTO mt VALUES ('2020-02-13', 4);")
|
||||
|
||||
node2.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM mt") == "3\n"
|
||||
assert node2.query("SELECT COUNT() FROM mt") == "3\n"
|
||||
|
||||
node2.query("ALTER TABLE mt DELETE WHERE id = 3", settings={"mutations_sync": "2"})
|
||||
|
||||
node1.query("SYSTEM SYNC REPLICA mt", timeout=5)
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM mt") == "2\n"
|
||||
assert node2.query("SELECT COUNT() FROM mt") == "2\n"
|
||||
|
||||
node1.query("ALTER TABLE mt MODIFY COLUMN id String DEFAULT '0'", settings={"replication_alter_partitions_sync": "2"})
|
||||
|
||||
node2.query("OPTIMIZE TABLE mt FINAL")
|
||||
|
||||
assert node1.query("SELECT id FROM mt") == "1\n4\n"
|
||||
assert node2.query("SELECT id FROM mt") == "1\n4\n"
|
||||
|
||||
|
||||
def test_upgrade_while_mutation(start_cluster):
|
||||
node3.query("CREATE TABLE mt1 (EventDate Date, id UInt64) ENGINE ReplicatedMergeTree('/clickhouse/tables/t1', 'node3') ORDER BY tuple()")
|
||||
|
||||
node3.query("INSERT INTO mt1 select '2020-02-13', number from numbers(100000)")
|
||||
|
||||
node3.query("SYSTEM STOP MERGES")
|
||||
node3.query("ALTER TABLE mt1 DELETE WHERE id % 2 == 0")
|
||||
|
||||
node3.restart_with_latest_version()
|
||||
|
||||
assert_eq_with_retry(node3, "SELECT COUNT() from mt1", "50000\n")
|
Loading…
Reference in New Issue
Block a user