Merge pull request #21535 from ClickHouse/distributed_ddl_improvements

Distributed DDL improvements
This commit is contained in:
tavplubix 2021-03-29 22:40:11 +03:00 committed by GitHub
commit 3c0f5a57ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 518 additions and 300 deletions

View File

@ -216,7 +216,7 @@ class IColumn;
\
M(Bool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.", 0) \
M(UInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite.", 0) \
M(Int64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. If a ddl request has not been performed on all hosts, a response will contain a timeout error and a request will be executed in an async mode. Negative value means infinite. Zero means async mode.", 0) \
M(Milliseconds, stream_flush_interval_ms, 7500, "Timeout for flushing data from streaming storages.", 0) \
M(Milliseconds, stream_poll_timeout_ms, 500, "Timeout for polling data from/to streaming storages.", 0) \
\
@ -438,7 +438,9 @@ class IColumn;
M(Bool, engine_file_truncate_on_insert, false, "Enables or disables truncate before insert in file engine tables", 0) \
M(Bool, allow_experimental_database_replicated, false, "Allow to create databases with Replicated engine", 0) \
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(Bool, database_replicated_ddl_output, true, "Return table with query execution status as a result of DDL query", 0) \
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
@ -450,6 +452,7 @@ class IColumn;
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -102,4 +102,10 @@ IMPLEMENT_SETTING_ENUM(UnionMode, ErrorCodes::UNKNOWN_UNION,
{"ALL", UnionMode::ALL},
{"DISTINCT", UnionMode::DISTINCT}})
IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{{"none", DistributedDDLOutputMode::NONE},
{"throw", DistributedDDLOutputMode::THROW},
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}})
}

View File

@ -138,4 +138,15 @@ enum class UnionMode
DECLARE_SETTING_ENUM(UnionMode)
enum class DistributedDDLOutputMode
{
NONE,
THROW,
NULL_STATUS_ON_TIMEOUT,
NEVER_THROW,
};
DECLARE_SETTING_ENUM(DistributedDDLOutputMode)
}

View File

@ -231,8 +231,8 @@ void DatabaseOnDisk::createTable(
if (create.attach_short_syntax)
{
/// Metadata already exists, table was detached
removeDetachedPermanentlyFlag(context, table_name, table_metadata_path, true);
attachTable(table_name, table, getTableDataPath(create));
removeDetachedPermanentlyFlag(table_name, table_metadata_path);
return;
}
@ -270,12 +270,12 @@ void DatabaseOnDisk::createTable(
commitCreateTable(create, table, table_metadata_tmp_path, table_metadata_path, context);
removeDetachedPermanentlyFlag(table_name, table_metadata_path);
removeDetachedPermanentlyFlag(context, table_name, table_metadata_path, false);
}
/// If the table was detached permanently we will have a flag file with
/// .sql.detached extension, is not needed anymore since we attached the table back
void DatabaseOnDisk::removeDetachedPermanentlyFlag(const String & table_name, const String & table_metadata_path) const
void DatabaseOnDisk::removeDetachedPermanentlyFlag(const Context &, const String & table_name, const String & table_metadata_path, bool) const
{
try
{

View File

@ -94,11 +94,10 @@ protected:
virtual void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, const Context & query_context);
virtual void removeDetachedPermanentlyFlag(const Context & context, const String & table_name, const String & table_metadata_path, bool attach) const;
const String metadata_path;
const String data_path;
private:
void removeDetachedPermanentlyFlag(const String & table_name, const String & table_metadata_path) const;
};
}

View File

@ -18,6 +18,7 @@
#include <Interpreters/Cluster.h>
#include <common/getFQDNOrHostName.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
@ -105,7 +106,22 @@ std::pair<String, String> DatabaseReplicated::parseFullReplicaName(const String
ClusterPtr DatabaseReplicated::getCluster() const
{
/// TODO Maintain up-to-date Cluster and allow to use it in Distributed tables
std::lock_guard lock{mutex};
if (cluster)
return cluster;
cluster = getClusterImpl();
return cluster;
}
void DatabaseReplicated::setCluster(ClusterPtr && new_cluster)
{
std::lock_guard lock{mutex};
cluster = std::move(new_cluster);
}
ClusterPtr DatabaseReplicated::getClusterImpl() const
{
Strings hosts;
Strings host_ids;
@ -120,7 +136,7 @@ ClusterPtr DatabaseReplicated::getCluster() const
hosts = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
if (hosts.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No hosts found");
Int32 cver = stat.cversion;
Int32 cversion = stat.cversion;
std::sort(hosts.begin(), hosts.end());
std::vector<zkutil::ZooKeeper::FutureGet> futures;
@ -139,7 +155,9 @@ ClusterPtr DatabaseReplicated::getCluster() const
}
zookeeper->get(zookeeper_path + "/replicas", &stat);
if (success && cver == stat.version)
if (cversion != stat.cversion)
success = false;
if (success)
break;
}
if (!success)
@ -157,22 +175,23 @@ ClusterPtr DatabaseReplicated::getCluster() const
if (id == DROPPED_MARK)
continue;
auto [shard, replica] = parseFullReplicaName(hosts[i]);
auto pos = id.find(':');
String host = id.substr(0, pos);
auto pos = id.rfind(':');
String host_port = id.substr(0, pos);
if (shard != current_shard)
{
current_shard = shard;
if (!shards.back().empty())
shards.emplace_back();
}
shards.back().emplace_back(unescapeForFileName(host));
shards.back().emplace_back(unescapeForFileName(host_port));
}
/// TODO make it configurable
String username = "default";
String password;
String username = db_settings.cluster_username;
String password = db_settings.cluster_password;
UInt16 default_port = global_context.getTCPPort();
bool secure = db_settings.cluster_secure_connection;
return std::make_shared<Cluster>(global_context.getSettingsRef(), shards, username, password, global_context.getTCPPort(), false);
return std::make_shared<Cluster>(global_context.getSettingsRef(), shards, username, password, default_port, false, secure);
}
void DatabaseReplicated::tryConnectToZooKeeperAndInitDatabase(bool force_attach)
@ -253,11 +272,8 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
__builtin_unreachable();
}
void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper)
void DatabaseReplicated::createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper)
{
/// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(global_context, db_uuid);
/// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info).
DDLLogEntry entry{};
@ -266,11 +282,20 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
String counter_path = current_zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential);
String query_path = query_path_prefix + counter_path.substr(counter_prefix.size());
ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(query_path + "/committed", getFullReplicaName(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
}
void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper)
{
/// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(global_context, db_uuid);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
}
@ -294,7 +319,11 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const
/// Replicas will set correct name of current database in query context (database name can be different on replicas)
if (auto * ddl_query = query->as<ASTQueryWithTableAndOutput>())
{
if (ddl_query->database != getDatabaseName())
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed");
ddl_query->database.clear();
}
if (const auto * query_alter = query->as<ASTAlterQuery>())
{
@ -305,23 +334,26 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const
}
}
if (auto * query_drop = query->as<ASTDropQuery>())
{
if (query_drop->kind == ASTDropQuery::Kind::Detach && query_context.getSettingsRef().database_replicated_always_detach_permanently)
query_drop->permanently = true;
if (query_drop->kind == ASTDropQuery::Kind::Detach && !query_drop->permanently)
throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH TABLE is not allowed for Replicated databases. "
"Use DETACH TABLE PERMANENTLY or SYSTEM RESTART REPLICA or set "
"database_replicated_always_detach_permanently to 1");
}
LOG_DEBUG(log, "Proposing query: {}", queryToString(query));
/// TODO maybe write current settings to log entry?
DDLLogEntry entry;
entry.query = queryToString(query);
entry.initiator = ddl_worker->getCommonHostID();
entry.setSettingsIfRequired(query_context);
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
BlockIO io;
if (query_context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io;
Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, query_context, hosts_to_wait);
if (query_context.getSettingsRef().database_replicated_ddl_output)
io.in = std::move(stream);
return io;
return getDistributedDDLStatus(node_path, entry, query_context, hosts_to_wait);
}
static UUID getTableUUIDIfReplicated(const String & metadata, const Context & context)
@ -557,12 +589,14 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
auto ast = parseQuery(parser, query, description, 0, global_context.getSettingsRef().max_parser_depth);
auto & create = ast->as<ASTCreateQuery &>();
if (create.uuid == UUIDHelpers::Nil || create.table != TABLE_WITH_UUID_NAME_PLACEHOLDER || ! create.database.empty())
if (create.uuid == UUIDHelpers::Nil || create.table != TABLE_WITH_UUID_NAME_PLACEHOLDER || !create.database.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected query from {}: {}", node_name, query);
bool is_materialized_view_with_inner_table = create.is_materialized_view && create.to_table_id.empty();
create.database = getDatabaseName();
create.table = unescapeForFileName(node_name);
create.attach = false;
create.attach = is_materialized_view_with_inner_table;
return ast;
}
@ -570,8 +604,13 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
void DatabaseReplicated::drop(const Context & context_)
{
auto current_zookeeper = getZooKeeper();
current_zookeeper->set(replica_path, DROPPED_MARK);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(replica_path, DROPPED_MARK, -1));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
DatabaseAtomic::drop(context_);
current_zookeeper->tryRemoveRecursive(replica_path);
/// TODO it may leave garbage in ZooKeeper if the last node lost connection here
if (current_zookeeper->tryRemove(zookeeper_path + "/replicas") == Coordination::Error::ZOK)
@ -598,7 +637,7 @@ void DatabaseReplicated::shutdown()
void DatabaseReplicated::dropTable(const Context & context, const String & table_name, bool no_delay)
{
auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
assert(!ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
if (txn && txn->isInitialQuery())
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
@ -702,12 +741,28 @@ void DatabaseReplicated::detachTablePermanently(const Context & context, const S
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->isInitialQuery())
{
/// We have to remove metadata from zookeeper, because we do not distinguish permanently detached tables
/// from attached tables when recovering replica.
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
}
DatabaseAtomic::detachTablePermanently(context, table_name);
}
void DatabaseReplicated::removeDetachedPermanentlyFlag(const Context & context, const String & table_name, const String & table_metadata_path, bool attach) const
{
auto txn = context.getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->isInitialQuery() && attach)
{
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(table_name);
String statement = readMetadataFile(table_name);
txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent));
}
DatabaseAtomic::removeDetachedPermanentlyFlag(context, table_name, table_metadata_path, attach);
}
String DatabaseReplicated::readMetadataFile(const String & table_name) const
{
String statement;

View File

@ -45,6 +45,7 @@ public:
const ASTPtr & query) override;
void removeDictionary(const Context & context, const String & dictionary_name) override;
void detachTablePermanently(const Context & context, const String & table_name) override;
void removeDetachedPermanentlyFlag(const Context & context, const String & table_name, const String & table_metadata_path, bool attach) const override;
/// Try to execute DLL query on current host as initial query. If query is succeed,
/// then it will be executed on all replicas.
@ -76,6 +77,11 @@ private:
ASTPtr parseQueryFromMetadataInZooKeeper(const String & node_name, const String & query);
String readMetadataFile(const String & table_name) const;
ClusterPtr getClusterImpl() const;
void setCluster(ClusterPtr && new_cluster);
void createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper);
String zookeeper_path;
String shard_name;
String replica_name;
@ -86,6 +92,8 @@ private:
std::atomic_bool is_readonly = true;
std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker;
mutable ClusterPtr cluster;
};
}

View File

@ -11,6 +11,9 @@ class ASTStorage;
M(Float, max_broken_tables_ratio, 0.5, "Do not recover replica automatically if the ratio of staled tables to all tables is greater", 0) \
M(UInt64, max_replication_lag_to_enqueue, 10, "Replica will throw exception on attempt to execute query if its replication lag greater", 0) \
M(UInt64, wait_entry_commited_timeout_sec, 3600, "Replicas will try to cancel query if timeout exceed, but initiator host has not executed it yet", 0) \
M(String, cluster_username, "default", "Username to use when connecting to hosts of cluster", 0) \
M(String, cluster_password, "", "Password to use when connecting to hosts of cluster", 0) \
M(Bool, cluster_secure_connection, false, "Enable TLS when connecting to hosts of cluster", 0) \
DECLARE_SETTINGS_TRAITS(DatabaseReplicatedSettingsTraits, LIST_OF_DATABASE_REPLICATED_SETTINGS)

View File

@ -237,6 +237,8 @@ DDLTaskPtr DatabaseReplicatedDDLWorker::initAndCheckTask(const String & entry_na
if (task->entry.query.empty())
{
/// Some replica is added or removed, let's update cached cluster
database->setCluster(database->getClusterImpl());
out_reason = fmt::format("Entry {} is a dummy task", entry_name);
return {};
}

View File

@ -116,7 +116,9 @@ Cluster::Address::Address(
const String & password_,
UInt16 clickhouse_port,
bool secure_,
Int64 priority_)
Int64 priority_,
UInt32 shard_index_,
UInt32 replica_index_)
: user(user_)
, password(password_)
{
@ -126,6 +128,8 @@ Cluster::Address::Address(
secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
priority = priority_;
is_local = isLocal(clickhouse_port);
shard_index = shard_index_;
replica_index = replica_index_;
}
@ -491,7 +495,7 @@ Cluster::Cluster(const Settings & settings, const std::vector<std::vector<String
{
Addresses current;
for (const auto & replica : shard)
current.emplace_back(replica, username, password, clickhouse_port, secure, priority);
current.emplace_back(replica, username, password, clickhouse_port, secure, priority, current_shard_num, current.size() + 1);
addresses_with_failover.emplace_back(current);

View File

@ -110,7 +110,9 @@ public:
const String & password_,
UInt16 clickhouse_port,
bool secure_ = false,
Int64 priority_ = 1);
Int64 priority_ = 1,
UInt32 shard_index_ = 0,
UInt32 replica_index_ = 0);
/// Returns 'escaped_host_name:port'
String toString() const;

View File

@ -54,6 +54,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/DDLTask.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h>
@ -1800,11 +1801,14 @@ std::optional<UInt16> Context::getTCPPortSecure() const
std::shared_ptr<Cluster> Context::getCluster(const std::string & cluster_name) const
{
auto res = getClusters().getCluster(cluster_name);
if (res)
return res;
if (!res)
throw Exception("Requested cluster '" + cluster_name + "' not found", ErrorCodes::BAD_GET);
res = tryGetReplicatedDatabaseCluster(cluster_name);
if (res)
return res;
return res;
throw Exception("Requested cluster '" + cluster_name + "' not found", ErrorCodes::BAD_GET);
}

View File

@ -10,6 +10,7 @@
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Databases/DatabaseReplicated.h>
@ -43,20 +44,47 @@ bool HostID::isLocalAddress(UInt16 clickhouse_port) const
}
}
void DDLLogEntry::assertVersion() const
{
constexpr UInt64 max_version = 2;
if (version == 0 || max_version < version)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}."
"Maximum supported version is {}", version, max_version);
}
void DDLLogEntry::setSettingsIfRequired(const Context & context)
{
version = context.getSettingsRef().distributed_ddl_entry_format_version;
if (version == 2)
settings.emplace(context.getSettingsRef().changes());
}
String DDLLogEntry::toString() const
{
WriteBufferFromOwnString wb;
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
auto version = CURRENT_VERSION;
wb << "version: " << version << "\n";
wb << "query: " << escape << query << "\n";
wb << "hosts: " << host_id_strings << "\n";
bool write_hosts = version == 1 || !hosts.empty();
if (write_hosts)
{
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
wb << "hosts: " << host_id_strings << "\n";
}
wb << "initiator: " << initiator << "\n";
bool write_settings = 1 <= version && settings && !settings->empty();
if (write_settings)
{
ASTSetQuery ast;
ast.is_standalone = false;
ast.changes = *settings;
wb << "settings: " << serializeAST(ast) << "\n";
}
return wb.str();
}
@ -64,25 +92,46 @@ void DDLLogEntry::parse(const String & data)
{
ReadBufferFromString rb(data);
int version;
rb >> "version: " >> version >> "\n";
if (version != CURRENT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
assertVersion();
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
rb >> "hosts: " >> host_id_strings >> "\n";
if (version == 1)
{
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
}
else if (version == 2)
{
if (!rb.eof() && *rb.position() == 'h')
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof() && *rb.position() == 'i')
rb >> "initiator: " >> initiator >> "\n";
if (!rb.eof() && *rb.position() == 's')
{
String settings_str;
rb >> "settings: " >> settings_str >> "\n";
ParserSetQuery parser{true};
constexpr UInt64 max_size = 4096;
constexpr UInt64 max_depth = 16;
ASTPtr settings_ast = parseQuery(parser, settings_str, max_size, max_depth);
settings.emplace(std::move(settings_ast->as<ASTSetQuery>()->changes));
}
}
assertEOF(rb);
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
if (!host_id_strings.empty())
{
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
}
}
@ -102,6 +151,8 @@ std::unique_ptr<Context> DDLTaskBase::makeQueryContext(Context & from_context, c
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (entry.settings)
query_context->applySettingsChanges(*entry.settings);
return query_context;
}
@ -345,4 +396,11 @@ void ZooKeeperMetadataTransaction::commit()
state = COMMITTED;
}
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name)
{
if (const auto * replicated_db = dynamic_cast<const DatabaseReplicated *>(DatabaseCatalog::instance().tryGetDatabase(cluster_name).get()))
return replicated_db->getCluster();
return {};
}
}

View File

@ -18,6 +18,7 @@ namespace DB
class ASTQueryWithOnCluster;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
using ClusterPtr = std::shared_ptr<Cluster>;
class DatabaseReplicated;
class ZooKeeperMetadataTransaction;
@ -56,15 +57,16 @@ struct HostID
struct DDLLogEntry
{
UInt64 version = 1;
String query;
std::vector<HostID> hosts;
String initiator; // optional
std::optional<SettingsChanges> settings;
static constexpr int CURRENT_VERSION = 1;
void setSettingsIfRequired(const Context & context);
String toString() const;
void parse(const String & data);
void assertVersion() const;
};
struct DDLTaskBase
@ -192,4 +194,6 @@ public:
~ZooKeeperMetadataTransaction() { assert(isExecuted() || std::uncaught_exceptions()); }
};
ClusterPtr tryGetReplicatedDatabaseCluster(const String & cluster_name);
}

View File

@ -713,6 +713,19 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
}
}
static void generateUUIDForTable(ASTCreateQuery & create)
{
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
/// If destination table (to_table_id) is not specified for materialized view,
/// then MV will create inner table. We should generate UUID of inner table here,
/// so it will be the same on all hosts if query in ON CLUSTER or database engine is Replicated.
bool need_uuid_for_inner_table = create.is_materialized_view && !create.to_table_id;
if (need_uuid_for_inner_table && create.to_inner_uuid == UUIDHelpers::Nil)
create.to_inner_uuid = UUIDHelpers::generateV4();
}
void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const DatabasePtr & database) const
{
const auto * kind = create.is_dictionary ? "Dictionary" : "Table";
@ -744,18 +757,19 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
kind_upper, create.table);
}
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
generateUUIDForTable(create);
}
else
{
bool is_on_cluster = context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
if (create.uuid != UUIDHelpers::Nil && !is_on_cluster)
bool has_uuid = create.uuid != UUIDHelpers::Nil || create.to_inner_uuid != UUIDHelpers::Nil;
if (has_uuid && !is_on_cluster)
throw Exception(ErrorCodes::INCORRECT_QUERY,
"{} UUID specified, but engine of database {} is not Atomic", kind, create.database);
/// Ignore UUID if it's ON CLUSTER query
create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
}
if (create.replace_table)
@ -804,6 +818,17 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (create.attach && !create.storage && !create.columns_list)
{
auto database = DatabaseCatalog::instance().getDatabase(database_name);
if (database->getEngineName() == "Replicated")
{
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, create.table);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{
create.database = database_name;
guard->releaseTableLock();
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, context);
}
}
bool if_not_exists = create.if_not_exists;
// Table SQL definition is available even if the table is detached (even permanently)
@ -877,7 +902,6 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (need_add_to_database && database->getEngineName() == "Replicated")
{
auto guard = DatabaseCatalog::instance().getDDLGuard(create.database, create.table);
database = DatabaseCatalog::instance().getDatabase(create.database);
if (typeid_cast<DatabaseReplicated *>(database.get()) && context.getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
{
assertOrSetUUID(create, database);
@ -1136,8 +1160,7 @@ void InterpreterCreateQuery::prepareOnClusterQuery(ASTCreateQuery & create, cons
/// For CREATE query generate UUID on initiator, so it will be the same on all hosts.
/// It will be ignored if database does not support UUIDs.
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
generateUUIDForTable(create);
/// For cross-replication cluster we cannot use UUID in replica path.
String cluster_name_expanded = context.getMacros()->expand(cluster_name);

View File

@ -133,10 +133,6 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ASTDropQuery & query, Dat
!is_drop_or_detach_database;
if (is_replicated_ddl_query)
{
if (query.kind == ASTDropQuery::Kind::Detach && !query.permanently)
throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH TABLE is not allowed for Replicated databases. "
"Use DETACH TABLE PERMANENTLY or SYSTEM RESTART REPLICA");
if (query.kind == ASTDropQuery::Kind::Detach)
context.checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
else if (query.kind == ASTDropQuery::Kind::Truncate)

View File

@ -82,6 +82,7 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
{
auto & create = create_query->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
}
WriteBufferFromOwnString buf;

View File

@ -13,6 +13,9 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -160,18 +163,32 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
entry.hosts = std::move(hosts);
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getCommonHostID();
entry.setSettingsIfRequired(context);
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context);
}
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, const Context & context, const std::optional<Strings> & hosts_to_wait)
{
BlockIO io;
if (context.getSettingsRef().distributed_ddl_task_timeout == 0)
return io;
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context);
io.in = std::move(stream);
auto stream = std::make_shared<DDLQueryStatusInputStream>(node_path, entry, context, hosts_to_wait);
if (context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
{
/// Wait for query to finish, but ignore output
NullBlockOutputStream output{Block{}};
copyData(*stream, output);
}
else
{
io.in = std::move(stream);
}
return io;
}
DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path, const DDLLogEntry & entry, const Context & context_,
const std::optional<Strings> & hosts_to_wait)
: node_path(zk_node_path)
@ -179,19 +196,36 @@ DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path
, watch(CLOCK_MONOTONIC_COARSE)
, log(&Poco::Logger::get("DDLQueryStatusInputStream"))
{
if (context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::THROW ||
context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NONE)
throw_on_timeout = true;
else if (context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT ||
context.getSettingsRef().distributed_ddl_output_mode == DistributedDDLOutputMode::NEVER_THROW)
throw_on_timeout = false;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown output mode");
auto maybe_make_nullable = [&](const DataTypePtr & type) -> DataTypePtr
{
if (throw_on_timeout)
return type;
return std::make_shared<DataTypeNullable>(type);
};
sample = Block{
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{std::make_shared<DataTypeInt64>(), "status"},
{std::make_shared<DataTypeString>(), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
{std::make_shared<DataTypeString>(), "host"},
{std::make_shared<DataTypeUInt16>(), "port"},
{maybe_make_nullable(std::make_shared<DataTypeInt64>()), "status"},
{maybe_make_nullable(std::make_shared<DataTypeString>()), "error"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_remaining"},
{std::make_shared<DataTypeUInt64>(), "num_hosts_active"},
};
if (hosts_to_wait)
{
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
by_hostname = false;
sample.erase("port");
}
else
{
@ -204,12 +238,29 @@ DDLQueryStatusInputStream::DDLQueryStatusInputStream(const String & zk_node_path
timeout_seconds = context.getSettingsRef().distributed_ddl_task_timeout;
}
std::pair<String, UInt16> DDLQueryStatusInputStream::parseHostAndPort(const String & host_id) const
{
String host = host_id;
UInt16 port = 0;
if (by_hostname)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
return {host, port};
}
Block DDLQueryStatusInputStream::readImpl()
{
Block res;
if (num_hosts_finished >= waiting_hosts.size())
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
/// Seems like num_hosts_finished cannot be strictly greater than waiting_hosts.size()
assert(num_hosts_finished <= waiting_hosts.size());
if (all_hosts_finished || timeout_exceeded)
{
if (first_exception)
bool throw_if_error_on_host = context.getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW;
if (first_exception && throw_if_error_on_host)
throw Exception(*first_exception);
return res;
@ -222,7 +273,8 @@ Block DDLQueryStatusInputStream::readImpl()
{
if (isCancelled())
{
if (first_exception)
bool throw_if_error_on_host = context.getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW;
if (first_exception && throw_if_error_on_host)
throw Exception(*first_exception);
return res;
@ -233,11 +285,36 @@ Block DDLQueryStatusInputStream::readImpl()
size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();
constexpr const char * msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), "
"they are going to execute the query in background";
if (throw_on_timeout)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, msg_format,
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), they are going to execute the query in background",
node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
timeout_exceeded = true;
LOG_INFO(log, msg_format, node_path, timeout_seconds, num_unfinished_hosts, num_active_hosts);
NameSet unfinished_hosts = waiting_hosts;
for (const auto & host_id : finished_hosts)
unfinished_hosts.erase(host_id);
/// Query is not finished on the rest hosts, so fill the corresponding rows with NULLs.
MutableColumns columns = sample.cloneEmptyColumns();
for (const String & host_id : unfinished_hosts)
{
auto [host, port] = parseHostAndPort(host_id);
size_t num = 0;
columns[num++]->insert(host);
if (by_hostname)
columns[num++]->insert(port);
columns[num++]->insert(Field{});
columns[num++]->insert(Field{});
columns[num++]->insert(num_unfinished_hosts);
columns[num++]->insert(num_active_hosts);
}
res = sample.cloneWithColumns(std::move(columns));
return res;
}
if (num_hosts_finished != 0 || try_number != 0)
@ -269,26 +346,21 @@ Block DDLQueryStatusInputStream::readImpl()
status.tryDeserializeText(status_data);
}
String host = host_id;
UInt16 port = 0;
if (by_hostname)
{
auto host_and_port = Cluster::Address::fromString(host_id);
host = host_and_port.first;
port = host_and_port.second;
}
auto [host, port] = parseHostAndPort(host_id);
if (status.code != 0 && first_exception == nullptr)
first_exception = std::make_unique<Exception>(status.code, "There was an error on [{}:{}]: {}", host, port, status.message);
++num_hosts_finished;
columns[0]->insert(host);
columns[1]->insert(port);
columns[2]->insert(status.code);
columns[3]->insert(status.message);
columns[4]->insert(waiting_hosts.size() - num_hosts_finished);
columns[5]->insert(current_active_hosts.size());
size_t num = 0;
columns[num++]->insert(host);
if (by_hostname)
columns[num++]->insert(port);
columns[num++]->insert(status.code);
columns[num++]->insert(status.message);
columns[num++]->insert(waiting_hosts.size() - num_hosts_finished);
columns[num++]->insert(current_active_hosts.size());
}
res = sample.cloneWithColumns(std::move(columns));
}

View File

@ -24,6 +24,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & conte
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const AccessRightsElements & query_requires_access);
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, AccessRightsElements && query_requires_access);
BlockIO getDistributedDDLStatus(const String & node_path, const DDLLogEntry & entry, const Context & context, const std::optional<Strings> & hosts_to_wait = {});
class DDLQueryStatusInputStream final : public IBlockInputStream
{
@ -44,6 +45,8 @@ private:
Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
std::pair<String, UInt16> parseHostAndPort(const String & host_id) const;
String node_path;
const Context & context;
Stopwatch watch;
@ -62,6 +65,8 @@ private:
Int64 timeout_seconds = 120;
bool by_hostname = true;
bool throw_on_timeout = true;
bool timeout_exceeded = false;
};
}

View File

@ -297,12 +297,20 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (to_table_id)
{
assert(is_materialized_view && to_inner_uuid == UUIDHelpers::Nil);
settings.ostr
<< (settings.hilite ? hilite_keyword : "") << " TO " << (settings.hilite ? hilite_none : "")
<< (!to_table_id.database_name.empty() ? backQuoteIfNeed(to_table_id.database_name) + "." : "")
<< backQuoteIfNeed(to_table_id.table_name);
}
if (to_inner_uuid != UUIDHelpers::Nil)
{
assert(is_materialized_view && !to_table_id);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO INNER UUID " << (settings.hilite ? hilite_none : "")
<< quoteString(toString(to_inner_uuid));
}
if (!as_table.empty())
{
settings.ostr

View File

@ -66,6 +66,7 @@ public:
ASTExpressionList * tables = nullptr;
StorageID to_table_id = StorageID::createEmpty(); /// For CREATE MATERIALIZED VIEW mv TO table.
UUID to_inner_uuid = UUIDHelpers::Nil; /// For materialized view with inner table
ASTStorage * storage = nullptr;
String as_database;
String as_table;

View File

@ -780,6 +780,7 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
ASTPtr table;
ASTPtr to_table;
ASTPtr to_inner_uuid;
ASTPtr columns_list;
ASTPtr storage;
ASTPtr as_database;
@ -830,9 +831,16 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
return false;
}
// TO [db.]table
if (ParserKeyword{"TO"}.ignore(pos, expected))
if (ParserKeyword{"TO INNER UUID"}.ignore(pos, expected))
{
ParserLiteral literal_p;
if (!literal_p.parse(pos, to_inner_uuid, expected))
return false;
}
else if (ParserKeyword{"TO"}.ignore(pos, expected))
{
// TO [db.]table
if (!table_name_p.parse(pos, to_table, expected))
return false;
}
@ -883,6 +891,8 @@ bool ParserCreateViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expec
if (to_table)
query->to_table_id = getTableIdentifier(to_table);
if (to_inner_uuid)
query->to_inner_uuid = parseFromString<UUID>(to_inner_uuid->as<ASTLiteral>()->value.get<String>());
query->set(query->columns_list, columns_list);
query->set(query->storage, storage);

View File

@ -85,7 +85,7 @@ StorageMaterializedView::StorageMaterializedView(
else if (attach_)
{
/// If there is an ATTACH request, then the internal table must already be created.
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()));
target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);
}
else
{
@ -94,6 +94,7 @@ StorageMaterializedView::StorageMaterializedView(
auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = getStorageID().database_name;
manual_create_query->table = generateInnerTableName(getStorageID());
manual_create_query->uuid = query.to_inner_uuid;
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());

View File

@ -359,6 +359,7 @@ protected:
{
auto & create = ast->as<ASTCreateQuery &>();
create.uuid = UUIDHelpers::Nil;
create.to_inner_uuid = UUIDHelpers::Nil;
}
if (columns_mask[src_index++])

View File

@ -17,4 +17,4 @@
</shard>
</test_cluster_two_shards_different_databases>
</remote_servers>
</yandex>
</yandex>

View File

@ -2,9 +2,11 @@
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<database_replicated_ddl_output>0</database_replicated_ddl_output>
<distributed_ddl_output_mode>none</distributed_ddl_output_mode>
<database_replicated_initial_query_timeout_sec>30</database_replicated_initial_query_timeout_sec>
<distributed_ddl_task_timeout>30</distributed_ddl_task_timeout>
<database_replicated_always_detach_permanently>1</database_replicated_always_detach_permanently>
<distributed_ddl_entry_format_version>2</distributed_ddl_entry_format_version>
</default>
</profiles>
</yandex>

View File

@ -1,34 +1,3 @@
<yandex>
<database_atomic_delay_before_drop_table_sec>10</database_atomic_delay_before_drop_table_sec>
<remote_servers>
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>main_node</host>
<port>9000</port>
</replica>
<replica>
<host>dummy_node</host>
<port>9000</port>
</replica>
<replica>
<host>competing_node</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>snapshotting_node</host>
<port>9000</port>
</replica>
<replica>
<host>snapshot_recovering_node</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</yandex>

View File

@ -2,6 +2,7 @@
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
<allow_experimental_alter_materialized_view_structure>1</allow_experimental_alter_materialized_view_structure>
</default>
</profiles>
<users>

View File

@ -99,16 +99,20 @@ def test_alters_from_different_replicas(started_cluster):
"(CounterID UInt32, StartDate Date, UserID UInt32, VisitID UInt32, NestedColumn Nested(A UInt8, S String), ToDrop UInt32) "
"ENGINE = MergeTree(StartDate, intHash32(UserID), (CounterID, StartDate, intHash32(UserID), VisitID), 8192);")
main_node.query("CREATE TABLE testdb.dist AS testdb.concurrent_test ENGINE = Distributed(cluster, testdb, concurrent_test, CounterID)")
main_node.query("CREATE TABLE testdb.dist AS testdb.concurrent_test ENGINE = Distributed(testdb, testdb, concurrent_test, CounterID)")
dummy_node.stop_clickhouse(kill=True)
settings = {"distributed_ddl_task_timeout": 10}
settings = {"distributed_ddl_task_timeout": 5}
assert "There are 1 unfinished hosts (0 of them are currently active)" in \
competing_node.query_and_get_error("ALTER TABLE testdb.concurrent_test ADD COLUMN Added0 UInt32;", settings=settings)
settings = {"distributed_ddl_task_timeout": 5, "distributed_ddl_output_mode": "null_status_on_timeout"}
assert "shard1|replica2\t\\N\t\\N" in \
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added2 UInt32;", settings=settings)
settings = {"distributed_ddl_task_timeout": 5, "distributed_ddl_output_mode": "never_throw"}
assert "shard1|replica2\t\\N\t\\N" in \
competing_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;", settings=settings)
dummy_node.start_clickhouse()
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added2 UInt32;")
competing_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN Added1 UInt32 AFTER Added0;")
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1 Nested(A UInt32, B UInt64) AFTER Added2;")
competing_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested1.C Array(String) AFTER AddedNested1.B;")
main_node.query("ALTER TABLE testdb.concurrent_test ADD COLUMN AddedNested2 Nested(A UInt32, B UInt64) AFTER AddedNested1;")
@ -198,8 +202,14 @@ def test_recover_staled_replica(started_cluster):
dummy_node.query("CREATE TABLE recover.rmt2 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings)
main_node.query("CREATE TABLE recover.rmt3 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings)
dummy_node.query("CREATE TABLE recover.rmt5 (n int) ENGINE=ReplicatedMergeTree order by n", settings=settings)
main_node.query("CREATE DICTIONARY recover.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())")
dummy_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())")
main_node.query("CREATE MATERIALIZED VIEW recover.mv1 (n int) ENGINE=ReplicatedMergeTree order by n AS SELECT n FROM recover.rmt1", settings=settings)
dummy_node.query("CREATE MATERIALIZED VIEW recover.mv2 (n int) ENGINE=ReplicatedMergeTree order by n AS SELECT n FROM recover.rmt2", settings=settings)
main_node.query("CREATE DICTIONARY recover.d1 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())")
dummy_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt2' PASSWORD '' DB 'recover')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT())")
for table in ['t1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt3', 'rmt5']:
main_node.query("INSERT INTO recover.{} VALUES (42)".format(table))
@ -217,35 +227,44 @@ def test_recover_staled_replica(started_cluster):
main_node.query("RENAME TABLE recover.rmt3 TO recover.rmt4", settings=settings)
main_node.query("DROP TABLE recover.rmt5", settings=settings)
main_node.query("DROP DICTIONARY recover.d2", settings=settings)
main_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());", settings=settings)
main_node.query("CREATE DICTIONARY recover.d2 (n int DEFAULT 0, m int DEFAULT 1) PRIMARY KEY n "
"SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'rmt1' PASSWORD '' DB 'recover')) "
"LIFETIME(MIN 1 MAX 10) LAYOUT(FLAT());", settings=settings)
inner_table = ".inner_id." + dummy_node.query("SELECT uuid FROM system.tables WHERE database='recover' AND name='mv1'").strip()
main_node.query("ALTER TABLE recover.`{}` MODIFY COLUMN n int DEFAULT 42".format(inner_table), settings=settings)
main_node.query("ALTER TABLE recover.mv1 MODIFY QUERY SELECT m FROM recover.rmt1".format(inner_table), settings=settings)
main_node.query("RENAME TABLE recover.mv2 TO recover.mv3".format(inner_table), settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
main_node.query("DROP TABLE recover.tmp", settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
main_node.query("DROP TABLE recover.tmp", settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
main_node.query("DROP TABLE recover.tmp", settings=settings)
main_node.query("CREATE TABLE recover.tmp AS recover.m1", settings=settings)
assert main_node.query("SELECT name FROM system.tables WHERE database='recover' ORDER BY name") == "d1\nd2\nm1\nmt1\nmt2\nrmt1\nrmt2\nrmt4\nt2\ntmp\n"
query = "SELECT name, uuid, create_table_query FROM system.tables WHERE database='recover' ORDER BY name"
assert main_node.query("SELECT name FROM system.tables WHERE database='recover' AND name NOT LIKE '.inner_id.%' ORDER BY name") == \
"d1\nd2\nm1\nmt1\nmt2\nmv1\nmv3\nrmt1\nrmt2\nrmt4\nt2\ntmp\n"
query = "SELECT name, uuid, create_table_query FROM system.tables WHERE database='recover' AND name NOT LIKE '.inner_id.%' " \
"ORDER BY name SETTINGS show_table_uuid_in_table_create_query_if_not_nil=1"
expected = main_node.query(query)
assert_eq_with_retry(dummy_node, query, expected)
assert main_node.query("SELECT count() FROM system.tables WHERE database='recover' AND name LIKE '.inner_id.%'") == "2\n"
assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover' AND name LIKE '.inner_id.%'") == "2\n"
for table in ['m1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2']:
for table in ['m1', 't2', 'mt1', 'mt2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mv1', 'mv3']:
assert main_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n"
for table in ['t2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mt2']:
for table in ['t2', 'rmt1', 'rmt2', 'rmt4', 'd1', 'd2', 'mt2', 'mv1', 'mv3']:
assert dummy_node.query("SELECT (*,).1 FROM recover.{}".format(table)) == "42\n"
for table in ['m1', 'mt1']:
assert dummy_node.query("SELECT count() FROM recover.{}".format(table)) == "0\n"
assert dummy_node.query("SELECT count() FROM system.tables WHERE database='recover_broken_tables'") == "2\n"
table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'mt1_26_%'").strip()
table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'mt1_29_%'").strip()
assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n"
table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'rmt5_26_%'").strip()
table = dummy_node.query("SHOW TABLES FROM recover_broken_tables LIKE 'rmt5_29_%'").strip()
assert dummy_node.query("SELECT (*,).1 FROM recover_broken_tables.{}".format(table)) == "42\n"
expected = "Cleaned 4 outdated objects: dropped 1 dictionaries and 1 tables, moved 2 tables"
expected = "Cleaned 6 outdated objects: dropped 1 dictionaries and 3 tables, moved 2 tables"
assert_logs_contain(dummy_node, expected)
dummy_node.query("DROP TABLE recover.tmp")

View File

@ -0,0 +1,25 @@
none
Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57, e.displayText() = Error: Table default.throw already exists
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=8) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background.
throw
localhost 9000 0 0 0
localhost 9000 57 Code: 57, e.displayText() = Error: Table default.throw already exists. 0 0
Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57, e.displayText() = Error: Table default.throw already exists
localhost 9000 0 1 0
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=8) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background.
null_status_on_timeout
localhost 9000 0 0 0
localhost 9000 57 Code: 57, e.displayText() = Error: Table default.null_status already exists. 0 0
Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57, e.displayText() = Error: Table default.null_status already exists
localhost 9000 0 1 0
localhost 1 \N \N 1 0
never_throw
localhost 9000 0 0 0
localhost 9000 57 Code: 57, e.displayText() = Error: Table default.never_throw already exists. 0 0
localhost 9000 0 1 0
localhost 1 \N \N 1 0

View File

@ -0,0 +1,39 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists throw;"
$CLICKHOUSE_CLIENT -q "drop table if exists null_status;"
$CLICKHOUSE_CLIENT -q "drop table if exists never_throw;"
CLICKHOUSE_CLIENT_OPT=$(echo ${CLICKHOUSE_CLIENT_OPT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=fatal/g')
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=none"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
# Ok
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
# Table exists
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
# Timeout
$CLIENT -q "drop table throw on cluster test_unavailable_shard;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/" | sed "s/background. /background./"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=throw"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
$CLIENT -q "drop table throw on cluster test_unavailable_shard;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/Watching task .* is executing longer/Watching task <task> is executing longer/" | sed "s/background. /background./"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=null_status_on_timeout"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table null_status on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| grep -Fv "@ 0x" | sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//" | sed "s/exists.. /exists/"
$CLIENT -q "drop table null_status on cluster test_unavailable_shard;"
CLIENT="$CLICKHOUSE_CLIENT_BINARY $CLICKHOUSE_CLIENT_OPT --distributed_ddl_task_timeout=8 --distributed_ddl_output_mode=never_throw"
$CLIENT -q "select value from system.settings where name='distributed_ddl_output_mode';"
$CLIENT -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;"
$CLIENT -q "create table never_throw on cluster test_shard_localhost (n int) engine=Memory;" 2>&1| sed "s/DB::Exception/Error/g" | sed "s/ (version.*)//"
$CLIENT -q "drop table never_throw on cluster test_unavailable_shard;"

View File

@ -554,9 +554,9 @@ SELECT count() FROM test;
DROP TABLE IF EXISTS test_r1;
DROP TABLE IF EXISTS test_r2;
CREATE TABLE test_r1 AS test ENGINE = ReplicatedMergeTree('/clickhouse/test', 'r1') ORDER BY "\\" SETTINGS min_bytes_for_wide_part = '100G';
CREATE TABLE test_r1 AS test ENGINE = ReplicatedMergeTree('/clickhouse/test_01666', 'r1') ORDER BY "\\" SETTINGS min_bytes_for_wide_part = '100G';
INSERT INTO test_r1 SELECT * FROM test;
CREATE TABLE test_r2 AS test ENGINE = ReplicatedMergeTree('/clickhouse/test', 'r2') ORDER BY "\\" SETTINGS min_bytes_for_wide_part = '100G';
CREATE TABLE test_r2 AS test ENGINE = ReplicatedMergeTree('/clickhouse/test_01666', 'r2') ORDER BY "\\" SETTINGS min_bytes_for_wide_part = '100G';
SYSTEM SYNC REPLICA test_r2;

View File

@ -45,6 +45,7 @@
"capnproto",
"query_profiler",
"memory_profiler",
"01175_distributed_ddl_output_mode_long", /// issue 21600
"01103_check_cpu_instructions_at_startup",
"01086_odbc_roundtrip", /// can't pass because odbc libraries are not instrumented
"00877_memory_limit_for_new_delete", /// memory limits don't work correctly under msan because it replaces malloc/free
@ -107,166 +108,50 @@
"00738_lock_for_inner_table"
],
"database-replicated": [
/// Tests with DETACH TABLE (it's not allowed)
/// and tests with SET (session and query settings are not supported)
"memory_tracking",
"memory_usage",
"live_view",
"01761_alter_decimal_zookeeper",
"01560_optimize_on_insert_zookeeper",
"01720_type_map_and_casts",
"01413_alter_update_supertype",
"01149_zookeeper_mutation_stuck_after_replace_partition",
"00836_indices_alter_replicated_zookeeper",
"00652_mutations_alter_update",
"01715_tuple_insert_null_as_default",
"00825_protobuf_format_map",
"00152_insert_different_granularity",
"01715_background_checker_blather_zookeeper",
"01714_alter_drop_version",
"01114_materialize_clear_index_compact_parts",
"00814_replicated_minimalistic_part_header_zookeeper",
"01188_attach_table_from_pat",
"01415_sticking_mutations",
"01130_in_memory_parts",
"01110_dictionary_layout_without_arguments",
"01018_ddl_dictionaries_create",
"01018_ddl_dictionaries_select",
"01414_freeze_does_not_prevent_alters",
"01018_ddl_dictionaries_bad_queries",
"01686_rocksdb",
"01550_mutation_subquery",
"01070_mutations_with_dependencies",
"01070_materialize_ttl",
"01055_compact_parts",
"01017_mutations_with_nondeterministic_functions_zookeeper",
"00926_adaptive_index_granularity_pk",
"00910_zookeeper_test_alter_compression_codecs",
"00908_bloom_filter_index",
"00616_final_single_part",
"00446_clear_column_in_partition_zookeeper",
"01533_multiple_nested",
"01213_alter_rename_column_zookeeper",
"01575_disable_detach_table_of_dictionary",
"01457_create_as_table_function_structure",
"01415_inconsistent_merge_tree_settings",
"01413_allow_non_metadata_alters",
"01378_alter_rename_with_ttl_zookeeper",
"01349_mutation_datetime_key",
"01325_freeze_mutation_stuck",
"01272_suspicious_codecs",
"01181_db_atomic_drop_on_cluster",
"00957_delta_diff_bug",
"00910_zookeeper_custom_compression_codecs_replicated",
"00899_long_attach_memory_limit",
"00804_test_custom_compression_codes_log_storages",
"00804_test_alter_compression_codecs",
"00804_test_delta_codec_no_type_alter",
"00804_test_custom_compression_codecs",
"00753_alter_attach",
"00715_fetch_merged_or_mutated_part_zookeeper",
"00688_low_cardinality_serialization",
"01575_disable_detach_table_of_dictionary",
"00738_lock_for_inner_table",
"01666_blns",
"01652_ignore_and_low_cardinality",
"01651_map_functions",
"01650_fetch_patition_with_macro_in_zk_path",
"01648_mutations_and_escaping",
"01640_marks_corruption_regression",
"01622_byte_size",
"01611_string_to_low_cardinality_key_alter",
"01602_show_create_view",
"01600_log_queries_with_extensive_info",
"01560_ttl_remove_empty_parts",
"01554_bloom_filter_index_big_integer_uuid",
"01550_type_map_formats_input",
"01550_type_map_formats",
"01550_create_map_type",
"01532_primary_key_without_order_by_zookeeper",
"01511_alter_version_versioned_collapsing_merge_tree_zookeeper",
"01509_parallel_quorum_insert_no_replicas",
"01504_compression_multiple_streams",
"01494_storage_join_persistency",
"01493_storage_set_persistency",
"01493_alter_remove_properties_zookeeper",
"01475_read_subcolumns_storages",
"01475_read_subcolumns",
"01451_replicated_detach_drop_part",
"01451_detach_drop_part",
"01440_big_int_exotic_casts",
"01430_modify_sample_by_zookeeper",
"01417_freeze_partition_verbose_zookeeper",
"01417_freeze_partition_verbose",
"01396_inactive_replica_cleanup_nodes_zookeeper",
"01375_compact_parts_codecs",
"01357_version_collapsing_attach_detach_zookeeper",
"01355_alter_column_with_order",
"01291_geo_types",
"01270_optimize_skip_unused_shards_low_cardinality",
"01182_materialized_view_different_structure",
"01150_ddl_guard_rwr",
"01148_zookeeper_path_macros_unfolding",
"01135_default_and_alter_zookeeper",
"01130_in_memory_parts_partitons",
"01127_month_partitioning_consistency_select",
"01114_database_atomic",
"01083_expressions_in_engine_arguments",
"01073_attach_if_not_exists",
"01072_optimize_skip_unused_shards_const_expr_eval",
"01071_prohibition_secondary_index_with_old_format_merge_tree",
"01062_alter_on_mutataion_zookeeper",
"01060_shutdown_table_after_detach",
"01056_create_table_as",
"01035_avg",
"01021_only_tuple_columns",
"01019_alter_materialized_view_query",
"01019_alter_materialized_view_consistent",
"01019_alter_materialized_view_atomic",
"01015_attach_part",
"00989_parallel_parts_loading",
"01175_distributed_ddl_output_mode",
"01415_sticking_mutations",
"00980_zookeeper_merge_tree_alter_settings",
"00980_merge_alter_settings",
"01148_zookeeper_path_macros_unfolding",
"01294_system_distributed_on_cluster",
"01269_create_with_null",
/// grep -c
"01018_ddl_dictionaries_bad_queries",
"00908_bloom_filter_index",
/// Unsupported type of ALTER query
"01650_fetch_patition_with_macro_in_zk_path",
"01451_detach_drop_part",
"01451_replicated_detach_drop_part",
"01417_freeze_partition_verbose",
"01417_freeze_partition_verbose_zookeeper",
"01130_in_memory_parts_partitons",
"01060_shutdown_table_after_detach",
"01021_only_tuple_columns",
"01015_attach_part",
"00955_test_final_mark",
"00933_reserved_word",
"00926_zookeeper_adaptive_index_granularity_replicated_merge_tree",
"00926_adaptive_index_granularity_replacing_merge_tree",
"00926_adaptive_index_granularity_merge_tree",
"00925_zookeeper_empty_replicated_merge_tree_optimize_final",
"00800_low_cardinality_distinct_numeric",
"00754_alter_modify_order_by_replicated_zookeeper",
"00751_low_cardinality_nullable_group_by",
"00751_default_databasename_for_view",
"00719_parallel_ddl_table",
"00718_low_cardinaliry_alter",
"00717_low_cardinaliry_distributed_group_by",
"00688_low_cardinality_syntax",
"00688_low_cardinality_nullable_cast",
"00688_low_cardinality_in",
"00652_replicated_mutations_zookeeper",
"00634_rename_view",
"00753_alter_attach",
"00626_replace_partition_from_table_zookeeper",
"00626_replace_partition_from_table",
"00625_arrays_in_nested",
"00152_insert_different_granularity",
/// Old syntax is not allowed
"01062_alter_on_mutataion_zookeeper",
"00925_zookeeper_empty_replicated_merge_tree_optimize_final",
"00754_alter_modify_order_by_replicated_zookeeper",
"00652_replicated_mutations_zookeeper",
"00623_replicated_truncate_table_zookeeper",
"00619_union_highlite",
"00599_create_view_with_subquery",
"00571_non_exist_database_when_create_materializ_view",
"00553_buff_exists_materlized_column",
"00516_deduplication_after_drop_partition_zookeeper",
"00508_materialized_view_to",
"00446_clear_column_in_partition_concurrent_zookeeper",
"00423_storage_log_single_thread",
"00311_array_primary_key",
"00236_replicated_drop_on_non_leader_zookeeper",
"00226_zookeeper_deduplication_and_unexpected_parts",
"00215_primary_key_order_zookeeper",
"00180_attach_materialized_view",
"00121_drop_column_zookeeper",
"00116_storage_set",
"00083_create_merge_tree_zookeeper",
"00062_replicated_merge_tree_alter_zookeeper",
"01720_constraints_complex_types",
"01747_alter_partition_key_enum_zookeeper"
/// Does not support renaming of multiple tables in single query
"00634_rename_view"
],
"polymorphic-parts": [
"01508_partition_pruning_long", /// bug, shoud be fixed
@ -752,6 +637,7 @@
"01601_detach_permanently",
"01602_show_create_view",
"01603_rename_overwrite_bug",
"01666_blns",
"01646_system_restart_replicas_smoke", // system restart replicas is a global query
"01656_test_query_log_factories_info",
"01669_columns_declaration_serde",