Merge pull request #8288 from PerformanceVision/reload_dictionary

Implement ON CLUSTER syntax when we want to reload a dictionary
This commit is contained in:
alexey-milovidov 2019-12-26 22:05:41 +03:00 committed by GitHub
commit 6b997a0c61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 26 deletions

View File

@ -13,6 +13,7 @@
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/PartLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/TraceLog.h>
@ -101,14 +102,14 @@ void startStopAction(Context & context, ASTSystemQuery & query, StorageActionBlo
auto manager = context.getActionLocksManager();
manager->cleanExpired();
if (!query.target_table.empty())
if (!query.table.empty())
{
String database = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
String database = !query.database.empty() ? query.database : context.getCurrentDatabase();
if (start)
manager->remove(database, query.target_table, action_type);
manager->remove(database, query.table, action_type);
else
manager->add(database, query.target_table, action_type);
manager->add(database, query.table, action_type);
}
else
{
@ -131,6 +132,9 @@ BlockIO InterpreterSystemQuery::execute()
{
auto & query = query_ptr->as<ASTSystemQuery &>();
if (!query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context, {query.database});
using Type = ASTSystemQuery::Type;
/// Use global context with fresh system profile settings
@ -138,11 +142,11 @@ BlockIO InterpreterSystemQuery::execute()
system_context.setSetting("profile", context.getSystemProfileName());
/// Make canonical query for simpler processing
if (!query.target_table.empty() && query.target_database.empty())
query.target_database = context.getCurrentDatabase();
if (!query.table.empty() && query.database.empty())
query.database = context.getCurrentDatabase();
if (!query.target_dictionary.empty() && !query.target_database.empty())
query.target_dictionary = query.target_database + "." + query.target_dictionary;
if (!query.target_dictionary.empty() && !query.database.empty())
query.target_dictionary = query.database + "." + query.target_dictionary;
switch (query.type)
{
@ -237,8 +241,8 @@ BlockIO InterpreterSystemQuery::execute()
restartReplicas(system_context);
break;
case Type::RESTART_REPLICA:
if (!tryRestartReplica(query.target_database, query.target_table, system_context))
throw Exception("There is no " + query.target_database + "." + query.target_table + " replicated table",
if (!tryRestartReplica(query.database, query.table, system_context))
throw Exception("There is no " + query.database + "." + query.table + " replicated table",
ErrorCodes::BAD_ARGUMENTS);
break;
case Type::FLUSH_LOGS:
@ -339,8 +343,8 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
{
String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
const String & table_name = query.target_table;
String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase();
const String & table_name = query.table;
StoragePtr table = context.getTable(database_name, table_name);
@ -362,8 +366,8 @@ void InterpreterSystemQuery::syncReplica(ASTSystemQuery & query)
void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
{
String database_name = !query.target_database.empty() ? query.target_database : context.getCurrentDatabase();
String & table_name = query.target_table;
String database_name = !query.database.empty() ? query.database : context.getCurrentDatabase();
String & table_name = query.table;
if (auto storage_distributed = dynamic_cast<StorageDistributed *>(context.getTable(database_name, table_name).get()))
storage_distributed->flushClusterNodesAllData();

View File

@ -96,27 +96,30 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
auto print_database_table = [&]
{
settings.ostr << " ";
if (!target_database.empty())
if (!database.empty())
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(target_database)
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(database)
<< (settings.hilite ? hilite_none : "") << ".";
}
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(target_table)
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(table)
<< (settings.hilite ? hilite_none : "");
};
auto print_database_dictionary = [&]
{
settings.ostr << " ";
if (!target_database.empty())
if (!database.empty())
{
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(target_database)
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(database)
<< (settings.hilite ? hilite_none : "") << ".";
}
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(target_dictionary)
<< (settings.hilite ? hilite_none : "");
};
if (!cluster.empty())
formatOnCluster(settings);
if ( type == Type::STOP_MERGES
|| type == Type::START_MERGES
|| type == Type::STOP_TTL_MERGES
@ -132,7 +135,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
|| type == Type::STOP_DISTRIBUTED_SENDS
|| type == Type::START_DISTRIBUTED_SENDS)
{
if (!target_table.empty())
if (!table.empty())
print_database_table();
}
else if (type == Type::RESTART_REPLICA || type == Type::SYNC_REPLICA || type == Type::FLUSH_DISTRIBUTED)

View File

@ -1,13 +1,14 @@
#pragma once
#include "config_core.h"
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/IAST.h>
namespace DB
{
class ASTSystemQuery : public IAST
class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
{
public:
@ -55,13 +56,18 @@ public:
Type type = Type::UNKNOWN;
String target_dictionary;
String target_database;
String target_table;
String database;
String table;
String getID(char) const override { return "SYSTEM query"; }
ASTPtr clone() const override { return std::make_shared<ASTSystemQuery>(*this); }
ASTPtr getRewrittenASTWithoutOnCluster(const std::string & new_database) const override
{
return removeOnCluster<ASTSystemQuery>(clone(), new_database);
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;

View File

@ -43,10 +43,17 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
{
case Type::RELOAD_DICTIONARY:
{
String cluster_str;
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
res->cluster = cluster_str;
ASTPtr ast;
if (ParserStringLiteral{}.parse(pos, ast, expected))
res->target_dictionary = ast->as<ASTLiteral &>().value.safeGet<String>();
else if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_dictionary))
else if (!parseDatabaseAndTableName(pos, expected, res->database, res->target_dictionary))
return false;
break;
}
@ -54,7 +61,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::RESTART_REPLICA:
case Type::SYNC_REPLICA:
case Type::FLUSH_DISTRIBUTED:
if (!parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table))
if (!parseDatabaseAndTableName(pos, expected, res->database, res->table))
return false;
break;
@ -72,7 +79,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
case Type::START_REPLICATION_QUEUES:
case Type::STOP_DISTRIBUTED_SENDS:
case Type::START_DISTRIBUTED_SENDS:
parseDatabaseAndTableName(pos, expected, res->target_database, res->target_table);
parseDatabaseAndTableName(pos, expected, res->database, res->table);
break;
default:

View File

@ -1,3 +1,4 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
@ -55,6 +56,16 @@ def test_dictionary_ddl_on_cluster(started_cluster):
assert node.query("SELECT count() from sometbl") == "1\n"
assert node.query("SELECT dictGetString('default.somedict', 'value', toUInt64({}))".format(num)) == node.name + '\n'
for num, node in enumerate([ch1, ch2, ch3, ch4]):
node.query("ALTER TABLE sometbl UPDATE value = 'new_key' WHERE 1")
ch1.query("SYSTEM RELOAD DICTIONARY ON CLUSTER 'cluster' `default.somedict`")
time.sleep(2) # SYSTEM RELOAD DICTIONARY is an asynchronous query
for num, node in enumerate([ch1, ch2, ch3, ch4]):
assert node.query("SELECT dictGetString('default.somedict', 'value', toUInt64({}))".format(num)) == 'new_key' + '\n'
ch1.query("DROP DICTIONARY default.somedict ON CLUSTER 'cluster'")
for node in [ch1, ch2, ch3, ch4]: