configurable output mode for distributed DDL

This commit is contained in:
Alexander Tokmakov 2021-03-08 23:35:09 +03:00
parent 2484781070
commit 2022b90919
10 changed files with 190 additions and 38 deletions

View File

@ -434,7 +434,7 @@ class IColumn;
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(Bool, database_replicated_ddl_output, true, "Return table with query execution status as a result of DDL query", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
@ -446,6 +446,7 @@ class IColumn;
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -102,4 +102,10 @@ IMPLEMENT_SETTING_ENUM(UnionMode, ErrorCodes::UNKNOWN_UNION,
{"ALL", UnionMode::ALL},
{"DISTINCT", UnionMode::DISTINCT}})
IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{{"none", DistributedDDLOutputMode::NONE},
{"throw", DistributedDDLOutputMode::THROW},
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
}

View File

@ -138,4 +138,15 @@ enum class UnionMode
DECLARE_SETTING_ENUM(UnionMode)
enum class DistributedDDLOutputMode
{
NONE,
THROW,
NULL_STATUS_ON_TIMEOUT,
NEVER_THROW,
};
DECLARE_SETTING_ENUM(DistributedDDLOutputMode)
}

View File

@ -313,15 +313,8 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const
entry.initiator = ddl_worker->getCommonHostID();
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
BlockIO io;
if (query_context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io;
Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, query_context, hosts_to_wait);
if (query_context.getSettingsRef().database_replicated_ddl_output)
io.in = std::move(stream);
return io;
return getDistributedDDLStatus(node_path, entry, query_context, hosts_to_wait);
}
static UUID getTableUUIDIfReplicated(const String & metadata, const Context & context)

View File

@ -13,6 +13,9 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -165,16 +168,29 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
entry.initiator = ddl_worker.getCommonHostID();
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context);
}
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, const Context & context, const std::optional<Strings> & hosts_to_wait)
{
BlockIO io;
if (context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io;
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context);
io.in = std::move(stream);
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context, hosts_to_wait);
if (context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
{
/// Wait for query to finish, but ignore output
NullBlockOutputStream output{Block{}};
copyData(*stream, output);
}
else
{
io.in = std::move(stream);
}
return io;
}
DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_,
const std::optional<Strings> & hosts_to_wait)
: node_path(zk_node_path)
@ -182,13 +198,29 @@ DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path
, watch(CLOCK_MONOTONIC_COARSE)
, log(&Poco::Logger::get("DDLQueryStatusInputStream"))
{
if (context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::THROW ||
context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
throw_on_timeout = true;
else if (context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT ||
context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NEVER_THROW)
throw_on_timeout = false;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown output mode");
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (throw_on_timeout)
return type;
return std::make_shared<DataTypeNullable>(type);
};
sample = Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{std::make_shared<DataTypeInt64>(), "status"},
{std::make_shared<DataTypeString>(), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
if (hosts_to_wait)
@ -205,14 +237,33 @@ DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path
addTotalRowsApprox(waiting_hosts.size());
timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout;
/// There is not sense to check query status with zero timeout.
assert(timeout_seconds >= 0);
}
std::pair<String, UInt16> DDLQueryStatusInputStream::parseHostAndPort(const String & host_id) const
{
String host = host_id;
UInt16 port = 0;
if (by_hostname)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
return {host, port};
}
Block DDLQueryStatusInputStream::readImpl()
{
Block res;
if (num_hosts_finished >= waiting_hosts.size())
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
if (all_hosts_finished || timeout_exceeded)
{
if (first_exception)
bool throw_if_error_on_host = context.getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW;
if (first_exception && throw_if_error_on_host)
throw Exception(*first_exception);
return res;
@ -225,22 +276,46 @@ Block DDLQueryStatusInputStream::readImpl()
{
if (isCancelled())
{
if (first_exception)
bool throw_if_error_on_host = context.getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW;
if (first_exception && throw_if_error_on_host)
throw Exception(*first_exception);
return res;
}
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
if (watch.elapsedSeconds() > timeout_seconds)
{
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr const char * msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), "
"they are going to execute the query in background";
if (throw_on_timeout)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, msg_format,
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background",
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
timeout_exceeded = true;
LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = sample.cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
auto [host, port] = parseHostAndPort(host_id);
columns[0]->insert(host);
columns[1]->insert(port);
columns[2]->insert(Field{});
columns[3]->insert(Field{});
columns[4]->insert(num_unfinished_hosts);
columns[5]->insert(num_active_hosts);
}
res = sample.cloneWithColumns(std::move(columns));
return res;
}
if (num_hosts_finished != 0 || try_number != 0)
@ -272,14 +347,7 @@ Block DDLQueryStatusInputStream::readImpl()
status.tryDeserializeText(status_data);
}
String host = host_id;
UInt16 port = 0;
if (by_hostname)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
auto [host, port] = parseHostAndPort(host_id);
if (status.code != 0 && first_exception == nullptr)
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);

View File

@ -24,6 +24,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & conte
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access, bool query_requires_grant_option = false);
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, AccessRightsElements && query_requires_access, bool query_requires_grant_option = false);
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, const Context & context, const std::optional<Strings> & hosts_to_wait = {});
class DDLQueryStatusInputStream final : public IBlockInputStream
{
@ -44,6 +45,8 @@ private:
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
String node_path;
const Context & context;
Stopwatch watch;
@ -62,6 +65,8 @@ private:
Int64 timeout_seconds = 120;
bool by_hostname = true;
bool throw_on_timeout = true;
bool timeout_exceeded = false;
};
}

View File

@ -2,7 +2,7 @@
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<database_replicated_ddl_output>0</database_replicated_ddl_output>
<distributed_ddl_output_mode>none</distributed_ddl_output_mode>
<database_replicated_initial_query_timeout_sec>30</database_replicated_initial_query_timeout_sec>
<distributed_ddl_task_timeout>30</distributed_ddl_task_timeout>
</default>

View File

@ -103,12 +103,16 @@ def test_alters_from_different_replicas(started_cluster):
dummy_node.stop_clickhouse(kill=True)
settings = {"distributed_ddl_task_timeout": 10}
settings = {"distributed_ddl_task_timeout": 5}
assert "There are 1 unfinished hosts (0 of them are currently active)" in \
competing_node.query_and_get_error("ALTER TABLE testdb.concurrent_test ADD COLUMN Added0 UInt32;", settings=settings)
settings = {"distributed_ddl_task_timeout": 5, "distributed_ddl_output_mode": "null_status_on_timeout"}
assert "shard1|replica2\t0\t\\N\t\\N" in \
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added2 UInt32;", settings=settings)
settings = {"distributed_ddl_task_timeout": 5, "distributed_ddl_output_mode": "never_throw"}
assert "shard1|replica2\t0\t\\N\t\\N" in \
competing_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;", settings=settings)
dummy_node.start_clickhouse()
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added2 UInt32;")
competing_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;")
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;")
competing_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;")
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;")

View File

@ -0,0 +1,25 @@
none
Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57, e.displayText() = Error: Table default.throw already exists
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=5) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background.
throw
localhost 9000 0 0 0
localhost 9000 57 Code: 57, e.displayText() = Error: Table default.throw already exists. 0 0
Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57, e.displayText() = Error: Table default.throw already exists
localhost 9000 0 1 0
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=5) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background.
null_status_on_timeout
localhost 9000 0 0 0
localhost 9000 57 Code: 57, e.displayText() = Error: Table default.null_status already exists. 0 0
Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57, e.displayText() = Error: Table default.null_status already exists
localhost 9000 0 1 0
localhost 1 \N \N 1 0
never_throw
localhost 9000 0 0 0
localhost 9000 57 Code: 57, e.displayText() = Error: Table default.never_throw already exists. 0 0
localhost 9000 0 1 0
localhost 1 \N \N 1 0

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists throw;"
$CLICKHOUSE_CLIENT -q "drop table if exists null_status;"
$CLICKHOUSE_CLIENT -q "drop table if exists never_throw;"
CLICKHOUSE_CLIENT_OPT=$(echo ${CLICKHOUSE_CLIENT_OPT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=fatal/g')
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=none"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
# Ok
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
# Table exists
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
# Timeout
$CLIENT -q "drop table throw on cluster test_unavailable_shard;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/" | sed "s/background. /background./"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=throw"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
$CLIENT -q "drop table throw on cluster test_unavailable_shard;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/" | sed "s/background. /background./"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=null_status_on_timeout"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
$CLIENT -q "drop table null_status on cluster test_unavailable_shard;"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=5 --distributed_ddl_output_mode=never_throw"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
$CLIENT -q "drop table never_throw on cluster test_unavailable_shard;"