an option to avoid waiting for inactive Replicated db replicas

This commit is contained in:
Alexander Tokmakov 2023-12-29 15:16:12 +01:00
parent d58b76ce06
commit bfc10bd234
9 changed files with 96 additions and 27 deletions

View File

@ -3847,6 +3847,8 @@ Possible values:
- `none` — Is similar to throw, but distributed DDL query returns no result set. - `none` — Is similar to throw, but distributed DDL query returns no result set.
- `null_status_on_timeout` — Returns `NULL` as execution status in some rows of result set instead of throwing `TIMEOUT_EXCEEDED` if query is not finished on the corresponding hosts. - `null_status_on_timeout` — Returns `NULL` as execution status in some rows of result set instead of throwing `TIMEOUT_EXCEEDED` if query is not finished on the corresponding hosts.
- `never_throw` — Do not throw `TIMEOUT_EXCEEDED` and do not rethrow exceptions if query has failed on some hosts. - `never_throw` — Do not throw `TIMEOUT_EXCEEDED` and do not rethrow exceptions if query has failed on some hosts.
- `null_status_on_timeout_only_active` — similar to `null_status_on_timeout`, but doesn't wait for inactive replicas of the `Replicated` database
- `throw_only_active` — similar to `throw`, but doesn't wait for inactive replicas of the `Replicated` database
Default value: `throw`. Default value: `throw`.

View File

@ -113,6 +113,8 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{{"none", DistributedDDLOutputMode::NONE}, {{"none", DistributedDDLOutputMode::NONE},
{"throw", DistributedDDLOutputMode::THROW}, {"throw", DistributedDDLOutputMode::THROW},
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT}, {"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"throw_only_active", DistributedDDLOutputMode::THROW_ONLY_ACTIVE},
{"null_status_on_timeout_only_active", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) {"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS, IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,

View File

@ -165,6 +165,8 @@ enum class DistributedDDLOutputMode
THROW, THROW,
NULL_STATUS_ON_TIMEOUT, NULL_STATUS_ON_TIMEOUT,
NEVER_THROW, NEVER_THROW,
THROW_ONLY_ACTIVE,
NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE,
}; };
DECLARE_SETTING_ENUM(DistributedDDLOutputMode) DECLARE_SETTING_ENUM(DistributedDDLOutputMode)

View File

@ -200,8 +200,6 @@ public:
Status prepare() override; Status prepare() override;
private: private:
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait); static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts); Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
@ -228,7 +226,8 @@ private:
NameSet waiting_hosts; /// hosts from task host list NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that were in active state at the last check Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0; size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of execution /// Save the first detected error and throw it at the end of execution
@ -237,7 +236,10 @@ private:
Int64 timeout_seconds = 120; Int64 timeout_seconds = 120;
bool is_replicated_database = false; bool is_replicated_database = false;
bool throw_on_timeout = true; bool throw_on_timeout = true;
bool only_running_hosts = false;
bool timeout_exceeded = false; bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
}; };
@ -316,6 +318,8 @@ DDLQueryStatusSource::DDLQueryStatusSource(
{ {
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end()); waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
is_replicated_database = true; is_replicated_database = true;
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE;
} }
else else
{ {
@ -377,6 +381,38 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
return Chunk(std::move(columns), unfinished_hosts.size()); return Chunk(std::move(columns), unfinished_hosts.size());
} }
static NameSet getOfflineHosts(const String & node_path, const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper, Poco::Logger * log)
{
fs::path replicas_path;
if (node_path.ends_with('/'))
replicas_path = fs::path(node_path).parent_path().parent_path().parent_path() / "replicas";
else
replicas_path = fs::path(node_path).parent_path().parent_path() / "replicas";
Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(replicas_path / host / "active");
}
NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);
if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}
return offline;
}
Chunk DDLQueryStatusSource::generate() Chunk DDLQueryStatusSource::generate()
{ {
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size(); bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
@ -398,7 +434,7 @@ Chunk DDLQueryStatusSource::generate()
if (isCancelled()) if (isCancelled())
return {}; return {};
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds) if (stop_waiting_offline_hosts || (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{ {
timeout_exceeded = true; timeout_exceeded = true;
@ -406,7 +442,7 @@ Chunk DDLQueryStatusSource::generate()
size_t num_active_hosts = current_active_hosts.size(); size_t num_active_hosts = current_active_hosts.size();
constexpr auto msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. " constexpr auto msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), " "There are {} unfinished hosts ({} of them are currently executing the task), "
"they are going to execute the query in background"; "they are going to execute the query in background";
if (throw_on_timeout) if (throw_on_timeout)
{ {
@ -425,10 +461,7 @@ Chunk DDLQueryStatusSource::generate()
return generateChunkWithUnfinishedHosts(); return generateChunkWithUnfinishedHosts();
} }
if (num_hosts_finished != 0 || try_number != 0) sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
{
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
}
bool node_exists = false; bool node_exists = false;
Strings tmp_hosts; Strings tmp_hosts;
@ -440,9 +473,21 @@ Chunk DDLQueryStatusSource::generate()
retries_ctl.retryLoop([&]() retries_ctl.retryLoop([&]()
{ {
auto zookeeper = context->getZooKeeper(); auto zookeeper = context->getZooKeeper();
node_exists = zookeeper->exists(node_path); Strings paths = {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
tmp_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / node_to_wait); auto res = zookeeper->tryGetChildren(paths);
tmp_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active"); for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;
if (only_running_hosts)
offline_hosts = getOfflineHosts(node_path, waiting_hosts, zookeeper, log);
}); });
} }
@ -460,6 +505,17 @@ Chunk DDLQueryStatusSource::generate()
Strings new_hosts = getNewAndUpdate(tmp_hosts); Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number; ++try_number;
if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}
if (new_hosts.empty()) if (new_hosts.empty())
continue; continue;
@ -470,7 +526,13 @@ Chunk DDLQueryStatusSource::generate()
{ {
ExecutionStatus status(-1, "Cannot obtain error message"); ExecutionStatus status(-1, "Cannot obtain error message");
if (node_to_wait == "finished") /// Replicated database retries in case of error, it should not write error status.
#ifdef ABORT_ON_LOGICAL_ERROR
bool need_check_status = true;
#else
bool need_check_status = !is_replicated_database;
#endif
if (need_check_status)
{ {
String status_data; String status_data;
bool finished_exists = false; bool finished_exists = false;
@ -496,7 +558,6 @@ Chunk DDLQueryStatusSource::generate()
if (status.code != 0 && !first_exception if (status.code != 0 && !first_exception
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW) && context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
{ {
/// Replicated database retries in case of error, it should not write error status.
if (is_replicated_database) if (is_replicated_database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message); throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
@ -555,15 +616,6 @@ IProcessor::Status DDLQueryStatusSource::prepare()
return ISource::prepare(); return ISource::prepare();
} }
Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
{
Strings res;
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(code, node_path);
return res;
}
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts) Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
{ {
Strings diff; Strings diff;

View File

@ -507,7 +507,7 @@ def test_alters_from_different_replicas(started_cluster):
settings = {"distributed_ddl_task_timeout": 5} settings = {"distributed_ddl_task_timeout": 5}
assert ( assert (
"There are 1 unfinished hosts (0 of them are currently active)" "There are 1 unfinished hosts (0 of them are currently executing the task"
in competing_node.query_and_get_error( in competing_node.query_and_get_error(
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;", "ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
settings=settings, settings=settings,

View File

@ -96,7 +96,7 @@ def test_cluster_groups(started_cluster):
main_node_2.stop_clickhouse() main_node_2.stop_clickhouse()
settings = {"distributed_ddl_task_timeout": 5} settings = {"distributed_ddl_task_timeout": 5}
assert ( assert (
"There are 1 unfinished hosts (0 of them are currently active)" "There are 1 unfinished hosts (0 of them are currently executing the task)"
in main_node_1.query_and_get_error( in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);", "CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings, settings=settings,

View File

@ -3,7 +3,7 @@ Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57. Error: Table default.none already exists. (TABLE_ALREADY_EXISTS) Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57. Error: Table default.none already exists. (TABLE_ALREADY_EXISTS)
(query: create table none on cluster test_shard_localhost (n int) engine=Memory;) (query: create table none on cluster test_shard_localhost (n int) engine=Memory;)
Received exception from server: Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED) Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists none on cluster test_unavailable_shard;) (query: drop table if exists none on cluster test_unavailable_shard;)
throw throw
localhost 9000 0 0 0 localhost 9000 0 0 0
@ -12,7 +12,7 @@ Code: 57. Error: Received from localhost:9000. Error: There was an error on [loc
(query: create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;) (query: create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;)
localhost 9000 0 1 0 localhost 9000 0 1 0
Received exception from server: Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED) Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists throw on cluster test_unavailable_shard;) (query: drop table if exists throw on cluster test_unavailable_shard;)
null_status_on_timeout null_status_on_timeout
localhost 9000 0 0 0 localhost 9000 0 0 0

View File

@ -12,10 +12,18 @@ t
2 2
rdb_default 1 1 s1 r1 1 rdb_default 1 1 s1 r1 1
2 2
s1 r1 OK 2 0
s2 r1 QUEUED 2 0
s1 r2 QUEUED 2 0
s1 r1 OK 2 0
s2 r1 QUEUED 2 0
s1 r2 QUEUED 2 0
2 2
rdb_default 1 1 s1 r1 1 rdb_default 1 1 s1 r1 1
rdb_default 1 2 s1 r2 0 rdb_default 1 2 s1 r2 0
2 2
2 2
t t
t2
t3
rdb_default_4 1 1 s1 r1 1 rdb_default_4 1 1 s1 r1 1

View File

@ -32,6 +32,9 @@ $CLICKHOUSE_CLIENT -q "system sync database replica $db"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db' and shard_num=1 and replica_num=1" $CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db' and shard_num=1 and replica_num=1"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it" $CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=throw_only_active -q "create table $db.t2 (n int) engine=Log"
$CLICKHOUSE_CLIENT --distributed_ddl_output_mode=null_status_on_timeout_only_active -q "create table $db.t3 (n int) engine=Log"
$CLICKHOUSE_CLIENT -q "detach database $db3" $CLICKHOUSE_CLIENT -q "detach database $db3"
$CLICKHOUSE_CLIENT -q "system drop database replica 'r1' from shard 's2' from database $db" $CLICKHOUSE_CLIENT -q "system drop database replica 'r1' from shard 's2' from database $db"
$CLICKHOUSE_CLIENT -q "attach database $db3" 2>/dev/null $CLICKHOUSE_CLIENT -q "attach database $db3" 2>/dev/null