From 163e27b299a1bc4a22bccbef80b08d01be30b897 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 9 Mar 2021 01:57:53 +0300 Subject: [PATCH] support query and session settings for distributed DDL --- src/Core/Settings.h | 1 + src/Databases/DatabaseReplicated.cpp | 2 +- src/Interpreters/DDLTask.cpp | 83 +++++++++++++++---- src/Interpreters/DDLTask.h | 7 +- src/Interpreters/executeDDLQueryOnCluster.cpp | 1 + tests/config/users.d/database_replicated.xml | 1 + 6 files changed, 75 insertions(+), 20 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 29ba289dc47..32f1b985ac3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -436,6 +436,7 @@ class IColumn; M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \ M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \ M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \ + M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/src/Databases/DatabaseReplicated.cpp b/src/Databases/DatabaseReplicated.cpp index 76f2c798237..612b7fb1948 100644 --- a/src/Databases/DatabaseReplicated.cpp +++ b/src/Databases/DatabaseReplicated.cpp @@ -322,10 +322,10 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const LOG_DEBUG(log, "Proposing query: {}", queryToString(query)); - /// TODO maybe write current settings to log entry? DDLLogEntry entry; entry.query = queryToString(query); entry.initiator = ddl_worker->getCommonHostID(); + entry.setSettingsIfRequired(query_context); String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context); Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); diff --git a/src/Interpreters/DDLTask.cpp b/src/Interpreters/DDLTask.cpp index 0c4c8a1bc34..5d3f538f420 100644 --- a/src/Interpreters/DDLTask.cpp +++ b/src/Interpreters/DDLTask.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -43,20 +44,47 @@ bool HostID::isLocalAddress(UInt16 clickhouse_port) const } } +void DDLLogEntry::assertVersion() const +{ + constexpr UInt64 MAX_VERSION = 2; + if (version == 0 || MAX_VERSION < version) + throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}." + "Maximum supported version is {}", version, MAX_VERSION); +} + +void DDLLogEntry::setSettingsIfRequired(const Context & context) +{ + version = context.getSettingsRef().distributed_ddl_entry_format_version; + if (version == 2) + settings.emplace(context.getSettingsRef().changes()); +} String DDLLogEntry::toString() const { WriteBufferFromOwnString wb; - Strings host_id_strings(hosts.size()); - std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString); - - auto version = CURRENT_VERSION; wb << "version: " << version << "\n"; wb << "query: " << escape << query << "\n"; - wb << "hosts: " << host_id_strings << "\n"; + + bool write_hosts = version == 1 || !hosts.empty(); + if (write_hosts) + { + Strings host_id_strings(hosts.size()); + std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString); + wb << "hosts: " << host_id_strings << "\n"; + } + wb << "initiator: " << initiator << "\n"; + bool write_settings = 1 <= version && settings && !settings->empty(); + if (write_settings) + { + ASTSetQuery ast; + ast.is_standalone = false; + ast.changes = *settings; + wb << "settings: " << serializeAST(ast) << "\n"; + } + return wb.str(); } @@ -64,25 +92,46 @@ void DDLLogEntry::parse(const String & data) { ReadBufferFromString rb(data); - int version; rb >> "version: " >> version >> "\n"; - - if (version != CURRENT_VERSION) - throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version); + assertVersion(); Strings host_id_strings; rb >> "query: " >> escape >> query >> "\n"; - rb >> "hosts: " >> host_id_strings >> "\n"; + if (version == 1) + { + rb >> "hosts: " >> host_id_strings >> "\n"; - if (!rb.eof()) - rb >> "initiator: " >> initiator >> "\n"; - else - initiator.clear(); + if (!rb.eof()) + rb >> "initiator: " >> initiator >> "\n"; + else + initiator.clear(); + } + else if (version == 2) + { + + if (!rb.eof() && *rb.position() == 'h') + rb >> "hosts: " >> host_id_strings >> "\n"; + if (!rb.eof() && *rb.position() == 'i') + rb >> "initiator: " >> initiator >> "\n"; + if (!rb.eof() && *rb.position() == 's') + { + String settings_str; + rb >> "settings: " >> settings_str >> "\n"; + ParserSetQuery parser{true}; + constexpr UInt64 max_size = 4096; + constexpr UInt64 max_depth = 16; + ASTPtr settings_ast = parseQuery(parser, settings_str, max_size, max_depth); + settings.emplace(std::move(settings_ast->as()->changes)); + } + } assertEOF(rb); - hosts.resize(host_id_strings.size()); - std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString); + if (!host_id_strings.empty()) + { + hosts.resize(host_id_strings.size()); + std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString); + } } @@ -102,6 +151,8 @@ std::unique_ptr DDLTaskBase::makeQueryContext(Context & from_context, c query_context->makeQueryContext(); query_context->setCurrentQueryId(""); // generate random query_id query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY; + if (entry.settings) + query_context->applySettingsChanges(*entry.settings); return query_context; } diff --git a/src/Interpreters/DDLTask.h b/src/Interpreters/DDLTask.h index 45702599fcf..777ab4ecb18 100644 --- a/src/Interpreters/DDLTask.h +++ b/src/Interpreters/DDLTask.h @@ -56,15 +56,16 @@ struct HostID struct DDLLogEntry { + UInt64 version = 1; String query; std::vector hosts; String initiator; // optional + std::optional settings; - static constexpr int CURRENT_VERSION = 1; - + void setSettingsIfRequired(const Context & context); String toString() const; - void parse(const String & data); + void assertVersion() const; }; struct DDLTaskBase diff --git a/src/Interpreters/executeDDLQueryOnCluster.cpp b/src/Interpreters/executeDDLQueryOnCluster.cpp index 2bd69c683f1..56eab86de3b 100644 --- a/src/Interpreters/executeDDLQueryOnCluster.cpp +++ b/src/Interpreters/executeDDLQueryOnCluster.cpp @@ -166,6 +166,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont entry.hosts = std::move(hosts); entry.query = queryToString(query_ptr); entry.initiator = ddl_worker.getCommonHostID(); + entry.setSettingsIfRequired(context); String node_path = ddl_worker.enqueueQuery(entry); return getDistributedDDLStatus(node_path, entry, context); diff --git a/tests/config/users.d/database_replicated.xml b/tests/config/users.d/database_replicated.xml index f21e3952d99..903b8a64e22 100644 --- a/tests/config/users.d/database_replicated.xml +++ b/tests/config/users.d/database_replicated.xml @@ -6,6 +6,7 @@ 30 30 1 + 2