Support REPLACE DICTIONARY, CREATE OR REPLACE DICTIONARY queries

This commit is contained in:
Maksim Kita 2021-06-18 01:29:41 +03:00
parent 6662e7ab0f
commit 008adabec2
7 changed files with 99 additions and 13 deletions

View File

@ -1101,6 +1101,7 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
[[maybe_unused]] bool done = doCreateTable(create, properties); [[maybe_unused]] bool done = doCreateTable(create, properties);
assert(done); assert(done);
ast_drop->table = create.table; ast_drop->table = create.table;
ast_drop->is_dictionary = create.is_dictionary;
ast_drop->database = create.database; ast_drop->database = create.database;
ast_drop->kind = ASTDropQuery::Drop; ast_drop->kind = ASTDropQuery::Drop;
created = true; created = true;
@ -1113,14 +1114,18 @@ BlockIO InterpreterCreateQuery::doCreateOrReplaceTable(ASTCreateQuery & create,
ASTRenameQuery::Table{create.database, create.table}, ASTRenameQuery::Table{create.database, create.table},
ASTRenameQuery::Table{create.database, table_to_replace_name} ASTRenameQuery::Table{create.database, table_to_replace_name}
}; };
ast_rename->elements.push_back(std::move(elem)); ast_rename->elements.push_back(std::move(elem));
ast_rename->exchange = true; ast_rename->exchange = true;
ast_rename->dictionary = create.is_dictionary;
InterpreterRenameQuery(ast_rename, getContext()).execute(); InterpreterRenameQuery(ast_rename, getContext()).execute();
replaced = true; replaced = true;
InterpreterDropQuery(ast_drop, getContext()).execute(); InterpreterDropQuery(ast_drop, getContext()).execute();
create.table = table_to_replace_name; create.table = table_to_replace_name;
return fillTableIfNeeded(create); return fillTableIfNeeded(create);
} }
catch (...) catch (...)

View File

@ -305,8 +305,16 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
} }
else else
{ {
String action = "CREATE";
if (attach)
action = "ATTACH";
else if (replace_table && create_or_replace)
action = "CREATE OR REPLACE";
else if (replace_table)
action = "REPLACE";
/// Always DICTIONARY /// Always DICTIONARY
settings.ostr << (settings.hilite ? hilite_keyword : "") << (attach ? "ATTACH " : "CREATE ") << "DICTIONARY " settings.ostr << (settings.hilite ? hilite_keyword : "") << action << "DICTIONARY "
<< (if_not_exists ? "IF NOT EXISTS " : "") << (settings.hilite ? hilite_none : "") << (if_not_exists ? "IF NOT EXISTS " : "") << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table); << (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
if (uuid != UUIDHelpers::Nil) if (uuid != UUIDHelpers::Nil)

View File

@ -971,6 +971,8 @@ bool ParserCreateDictionaryQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, E
{ {
ParserKeyword s_create("CREATE"); ParserKeyword s_create("CREATE");
ParserKeyword s_attach("ATTACH"); ParserKeyword s_attach("ATTACH");
ParserKeyword s_replace("REPLACE");
ParserKeyword s_or_replace("OR REPLACE");
ParserKeyword s_dictionary("DICTIONARY"); ParserKeyword s_dictionary("DICTIONARY");
ParserKeyword s_if_not_exists("IF NOT EXISTS"); ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON"); ParserKeyword s_on("ON");
@ -982,6 +984,8 @@ bool ParserCreateDictionaryQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, E
ParserDictionary dictionary_p; ParserDictionary dictionary_p;
bool if_not_exists = false; bool if_not_exists = false;
bool replace = false;
bool or_replace = false;
ASTPtr name; ASTPtr name;
ASTPtr attributes; ASTPtr attributes;
@ -989,13 +993,21 @@ bool ParserCreateDictionaryQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, E
String cluster_str; String cluster_str;
bool attach = false; bool attach = false;
if (!s_create.ignore(pos, expected))
if (s_create.ignore(pos, expected))
{ {
if (s_attach.ignore(pos, expected)) if (s_or_replace.ignore(pos, expected))
attach = true; {
else replace = true;
return false; or_replace = true;
}
} }
else if (s_attach.ignore(pos, expected))
attach = true;
else if (s_replace.ignore(pos, expected))
replace = true;
else
return false;
if (!s_dictionary.ignore(pos, expected)) if (!s_dictionary.ignore(pos, expected))
return false; return false;
@ -1031,6 +1043,8 @@ bool ParserCreateDictionaryQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, E
node = query; node = query;
query->is_dictionary = true; query->is_dictionary = true;
query->attach = attach; query->attach = attach;
query->create_or_replace = or_replace;
query->replace_table = replace;
auto dict_id = name->as<ASTTableIdentifier>()->getTableId(); auto dict_id = name->as<ASTTableIdentifier>()->getTableId();
query->database = dict_id.database_name; query->database = dict_id.database_name;

View File

@ -167,7 +167,7 @@ Pipe StorageDictionary::read(
const size_t max_block_size, const size_t max_block_size,
const unsigned /*threads*/) const unsigned /*threads*/)
{ {
auto dictionary = getContext()->getExternalDictionariesLoader().getDictionary(dictionary_name, local_context); auto dictionary = getContext()->getExternalDictionariesLoader().getDictionary(getStorageID().getInternalDictionaryName(), local_context);
auto stream = dictionary->getBlockInputStream(column_names, max_block_size); auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
/// TODO: update dictionary interface for processors. /// TODO: update dictionary interface for processors.
return Pipe(std::make_shared<SourceFromInputStream>(stream)); return Pipe(std::make_shared<SourceFromInputStream>(stream));
@ -215,23 +215,30 @@ LoadablesConfigurationPtr StorageDictionary::getConfiguration() const
void StorageDictionary::renameInMemory(const StorageID & new_table_id) void StorageDictionary::renameInMemory(const StorageID & new_table_id)
{ {
auto previous_table_id = getStorageID();
auto previous_dictionary_name = getStorageID().getInternalDictionaryName();
auto new_dictionary_name = new_table_id.getInternalDictionaryName();
IStorage::renameInMemory(new_table_id);
dictionary_name = new_dictionary_name;
if (configuration) if (configuration)
{ {
configuration->setString("dictionary.database", new_table_id.database_name); configuration->setString("dictionary.database", new_table_id.database_name);
configuration->setString("dictionary.name", new_table_id.table_name); configuration->setString("dictionary.name", new_table_id.table_name);
const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader(); const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
external_dictionaries_loader.reloadConfig(getStorageID().getInternalDictionaryName()); external_dictionaries_loader.reloadConfig(previous_dictionary_name);
auto result = external_dictionaries_loader.getLoadResult(new_dictionary_name);
auto result = external_dictionaries_loader.getLoadResult(getStorageID().getInternalDictionaryName());
if (!result.object) if (!result.object)
return; return;
const auto dictionary = std::static_pointer_cast<const IDictionary>(result.object); const auto dictionary = std::static_pointer_cast<const IDictionary>(result.object);
dictionary->updateDictionaryName(new_table_id); dictionary->updateDictionaryName(new_table_id);
} }
IStorage::renameInMemory(new_table_id);
} }
void registerStorageDictionary(StorageFactory & factory) void registerStorageDictionary(StorageFactory & factory)

View File

@ -45,7 +45,7 @@ public:
Poco::Timestamp getUpdateTime() const; Poco::Timestamp getUpdateTime() const;
LoadablesConfigurationPtr getConfiguration() const; LoadablesConfigurationPtr getConfiguration() const;
const String & getDictionaryName() const { return dictionary_name; } String getDictionaryName() const { return dictionary_name; }
/// Specifies where the table is located relative to the dictionary. /// Specifies where the table is located relative to the dictionary.
enum class Location enum class Location
@ -66,7 +66,7 @@ public:
}; };
private: private:
const String dictionary_name; String dictionary_name;
const Location location; const Location location;
mutable std::mutex dictionary_config_mutex; mutable std::mutex dictionary_config_mutex;

View File

@ -0,0 +1,2 @@
0 Value0
0 Value1

View File

@ -0,0 +1,50 @@
DROP DATABASE IF EXISTS 01913_db;
CREATE DATABASE 01913_db ENGINE=Atomic;
DROP TABLE IF EXISTS 01913_db.test_source_table_1;
CREATE TABLE 01913_db.test_source_table_1
(
id UInt64,
value String
) ENGINE=TinyLog;
INSERT INTO 01913_db.test_source_table_1 VALUES (0, 'Value0');
DROP DICTIONARY IF EXISTS 01913_db.test_dictionary;
CREATE DICTIONARY 01913_db.test_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
LAYOUT(DIRECT())
SOURCE(CLICKHOUSE(DB '01913_db' TABLE 'test_source_table_1'));
SELECT * FROM 01913_db.test_dictionary;
DROP TABLE IF EXISTS 01913_db.test_source_table_2;
CREATE TABLE 01913_db.test_source_table_2
(
id UInt64,
value_1 String
) ENGINE=TinyLog;
INSERT INTO 01913_db.test_source_table_2 VALUES (0, 'Value1');
REPLACE DICTIONARY 01913_db.test_dictionary
(
id UInt64,
value_1 String
)
PRIMARY KEY id
LAYOUT(HASHED())
SOURCE(CLICKHOUSE(DB '01913_db' TABLE 'test_source_table_2'))
LIFETIME(0);
SELECT * FROM 01913_db.test_dictionary;
DROP DICTIONARY 01913_db.test_dictionary;
DROP TABLE 01913_db.test_source_table_1;
DROP TABLE 01913_db.test_source_table_2;
DROP DATABASE 01913_db;