support query and session settings for distributed DDL

This commit is contained in:
Alexander Tokmakov 2021-03-09 01:57:53 +03:00
parent 5070b5b85b
commit 163e27b299
6 changed files with 75 additions and 20 deletions

View File

@ -436,6 +436,7 @@ class IColumn;
M(UInt64, database_replicated_initial_query_timeout_sec, 300, "How long initial DDL query should wait for Replicated database to precess previous DDL queue entries", 0) \
M(Bool, database_replicated_always_detach_permanently, false, "Execute DETACH TABLE as DETACH TABLE PERMANENTLY if database engine is Replicated", 0) \
M(DistributedDDLOutputMode, distributed_ddl_output_mode, DistributedDDLOutputMode::THROW, "Format of distributed DDL query result", 0) \
M(UInt64, distributed_ddl_entry_format_version, 1, "Version of DDL entry to write into ZooKeeper", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -322,10 +322,10 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, const
LOG_DEBUG(log, "Proposing query: {}", queryToString(query));
/// TODO maybe write current settings to log entry?
DDLLogEntry entry;
entry.query = queryToString(query);
entry.initiator = ddl_worker->getCommonHostID();
entry.setSettingsIfRequired(query_context);
String node_path = ddl_worker->tryEnqueueAndExecuteEntry(entry, query_context);
Strings hosts_to_wait = getZooKeeper()->getChildren(zookeeper_path + "/replicas");

View File

@ -10,6 +10,7 @@
#include <Parsers/ParserQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Databases/DatabaseReplicated.h>
@ -43,20 +44,47 @@ bool HostID::isLocalAddress(UInt16 clickhouse_port) const
}
}
void DDLLogEntry::assertVersion() const
{
constexpr UInt64 MAX_VERSION = 2;
if (version == 0 || MAX_VERSION < version)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}."
"Maximum supported version is {}", version, MAX_VERSION);
}
void DDLLogEntry::setSettingsIfRequired(const Context & context)
{
version = context.getSettingsRef().distributed_ddl_entry_format_version;
if (version == 2)
settings.emplace(context.getSettingsRef().changes());
}
String DDLLogEntry::toString() const
{
WriteBufferFromOwnString wb;
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
auto version = CURRENT_VERSION;
wb << "version: " << version << "\n";
wb << "query: " << escape << query << "\n";
wb << "hosts: " << host_id_strings << "\n";
bool write_hosts = version == 1 || !hosts.empty();
if (write_hosts)
{
Strings host_id_strings(hosts.size());
std::transform(hosts.begin(), hosts.end(), host_id_strings.begin(), HostID::applyToString);
wb << "hosts: " << host_id_strings << "\n";
}
wb << "initiator: " << initiator << "\n";
bool write_settings = 1 <= version && settings && !settings->empty();
if (write_settings)
{
ASTSetQuery ast;
ast.is_standalone = false;
ast.changes = *settings;
wb << "settings: " << serializeAST(ast) << "\n";
}
return wb.str();
}
@ -64,25 +92,46 @@ void DDLLogEntry::parse(const String & data)
{
ReadBufferFromString rb(data);
int version;
rb >> "version: " >> version >> "\n";
if (version != CURRENT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unknown DDLLogEntry format version: {}", version);
assertVersion();
Strings host_id_strings;
rb >> "query: " >> escape >> query >> "\n";
rb >> "hosts: " >> host_id_strings >> "\n";
if (version == 1)
{
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
if (!rb.eof())
rb >> "initiator: " >> initiator >> "\n";
else
initiator.clear();
}
else if (version == 2)
{
if (!rb.eof() && *rb.position() == 'h')
rb >> "hosts: " >> host_id_strings >> "\n";
if (!rb.eof() && *rb.position() == 'i')
rb >> "initiator: " >> initiator >> "\n";
if (!rb.eof() && *rb.position() == 's')
{
String settings_str;
rb >> "settings: " >> settings_str >> "\n";
ParserSetQuery parser{true};
constexpr UInt64 max_size = 4096;
constexpr UInt64 max_depth = 16;
ASTPtr settings_ast = parseQuery(parser, settings_str, max_size, max_depth);
settings.emplace(std::move(settings_ast->as<ASTSetQuery>()->changes));
}
}
assertEOF(rb);
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
if (!host_id_strings.empty())
{
hosts.resize(host_id_strings.size());
std::transform(host_id_strings.begin(), host_id_strings.end(), hosts.begin(), HostID::fromString);
}
}
@ -102,6 +151,8 @@ std::unique_ptr<Context> DDLTaskBase::makeQueryContext(Context & from_context, c
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
if (entry.settings)
query_context->applySettingsChanges(*entry.settings);
return query_context;
}

View File

@ -56,15 +56,16 @@ struct HostID
struct DDLLogEntry
{
UInt64 version = 1;
String query;
std::vector<HostID> hosts;
String initiator; // optional
std::optional<SettingsChanges> settings;
static constexpr int CURRENT_VERSION = 1;
void setSettingsIfRequired(const Context & context);
String toString() const;
void parse(const String & data);
void assertVersion() const;
};
struct DDLTaskBase

View File

@ -166,6 +166,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
entry.hosts = std::move(hosts);
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getCommonHostID();
entry.setSettingsIfRequired(context);
String node_path = ddl_worker.enqueueQuery(entry);
return getDistributedDDLStatus(node_path, entry, context);

View File

@ -6,6 +6,7 @@
<database_replicated_initial_query_timeout_sec>30</database_replicated_initial_query_timeout_sec>
<distributed_ddl_task_timeout>30</distributed_ddl_task_timeout>
<database_replicated_always_detach_permanently>1</database_replicated_always_detach_permanently>
<distributed_ddl_entry_format_version>2</distributed_ddl_entry_format_version>
</default>
</profiles>
</yandex>