mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 01:30:51 +00:00
Add allow_distributed_ddl setting. [#CLICKHOUSE-3611]
Clearer exception message. Fixed Int64 settings parsing.
This commit is contained in:
parent
f89d9dbfb9
commit
9c889af882
@ -368,6 +368,7 @@ namespace ErrorCodes
|
||||
extern const int INSERT_WAS_DEDUPLICATED = 389;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY = 390;
|
||||
extern const int EXTERNAL_LIBRARY_ERROR = 391;
|
||||
extern const int QUERY_IS_PROHIBITED = 392;
|
||||
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
|
@ -54,6 +54,7 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_TYPE_OF_QUERY;
|
||||
extern const int UNFINISHED;
|
||||
extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK;
|
||||
extern const int QUERY_IS_PROHIBITED;
|
||||
}
|
||||
|
||||
|
||||
@ -980,21 +981,32 @@ public:
|
||||
if (is_cancelled)
|
||||
return res;
|
||||
|
||||
auto elapsed_seconds = watch.elapsedSeconds();
|
||||
if (timeout_seconds >= 0 && elapsed_seconds > timeout_seconds)
|
||||
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
|
||||
{
|
||||
throw Exception("Watching task " + node_path + " is executing too long (" + toString(std::round(elapsed_seconds)) + " sec.)",
|
||||
ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
|
||||
size_t num_active_hosts = current_active_hosts.size();
|
||||
|
||||
std::stringstream msg;
|
||||
msg << "Watching task " << node_path << " is executing longer than distributed_ddl_task_timeout"
|
||||
<< " (=" << timeout_seconds << ") seconds."
|
||||
<< " There are " << num_unfinished_hosts << " unfinished hosts"
|
||||
<< " (" << num_active_hosts << " of them are currently active)"
|
||||
<< ", they are going to execute the query in background";
|
||||
|
||||
throw Exception(msg.str(), ErrorCodes::TIMEOUT_EXCEEDED);
|
||||
}
|
||||
|
||||
if (num_hosts_finished != 0 || try_number != 0)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50 * std::min(static_cast<size_t>(20), try_number + 1)));
|
||||
{
|
||||
auto current_sleep_for = std::chrono::milliseconds(std::min(static_cast<size_t>(1000), 50 * (try_number + 1)));
|
||||
std::this_thread::sleep_for(current_sleep_for);
|
||||
}
|
||||
|
||||
/// TODO: add shared lock
|
||||
if (!zookeeper->exists(node_path))
|
||||
{
|
||||
throw Exception("Cannot provide query execution status. The query's node " + node_path
|
||||
+ " had been deleted by the cleaner since it was finished (or its lifetime is expired)",
|
||||
+ " has been deleted by the cleaner since it was finished (or its lifetime is expired)",
|
||||
ErrorCodes::UNFINISHED);
|
||||
}
|
||||
|
||||
@ -1003,7 +1015,7 @@ public:
|
||||
if (new_hosts.empty())
|
||||
continue;
|
||||
|
||||
Strings cur_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
|
||||
current_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active");
|
||||
|
||||
MutableColumns columns = sample.cloneEmptyColumns();
|
||||
for (const String & host_id : new_hosts)
|
||||
@ -1019,12 +1031,14 @@ public:
|
||||
UInt16 port;
|
||||
Cluster::Address::fromString(host_id, host, port);
|
||||
|
||||
++num_hosts_finished;
|
||||
|
||||
columns[0]->insert(host);
|
||||
columns[1]->insert(static_cast<UInt64>(port));
|
||||
columns[2]->insert(static_cast<Int64>(status.code));
|
||||
columns[3]->insert(status.message);
|
||||
columns[4]->insert(static_cast<UInt64>(waiting_hosts.size() - (++num_hosts_finished)));
|
||||
columns[5]->insert(static_cast<UInt64>(cur_active_hosts.size()));
|
||||
columns[4]->insert(static_cast<UInt64>(waiting_hosts.size() - num_hosts_finished));
|
||||
columns[5]->insert(static_cast<UInt64>(current_active_hosts.size()));
|
||||
}
|
||||
res = sample.cloneWithColumns(std::move(columns));
|
||||
}
|
||||
@ -1086,6 +1100,7 @@ private:
|
||||
NameSet waiting_hosts; /// hosts from task host list
|
||||
NameSet finished_hosts; /// finished hosts from host list
|
||||
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
|
||||
Strings current_active_hosts; /// Hosts that were in active state at the last check
|
||||
size_t num_hosts_finished = 0;
|
||||
|
||||
Int64 timeout_seconds = 120;
|
||||
@ -1104,6 +1119,9 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
|
||||
throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
if (!context.getSettingsRef().allow_distributed_ddl)
|
||||
throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
|
||||
|
||||
if (auto query_alter = dynamic_cast<const ASTAlterQuery *>(query_ptr.get()))
|
||||
{
|
||||
for (const auto & param : query_alter->parameters)
|
||||
|
@ -170,7 +170,7 @@ struct Settings
|
||||
\
|
||||
M(SettingBool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.") \
|
||||
M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \
|
||||
M(SettingInt64, distributed_ddl_task_timeout, 120, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \
|
||||
M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \
|
||||
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \
|
||||
M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \
|
||||
M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \
|
||||
@ -180,7 +180,8 @@ struct Settings
|
||||
M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") \
|
||||
M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \
|
||||
\
|
||||
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.")
|
||||
M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \
|
||||
M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.")
|
||||
|
||||
|
||||
/// Possible limits for query execution.
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Poco/Timespan.h>
|
||||
|
||||
#include <Common/getNumberOfPhysicalCPUCores.h>
|
||||
#include <Common/FieldVisitors.h>
|
||||
|
||||
#include <IO/CompressedStream.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
@ -57,7 +58,7 @@ struct SettingInt
|
||||
|
||||
void set(const Field & x)
|
||||
{
|
||||
set(safeGet<IntType>(x));
|
||||
set(applyVisitor(FieldVisitorConvertToNumber<IntType>(), x));
|
||||
}
|
||||
|
||||
void set(const String & x)
|
||||
|
Loading…
Reference in New Issue
Block a user