mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #58350 from ClickHouse/rdb_dont_wait_inactive
An option to avoid waiting for inactive Replicated database replicas
This commit is contained in:
commit
972756486a
@ -3847,6 +3847,8 @@ Possible values:
|
||||
- `none` — Is similar to throw, but distributed DDL query returns no result set.
|
||||
- `null_status_on_timeout` — Returns `NULL` as execution status in some rows of result set instead of throwing `TIMEOUT_EXCEEDED` if query is not finished on the corresponding hosts.
|
||||
- `never_throw` — Do not throw `TIMEOUT_EXCEEDED` and do not rethrow exceptions if query has failed on some hosts.
|
||||
- `null_status_on_timeout_only_active` — similar to `null_status_on_timeout`, but doesn't wait for inactive replicas of the `Replicated` database
|
||||
- `throw_only_active` — similar to `throw`, but doesn't wait for inactive replicas of the `Replicated` database
|
||||
|
||||
Default value: `throw`.
|
||||
|
||||
|
@ -115,6 +115,8 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
|
||||
{{"none", DistributedDDLOutputMode::NONE},
|
||||
{"throw", DistributedDDLOutputMode::THROW},
|
||||
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
|
||||
{"throw_only_active", DistributedDDLOutputMode::THROW_ONLY_ACTIVE},
|
||||
{"null_status_on_timeout_only_active", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE},
|
||||
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
|
||||
|
||||
IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
|
||||
|
@ -173,6 +173,8 @@ enum class DistributedDDLOutputMode
|
||||
THROW,
|
||||
NULL_STATUS_ON_TIMEOUT,
|
||||
NEVER_THROW,
|
||||
THROW_ONLY_ACTIVE,
|
||||
NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE,
|
||||
};
|
||||
|
||||
DECLARE_SETTING_ENUM(DistributedDDLOutputMode)
|
||||
|
@ -200,8 +200,6 @@ public:
|
||||
Status prepare() override;
|
||||
|
||||
private:
|
||||
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);
|
||||
|
||||
static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);
|
||||
|
||||
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
|
||||
@ -228,7 +226,8 @@ private:
|
||||
NameSet waiting_hosts; /// hosts from task host list
|
||||
NameSet finished_hosts; /// finished hosts from host list
|
||||
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
|
||||
Strings current_active_hosts; /// Hosts that were in active state at the last check
|
||||
Strings current_active_hosts; /// Hosts that are currently executing the task
|
||||
NameSet offline_hosts; /// Hosts that are not currently running
|
||||
size_t num_hosts_finished = 0;
|
||||
|
||||
/// Save the first detected error and throw it at the end of execution
|
||||
@ -237,7 +236,10 @@ private:
|
||||
Int64 timeout_seconds = 120;
|
||||
bool is_replicated_database = false;
|
||||
bool throw_on_timeout = true;
|
||||
bool only_running_hosts = false;
|
||||
|
||||
bool timeout_exceeded = false;
|
||||
bool stop_waiting_offline_hosts = false;
|
||||
};
|
||||
|
||||
|
||||
@ -310,12 +312,15 @@ DDLQueryStatusSource::DDLQueryStatusSource(
|
||||
, log(&Poco::Logger::get("DDLQueryStatusSource"))
|
||||
{
|
||||
auto output_mode = context->getSettingsRef().distributed_ddl_output_mode;
|
||||
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
|
||||
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE
|
||||
|| output_mode == DistributedDDLOutputMode::NONE;
|
||||
|
||||
if (hosts_to_wait)
|
||||
{
|
||||
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
|
||||
is_replicated_database = true;
|
||||
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE ||
|
||||
output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -377,6 +382,38 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
|
||||
return Chunk(std::move(columns), unfinished_hosts.size());
|
||||
}
|
||||
|
||||
static NameSet getOfflineHosts(const String & node_path, const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper, Poco::Logger * log)
|
||||
{
|
||||
fs::path replicas_path;
|
||||
if (node_path.ends_with('/'))
|
||||
replicas_path = fs::path(node_path).parent_path().parent_path().parent_path() / "replicas";
|
||||
else
|
||||
replicas_path = fs::path(node_path).parent_path().parent_path() / "replicas";
|
||||
|
||||
Strings paths;
|
||||
Strings hosts_array;
|
||||
for (const auto & host : hosts_to_wait)
|
||||
{
|
||||
hosts_array.push_back(host);
|
||||
paths.push_back(replicas_path / host / "active");
|
||||
}
|
||||
|
||||
NameSet offline;
|
||||
auto res = zookeeper->tryGet(paths);
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
if (res[i].error == Coordination::Error::ZNONODE)
|
||||
offline.insert(hosts_array[i]);
|
||||
|
||||
if (offline.size() == hosts_to_wait.size())
|
||||
{
|
||||
/// Avoid reporting that all hosts are offline
|
||||
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
|
||||
return {};
|
||||
}
|
||||
|
||||
return offline;
|
||||
}
|
||||
|
||||
Chunk DDLQueryStatusSource::generate()
|
||||
{
|
||||
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
|
||||
@ -398,7 +435,7 @@ Chunk DDLQueryStatusSource::generate()
|
||||
if (isCancelled())
|
||||
return {};
|
||||
|
||||
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
|
||||
if (stop_waiting_offline_hosts || (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
|
||||
{
|
||||
timeout_exceeded = true;
|
||||
|
||||
@ -406,7 +443,7 @@ Chunk DDLQueryStatusSource::generate()
|
||||
size_t num_active_hosts = current_active_hosts.size();
|
||||
|
||||
constexpr auto msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
|
||||
"There are {} unfinished hosts ({} of them are currently active), "
|
||||
"There are {} unfinished hosts ({} of them are currently executing the task), "
|
||||
"they are going to execute the query in background";
|
||||
if (throw_on_timeout)
|
||||
{
|
||||
@ -425,10 +462,7 @@ Chunk DDLQueryStatusSource::generate()
|
||||
return generateChunkWithUnfinishedHosts();
|
||||
}
|
||||
|
||||
if (num_hosts_finished != 0 || try_number != 0)
|
||||
{
|
||||
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
|
||||
}
|
||||
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));
|
||||
|
||||
bool node_exists = false;
|
||||
Strings tmp_hosts;
|
||||
@ -440,9 +474,21 @@ Chunk DDLQueryStatusSource::generate()
|
||||
retries_ctl.retryLoop([&]()
|
||||
{
|
||||
auto zookeeper = context->getZooKeeper();
|
||||
node_exists = zookeeper->exists(node_path);
|
||||
tmp_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / node_to_wait);
|
||||
tmp_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active");
|
||||
Strings paths = {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
|
||||
auto res = zookeeper->tryGetChildren(paths);
|
||||
for (size_t i = 0; i < res.size(); ++i)
|
||||
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
|
||||
throw Coordination::Exception::fromPath(res[i].error, paths[i]);
|
||||
|
||||
if (res[0].error == Coordination::Error::ZNONODE)
|
||||
node_exists = zookeeper->exists(node_path);
|
||||
else
|
||||
node_exists = true;
|
||||
tmp_hosts = res[0].names;
|
||||
tmp_active_hosts = res[1].names;
|
||||
|
||||
if (only_running_hosts)
|
||||
offline_hosts = getOfflineHosts(node_path, waiting_hosts, zookeeper, log);
|
||||
});
|
||||
}
|
||||
|
||||
@ -460,6 +506,17 @@ Chunk DDLQueryStatusSource::generate()
|
||||
|
||||
Strings new_hosts = getNewAndUpdate(tmp_hosts);
|
||||
++try_number;
|
||||
|
||||
if (only_running_hosts)
|
||||
{
|
||||
size_t num_finished_or_offline = 0;
|
||||
for (const auto & host : waiting_hosts)
|
||||
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);
|
||||
|
||||
if (num_finished_or_offline == waiting_hosts.size())
|
||||
stop_waiting_offline_hosts = true;
|
||||
}
|
||||
|
||||
if (new_hosts.empty())
|
||||
continue;
|
||||
|
||||
@ -470,7 +527,13 @@ Chunk DDLQueryStatusSource::generate()
|
||||
{
|
||||
ExecutionStatus status(-1, "Cannot obtain error message");
|
||||
|
||||
if (node_to_wait == "finished")
|
||||
/// Replicated database retries in case of error, it should not write error status.
|
||||
#ifdef ABORT_ON_LOGICAL_ERROR
|
||||
bool need_check_status = true;
|
||||
#else
|
||||
bool need_check_status = !is_replicated_database;
|
||||
#endif
|
||||
if (need_check_status)
|
||||
{
|
||||
String status_data;
|
||||
bool finished_exists = false;
|
||||
@ -496,7 +559,6 @@ Chunk DDLQueryStatusSource::generate()
|
||||
if (status.code != 0 && !first_exception
|
||||
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
|
||||
{
|
||||
/// Replicated database retries in case of error, it should not write error status.
|
||||
if (is_replicated_database)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);
|
||||
|
||||
@ -555,15 +617,6 @@ IProcessor::Status DDLQueryStatusSource::prepare()
|
||||
return ISource::prepare();
|
||||
}
|
||||
|
||||
Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
|
||||
{
|
||||
Strings res;
|
||||
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
|
||||
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
|
||||
throw Coordination::Exception::fromPath(code, node_path);
|
||||
return res;
|
||||
}
|
||||
|
||||
Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
|
||||
{
|
||||
Strings diff;
|
||||
|
@ -507,7 +507,7 @@ def test_alters_from_different_replicas(started_cluster):
|
||||
|
||||
settings = {"distributed_ddl_task_timeout": 5}
|
||||
assert (
|
||||
"There are 1 unfinished hosts (0 of them are currently active)"
|
||||
"There are 1 unfinished hosts (0 of them are currently executing the task"
|
||||
in competing_node.query_and_get_error(
|
||||
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
|
||||
settings=settings,
|
||||
|
@ -96,7 +96,7 @@ def test_cluster_groups(started_cluster):
|
||||
main_node_2.stop_clickhouse()
|
||||
settings = {"distributed_ddl_task_timeout": 5}
|
||||
assert (
|
||||
"There are 1 unfinished hosts (0 of them are currently active)"
|
||||
"There are 1 unfinished hosts (0 of them are currently executing the task)"
|
||||
in main_node_1.query_and_get_error(
|
||||
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
|
||||
settings=settings,
|
||||
|
@ -3,7 +3,7 @@ Received exception from server:
|
||||
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57. Error: Table default.none already exists. (TABLE_ALREADY_EXISTS)
|
||||
(query: create table none on cluster test_shard_localhost (n int) engine=Memory;)
|
||||
Received exception from server:
|
||||
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
|
||||
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
|
||||
(query: drop table if exists none on cluster test_unavailable_shard;)
|
||||
throw
|
||||
localhost 9000 0 0 0
|
||||
@ -12,7 +12,7 @@ Code: 57. Error: Received from localhost:9000. Error: There was an error on [loc
|
||||
(query: create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;)
|
||||
localhost 9000 0 1 0
|
||||
Received exception from server:
|
||||
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
|
||||
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
|
||||
(query: drop table if exists throw on cluster test_unavailable_shard;)
|
||||
null_status_on_timeout
|
||||
localhost 9000 0 0 0
|
||||
|
@ -33,7 +33,7 @@ function run_until_out_contains()
|
||||
done
|
||||
}
|
||||
|
||||
RAND_COMMENT="01175_DDL_$RANDOM"
|
||||
RAND_COMMENT="01175_DDL_$CLICKHOUSE_DATABASE"
|
||||
LOG_COMMENT="${CLICKHOUSE_LOG_COMMENT}_$RAND_COMMENT"
|
||||
|
||||
CLICKHOUSE_CLIENT_WITH_SETTINGS=${CLICKHOUSE_CLIENT/--log_comment ${CLICKHOUSE_LOG_COMMENT}/--log_comment ${LOG_COMMENT}}
|
||||
|
@ -13,9 +13,15 @@ t
|
||||
rdb_default 1 1 s1 r1 1
|
||||
2
|
||||
2
|
||||
s1 r1 OK 2 0
|
||||
s1 r2 QUEUED 2 0
|
||||
s2 r1 QUEUED 2 0
|
||||
2
|
||||
rdb_default 1 1 s1 r1 1
|
||||
rdb_default 1 2 s1 r2 0
|
||||
2
|
||||
2
|
||||
t
|
||||
t2
|
||||
t3
|
||||
rdb_default_4 1 1 s1 r1 1
|
||||
|
@ -32,6 +32,10 @@ $CLICKHOUSE_CLIENT -q "system sync database replica $db"
|
||||
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db' and shard_num=1 and replica_num=1"
|
||||
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it"
|
||||
|
||||
# Also check that it doesn't exceed distributed_ddl_task_timeout waiting for inactive replicas
|
||||
timeout 60s $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=1000 --distributed_ddl_output_mode=throw_only_active -q "create table $db.t2 (n int) engine=Log" 2>&1| grep -Fac "TIMEOUT_EXCEEDED"
|
||||
timeout 60s $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=1000 --distributed_ddl_output_mode=null_status_on_timeout_only_active -q "create table $db.t3 (n int) engine=Log" | sort
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "detach database $db3"
|
||||
$CLICKHOUSE_CLIENT -q "system drop database replica 'r1' from shard 's2' from database $db"
|
||||
$CLICKHOUSE_CLIENT -q "attach database $db3" 2>/dev/null
|
||||
|
Loading…
Reference in New Issue
Block a user