Merge pull request #4691 from yandex/improvements_in_replication_without_zookeeper

Add ability to start replicated table without metadata in zookeeper
This commit is contained in:
alexey-milovidov 2019-03-15 04:55:07 +03:00 committed by GitHub
commit 94d2c35f1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 61 deletions

View File

@ -277,6 +277,13 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
return;
}
if (attach && !current_zookeeper->exists(zookeeper_path + "/metadata"))
{
LOG_WARNING(log, "No metadata in ZooKeeper: table will be in readonly mode.");
is_readonly = true;
return;
}
if (!attach)
{
if (!data.getDataParts().empty())
@ -540,45 +547,25 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
/// There are no PreCommitted parts at startup.
auto parts = data.getDataParts({MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
/// Local parts that are not in ZK.
/** Local parts that are not in ZK.
* In very rare cases they may cover missing parts
* and someone may think that pushing them to zookeeper is good idea.
* But actually we can't precisely determine that ALL missing parts
* covered by this unexpected part. So missing parts will be downloaded.
*/
MergeTreeData::DataParts unexpected_parts;
/// Collect unexpected parts
for (const auto & part : parts)
{
if (expected_parts.count(part->name))
expected_parts.erase(part->name);
else
unexpected_parts.insert(part);
}
/// Which local parts to added into ZK.
MergeTreeData::DataPartsVector parts_to_add;
UInt64 parts_to_add_rows = 0;
if (!expected_parts.count(part->name))
unexpected_parts.insert(part); /// this parts we will place to detached with ignored_ prefix
/// Which parts should be taken from other replicas.
Strings parts_to_fetch;
for (const String & missing_name : expected_parts)
{
/// If locally some part is missing, but there is a part covering it, you can replace it in ZK with the covering one.
auto containing = data.getActiveContainingPart(missing_name);
if (containing)
{
LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
if (unexpected_parts.count(containing))
{
parts_to_add.push_back(containing);
unexpected_parts.erase(containing);
parts_to_add_rows += containing->rows_count;
}
}
else
if (!data.getActiveContainingPart(missing_name))
parts_to_fetch.push_back(missing_name);
}
for (const String & name : parts_to_fetch)
expected_parts.erase(name);
/** To check the adequacy, for the parts that are in the FS, but not in ZK, we will only consider not the most recent parts.
* Because unexpected new parts usually arise only because they did not have time to enroll in ZK with a rough restart of the server.
@ -613,16 +600,10 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const String & name : parts_to_fetch)
parts_to_fetch_blocks += get_blocks_count_in_data_part(name);
UInt64 expected_parts_blocks = 0;
for (const String & name : expected_parts)
expected_parts_blocks += get_blocks_count_in_data_part(name);
std::stringstream sanity_report;
sanity_report << "There are "
<< unexpected_parts.size() << " unexpected parts with " << unexpected_parts_rows << " rows ("
<< unexpected_parts_nonnew << " of them is not just-written with " << unexpected_parts_rows << " rows), "
<< parts_to_add.size() << " unexpectedly merged parts with " << parts_to_add_rows << " rows, "
<< expected_parts.size() << " missing obsolete parts (with " << expected_parts_blocks << " blocks), "
<< parts_to_fetch.size() << " missing parts (with " << parts_to_fetch_blocks << " blocks).";
/** We can automatically synchronize data,
@ -638,45 +619,23 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
for (const auto & part : parts)
total_rows_on_filesystem += part->rows_count;
UInt64 total_suspicious_rows = parts_to_add_rows + unexpected_parts_rows;
UInt64 total_suspicious_rows_no_new = parts_to_add_rows + unexpected_parts_nonnew_rows;
bool insane = total_suspicious_rows > total_rows_on_filesystem * data.settings.replicated_max_ratio_of_wrong_parts;
bool insane = unexpected_parts_rows > total_rows_on_filesystem * data.settings.replicated_max_ratio_of_wrong_parts;
if (insane && !skip_sanity_checks)
{
std::stringstream why;
why << "The local set of parts of table " << database_name << "." << table_name << " doesn't look like the set of parts "
<< "in ZooKeeper: "
<< formatReadableQuantity(total_suspicious_rows) << " rows of " << formatReadableQuantity(total_rows_on_filesystem)
<< formatReadableQuantity(unexpected_parts_rows) << " rows of " << formatReadableQuantity(total_rows_on_filesystem)
<< " total rows in filesystem are suspicious.";
throw Exception(why.str() + " " + sanity_report.str(), ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
}
if (total_suspicious_rows_no_new > 0)
if (unexpected_parts_nonnew_rows > 0)
LOG_WARNING(log, sanity_report.str());
/// Add information to the ZK about the parts that cover the missing parts.
for (const MergeTreeData::DataPartPtr & part : parts_to_add)
{
LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
Coordination::Requests ops;
checkPartChecksumsAndAddCommitOps(zookeeper, part, ops);
zookeeper->multi(ops);
}
/// Remove from ZK information about the parts covered by the newly added ones.
{
for (const String & name : expected_parts)
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
removePartsFromZooKeeper(zookeeper, Strings(expected_parts.begin(), expected_parts.end()));
}
/// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
std::vector<std::future<Coordination::ExistsResponse>> exists_futures;
exists_futures.reserve(parts_to_fetch.size());
for (const String & part_name : parts_to_fetch)

View File

@ -482,6 +482,13 @@ class ClickHouseInstance:
def get_query_request(self, *args, **kwargs):
return self.client.get_query_request(*args, **kwargs)
def restart_clickhouse(self, stop_start_wait_sec=5):
if not self.stay_alive:
raise Exception("clickhouse can be restarted only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "pkill clickhouse"], user='root')
time.sleep(stop_start_wait_sec)
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user='root')
def exec_in_container(self, cmd, detach=False, **kwargs):
container = self.get_docker_handle()

View File

@ -0,0 +1,14 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>shard_0</default_database>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,48 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
node1.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/replicated', 'node1') ORDER BY id PARTITION BY toYYYYMM(date);
''')
yield cluster
except Exception as ex:
print ex
finally:
cluster.shutdown()
def drop_zk(zk):
zk.delete(path="/clickhouse", recursive=True)
def test_startup_without_zookeeper(start_cluster):
node1.query("INSERT INTO test_table VALUES ('2018-10-01', 1), ('2018-10-02', 2), ('2018-10-03', 3)")
node1.query("SELECT COUNT(*) from test_table") == "3\n"
node1.query("SELECT is_readonly from system.replicas where table='test_table'") == "0\n"
cluster.run_kazoo_commands_with_retries(drop_zk)
time.sleep(5)
node1.query("SELECT COUNT(*) from test_table") == "3\n"
node1.query("SELECT is_readonly from system.replicas where table='test_table'") == "1\n"
node1.restart_clickhouse()
time.sleep(5)
node1.query("SELECT COUNT(*) from test_table") == "3\n"
node1.query("SELECT is_readonly from system.replicas where table='test_table'") == "1\n"