Fix DDL worker

This commit is contained in:
alesapin 2020-08-07 12:18:34 +03:00
parent 511b097881
commit 77b8c9e332
5 changed files with 130 additions and 29 deletions

View File

@ -775,44 +775,41 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
String shard_node_name = get_shard_name(task.cluster->getShardsAddresses().at(task.host_shard_num));
String shard_path = node_path + "/shards/" + shard_node_name;
String is_executed_path = shard_path + "/executed";
String tries_to_execute_path = shard_path + "/tries_to_execute";
zookeeper->createAncestors(shard_path + "/");
auto is_already_executed = [&]() -> bool
{
String executed_by;
if (zookeeper->tryGet(is_executed_path, executed_by))
{
LOG_DEBUG(log, "Task {} has already been executed by leader replica ({}) of the same shard.", task.entry_name, executed_by);
return true;
}
/// Node exists, or we will create or we will get an exception
zookeeper->tryCreate(tries_to_execute_path, "0", zkutil::CreateMode::Persistent);
return false;
};
static constexpr int MAX_TRIES_TO_EXECUTE = 3;
String executed_by;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->exists(is_executed_path, nullptr, event))
{
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
return true;
}
pcg64 rng(randomSeed());
auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
static const size_t max_tries = 20;
bool executed_by_leader = false;
for (size_t num_tries = 0; num_tries < max_tries; ++num_tries)
{
if (is_already_executed())
{
executed_by_leader = true;
break;
}
bool executed_by_leader = false;
while (true)
{
StorageReplicatedMergeTree::Status status;
replicated_storage->getStatus(status);
/// Leader replica take lock
/// Any replica which is leader tries to take lock
if (status.is_leader && lock->tryLock())
{
if (is_already_executed())
{
executed_by_leader = true;
/// Doing it exclusively
size_t counter = parse<int>(zookeeper->get(tries_to_execute_path));
if (counter > MAX_TRIES_TO_EXECUTE)
break;
}
zookeeper->set(tries_to_execute_path, toString(counter + 1));
/// If the leader will unexpectedly changed this method will return false
/// and on the next iteration new leader will take lock
@ -822,20 +819,31 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
executed_by_leader = true;
break;
}
lock->unlock();
}
/// Does nothing if wasn't previously locked
lock->unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(std::uniform_int_distribution<int>(0, 1000)(rng)));
if (event->tryWait(std::uniform_int_distribution<int>(0, 1000)(rng)))
{
executed_by_leader = true;
break;
}
else if (parse<int>(zookeeper->get(tries_to_execute_path)) > MAX_TRIES_TO_EXECUTE)
{
/// Nobody will try to execute query again
break;
}
}
/// Not executed by leader so was not executed at all
if (!executed_by_leader)
{
task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED,
"Cannot execute replicated DDL query on leader");
task.execution_status = ExecutionStatus(ErrorCodes::NOT_IMPLEMENTED, "Cannot execute replicated DDL query");
return false;
}
LOG_DEBUG(log, "Task {} has already been executed by replica ({}) of the same shard.", task.entry_name, zookeeper->get(is_executed_path));
return true;
}

View File

@ -0,0 +1,28 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,23 @@
<yandex>
<profiles>
<default>
<mutations_sync>2</mutations_sync>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,42 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_config.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_config.xml'], with_zookeeper=True)
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_config.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml'], user_configs=['configs/users_config.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_long_query(started_cluster):
node1.query("CREATE TABLE cluster_table (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/1/cluster_table', '1') ORDER BY tuple()")
node2.query("CREATE TABLE cluster_table (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/1/cluster_table', '2') ORDER BY tuple()")
node1.query("INSERT INTO cluster_table SELECT number, toString(number) FROM numbers(20)")
node2.query("SYSTEM SYNC REPLICA cluster_table")
node3.query("CREATE TABLE cluster_table (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/2/cluster_table', '1') ORDER BY tuple()")
node4.query("CREATE TABLE cluster_table (key UInt64, value String) ENGINE = ReplicatedMergeTree('/test/2/cluster_table', '2') ORDER BY tuple()")
node3.query("INSERT INTO cluster_table SELECT number, toString(number) FROM numbers(20)")
node4.query("SYSTEM SYNC REPLICA cluster_table")
node1.query("ALTER TABLE cluster_table ON CLUSTER 'test_cluster' UPDATE key = 1 WHERE sleepEachRow(1) == 0", settings={"mutations_sync": "2"})
assert node1.query("SELECT SUM(key) FROM cluster_table") == "20\n"
assert node2.query("SELECT SUM(key) FROM cluster_table") == "20\n"
assert node3.query("SELECT SUM(key) FROM cluster_table") == "20\n"
assert node4.query("SELECT SUM(key) FROM cluster_table") == "20\n"