mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #23436 from kitaisreal/storage-dictionary-updated
Refactored StorageDictionary
This commit is contained in:
commit
b544037f03
@ -1,5 +1,5 @@
|
||||
#include <Common/StatusInfo.h>
|
||||
#include <Interpreters/ExternalLoader.h>
|
||||
#include <Common/ExternalLoaderStatus.h>
|
||||
|
||||
/// Available status. Add something here as you wish.
|
||||
#define APPLY_FOR_STATUS(M) \
|
||||
|
@ -91,7 +91,7 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
|
||||
not_in_use = cleanupDetachedTables();
|
||||
auto table_id = table->getStorageID();
|
||||
assertDetachedTableNotInUse(table_id.uuid);
|
||||
DatabaseWithDictionaries::attachTableUnlocked(name, table, lock);
|
||||
DatabaseOrdinary::attachTableUnlocked(name, table, lock);
|
||||
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
|
||||
}
|
||||
|
||||
@ -99,7 +99,7 @@ StoragePtr DatabaseAtomic::detachTable(const String & name)
|
||||
{
|
||||
DetachedTables not_in_use;
|
||||
std::unique_lock lock(mutex);
|
||||
auto table = DatabaseWithDictionaries::detachTableUnlocked(name, lock);
|
||||
auto table = DatabaseOrdinary::detachTableUnlocked(name, lock);
|
||||
table_name_to_path.erase(name);
|
||||
detached_tables.emplace(table->getStorageID().uuid, table);
|
||||
not_in_use = cleanupDetachedTables();
|
||||
@ -133,9 +133,10 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
|
||||
/// TODO better detection and recovery
|
||||
|
||||
Poco::File(table_metadata_path).renameTo(table_metadata_path_drop); /// Mark table as dropped
|
||||
DatabaseWithDictionaries::detachTableUnlocked(table_name, lock); /// Should never throw
|
||||
DatabaseOrdinary::detachTableUnlocked(table_name, lock); /// Should never throw
|
||||
table_name_to_path.erase(table_name);
|
||||
}
|
||||
|
||||
if (table->storesDataOnDisk())
|
||||
tryRemoveSymlink(table_name);
|
||||
|
||||
@ -156,8 +157,6 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
|
||||
return;
|
||||
}
|
||||
|
||||
if (exchange && dictionary)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot exchange dictionaries");
|
||||
if (exchange && !supportsRenameat2())
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported");
|
||||
|
||||
@ -174,7 +173,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
|
||||
/// Path can be not set for DDL dictionaries, but it does not matter for StorageDictionary.
|
||||
if (it != db.table_name_to_path.end())
|
||||
table_data_path_saved = it->second;
|
||||
assert(!table_data_path_saved.empty() || db.dictionaries.find(table_name_) != db.dictionaries.end());
|
||||
assert(!table_data_path_saved.empty());
|
||||
db.tables.erase(table_name_);
|
||||
db.table_name_to_path.erase(table_name_);
|
||||
if (has_symlink)
|
||||
@ -222,21 +221,21 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
|
||||
db_lock = std::unique_lock{mutex};
|
||||
}
|
||||
|
||||
bool is_dictionary = dictionaries.find(table_name) != dictionaries.end();
|
||||
if (exchange && other_db.dictionaries.find(to_table_name) != other_db.dictionaries.end())
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot exchange dictionaries");
|
||||
|
||||
if (dictionary != is_dictionary)
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Use RENAME DICTIONARY for dictionaries and RENAME TABLE for tables.");
|
||||
|
||||
if (is_dictionary && !inside_database)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot move dictionary to other database");
|
||||
|
||||
if (!exchange)
|
||||
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name, inside_database ? db_lock : other_db_lock);
|
||||
|
||||
StoragePtr table = getTableUnlocked(table_name, db_lock);
|
||||
|
||||
if (table->isDictionary() && !dictionary)
|
||||
{
|
||||
if (exchange)
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Use EXCHANGE DICTIONARIES for dictionaries and EXCHANGE TABLES for tables.");
|
||||
else
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Use RENAME DICTIONARY for dictionaries and RENAME TABLE for tables.");
|
||||
}
|
||||
|
||||
table->checkTableCanBeRenamed();
|
||||
assert_can_move_mat_view(table);
|
||||
StoragePtr other_table;
|
||||
@ -281,12 +280,6 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
|
||||
attach(other_db, to_table_name, table_data_path, table);
|
||||
if (exchange)
|
||||
attach(*this, table_name, other_table_data_path, other_table);
|
||||
|
||||
if (is_dictionary)
|
||||
{
|
||||
auto new_table_id = StorageID(other_db.database_name, to_table_name, old_table_id.uuid);
|
||||
renameDictionaryInMemoryUnlocked(old_table_id, new_table_id);
|
||||
}
|
||||
}
|
||||
|
||||
void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
|
||||
@ -528,14 +521,6 @@ void DatabaseAtomic::renameDatabase(const String & new_name)
|
||||
table.second->renameInMemory(table_id);
|
||||
}
|
||||
|
||||
for (auto & dict : dictionaries)
|
||||
{
|
||||
auto old_name = StorageID(dict.second.create_query);
|
||||
auto name = old_name;
|
||||
name.database_name = database_name;
|
||||
renameDictionaryInMemoryUnlocked(old_name, name);
|
||||
}
|
||||
|
||||
path_to_metadata_symlink = getContext()->getPath() + "metadata/" + new_name_escaped;
|
||||
old_path_to_table_symlinks = path_to_table_symlinks;
|
||||
path_to_table_symlinks = getContext()->getPath() + "data/" + new_name_escaped + "/";
|
||||
@ -545,32 +530,6 @@ void DatabaseAtomic::renameDatabase(const String & new_name)
|
||||
tryCreateMetadataSymlink();
|
||||
}
|
||||
|
||||
void DatabaseAtomic::renameDictionaryInMemoryUnlocked(const StorageID & old_name, const StorageID & new_name)
|
||||
{
|
||||
auto it = dictionaries.find(old_name.table_name);
|
||||
assert(it != dictionaries.end());
|
||||
assert(it->second.config->getString("dictionary.uuid") == toString(old_name.uuid));
|
||||
assert(old_name.uuid == new_name.uuid);
|
||||
it->second.config->setString("dictionary.database", new_name.database_name);
|
||||
it->second.config->setString("dictionary.name", new_name.table_name);
|
||||
auto & create = it->second.create_query->as<ASTCreateQuery &>();
|
||||
create.database = new_name.database_name;
|
||||
create.table = new_name.table_name;
|
||||
assert(create.uuid == new_name.uuid);
|
||||
|
||||
if (old_name.table_name != new_name.table_name)
|
||||
{
|
||||
auto attach_info = std::move(it->second);
|
||||
dictionaries.erase(it);
|
||||
dictionaries.emplace(new_name.table_name, std::move(attach_info));
|
||||
}
|
||||
|
||||
auto result = external_loader.getLoadResult(toString(old_name.uuid));
|
||||
if (!result.object)
|
||||
return;
|
||||
const auto & dict = dynamic_cast<const IDictionary &>(*result.object);
|
||||
dict.updateDictionaryName(new_name);
|
||||
}
|
||||
void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid)
|
||||
{
|
||||
/// Table is in use while its shared_ptr counter is greater than 1.
|
||||
|
@ -72,8 +72,6 @@ protected:
|
||||
|
||||
void tryCreateMetadataSymlink();
|
||||
|
||||
void renameDictionaryInMemoryUnlocked(const StorageID & old_name, const StorageID & new_name);
|
||||
|
||||
//TODO store path in DatabaseWithOwnTables::tables
|
||||
using NameToPathMap = std::unordered_map<String, String>;
|
||||
NameToPathMap table_name_to_path;
|
||||
|
@ -21,18 +21,20 @@ namespace ErrorCodes
|
||||
|
||||
namespace
|
||||
{
|
||||
StoragePtr createStorageDictionary(const String & database_name, const ExternalLoader::LoadResult & load_result)
|
||||
StoragePtr createStorageDictionary(const String & database_name, const ExternalLoader::LoadResult & load_result, ContextPtr context)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!load_result.config)
|
||||
return nullptr;
|
||||
|
||||
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
|
||||
return StorageDictionary::create(
|
||||
StorageID(database_name, load_result.name),
|
||||
load_result.name,
|
||||
dictionary_structure,
|
||||
StorageDictionary::Location::DictionaryDatabase);
|
||||
StorageDictionary::Location::DictionaryDatabase,
|
||||
context);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
@ -57,7 +59,7 @@ Tables DatabaseDictionary::listTables(const FilterByNameFunction & filter_by_nam
|
||||
String db_name = getDatabaseName();
|
||||
for (auto & load_result : load_results)
|
||||
{
|
||||
auto storage = createStorageDictionary(db_name, load_result);
|
||||
auto storage = createStorageDictionary(db_name, load_result, getContext());
|
||||
if (storage)
|
||||
tables.emplace(storage->getStorageID().table_name, storage);
|
||||
}
|
||||
@ -72,7 +74,7 @@ bool DatabaseDictionary::isTableExist(const String & table_name, ContextPtr) con
|
||||
StoragePtr DatabaseDictionary::tryGetTable(const String & table_name, ContextPtr) const
|
||||
{
|
||||
auto load_result = getContext()->getExternalDictionariesLoader().getLoadResult(table_name);
|
||||
return createStorageDictionary(getDatabaseName(), load_result);
|
||||
return createStorageDictionary(getDatabaseName(), load_result, getContext());
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name)
|
||||
|
@ -37,7 +37,6 @@ namespace ErrorCodes
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int SYNTAX_ERROR;
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
||||
}
|
||||
|
||||
@ -63,14 +62,21 @@ std::pair<String, StoragePtr> createTableFromAST(
|
||||
storage->renameInMemory(ast_create_query);
|
||||
return {ast_create_query.table, storage};
|
||||
}
|
||||
/// We do not directly use `InterpreterCreateQuery::execute`, because
|
||||
/// - the database has not been loaded yet;
|
||||
/// - the code is simpler, since the query is already brought to a suitable form.
|
||||
if (!ast_create_query.columns_list || !ast_create_query.columns_list->columns)
|
||||
throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
||||
|
||||
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
|
||||
ConstraintsDescription constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
|
||||
ColumnsDescription columns;
|
||||
ConstraintsDescription constraints;
|
||||
|
||||
if (!ast_create_query.is_dictionary)
|
||||
{
|
||||
/// We do not directly use `InterpreterCreateQuery::execute`, because
|
||||
/// - the database has not been loaded yet;
|
||||
/// - the code is simpler, since the query is already brought to a suitable form.
|
||||
if (!ast_create_query.columns_list || !ast_create_query.columns_list->columns)
|
||||
throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
|
||||
|
||||
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
|
||||
constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
|
||||
}
|
||||
|
||||
return
|
||||
{
|
||||
@ -220,10 +226,6 @@ void DatabaseOnDisk::createTable(
|
||||
/// A race condition would be possible if a table with the same name is simultaneously created using CREATE and using ATTACH.
|
||||
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
|
||||
|
||||
if (isDictionaryExist(table_name))
|
||||
throw Exception(
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS, "Dictionary {}.{} already exists", backQuote(getDatabaseName()), backQuote(table_name));
|
||||
|
||||
if (isTableExist(table_name, getContext()))
|
||||
throw Exception(
|
||||
ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists", backQuote(getDatabaseName()), backQuote(table_name));
|
||||
|
@ -43,13 +43,15 @@ namespace
|
||||
const String & metadata_path,
|
||||
bool has_force_restore_data_flag)
|
||||
{
|
||||
assert(!query.is_dictionary);
|
||||
try
|
||||
{
|
||||
String table_name;
|
||||
StoragePtr table;
|
||||
std::tie(table_name, table)
|
||||
= createTableFromAST(query, database_name, database.getTableDataPath(query), context, has_force_restore_data_flag);
|
||||
auto [table_name, table] = createTableFromAST(
|
||||
query,
|
||||
database_name,
|
||||
database.getTableDataPath(query),
|
||||
context,
|
||||
has_force_restore_data_flag);
|
||||
|
||||
database.attachTable(table_name, table, database.getTableDataPath(query));
|
||||
}
|
||||
catch (Exception & e)
|
||||
@ -61,28 +63,6 @@ namespace
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void tryAttachDictionary(const ASTPtr & query, DatabaseOrdinary & database, const String & metadata_path, ContextPtr context)
|
||||
{
|
||||
auto & create_query = query->as<ASTCreateQuery &>();
|
||||
assert(create_query.is_dictionary);
|
||||
try
|
||||
{
|
||||
Poco::File meta_file(metadata_path);
|
||||
auto config = getDictionaryConfigurationFromAST(create_query, context, database.getDatabaseName());
|
||||
time_t modification_time = meta_file.getLastModified().epochTime();
|
||||
database.attachDictionary(create_query.table, DictionaryAttachInfo{query, config, modification_time});
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage(
|
||||
"Cannot attach dictionary " + backQuote(database.getDatabaseName()) + "." + backQuote(create_query.table)
|
||||
+ " from metadata file " + metadata_path + " from query " + serializeAST(*query));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
|
||||
{
|
||||
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
|
||||
@ -101,7 +81,7 @@ DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata
|
||||
|
||||
DatabaseOrdinary::DatabaseOrdinary(
|
||||
const String & name_, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr context_)
|
||||
: DatabaseWithDictionaries(name_, metadata_path_, data_path_, logger, context_)
|
||||
: DatabaseOnDisk(name_, metadata_path_, data_path_, logger, context_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -117,7 +97,7 @@ void DatabaseOrdinary::loadStoredObjects(ContextPtr local_context, bool has_forc
|
||||
|
||||
size_t total_dictionaries = 0;
|
||||
|
||||
auto process_metadata = [context_weak = ContextWeakPtr(local_context), &file_names, &total_dictionaries, &file_names_mutex, this](
|
||||
auto process_metadata = [&file_names, &total_dictionaries, &file_names_mutex, this](
|
||||
const String & file_name)
|
||||
{
|
||||
fs::path path(getMetadataPath());
|
||||
@ -164,7 +144,6 @@ void DatabaseOrdinary::loadStoredObjects(ContextPtr local_context, bool has_forc
|
||||
|
||||
AtomicStopwatch watch;
|
||||
std::atomic<size_t> tables_processed{0};
|
||||
std::atomic<size_t> dictionaries_processed{0};
|
||||
|
||||
ThreadPool pool;
|
||||
|
||||
@ -176,23 +155,12 @@ void DatabaseOrdinary::loadStoredObjects(ContextPtr local_context, bool has_forc
|
||||
/// loading of its config only, it doesn't involve loading the dictionary itself.
|
||||
|
||||
/// Attach dictionaries.
|
||||
for (const auto & [name, query] : file_names)
|
||||
{
|
||||
auto create_query = query->as<const ASTCreateQuery &>();
|
||||
if (create_query.is_dictionary)
|
||||
{
|
||||
tryAttachDictionary(query, *this, getMetadataPath() + name, local_context);
|
||||
|
||||
/// Messages, so that it's not boring to wait for the server to load for a long time.
|
||||
logAboutProgress(log, ++dictionaries_processed, total_dictionaries, watch);
|
||||
}
|
||||
}
|
||||
|
||||
/// Attach tables.
|
||||
for (const auto & name_with_query : file_names)
|
||||
{
|
||||
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
|
||||
if (!create_query.is_dictionary)
|
||||
|
||||
if (create_query.is_dictionary)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]()
|
||||
{
|
||||
tryAttachTable(
|
||||
@ -206,6 +174,32 @@ void DatabaseOrdinary::loadStoredObjects(ContextPtr local_context, bool has_forc
|
||||
/// Messages, so that it's not boring to wait for the server to load for a long time.
|
||||
logAboutProgress(log, ++tables_processed, total_tables, watch);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
|
||||
/// Attach tables.
|
||||
for (const auto & name_with_query : file_names)
|
||||
{
|
||||
const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
|
||||
|
||||
if (!create_query.is_dictionary)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]()
|
||||
{
|
||||
tryAttachTable(
|
||||
local_context,
|
||||
create_query,
|
||||
*this,
|
||||
database_name,
|
||||
getMetadataPath() + name_with_query.first,
|
||||
has_force_restore_data_flag);
|
||||
|
||||
/// Messages, so that it's not boring to wait for the server to load for a long time.
|
||||
logAboutProgress(log, ++tables_processed, total_tables, watch);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Databases/DatabaseWithDictionaries.h>
|
||||
#include <Databases/DatabaseOnDisk.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
|
||||
@ -11,7 +11,7 @@ namespace DB
|
||||
* It stores tables list in filesystem using list of .sql files,
|
||||
* that contain declaration of table represented by SQL ATTACH TABLE query.
|
||||
*/
|
||||
class DatabaseOrdinary : public DatabaseWithDictionaries
|
||||
class DatabaseOrdinary : public DatabaseOnDisk
|
||||
{
|
||||
public:
|
||||
DatabaseOrdinary(const String & name_, const String & metadata_path_, ContextPtr context);
|
||||
|
@ -511,9 +511,10 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
executeQuery(query, query_context, true);
|
||||
}
|
||||
|
||||
size_t dropped_dicts = 0;
|
||||
size_t moved_tables = 0;
|
||||
std::vector<UUID> dropped_tables;
|
||||
size_t dropped_dictionaries = 0;
|
||||
|
||||
for (const auto & table_name : tables_to_detach)
|
||||
{
|
||||
DDLGuardPtr table_guard = DatabaseCatalog::instance().getDDLGuard(db_name, table_name);
|
||||
@ -521,17 +522,13 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed, will retry");
|
||||
|
||||
auto table = tryGetTable(table_name, getContext());
|
||||
if (isDictionaryExist(table_name))
|
||||
{
|
||||
/// We can safely drop any dictionaries because they do not store data
|
||||
LOG_DEBUG(log, "Will DROP DICTIONARY {}", backQuoteIfNeed(table_name));
|
||||
DatabaseAtomic::removeDictionary(getContext(), table_name);
|
||||
++dropped_dicts;
|
||||
}
|
||||
else if (!table->storesDataOnDisk())
|
||||
|
||||
if (!table->storesDataOnDisk())
|
||||
{
|
||||
LOG_DEBUG(log, "Will DROP TABLE {}, because it does not store data on disk and can be safely dropped", backQuoteIfNeed(table_name));
|
||||
dropped_tables.push_back(tryGetTableUUID(table_name));
|
||||
dropped_dictionaries += table->isDictionary();
|
||||
|
||||
table->shutdown();
|
||||
DatabaseAtomic::dropTable(getContext(), table_name, true);
|
||||
}
|
||||
@ -550,7 +547,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
|
||||
|
||||
if (!tables_to_detach.empty())
|
||||
LOG_WARNING(log, "Cleaned {} outdated objects: dropped {} dictionaries and {} tables, moved {} tables",
|
||||
tables_to_detach.size(), dropped_dicts, dropped_tables.size(), moved_tables);
|
||||
tables_to_detach.size(), dropped_dictionaries, dropped_tables.size() - dropped_dictionaries, moved_tables);
|
||||
|
||||
/// Now database is cleared from outdated tables, let's rename ReplicatedMergeTree tables to actual names
|
||||
for (const auto & old_to_new : replicated_tables_to_rename)
|
||||
@ -766,33 +763,6 @@ void DatabaseReplicated::commitAlterTable(const StorageID & table_id,
|
||||
DatabaseAtomic::commitAlterTable(table_id, table_metadata_tmp_path, table_metadata_path, statement, query_context);
|
||||
}
|
||||
|
||||
void DatabaseReplicated::createDictionary(ContextPtr local_context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query)
|
||||
{
|
||||
auto txn = local_context->getZooKeeperMetadataTransaction();
|
||||
assert(!ddl_worker->isCurrentlyActive() || txn);
|
||||
if (txn && txn->isInitialQuery())
|
||||
{
|
||||
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name);
|
||||
String statement = getObjectDefinitionFromCreateQuery(query->clone());
|
||||
txn->addOp(zkutil::makeCreateRequest(metadata_zk_path, statement, zkutil::CreateMode::Persistent));
|
||||
}
|
||||
DatabaseAtomic::createDictionary(local_context, dictionary_name, query);
|
||||
}
|
||||
|
||||
void DatabaseReplicated::removeDictionary(ContextPtr local_context, const String & dictionary_name)
|
||||
{
|
||||
auto txn = local_context->getZooKeeperMetadataTransaction();
|
||||
assert(!ddl_worker->isCurrentlyActive() || txn);
|
||||
if (txn && txn->isInitialQuery())
|
||||
{
|
||||
String metadata_zk_path = zookeeper_path + "/metadata/" + escapeForFileName(dictionary_name);
|
||||
txn->addOp(zkutil::makeRemoveRequest(metadata_zk_path, -1));
|
||||
}
|
||||
DatabaseAtomic::removeDictionary(local_context, dictionary_name);
|
||||
}
|
||||
|
||||
void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const String & table_name)
|
||||
{
|
||||
auto txn = local_context->getZooKeeperMetadataTransaction();
|
||||
|
@ -40,10 +40,6 @@ public:
|
||||
void commitAlterTable(const StorageID & table_id,
|
||||
const String & table_metadata_tmp_path, const String & table_metadata_path,
|
||||
const String & statement, ContextPtr query_context) override;
|
||||
void createDictionary(ContextPtr context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query) override;
|
||||
void removeDictionary(ContextPtr context, const String & dictionary_name) override;
|
||||
void detachTablePermanently(ContextPtr context, const String & table_name) override;
|
||||
void removeDetachedPermanentlyFlag(ContextPtr context, const String & table_name, const String & table_metadata_path, bool attach) const override;
|
||||
|
||||
|
@ -1,381 +0,0 @@
|
||||
#include <Databases/DatabaseWithDictionaries.h>
|
||||
#include <Common/StatusInfo.h>
|
||||
#include <Common/ExternalLoaderStatus.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Interpreters/ExternalLoaderTempConfigRepository.h>
|
||||
#include <Interpreters/ExternalLoaderDatabaseConfigRepository.h>
|
||||
#include <Interpreters/DDLTask.h>
|
||||
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Poco/File.h>
|
||||
#include <boost/smart_ptr/make_shared_object.hpp>
|
||||
|
||||
|
||||
namespace CurrentStatusInfo
|
||||
{
|
||||
extern const Status DictionaryStatus;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY;
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
extern const int UNKNOWN_DICTIONARY;
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
}
|
||||
|
||||
|
||||
void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name, const DictionaryAttachInfo & attach_info)
|
||||
{
|
||||
auto dict_id = StorageID(attach_info.create_query);
|
||||
String internal_name = dict_id.getInternalDictionaryName();
|
||||
assert(attach_info.create_query->as<const ASTCreateQuery &>().table == dictionary_name);
|
||||
assert(!dict_id.database_name.empty());
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
auto [it, inserted] = dictionaries.emplace(dictionary_name, attach_info);
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::DICTIONARY_ALREADY_EXISTS,
|
||||
"Dictionary {} already exists.", dict_id.getNameForLogs());
|
||||
|
||||
/// Attach the dictionary as table too.
|
||||
try
|
||||
{
|
||||
/// TODO Make StorageDictionary an owner of IDictionary objects.
|
||||
/// All DDL operations with dictionaries will work with StorageDictionary table,
|
||||
/// and StorageDictionary will be responsible for loading of DDL dictionaries.
|
||||
/// ExternalLoaderDatabaseConfigRepository and other hacks related to ExternalLoader
|
||||
/// will not be longer required.
|
||||
attachTableUnlocked(
|
||||
dictionary_name,
|
||||
StorageDictionary::create(
|
||||
dict_id,
|
||||
internal_name,
|
||||
ExternalDictionariesLoader::getDictionaryStructure(*attach_info.config),
|
||||
StorageDictionary::Location::SameDatabaseAndNameAsDictionary),
|
||||
lock);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
dictionaries.erase(it);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
CurrentStatusInfo::set(CurrentStatusInfo::DictionaryStatus, internal_name, static_cast<Int8>(ExternalLoaderStatus::NOT_LOADED));
|
||||
|
||||
/// We want ExternalLoader::reloadConfig() to find out that the dictionary's config
|
||||
/// has been added and in case `dictionaries_lazy_load == false` to load the dictionary.
|
||||
reloadDictionaryConfig(internal_name);
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::detachDictionary(const String & dictionary_name)
|
||||
{
|
||||
DictionaryAttachInfo attach_info;
|
||||
detachDictionaryImpl(dictionary_name, attach_info);
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::detachDictionaryImpl(const String & dictionary_name, DictionaryAttachInfo & attach_info)
|
||||
{
|
||||
auto dict_id = StorageID::createEmpty();
|
||||
String internal_name;
|
||||
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
auto it = dictionaries.find(dictionary_name);
|
||||
if (it == dictionaries.end())
|
||||
throw Exception(ErrorCodes::UNKNOWN_DICTIONARY,
|
||||
"Dictionary {}.{} doesn't exist.", database_name, dictionary_name);
|
||||
dict_id = StorageID(it->second.create_query);
|
||||
internal_name = dict_id.getInternalDictionaryName();
|
||||
assert(dict_id.table_name == dictionary_name);
|
||||
assert(!dict_id.database_name.empty());
|
||||
|
||||
attach_info = std::move(it->second);
|
||||
dictionaries.erase(it);
|
||||
|
||||
/// Detach the dictionary as table too.
|
||||
try
|
||||
{
|
||||
if (!dict_id.hasUUID())
|
||||
detachTableUnlocked(dictionary_name, lock);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
dictionaries.emplace(dictionary_name, std::move(attach_info));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus, internal_name);
|
||||
|
||||
/// We want ExternalLoader::reloadConfig() to find out that the dictionary's config
|
||||
/// has been removed and to unload the dictionary.
|
||||
reloadDictionaryConfig(internal_name);
|
||||
|
||||
if (dict_id.hasUUID())
|
||||
detachTable(dictionary_name);
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::createDictionary(ContextPtr local_context, const String & dictionary_name, const ASTPtr & query)
|
||||
{
|
||||
const auto & settings = local_context->getSettingsRef();
|
||||
|
||||
/** The code is based on the assumption that all threads share the same order of operations:
|
||||
* - create the .sql.tmp file;
|
||||
* - add the dictionary to ExternalDictionariesLoader;
|
||||
* - load the dictionary in case dictionaries_lazy_load == false;
|
||||
* - attach the dictionary;
|
||||
* - rename .sql.tmp to .sql.
|
||||
*/
|
||||
|
||||
auto dict_id = StorageID(query);
|
||||
assert(query->as<const ASTCreateQuery &>().table == dictionary_name);
|
||||
assert(!dict_id.database_name.empty());
|
||||
|
||||
/// A race condition would be possible if a dictionary with the same name is simultaneously created using CREATE and using ATTACH.
|
||||
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
|
||||
if (isDictionaryExist(dictionary_name))
|
||||
throw Exception(ErrorCodes::DICTIONARY_ALREADY_EXISTS, "Dictionary {} already exists.", dict_id.getFullTableName());
|
||||
|
||||
/// A dictionary with the same full name could be defined in *.xml config files.
|
||||
if (external_loader.getCurrentStatus(dict_id.getFullNameNotQuoted()) != ExternalLoader::Status::NOT_EXIST)
|
||||
throw Exception(ErrorCodes::DICTIONARY_ALREADY_EXISTS,
|
||||
"Dictionary {} already exists.", dict_id.getFullNameNotQuoted());
|
||||
|
||||
if (isTableExist(dictionary_name, getContext()))
|
||||
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {} already exists.", dict_id.getFullTableName());
|
||||
|
||||
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
String dictionary_metadata_tmp_path = dictionary_metadata_path + ".tmp";
|
||||
String statement = getObjectDefinitionFromCreateQuery(query);
|
||||
|
||||
{
|
||||
/// Exclusive flags guarantees, that table is not created right now in another thread. Otherwise, exception will be thrown.
|
||||
WriteBufferFromFile out(dictionary_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
|
||||
writeString(statement, out);
|
||||
out.next();
|
||||
if (settings.fsync_metadata)
|
||||
out.sync();
|
||||
out.close();
|
||||
}
|
||||
|
||||
bool succeeded = false;
|
||||
bool uuid_locked = false;
|
||||
SCOPE_EXIT({
|
||||
if (!succeeded)
|
||||
{
|
||||
if (uuid_locked)
|
||||
DatabaseCatalog::instance().removeUUIDMappingFinally(dict_id.uuid);
|
||||
Poco::File(dictionary_metadata_tmp_path).remove();
|
||||
}
|
||||
});
|
||||
|
||||
if (dict_id.uuid != UUIDHelpers::Nil)
|
||||
{
|
||||
DatabaseCatalog::instance().addUUIDMapping(dict_id.uuid);
|
||||
uuid_locked = true;
|
||||
}
|
||||
|
||||
/// Add a temporary repository containing the dictionary.
|
||||
/// We need this temp repository to try loading the dictionary before actually attaching it to the database.
|
||||
auto temp_repository = external_loader.addConfigRepository(std::make_unique<ExternalLoaderTempConfigRepository>(
|
||||
getDatabaseName(), dictionary_metadata_tmp_path, getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>(), local_context)));
|
||||
|
||||
bool lazy_load = local_context->getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
if (!lazy_load)
|
||||
{
|
||||
/// load() is called here to force loading the dictionary, wait until the loading is finished,
|
||||
/// and throw an exception if the loading is failed.
|
||||
external_loader.load(dict_id.getInternalDictionaryName());
|
||||
}
|
||||
|
||||
auto config = getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>(), local_context);
|
||||
attachDictionary(dictionary_name, DictionaryAttachInfo{query, config, time(nullptr)});
|
||||
SCOPE_EXIT({
|
||||
if (!succeeded)
|
||||
detachDictionary(dictionary_name);
|
||||
});
|
||||
|
||||
auto txn = local_context->getZooKeeperMetadataTransaction();
|
||||
if (txn && !local_context->isInternalSubquery())
|
||||
txn->commit(); /// Commit point (a sort of) for Replicated database
|
||||
|
||||
/// If it was ATTACH query and file with dictionary metadata already exist
|
||||
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
|
||||
Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path);
|
||||
|
||||
/// ExternalDictionariesLoader doesn't know we renamed the metadata path.
|
||||
/// That's why we have to call ExternalLoader::reloadConfig() here.
|
||||
reloadDictionaryConfig(dict_id.getInternalDictionaryName());
|
||||
|
||||
/// Everything's ok.
|
||||
succeeded = true;
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::removeDictionary(ContextPtr local_context, const String & dictionary_name)
|
||||
{
|
||||
DictionaryAttachInfo attach_info;
|
||||
detachDictionaryImpl(dictionary_name, attach_info);
|
||||
|
||||
try
|
||||
{
|
||||
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
|
||||
auto txn = local_context->getZooKeeperMetadataTransaction();
|
||||
if (txn && !local_context->isInternalSubquery())
|
||||
txn->commit(); /// Commit point (a sort of) for Replicated database
|
||||
|
||||
Poco::File(dictionary_metadata_path).remove();
|
||||
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus,
|
||||
StorageID(attach_info.create_query).getInternalDictionaryName());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// If remove was not possible for some reason
|
||||
attachDictionary(dictionary_name, attach_info);
|
||||
throw;
|
||||
}
|
||||
|
||||
UUID dict_uuid = attach_info.create_query->as<ASTCreateQuery>()->uuid;
|
||||
if (dict_uuid != UUIDHelpers::Nil)
|
||||
DatabaseCatalog::instance().removeUUIDMappingFinally(dict_uuid);
|
||||
}
|
||||
|
||||
DatabaseDictionariesIteratorPtr DatabaseWithDictionaries::getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
DictionariesWithID filtered_dictionaries;
|
||||
for (const auto & dictionary : dictionaries)
|
||||
{
|
||||
if (filter_by_dictionary_name && !filter_by_dictionary_name(dictionary.first))
|
||||
continue;
|
||||
filtered_dictionaries.emplace_back();
|
||||
filtered_dictionaries.back().first = dictionary.first;
|
||||
filtered_dictionaries.back().second = dictionary.second.create_query->as<const ASTCreateQuery &>().uuid;
|
||||
}
|
||||
return std::make_unique<DatabaseDictionariesSnapshotIterator>(std::move(filtered_dictionaries), database_name);
|
||||
}
|
||||
|
||||
bool DatabaseWithDictionaries::isDictionaryExist(const String & dictionary_name) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return dictionaries.find(dictionary_name) != dictionaries.end();
|
||||
}
|
||||
|
||||
ASTPtr DatabaseWithDictionaries::getCreateDictionaryQueryImpl(
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error) const
|
||||
{
|
||||
{
|
||||
/// Try to get create query ifg for an attached dictionary.
|
||||
std::lock_guard lock{mutex};
|
||||
auto it = dictionaries.find(dictionary_name);
|
||||
if (it != dictionaries.end())
|
||||
{
|
||||
ASTPtr ast = it->second.create_query->clone();
|
||||
auto & create_query = ast->as<ASTCreateQuery &>();
|
||||
create_query.attach = false;
|
||||
create_query.database = database_name;
|
||||
return ast;
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to get create query for non-attached dictionary.
|
||||
ASTPtr ast;
|
||||
try
|
||||
{
|
||||
auto dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
ast = getCreateQueryFromMetadata(dictionary_metadata_path, throw_on_error);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (throw_on_error && (e.code() != ErrorCodes::FILE_DOESNT_EXIST))
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ast)
|
||||
{
|
||||
const auto * create_query = ast->as<const ASTCreateQuery>();
|
||||
if (create_query && create_query->is_dictionary)
|
||||
return ast;
|
||||
}
|
||||
if (throw_on_error)
|
||||
throw Exception{"Dictionary " + backQuote(dictionary_name) + " doesn't exist",
|
||||
ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY};
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> DatabaseWithDictionaries::getDictionaryConfiguration(const String & dictionary_name) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = dictionaries.find(dictionary_name);
|
||||
if (it != dictionaries.end())
|
||||
return it->second.config;
|
||||
throw Exception("Dictionary " + backQuote(dictionary_name) + " doesn't exist", ErrorCodes::UNKNOWN_DICTIONARY);
|
||||
}
|
||||
|
||||
time_t DatabaseWithDictionaries::getObjectMetadataModificationTime(const String & object_name) const
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = dictionaries.find(object_name);
|
||||
if (it != dictionaries.end())
|
||||
return it->second.modification_time;
|
||||
}
|
||||
return DatabaseOnDisk::getObjectMetadataModificationTime(object_name);
|
||||
}
|
||||
|
||||
|
||||
bool DatabaseWithDictionaries::empty() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return tables.empty() && dictionaries.empty();
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::reloadDictionaryConfig(const String & full_name)
|
||||
{
|
||||
/// Ensure that this database is attached to ExternalLoader as a config repository.
|
||||
if (!database_as_config_repo_for_external_loader.load())
|
||||
{
|
||||
auto repository = std::make_unique<ExternalLoaderDatabaseConfigRepository>(*this, getContext());
|
||||
auto remove_repository_callback = external_loader.addConfigRepository(std::move(repository));
|
||||
database_as_config_repo_for_external_loader = boost::make_shared<ext::scope_guard>(std::move(remove_repository_callback));
|
||||
}
|
||||
|
||||
external_loader.reloadConfig(getDatabaseName(), full_name);
|
||||
}
|
||||
|
||||
|
||||
void DatabaseWithDictionaries::shutdown()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
dictionaries.clear();
|
||||
}
|
||||
|
||||
/// Invoke removing the database from ExternalLoader.
|
||||
database_as_config_repo_for_external_loader = nullptr;
|
||||
|
||||
DatabaseOnDisk::shutdown();
|
||||
}
|
||||
|
||||
|
||||
DatabaseWithDictionaries::DatabaseWithDictionaries(
|
||||
const String & name, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr context_)
|
||||
: DatabaseOnDisk(name, metadata_path_, data_path_, logger, context_)
|
||||
, external_loader(context_->getExternalDictionariesLoader())
|
||||
{
|
||||
}
|
||||
|
||||
DatabaseWithDictionaries::~DatabaseWithDictionaries() = default;
|
||||
|
||||
}
|
@ -1,55 +0,0 @@
|
||||
#pragma once
|
||||
#include <Databases/DatabaseOnDisk.h>
|
||||
#include <boost/smart_ptr/atomic_shared_ptr.hpp>
|
||||
#include <ext/scope_guard.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
class ExternalDictionariesLoader;
|
||||
|
||||
|
||||
class DatabaseWithDictionaries : public DatabaseOnDisk
|
||||
{
|
||||
public:
|
||||
void attachDictionary(const String & dictionary_name, const DictionaryAttachInfo & attach_info) override;
|
||||
|
||||
void detachDictionary(const String & dictionary_name) override;
|
||||
|
||||
void createDictionary(ContextPtr context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query) override;
|
||||
|
||||
void removeDictionary(ContextPtr context, const String & dictionary_name) override;
|
||||
|
||||
bool isDictionaryExist(const String & dictionary_name) const override;
|
||||
|
||||
DatabaseDictionariesIteratorPtr getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name) override;
|
||||
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> getDictionaryConfiguration(const String & /*name*/) const override;
|
||||
|
||||
time_t getObjectMetadataModificationTime(const String & object_name) const override;
|
||||
|
||||
bool empty() const override;
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
~DatabaseWithDictionaries() override;
|
||||
|
||||
protected:
|
||||
DatabaseWithDictionaries(const String & name, const String & metadata_path_, const String & data_path_, const String & logger, ContextPtr context);
|
||||
|
||||
ASTPtr getCreateDictionaryQueryImpl(const String & dictionary_name, bool throw_on_error) const override;
|
||||
|
||||
std::unordered_map<String, DictionaryAttachInfo> dictionaries;
|
||||
const ExternalDictionariesLoader & external_loader;
|
||||
|
||||
private:
|
||||
void detachDictionaryImpl(const String & dictionary_name, DictionaryAttachInfo & attach_info);
|
||||
void reloadDictionaryConfig(const String & full_name);
|
||||
|
||||
boost::atomic_shared_ptr<ext::scope_guard> database_as_config_repo_for_external_loader;
|
||||
};
|
||||
|
||||
}
|
@ -29,7 +29,6 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY;
|
||||
}
|
||||
|
||||
class IDatabaseTablesIterator
|
||||
@ -95,38 +94,7 @@ public:
|
||||
const StoragePtr & table() const override { return it->second; }
|
||||
};
|
||||
|
||||
/// Copies list of dictionaries and iterates through such snapshot.
|
||||
class DatabaseDictionariesSnapshotIterator
|
||||
{
|
||||
private:
|
||||
DictionariesWithID dictionaries;
|
||||
DictionariesWithID::iterator it;
|
||||
String database_name;
|
||||
|
||||
public:
|
||||
DatabaseDictionariesSnapshotIterator() = default;
|
||||
DatabaseDictionariesSnapshotIterator(DictionariesWithID & dictionaries_, const String & database_name_)
|
||||
: dictionaries(dictionaries_), it(dictionaries.begin()), database_name(database_name_)
|
||||
{
|
||||
}
|
||||
DatabaseDictionariesSnapshotIterator(DictionariesWithID && dictionaries_, const String & database_name_)
|
||||
: dictionaries(dictionaries_), it(dictionaries.begin()), database_name(database_name_)
|
||||
{
|
||||
}
|
||||
|
||||
void next() { ++it; }
|
||||
|
||||
bool isValid() const { return !dictionaries.empty() && it != dictionaries.end(); }
|
||||
|
||||
const String & name() const { return it->first; }
|
||||
|
||||
const UUID & uuid() const { return it->second; }
|
||||
|
||||
const String & databaseName() const { assert(!database_name.empty()); return database_name; }
|
||||
};
|
||||
|
||||
using DatabaseTablesIteratorPtr = std::unique_ptr<IDatabaseTablesIterator>;
|
||||
using DatabaseDictionariesIteratorPtr = std::unique_ptr<DatabaseDictionariesSnapshotIterator>;
|
||||
|
||||
|
||||
/** Database engine.
|
||||
@ -158,12 +126,6 @@ public:
|
||||
/// Check the existence of the table.
|
||||
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;
|
||||
|
||||
/// Check the existence of the dictionary
|
||||
virtual bool isDictionaryExist(const String & /*name*/) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Get the table for work. Return nullptr if there is no table.
|
||||
virtual StoragePtr tryGetTable(const String & name, ContextPtr context) const = 0;
|
||||
|
||||
@ -175,12 +137,6 @@ public:
|
||||
/// It is possible to have "hidden" tables that are not visible when passing through, but are visible if you get them by name using the functions above.
|
||||
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}) = 0;
|
||||
|
||||
/// Get an iterator to pass through all the dictionaries.
|
||||
virtual DatabaseDictionariesIteratorPtr getDictionariesIterator([[maybe_unused]] const FilterByNameFunction & filter_by_dictionary_name = {})
|
||||
{
|
||||
return std::make_unique<DatabaseDictionariesSnapshotIterator>();
|
||||
}
|
||||
|
||||
/// Is the database empty.
|
||||
virtual bool empty() const = 0;
|
||||
|
||||
@ -194,15 +150,6 @@ public:
|
||||
throw Exception("There is no CREATE TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Add the dictionary to the database. Record its presence in the metadata.
|
||||
virtual void createDictionary(
|
||||
ContextPtr /*context*/,
|
||||
const String & /*dictionary_name*/,
|
||||
const ASTPtr & /*query*/)
|
||||
{
|
||||
throw Exception("There is no CREATE DICTIONARY query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Delete the table from the database, drop table and delete the metadata.
|
||||
virtual void dropTable(
|
||||
ContextPtr /*context*/,
|
||||
@ -212,14 +159,6 @@ public:
|
||||
throw Exception("There is no DROP TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Delete the dictionary from the database. Delete the metadata.
|
||||
virtual void removeDictionary(
|
||||
ContextPtr /*context*/,
|
||||
const String & /*dictionary_name*/)
|
||||
{
|
||||
throw Exception("There is no DROP DICTIONARY query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Add a table to the database, but do not add it to the metadata. The database may not support this method.
|
||||
///
|
||||
/// Note: ATTACH TABLE statement actually uses createTable method.
|
||||
@ -228,25 +167,12 @@ public:
|
||||
throw Exception("There is no ATTACH TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Add dictionary to the database, but do not add it to the metadata. The database may not support this method.
|
||||
/// If dictionaries_lazy_load is false it also starts loading the dictionary asynchronously.
|
||||
virtual void attachDictionary(const String & /* dictionary_name */, const DictionaryAttachInfo & /* attach_info */)
|
||||
{
|
||||
throw Exception("There is no ATTACH DICTIONARY query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Forget about the table without deleting it, and return it. The database may not support this method.
|
||||
virtual StoragePtr detachTable(const String & /*name*/)
|
||||
{
|
||||
throw Exception("There is no DETACH TABLE query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Forget about the dictionary without deleting it. The database may not support this method.
|
||||
virtual void detachDictionary(const String & /*name*/)
|
||||
{
|
||||
throw Exception("There is no DETACH DICTIONARY query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Forget about the table without deleting it's data, but rename metadata file to prevent reloading it
|
||||
/// with next restart. The database may not support this method.
|
||||
virtual void detachTablePermanently(ContextPtr /*context*/, const String & /*name*/)
|
||||
@ -295,22 +221,6 @@ public:
|
||||
return getCreateTableQueryImpl(name, context, true);
|
||||
}
|
||||
|
||||
/// Get the CREATE DICTIONARY query for the dictionary. Returns nullptr if dictionary doesn't exists.
|
||||
ASTPtr tryGetCreateDictionaryQuery(const String & name) const noexcept
|
||||
{
|
||||
return getCreateDictionaryQueryImpl(name, false);
|
||||
}
|
||||
|
||||
ASTPtr getCreateDictionaryQuery(const String & name) const
|
||||
{
|
||||
return getCreateDictionaryQueryImpl(name, true);
|
||||
}
|
||||
|
||||
virtual Poco::AutoPtr<Poco::Util::AbstractConfiguration> getDictionaryConfiguration(const String & /*name*/) const
|
||||
{
|
||||
throw Exception(getEngineName() + ": getDictionaryConfiguration() is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Get the CREATE DATABASE query for current database.
|
||||
virtual ASTPtr getCreateDatabaseQuery() const = 0;
|
||||
|
||||
@ -364,13 +274,6 @@ protected:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
virtual ASTPtr getCreateDictionaryQueryImpl(const String & /*name*/, bool throw_on_error) const
|
||||
{
|
||||
if (throw_on_error)
|
||||
throw Exception("There is no SHOW CREATE DICTIONARY query for Database" + getEngineName(), ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
mutable std::mutex mutex;
|
||||
String database_name;
|
||||
};
|
||||
|
@ -19,7 +19,6 @@ SRCS(
|
||||
DatabaseReplicated.cpp
|
||||
DatabaseReplicatedSettings.cpp
|
||||
DatabaseReplicatedWorker.cpp
|
||||
DatabaseWithDictionaries.cpp
|
||||
DatabasesCommon.cpp
|
||||
MySQL/ConnectionMySQLSettings.cpp
|
||||
MySQL/DatabaseConnectionMySQL.cpp
|
||||
|
@ -630,8 +630,10 @@ std::unique_lock<std::shared_mutex> DatabaseCatalog::getExclusiveDDLGuardForData
|
||||
|
||||
bool DatabaseCatalog::isDictionaryExist(const StorageID & table_id) const
|
||||
{
|
||||
auto db = tryGetDatabase(table_id.getDatabaseName());
|
||||
return db && db->isDictionaryExist(table_id.getTableName());
|
||||
auto storage = tryGetTable(table_id, getContext());
|
||||
bool storage_is_dictionary = storage && storage->isDictionary();
|
||||
|
||||
return storage_is_dictionary;
|
||||
}
|
||||
|
||||
StoragePtr DatabaseCatalog::getTable(const StorageID & table_id, ContextPtr local_context) const
|
||||
|
@ -755,11 +755,11 @@ static ExpressionActionsPtr createJoinedBlockActions(ContextPtr context, const T
|
||||
|
||||
static bool allowDictJoin(StoragePtr joined_storage, ContextPtr context, String & dict_name, String & key_name)
|
||||
{
|
||||
const auto * dict = dynamic_cast<const StorageDictionary *>(joined_storage.get());
|
||||
if (!dict)
|
||||
if (!joined_storage->isDictionary())
|
||||
return false;
|
||||
|
||||
dict_name = dict->dictionaryName();
|
||||
StorageDictionary & storage_dictionary = static_cast<StorageDictionary &>(*joined_storage);
|
||||
dict_name = storage_dictionary.getDictionaryName();
|
||||
auto dictionary = context->getExternalDictionariesLoader().getDictionary(dict_name, context);
|
||||
if (!dictionary)
|
||||
return false;
|
||||
|
@ -32,7 +32,6 @@ ExternalDictionariesLoader::ExternalDictionariesLoader(ContextPtr global_context
|
||||
enablePeriodicUpdates(true);
|
||||
}
|
||||
|
||||
|
||||
ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
|
||||
const std::string & name, const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & key_in_config, const std::string & repository_name) const
|
||||
|
@ -1,89 +0,0 @@
|
||||
#include <Interpreters/ExternalLoaderDatabaseConfigRepository.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_DICTIONARY;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
String trimDatabaseName(const std::string & loadable_definition_name, const String & database_name,
|
||||
const IDatabase & database, ContextPtr global_context)
|
||||
{
|
||||
bool is_atomic_database = database.getUUID() != UUIDHelpers::Nil;
|
||||
if (is_atomic_database)
|
||||
{
|
||||
/// We do not know actual database and dictionary names here
|
||||
auto dict_id = StorageID::createEmpty();
|
||||
dict_id.uuid = parseFromString<UUID>(loadable_definition_name);
|
||||
assert(dict_id.uuid != UUIDHelpers::Nil);
|
||||
/// Get associated StorageDictionary by UUID
|
||||
auto table = DatabaseCatalog::instance().getTable(dict_id, global_context);
|
||||
auto dict_id_with_names = table->getStorageID();
|
||||
return dict_id_with_names.table_name;
|
||||
}
|
||||
|
||||
if (!startsWith(loadable_definition_name, database_name))
|
||||
throw Exception(ErrorCodes::UNKNOWN_DICTIONARY,
|
||||
"Loadable '{}' is not from database '{}'", loadable_definition_name, database_name);
|
||||
/// dbname.loadable_name
|
||||
///--> remove <---
|
||||
return loadable_definition_name.substr(database_name.length() + 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, ContextPtr context_)
|
||||
: WithContext(context_->getGlobalContext())
|
||||
, database_name(database_.getDatabaseName())
|
||||
, database(database_)
|
||||
{
|
||||
}
|
||||
|
||||
LoadablesConfigurationPtr ExternalLoaderDatabaseConfigRepository::load(const std::string & loadable_definition_name)
|
||||
{
|
||||
auto dict_name = trimDatabaseName(loadable_definition_name, database_name, database, getContext());
|
||||
return database.getDictionaryConfiguration(dict_name);
|
||||
}
|
||||
|
||||
bool ExternalLoaderDatabaseConfigRepository::exists(const std::string & loadable_definition_name)
|
||||
{
|
||||
auto dict_name = trimDatabaseName(loadable_definition_name, database_name, database, getContext());
|
||||
return database.isDictionaryExist(dict_name);
|
||||
}
|
||||
|
||||
Poco::Timestamp ExternalLoaderDatabaseConfigRepository::getUpdateTime(const std::string & loadable_definition_name)
|
||||
{
|
||||
auto dict_name = trimDatabaseName(loadable_definition_name, database_name, database, getContext());
|
||||
return database.getObjectMetadataModificationTime(dict_name);
|
||||
}
|
||||
|
||||
std::set<std::string> ExternalLoaderDatabaseConfigRepository::getAllLoadablesDefinitionNames()
|
||||
{
|
||||
std::set<std::string> result;
|
||||
auto itr = database.getDictionariesIterator();
|
||||
bool is_atomic_database = database.getUUID() != UUIDHelpers::Nil;
|
||||
while (itr && itr->isValid())
|
||||
{
|
||||
if (is_atomic_database)
|
||||
{
|
||||
assert(itr->uuid() != UUIDHelpers::Nil);
|
||||
result.insert(toString(itr->uuid()));
|
||||
}
|
||||
else
|
||||
result.insert(database_name + "." + itr->name());
|
||||
itr->next();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
@ -14,7 +14,7 @@ class ExternalLoaderDatabaseConfigRepository : public IExternalLoaderConfigRepos
|
||||
public:
|
||||
ExternalLoaderDatabaseConfigRepository(IDatabase & database_, ContextPtr global_context_);
|
||||
|
||||
const std::string & getName() const override { return database_name; }
|
||||
std::string getName() const override { return database_name; }
|
||||
|
||||
std::set<std::string> getAllLoadablesDefinitionNames() override;
|
||||
|
||||
|
@ -0,0 +1,39 @@
|
||||
#include "ExternalLoaderDictionaryStorageConfigRepository.h"
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ExternalLoaderDictionaryStorageConfigRepository::ExternalLoaderDictionaryStorageConfigRepository(const StorageDictionary & dictionary_storage_)
|
||||
: dictionary_storage(dictionary_storage_)
|
||||
{
|
||||
}
|
||||
|
||||
std::string ExternalLoaderDictionaryStorageConfigRepository::getName() const
|
||||
{
|
||||
return dictionary_storage.getStorageID().getInternalDictionaryName();
|
||||
}
|
||||
|
||||
std::set<std::string> ExternalLoaderDictionaryStorageConfigRepository::getAllLoadablesDefinitionNames()
|
||||
{
|
||||
return { getName() };
|
||||
}
|
||||
|
||||
bool ExternalLoaderDictionaryStorageConfigRepository::exists(const std::string & loadable_definition_name)
|
||||
{
|
||||
return getName() == loadable_definition_name;
|
||||
}
|
||||
|
||||
Poco::Timestamp ExternalLoaderDictionaryStorageConfigRepository::getUpdateTime(const std::string &)
|
||||
{
|
||||
return dictionary_storage.getUpdateTime();
|
||||
}
|
||||
|
||||
LoadablesConfigurationPtr ExternalLoaderDictionaryStorageConfigRepository::load(const std::string &)
|
||||
{
|
||||
return dictionary_storage.getConfiguration();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/IExternalLoaderConfigRepository.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageDictionary;
|
||||
|
||||
class ExternalLoaderDictionaryStorageConfigRepository : public IExternalLoaderConfigRepository
|
||||
{
|
||||
public:
|
||||
explicit ExternalLoaderDictionaryStorageConfigRepository(const StorageDictionary & dictionary_storage_);
|
||||
|
||||
std::string getName() const override;
|
||||
|
||||
std::set<std::string> getAllLoadablesDefinitionNames() override;
|
||||
|
||||
bool exists(const std::string & loadable_definition_name) override;
|
||||
|
||||
Poco::Timestamp getUpdateTime(const std::string & loadable_definition_name) override;
|
||||
|
||||
LoadablesConfigurationPtr load(const std::string & loadable_definition_name) override;
|
||||
|
||||
private:
|
||||
const StorageDictionary & dictionary_storage;
|
||||
};
|
||||
|
||||
}
|
@ -13,7 +13,7 @@ class ExternalLoaderTempConfigRepository : public IExternalLoaderConfigRepositor
|
||||
public:
|
||||
ExternalLoaderTempConfigRepository(const String & repository_name_, const String & path_, const LoadablesConfigurationPtr & config_);
|
||||
|
||||
const String & getName() const override { return name; }
|
||||
String getName() const override { return name; }
|
||||
bool isTemporary() const override { return true; }
|
||||
|
||||
std::set<String> getAllLoadablesDefinitionNames() override;
|
||||
|
@ -15,7 +15,7 @@ class ExternalLoaderXMLConfigRepository : public IExternalLoaderConfigRepository
|
||||
public:
|
||||
ExternalLoaderXMLConfigRepository(const Poco::Util::AbstractConfiguration & main_config_, const std::string & config_key_);
|
||||
|
||||
const String & getName() const override { return name; }
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
/// Return set of .xml files from path in main_config (config_key)
|
||||
std::set<std::string> getAllLoadablesDefinitionNames() override;
|
||||
|
@ -23,7 +23,7 @@ class IExternalLoaderConfigRepository
|
||||
{
|
||||
public:
|
||||
/// Returns the name of the repository.
|
||||
virtual const std::string & getName() const = 0;
|
||||
virtual std::string getName() const = 0;
|
||||
|
||||
/// Whether this repository is temporary:
|
||||
/// it's created and destroyed while executing the same query.
|
||||
|
@ -71,6 +71,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
||||
extern const int INCORRECT_QUERY;
|
||||
extern const int UNKNOWN_DATABASE_ENGINE;
|
||||
@ -79,7 +80,6 @@ namespace ErrorCodes
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int BAD_DATABASE_FOR_TEMPORARY_TABLE;
|
||||
extern const int SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY;
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
extern const int ILLEGAL_SYNTAX_FOR_DATA_TYPE;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int LOGICAL_ERROR;
|
||||
@ -551,6 +551,10 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
|
||||
properties.columns = table_function->getActualTableStructure(getContext());
|
||||
assert(!properties.columns.empty());
|
||||
}
|
||||
else if (create.is_dictionary)
|
||||
{
|
||||
return {};
|
||||
}
|
||||
else
|
||||
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
@ -837,11 +841,20 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
|
||||
|
||||
// Table SQL definition is available even if the table is detached (even permanently)
|
||||
auto query = database->getCreateTableQuery(create.table, getContext());
|
||||
create = query->as<ASTCreateQuery &>(); // Copy the saved create query, but use ATTACH instead of CREATE
|
||||
if (create.is_dictionary)
|
||||
auto create_query = query->as<ASTCreateQuery &>();
|
||||
|
||||
if (!create.is_dictionary && create_query.is_dictionary)
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Cannot ATTACH TABLE {}.{}, it is a Dictionary",
|
||||
backQuoteIfNeed(database_name), backQuoteIfNeed(create.table));
|
||||
"Cannot ATTACH TABLE {}.{}, it is a Dictionary",
|
||||
backQuoteIfNeed(database_name), backQuoteIfNeed(create.table));
|
||||
|
||||
if (create.is_dictionary && !create_query.is_dictionary)
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Cannot ATTACH DICTIONARY {}.{}, it is a Table",
|
||||
backQuoteIfNeed(database_name), backQuoteIfNeed(create.table));
|
||||
|
||||
create = create_query; // Copy the saved create query, but use ATTACH instead of CREATE
|
||||
|
||||
create.attach = true;
|
||||
create.attach_short_syntax = true;
|
||||
create.if_not_exists = if_not_exists;
|
||||
@ -947,6 +960,9 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
database = DatabaseCatalog::instance().getDatabase(create.database);
|
||||
assertOrSetUUID(create, database);
|
||||
|
||||
String storage_name = create.is_dictionary ? "Dictionary" : "Table";
|
||||
auto storage_already_exists_error_code = create.is_dictionary ? ErrorCodes::DICTIONARY_ALREADY_EXISTS : ErrorCodes::TABLE_ALREADY_EXISTS;
|
||||
|
||||
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
|
||||
if (database->isTableExist(create.table, getContext()))
|
||||
{
|
||||
@ -966,12 +982,13 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
|
||||
interpreter.execute();
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists.", backQuoteIfNeed(create.database), backQuoteIfNeed(create.table));
|
||||
throw Exception(storage_already_exists_error_code,
|
||||
"{} {}.{} already exists.", storage_name, backQuoteIfNeed(create.database), backQuoteIfNeed(create.table));
|
||||
}
|
||||
|
||||
data_path = database->getTableDataPath(create);
|
||||
if (!create.attach && !data_path.empty() && fs::exists(fs::path{getContext()->getPath()} / data_path))
|
||||
throw Exception(ErrorCodes::TABLE_ALREADY_EXISTS, "Directory for table data {} already exists", String(data_path));
|
||||
throw Exception(storage_already_exists_error_code, "Directory for {} data {} already exists", Poco::toLower(storage_name), String(data_path));
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1123,56 +1140,6 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
|
||||
return {};
|
||||
}
|
||||
|
||||
BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
|
||||
{
|
||||
String dictionary_name = create.table;
|
||||
|
||||
create.database = getContext()->resolveDatabase(create.database);
|
||||
const String & database_name = create.database;
|
||||
|
||||
auto guard = DatabaseCatalog::instance().getDDLGuard(database_name, dictionary_name);
|
||||
DatabasePtr database = DatabaseCatalog::instance().getDatabase(database_name);
|
||||
|
||||
if (typeid_cast<DatabaseReplicated *>(database.get())
|
||||
&& getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
|
||||
{
|
||||
if (!create.attach)
|
||||
assertOrSetUUID(create, database);
|
||||
guard->releaseTableLock();
|
||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
}
|
||||
|
||||
if (database->isDictionaryExist(dictionary_name))
|
||||
{
|
||||
/// TODO Check structure of dictionary
|
||||
if (create.if_not_exists)
|
||||
return {};
|
||||
else
|
||||
throw Exception(
|
||||
"Dictionary " + database_name + "." + dictionary_name + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
if (create.attach)
|
||||
{
|
||||
auto query = DatabaseCatalog::instance().getDatabase(database_name)->getCreateDictionaryQuery(dictionary_name);
|
||||
create = query->as<ASTCreateQuery &>();
|
||||
create.attach = true;
|
||||
}
|
||||
|
||||
assertOrSetUUID(create, database);
|
||||
|
||||
if (create.attach)
|
||||
{
|
||||
auto config = getDictionaryConfigurationFromAST(create, getContext());
|
||||
auto modification_time = database->getObjectMetadataModificationTime(dictionary_name);
|
||||
database->attachDictionary(dictionary_name, DictionaryAttachInfo{query_ptr, config, modification_time});
|
||||
}
|
||||
else
|
||||
database->createDictionary(getContext(), dictionary_name, query_ptr);
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
void InterpreterCreateQuery::prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr local_context, const String & cluster_name)
|
||||
{
|
||||
if (create.attach)
|
||||
@ -1237,10 +1204,8 @@ BlockIO InterpreterCreateQuery::execute()
|
||||
/// CREATE|ATTACH DATABASE
|
||||
if (!create.database.empty() && create.table.empty())
|
||||
return createDatabase(create);
|
||||
else if (!create.is_dictionary)
|
||||
return createTable(create);
|
||||
else
|
||||
return createDictionary(create);
|
||||
return createTable(create);
|
||||
}
|
||||
|
||||
|
||||
|
@ -70,7 +70,6 @@ private:
|
||||
|
||||
BlockIO createDatabase(ASTCreateQuery & create);
|
||||
BlockIO createTable(ASTCreateQuery & create);
|
||||
BlockIO createDictionary(ASTCreateQuery & create);
|
||||
|
||||
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
|
||||
TableProperties setProperties(ASTCreateQuery & create) const;
|
||||
|
@ -31,7 +31,6 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int SYNTAX_ERROR;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int UNKNOWN_DICTIONARY;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int INCORRECT_QUERY;
|
||||
}
|
||||
@ -58,14 +57,7 @@ BlockIO InterpreterDropQuery::execute()
|
||||
drop.no_delay = true;
|
||||
|
||||
if (!drop.table.empty())
|
||||
{
|
||||
if (!drop.is_dictionary)
|
||||
return executeToTable(drop);
|
||||
else if (drop.permanently && drop.kind == ASTDropQuery::Kind::Detach)
|
||||
throw Exception("DETACH PERMANENTLY is not implemented for dictionaries", ErrorCodes::NOT_IMPLEMENTED);
|
||||
else
|
||||
return executeToDictionary(drop.database, drop.table, drop.kind, drop.if_exists, drop.temporary, drop.no_ddl_lock);
|
||||
}
|
||||
return executeToTable(drop);
|
||||
else if (!drop.database.empty())
|
||||
return executeToDatabase(drop);
|
||||
else
|
||||
@ -122,8 +114,17 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
|
||||
if (database && table)
|
||||
{
|
||||
if (query.as<ASTDropQuery &>().is_view && !table->isView())
|
||||
throw Exception("Table " + table_id.getNameForLogs() + " is not a View", ErrorCodes::LOGICAL_ERROR);
|
||||
auto & ast_drop_query = query.as<ASTDropQuery &>();
|
||||
|
||||
if (ast_drop_query.is_view && !table->isView())
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Table {} is not a View",
|
||||
table_id.getNameForLogs());
|
||||
|
||||
if (ast_drop_query.is_dictionary && !table->isDictionary())
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY,
|
||||
"Table {} is not a Dictionary",
|
||||
table_id.getNameForLogs());
|
||||
|
||||
/// Now get UUID, so we can wait for table data to be finally dropped
|
||||
table_id.uuid = database->tryGetTableUUID(table_id.table_name);
|
||||
@ -133,14 +134,24 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
|
||||
getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
!is_drop_or_detach_database;
|
||||
|
||||
AccessFlags drop_storage;
|
||||
|
||||
if (table->isView())
|
||||
drop_storage = AccessType::DROP_VIEW;
|
||||
else if (table->isDictionary())
|
||||
drop_storage = AccessType::DROP_DICTIONARY;
|
||||
else
|
||||
drop_storage = AccessType::DROP_TABLE;
|
||||
|
||||
if (is_replicated_ddl_query)
|
||||
{
|
||||
if (query.kind == ASTDropQuery::Kind::Detach)
|
||||
getContext()->checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
|
||||
getContext()->checkAccess(drop_storage, table_id);
|
||||
else if (query.kind == ASTDropQuery::Kind::Truncate)
|
||||
getContext()->checkAccess(AccessType::TRUNCATE, table_id);
|
||||
else if (query.kind == ASTDropQuery::Kind::Drop)
|
||||
getContext()->checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
|
||||
getContext()->checkAccess(drop_storage, table_id);
|
||||
|
||||
ddl_guard->releaseTableLock();
|
||||
table.reset();
|
||||
@ -149,8 +160,17 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
|
||||
if (query.kind == ASTDropQuery::Kind::Detach)
|
||||
{
|
||||
getContext()->checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
|
||||
table->checkTableCanBeDetached();
|
||||
getContext()->checkAccess(drop_storage, table_id);
|
||||
|
||||
if (table->isDictionary())
|
||||
{
|
||||
/// If DROP DICTIONARY query is not used, check if Dictionary can be dropped with DROP TABLE query
|
||||
if (!query.is_dictionary)
|
||||
table->checkTableCanBeDetached();
|
||||
}
|
||||
else
|
||||
table->checkTableCanBeDetached();
|
||||
|
||||
table->shutdown();
|
||||
TableExclusiveLockHolder table_lock;
|
||||
|
||||
@ -170,7 +190,11 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
}
|
||||
else if (query.kind == ASTDropQuery::Kind::Truncate)
|
||||
{
|
||||
if (table->isDictionary())
|
||||
throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
getContext()->checkAccess(AccessType::TRUNCATE, table_id);
|
||||
|
||||
table->checkTableCanBeDropped();
|
||||
|
||||
auto table_lock = table->lockExclusively(getContext()->getCurrentQueryId(), getContext()->getSettingsRef().lock_acquire_timeout);
|
||||
@ -180,8 +204,16 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
}
|
||||
else if (query.kind == ASTDropQuery::Kind::Drop)
|
||||
{
|
||||
getContext()->checkAccess(table->isView() ? AccessType::DROP_VIEW : AccessType::DROP_TABLE, table_id);
|
||||
table->checkTableCanBeDropped();
|
||||
getContext()->checkAccess(drop_storage, table_id);
|
||||
|
||||
if (table->isDictionary())
|
||||
{
|
||||
/// If DROP DICTIONARY query is not used, check if Dictionary can be dropped with DROP TABLE query
|
||||
if (!query.is_dictionary)
|
||||
table->checkTableCanBeDropped();
|
||||
}
|
||||
else
|
||||
table->checkTableCanBeDropped();
|
||||
|
||||
table->shutdown();
|
||||
|
||||
@ -199,67 +231,6 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ASTDropQuery & query, DatabaseP
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
BlockIO InterpreterDropQuery::executeToDictionary(
|
||||
const String & database_name_,
|
||||
const String & dictionary_name,
|
||||
ASTDropQuery::Kind kind,
|
||||
bool if_exists,
|
||||
bool is_temporary,
|
||||
bool no_ddl_lock)
|
||||
{
|
||||
if (is_temporary)
|
||||
throw Exception("Temporary dictionaries are not possible.", ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
String database_name = getContext()->resolveDatabase(database_name_);
|
||||
|
||||
auto ddl_guard = (!no_ddl_lock ? DatabaseCatalog::instance().getDDLGuard(database_name, dictionary_name) : nullptr);
|
||||
query_ptr->as<ASTDropQuery>()->database = database_name;
|
||||
DatabasePtr database = tryGetDatabase(database_name, if_exists);
|
||||
|
||||
bool is_drop_or_detach_database = query_ptr->as<ASTDropQuery>()->table.empty();
|
||||
bool is_replicated_ddl_query = typeid_cast<DatabaseReplicated *>(database.get()) &&
|
||||
getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY &&
|
||||
!is_drop_or_detach_database;
|
||||
if (is_replicated_ddl_query)
|
||||
{
|
||||
if (kind == ASTDropQuery::Kind::Detach)
|
||||
throw Exception(ErrorCodes::INCORRECT_QUERY, "DETACH DICTIONARY is not allowed for Replicated databases.");
|
||||
|
||||
getContext()->checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name);
|
||||
|
||||
ddl_guard->releaseTableLock();
|
||||
return typeid_cast<DatabaseReplicated *>(database.get())->tryEnqueueReplicatedDDL(query_ptr, getContext());
|
||||
}
|
||||
|
||||
if (!database || !database->isDictionaryExist(dictionary_name))
|
||||
{
|
||||
if (!if_exists)
|
||||
throw Exception(
|
||||
"Dictionary " + backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dictionary_name) + " doesn't exist.",
|
||||
ErrorCodes::UNKNOWN_DICTIONARY);
|
||||
else
|
||||
return {};
|
||||
}
|
||||
|
||||
if (kind == ASTDropQuery::Kind::Detach)
|
||||
{
|
||||
/// Drop dictionary from memory, don't touch data and metadata
|
||||
getContext()->checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name);
|
||||
database->detachDictionary(dictionary_name);
|
||||
}
|
||||
else if (kind == ASTDropQuery::Kind::Truncate)
|
||||
{
|
||||
throw Exception("Cannot TRUNCATE dictionary", ErrorCodes::SYNTAX_ERROR);
|
||||
}
|
||||
else if (kind == ASTDropQuery::Kind::Drop)
|
||||
{
|
||||
getContext()->checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name);
|
||||
database->removeDictionary(getContext(), dictionary_name);
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name, ASTDropQuery::Kind kind)
|
||||
{
|
||||
if (kind == ASTDropQuery::Kind::Detach)
|
||||
@ -351,15 +322,6 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
|
||||
if (database->shouldBeEmptyOnDetach())
|
||||
{
|
||||
/// DETACH or DROP all tables and dictionaries inside database.
|
||||
/// First we should DETACH or DROP dictionaries because StorageDictionary
|
||||
/// must be detached only by detaching corresponding dictionary.
|
||||
for (auto iterator = database->getDictionariesIterator(); iterator->isValid(); iterator->next())
|
||||
{
|
||||
String current_dictionary = iterator->name();
|
||||
executeToDictionary(database_name, current_dictionary, query.kind, false, false, false);
|
||||
}
|
||||
|
||||
ASTDropQuery query_for_table;
|
||||
query_for_table.kind = query.kind;
|
||||
query_for_table.if_exists = true;
|
||||
@ -371,6 +333,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
DatabasePtr db;
|
||||
UUID table_to_wait = UUIDHelpers::Nil;
|
||||
query_for_table.table = iterator->name();
|
||||
query_for_table.is_dictionary = iterator->table()->isDictionary();
|
||||
executeToTableImpl(query_for_table, db, table_to_wait);
|
||||
uuids_to_wait.push_back(table_to_wait);
|
||||
}
|
||||
|
@ -45,17 +45,33 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
|
||||
ASTPtr create_query;
|
||||
ASTQueryWithTableAndOutput * show_query;
|
||||
if ((show_query = query_ptr->as<ASTShowCreateTableQuery>()) ||
|
||||
(show_query = query_ptr->as<ASTShowCreateViewQuery>()))
|
||||
(show_query = query_ptr->as<ASTShowCreateViewQuery>()) ||
|
||||
(show_query = query_ptr->as<ASTShowCreateDictionaryQuery>()))
|
||||
{
|
||||
auto resolve_table_type = show_query->temporary ? Context::ResolveExternal : Context::ResolveOrdinary;
|
||||
auto table_id = getContext()->resolveStorageID(*show_query, resolve_table_type);
|
||||
getContext()->checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
|
||||
bool is_dictionary = static_cast<bool>(query_ptr->as<ASTShowCreateDictionaryQuery>());
|
||||
|
||||
if (is_dictionary)
|
||||
getContext()->checkAccess(AccessType::SHOW_DICTIONARIES, table_id);
|
||||
else
|
||||
getContext()->checkAccess(AccessType::SHOW_COLUMNS, table_id);
|
||||
|
||||
create_query = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getCreateTableQuery(table_id.table_name, getContext());
|
||||
|
||||
auto & ast_create_query = create_query->as<ASTCreateQuery &>();
|
||||
if (query_ptr->as<ASTShowCreateViewQuery>())
|
||||
{
|
||||
auto & ast_create_query = create_query->as<ASTCreateQuery &>();
|
||||
if (!ast_create_query.isView())
|
||||
throw Exception(backQuote(ast_create_query.database) + "." + backQuote(ast_create_query.table) + " is not a VIEW", ErrorCodes::BAD_ARGUMENTS);
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}.{} is not a VIEW",
|
||||
backQuote(ast_create_query.database), backQuote(ast_create_query.table));
|
||||
}
|
||||
else if (is_dictionary)
|
||||
{
|
||||
if (!ast_create_query.is_dictionary)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}.{} is not a DICTIONARY",
|
||||
backQuote(ast_create_query.database), backQuote(ast_create_query.table));
|
||||
}
|
||||
}
|
||||
else if ((show_query = query_ptr->as<ASTShowCreateDatabaseQuery>()))
|
||||
@ -66,14 +82,6 @@ BlockInputStreamPtr InterpreterShowCreateQuery::executeImpl()
|
||||
getContext()->checkAccess(AccessType::SHOW_DATABASES, show_query->database);
|
||||
create_query = DatabaseCatalog::instance().getDatabase(show_query->database)->getCreateDatabaseQuery();
|
||||
}
|
||||
else if ((show_query = query_ptr->as<ASTShowCreateDictionaryQuery>()))
|
||||
{
|
||||
if (show_query->temporary)
|
||||
throw Exception("Temporary dictionaries are not possible.", ErrorCodes::SYNTAX_ERROR);
|
||||
show_query->database = getContext()->resolveDatabase(show_query->database);
|
||||
getContext()->checkAccess(AccessType::SHOW_DICTIONARIES, show_query->database, show_query->table);
|
||||
create_query = DatabaseCatalog::instance().getDatabase(show_query->database)->getCreateDictionaryQuery(show_query->table);
|
||||
}
|
||||
|
||||
if (!create_query)
|
||||
throw Exception("Unable to show the create query of " + show_query->table + ". Maybe it was created by the system.", ErrorCodes::THERE_IS_NO_QUERY);
|
||||
|
@ -54,7 +54,7 @@ SRCS(
|
||||
ExpressionAnalyzer.cpp
|
||||
ExternalDictionariesLoader.cpp
|
||||
ExternalLoader.cpp
|
||||
ExternalLoaderDatabaseConfigRepository.cpp
|
||||
ExternalLoaderDictionaryStorageConfigRepository.cpp
|
||||
ExternalLoaderTempConfigRepository.cpp
|
||||
ExternalLoaderXMLConfigRepository.cpp
|
||||
ExternalModelsLoader.cpp
|
||||
|
@ -42,6 +42,7 @@ bool ParserRenameQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
ParserKeyword s_rename_table("RENAME TABLE");
|
||||
ParserKeyword s_exchange_tables("EXCHANGE TABLES");
|
||||
ParserKeyword s_rename_dictionary("RENAME DICTIONARY");
|
||||
ParserKeyword s_exchange_dictionaries("EXCHANGE DICTIONARIES");
|
||||
ParserKeyword s_rename_database("RENAME DATABASE");
|
||||
ParserKeyword s_to("TO");
|
||||
ParserKeyword s_and("AND");
|
||||
@ -56,6 +57,11 @@ bool ParserRenameQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|
||||
exchange = true;
|
||||
else if (s_rename_dictionary.ignore(pos, expected))
|
||||
dictionary = true;
|
||||
else if (s_exchange_dictionaries.ignore(pos, expected))
|
||||
{
|
||||
exchange = true;
|
||||
dictionary = true;
|
||||
}
|
||||
else if (s_rename_database.ignore(pos, expected))
|
||||
{
|
||||
ASTPtr from_db;
|
||||
|
@ -108,6 +108,9 @@ public:
|
||||
/// Returns true if the storage is a view of a table or another view.
|
||||
virtual bool isView() const { return false; }
|
||||
|
||||
/// Returns true if the storage is dictionary
|
||||
virtual bool isDictionary() const { return false; }
|
||||
|
||||
/// Returns true if the storage supports queries with the SAMPLE section.
|
||||
virtual bool supportsSampling() const { return getInMemoryMetadataPtr()->hasSamplingKey(); }
|
||||
|
||||
|
@ -5,11 +5,13 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Interpreters/ExternalLoaderDictionaryStorageConfigRepository.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -19,6 +21,7 @@ namespace ErrorCodes
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
extern const int CANNOT_DETACH_DICTIONARY_AS_TABLE;
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -93,8 +96,10 @@ StorageDictionary::StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
const String & dictionary_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
Location location_)
|
||||
Location location_,
|
||||
ContextPtr context_)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, dictionary_name(dictionary_name_)
|
||||
, location(location_)
|
||||
{
|
||||
@ -105,18 +110,52 @@ StorageDictionary::StorageDictionary(
|
||||
|
||||
|
||||
StorageDictionary::StorageDictionary(
|
||||
const StorageID & table_id_, const String & dictionary_name_, const DictionaryStructure & dictionary_structure_, Location location_)
|
||||
: StorageDictionary(table_id_, dictionary_name_, ColumnsDescription{getNamesAndTypes(dictionary_structure_)}, location_)
|
||||
const StorageID & table_id_,
|
||||
const String & dictionary_name_,
|
||||
const DictionaryStructure & dictionary_structure_,
|
||||
Location location_,
|
||||
ContextPtr context_)
|
||||
: StorageDictionary(
|
||||
table_id_,
|
||||
dictionary_name_,
|
||||
ColumnsDescription{getNamesAndTypes(dictionary_structure_)},
|
||||
location_,
|
||||
context_)
|
||||
{
|
||||
}
|
||||
|
||||
StorageDictionary::StorageDictionary(
|
||||
const StorageID & table_id,
|
||||
LoadablesConfigurationPtr dictionary_configuration,
|
||||
ContextPtr context_)
|
||||
: StorageDictionary(
|
||||
table_id,
|
||||
table_id.getFullNameNotQuoted(),
|
||||
context_->getExternalDictionariesLoader().getDictionaryStructure(*dictionary_configuration),
|
||||
Location::SameDatabaseAndNameAsDictionary,
|
||||
context_)
|
||||
{
|
||||
configuration = dictionary_configuration;
|
||||
|
||||
auto repository = std::make_unique<ExternalLoaderDictionaryStorageConfigRepository>(*this);
|
||||
remove_repository_callback = context_->getExternalDictionariesLoader().addConfigRepository(std::move(repository));
|
||||
}
|
||||
|
||||
StorageDictionary::~StorageDictionary()
|
||||
{
|
||||
removeDictionaryConfigurationFromRepository();
|
||||
}
|
||||
|
||||
void StorageDictionary::checkTableCanBeDropped() const
|
||||
{
|
||||
if (location == Location::SameDatabaseAndNameAsDictionary)
|
||||
throw Exception("Cannot drop/detach dictionary " + backQuote(dictionary_name) + " as table, use DROP DICTIONARY or DETACH DICTIONARY query instead", ErrorCodes::CANNOT_DETACH_DICTIONARY_AS_TABLE);
|
||||
throw Exception(ErrorCodes::CANNOT_DETACH_DICTIONARY_AS_TABLE,
|
||||
"Cannot drop/detach dictionary {} as table, use DROP DICTIONARY or DETACH DICTIONARY query instead",
|
||||
dictionary_name);
|
||||
if (location == Location::DictionaryDatabase)
|
||||
throw Exception("Cannot drop/detach table " + getStorageID().getFullTableName() + " from a database with DICTIONARY engine", ErrorCodes::CANNOT_DETACH_DICTIONARY_AS_TABLE);
|
||||
throw Exception(ErrorCodes::CANNOT_DETACH_DICTIONARY_AS_TABLE,
|
||||
"Cannot drop/detach table from a database with DICTIONARY engine, use DROP DICTIONARY or DETACH DICTIONARY query instead",
|
||||
dictionary_name);
|
||||
}
|
||||
|
||||
void StorageDictionary::checkTableCanBeDetached() const
|
||||
@ -128,37 +167,130 @@ Pipe StorageDictionary::read(
|
||||
const Names & column_names,
|
||||
const StorageMetadataPtr & /*metadata_snapshot*/,
|
||||
SelectQueryInfo & /*query_info*/,
|
||||
ContextPtr context,
|
||||
ContextPtr local_context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*threads*/)
|
||||
{
|
||||
auto dictionary = context->getExternalDictionariesLoader().getDictionary(dictionary_name, context);
|
||||
auto dictionary = getContext()->getExternalDictionariesLoader().getDictionary(dictionary_name, local_context);
|
||||
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
|
||||
/// TODO: update dictionary interface for processors.
|
||||
return Pipe(std::make_shared<SourceFromInputStream>(stream));
|
||||
}
|
||||
|
||||
void StorageDictionary::shutdown()
|
||||
{
|
||||
removeDictionaryConfigurationFromRepository();
|
||||
}
|
||||
|
||||
void StorageDictionary::startup()
|
||||
{
|
||||
auto global_context = getContext();
|
||||
|
||||
bool lazy_load = global_context->getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
if (!lazy_load)
|
||||
{
|
||||
auto & external_dictionaries_loader = global_context->getExternalDictionariesLoader();
|
||||
|
||||
/// reloadConfig() is called here to force loading the dictionary.
|
||||
external_dictionaries_loader.reloadConfig(getStorageID().getInternalDictionaryName());
|
||||
}
|
||||
}
|
||||
|
||||
void StorageDictionary::removeDictionaryConfigurationFromRepository()
|
||||
{
|
||||
if (remove_repository_callback_executed)
|
||||
return;
|
||||
|
||||
remove_repository_callback_executed = true;
|
||||
remove_repository_callback.reset();
|
||||
}
|
||||
|
||||
Poco::Timestamp StorageDictionary::getUpdateTime() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(dictionary_config_mutex);
|
||||
return update_time;
|
||||
}
|
||||
|
||||
LoadablesConfigurationPtr StorageDictionary::getConfiguration() const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(dictionary_config_mutex);
|
||||
return configuration;
|
||||
}
|
||||
|
||||
void StorageDictionary::renameInMemory(const StorageID & new_table_id)
|
||||
{
|
||||
if (configuration)
|
||||
{
|
||||
configuration->setString("dictionary.database", new_table_id.database_name);
|
||||
configuration->setString("dictionary.name", new_table_id.table_name);
|
||||
|
||||
auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
|
||||
external_dictionaries_loader.reloadConfig(getStorageID().getInternalDictionaryName());
|
||||
|
||||
auto result = external_dictionaries_loader.getLoadResult(getStorageID().getInternalDictionaryName());
|
||||
if (!result.object)
|
||||
return;
|
||||
|
||||
const auto dictionary = std::static_pointer_cast<const IDictionary>(result.object);
|
||||
dictionary->updateDictionaryName(new_table_id);
|
||||
}
|
||||
|
||||
IStorage::renameInMemory(new_table_id);
|
||||
}
|
||||
|
||||
void registerStorageDictionary(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("Dictionary", [](const StorageFactory::Arguments & args)
|
||||
{
|
||||
if (args.engine_args.size() != 1)
|
||||
throw Exception("Storage Dictionary requires single parameter: name of dictionary",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
auto query = args.query;
|
||||
|
||||
args.engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[0], args.getLocalContext());
|
||||
String dictionary_name = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
auto local_context = args.getLocalContext();
|
||||
|
||||
if (!args.attach)
|
||||
if (query.is_dictionary)
|
||||
{
|
||||
const auto & dictionary = args.getContext()->getExternalDictionariesLoader().getDictionary(dictionary_name, args.getContext());
|
||||
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
|
||||
checkNamesAndTypesCompatibleWithDictionary(dictionary_name, args.columns, dictionary_structure);
|
||||
}
|
||||
auto dictionary_id = args.table_id;
|
||||
auto & external_dictionaries_loader = local_context->getExternalDictionariesLoader();
|
||||
|
||||
return StorageDictionary::create(args.table_id, dictionary_name, args.columns, StorageDictionary::Location::Custom);
|
||||
/// A dictionary with the same full name could be defined in *.xml config files.
|
||||
if (external_dictionaries_loader.getCurrentStatus(dictionary_id.getFullNameNotQuoted()) != ExternalLoader::Status::NOT_EXIST)
|
||||
throw Exception(ErrorCodes::DICTIONARY_ALREADY_EXISTS,
|
||||
"Dictionary {} already exists.", dictionary_id.getFullNameNotQuoted());
|
||||
|
||||
/// Create dictionary storage that owns underlying dictionary
|
||||
auto abstract_dictionary_configuration = getDictionaryConfigurationFromAST(args.query, local_context, dictionary_id.database_name);
|
||||
auto result_storage = StorageDictionary::create(dictionary_id, abstract_dictionary_configuration, local_context);
|
||||
|
||||
bool lazy_load = local_context->getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
if (!args.attach && !lazy_load)
|
||||
{
|
||||
/// load() is called here to force loading the dictionary, wait until the loading is finished,
|
||||
/// and throw an exception if the loading is failed.
|
||||
external_dictionaries_loader.load(dictionary_id.getInternalDictionaryName());
|
||||
}
|
||||
|
||||
return result_storage;
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Create dictionary storage that is view of underlying dictionary
|
||||
|
||||
if (args.engine_args.size() != 1)
|
||||
throw Exception("Storage Dictionary requires single parameter: name of dictionary",
|
||||
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
args.engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[0], local_context);
|
||||
String dictionary_name = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
if (!args.attach)
|
||||
{
|
||||
const auto & dictionary = args.getContext()->getExternalDictionariesLoader().getDictionary(dictionary_name, args.getContext());
|
||||
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
|
||||
checkNamesAndTypesCompatibleWithDictionary(dictionary_name, args.columns, dictionary_structure);
|
||||
}
|
||||
|
||||
return StorageDictionary::create(args.table_id, dictionary_name, args.columns, StorageDictionary::Location::Custom, local_context);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1,21 +1,26 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <atomic>
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Interpreters/IExternalLoaderConfigRepository.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct DictionaryStructure;
|
||||
class TableFunctionDictionary;
|
||||
|
||||
class StorageDictionary final : public ext::shared_ptr_helper<StorageDictionary>, public IStorage
|
||||
class StorageDictionary final : public ext::shared_ptr_helper<StorageDictionary>, public IStorage, public WithContext
|
||||
{
|
||||
friend struct ext::shared_ptr_helper<StorageDictionary>;
|
||||
friend class TableFunctionDictionary;
|
||||
public:
|
||||
std::string getName() const override { return "Dictionary"; }
|
||||
|
||||
~StorageDictionary() override;
|
||||
|
||||
void checkTableCanBeDropped() const override;
|
||||
void checkTableCanBeDetached() const override;
|
||||
|
||||
@ -31,7 +36,16 @@ public:
|
||||
static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure);
|
||||
static String generateNamesAndTypesDescription(const NamesAndTypesList & list);
|
||||
|
||||
const String & dictionaryName() const { return dictionary_name; }
|
||||
bool isDictionary() const override { return true; }
|
||||
void shutdown() override;
|
||||
void startup() override;
|
||||
|
||||
void renameInMemory(const StorageID & new_table_id) override;
|
||||
|
||||
Poco::Timestamp getUpdateTime() const;
|
||||
LoadablesConfigurationPtr getConfiguration() const;
|
||||
|
||||
const String & getDictionaryName() const { return dictionary_name; }
|
||||
|
||||
/// Specifies where the table is located relative to the dictionary.
|
||||
enum class Location
|
||||
@ -55,18 +69,33 @@ private:
|
||||
const String dictionary_name;
|
||||
const Location location;
|
||||
|
||||
protected:
|
||||
mutable std::mutex dictionary_config_mutex;
|
||||
Poco::Timestamp update_time;
|
||||
LoadablesConfigurationPtr configuration;
|
||||
|
||||
std::atomic<bool> remove_repository_callback_executed = false;
|
||||
ext::scope_guard remove_repository_callback;
|
||||
|
||||
void removeDictionaryConfigurationFromRepository();
|
||||
|
||||
StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
const String & dictionary_name_,
|
||||
const ColumnsDescription & columns_,
|
||||
Location location_);
|
||||
Location location_,
|
||||
ContextPtr context_);
|
||||
|
||||
StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
const String & dictionary_name_,
|
||||
const DictionaryStructure & dictionary_structure,
|
||||
Location location_);
|
||||
Location location_,
|
||||
ContextPtr context_);
|
||||
|
||||
StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
LoadablesConfigurationPtr dictionary_configuration_,
|
||||
ContextPtr context_);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -79,12 +79,18 @@ StoragePtr StorageFactory::get(
|
||||
}
|
||||
else if (query.is_live_view)
|
||||
{
|
||||
|
||||
if (query.storage)
|
||||
throw Exception("Specifying ENGINE is not allowed for a LiveView", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
name = "LiveView";
|
||||
}
|
||||
else if (query.is_dictionary)
|
||||
{
|
||||
if (query.storage)
|
||||
throw Exception("Specifying ENGINE is not allowed for a Dictionary", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
name = "Dictionary";
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Check for some special types, that are not allowed to be stored in tables. Example: NULL data type.
|
||||
@ -199,7 +205,7 @@ StoragePtr StorageFactory::get(
|
||||
assert(arguments.getContext() == arguments.getContext()->getGlobalContext());
|
||||
|
||||
auto res = storages.at(name).creator_fn(arguments);
|
||||
if (!empty_engine_args.empty())
|
||||
if (!empty_engine_args.empty()) //-V547
|
||||
{
|
||||
/// Storage creator modified empty arguments list, so we should modify the query
|
||||
assert(storage_def && storage_def->engine && !storage_def->engine->arguments);
|
||||
|
@ -60,9 +60,13 @@ NamesAndTypesList StorageSystemDictionaries::getVirtuals() const
|
||||
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & /*query_info*/) const
|
||||
{
|
||||
const auto access = context->getAccess();
|
||||
const bool check_access_for_dictionaries = !access->isGranted(AccessType::SHOW_DICTIONARIES);
|
||||
const bool check_access_for_dictionaries = access->isGranted(AccessType::SHOW_DICTIONARIES);
|
||||
|
||||
const auto & external_dictionaries = context->getExternalDictionariesLoader();
|
||||
|
||||
if (!check_access_for_dictionaries)
|
||||
return;
|
||||
|
||||
for (const auto & load_result : external_dictionaries.getLoadResults())
|
||||
{
|
||||
const auto dict_ptr = std::dynamic_pointer_cast<const IDictionary>(load_result.object);
|
||||
@ -77,8 +81,7 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, ContextPt
|
||||
dict_id.table_name = load_result.name;
|
||||
|
||||
String db_or_tag = dict_id.database_name.empty() ? IDictionary::NO_DATABASE_TAG : dict_id.database_name;
|
||||
if (check_access_for_dictionaries
|
||||
&& !access->isGranted(AccessType::SHOW_DICTIONARIES, db_or_tag, dict_id.table_name))
|
||||
if (!access->isGranted(AccessType::SHOW_DICTIONARIES, db_or_tag, dict_id.table_name))
|
||||
continue;
|
||||
|
||||
size_t i = 0;
|
||||
|
@ -52,7 +52,15 @@ StoragePtr TableFunctionDictionary::executeImpl(
|
||||
{
|
||||
StorageID dict_id(getDatabaseName(), table_name);
|
||||
auto dictionary_table_structure = getActualTableStructure(context);
|
||||
return StorageDictionary::create(dict_id, dictionary_name, std::move(dictionary_table_structure), StorageDictionary::Location::Custom);
|
||||
|
||||
auto result = StorageDictionary::create(
|
||||
dict_id,
|
||||
dictionary_name,
|
||||
std::move(dictionary_table_structure),
|
||||
StorageDictionary::Location::Custom,
|
||||
context);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void registerTableFunctionDictionary(TableFunctionFactory & factory)
|
||||
|
@ -288,6 +288,23 @@ def test_clickhouse_remote(started_cluster):
|
||||
time.sleep(0.5)
|
||||
|
||||
node3.query("detach dictionary if exists test.clickhouse_remote")
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node3.query("""
|
||||
CREATE DICTIONARY test.clickhouse_remote(
|
||||
id UInt64,
|
||||
SomeValue1 UInt8,
|
||||
SomeValue2 String
|
||||
)
|
||||
PRIMARY KEY id
|
||||
LAYOUT(FLAT())
|
||||
SOURCE(CLICKHOUSE(HOST 'node4' PORT 9000 USER 'default' PASSWORD 'default' TABLE 'xml_dictionary_table' DB 'test'))
|
||||
LIFETIME(MIN 1 MAX 10)
|
||||
""")
|
||||
|
||||
node3.query("attach dictionary test.clickhouse_remote")
|
||||
node3.query("drop dictionary test.clickhouse_remote")
|
||||
|
||||
node3.query("""
|
||||
CREATE DICTIONARY test.clickhouse_remote(
|
||||
id UInt64,
|
||||
|
@ -12,7 +12,10 @@ db_01018 dict1
|
||||
==DROP DICTIONARY
|
||||
0
|
||||
=DICTIONARY in Memory DB
|
||||
0
|
||||
CREATE DICTIONARY memory_db.dict2\n(\n `key_column` UInt64 DEFAULT 0 INJECTIVE,\n `second_column` UInt8 DEFAULT 1 EXPRESSION rand() % 222,\n `third_column` String DEFAULT \'qqq\'\n)\nPRIMARY KEY key_column\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT tcpPort() USER \'default\' TABLE \'table_for_dict\' PASSWORD \'\' DB \'database_for_dict_01018\'))\nLIFETIME(MIN 1 MAX 10)\nLAYOUT(FLAT())
|
||||
dict2
|
||||
1
|
||||
memory_db dict2
|
||||
=DICTIONARY in Lazy DB
|
||||
=DROP DATABASE WITH DICTIONARY
|
||||
dict4
|
||||
|
@ -89,9 +89,9 @@ CREATE DICTIONARY memory_db.dict2
|
||||
PRIMARY KEY key_column
|
||||
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict_01018'))
|
||||
LIFETIME(MIN 1 MAX 10)
|
||||
LAYOUT(FLAT()); -- {serverError 48}
|
||||
LAYOUT(FLAT());
|
||||
|
||||
SHOW CREATE DICTIONARY memory_db.dict2; -- {serverError 487}
|
||||
SHOW CREATE DICTIONARY memory_db.dict2;
|
||||
|
||||
SHOW DICTIONARIES FROM memory_db LIKE 'dict2';
|
||||
|
||||
@ -114,7 +114,7 @@ CREATE DICTIONARY lazy_db.dict3
|
||||
PRIMARY KEY key_column, second_column
|
||||
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict_01018'))
|
||||
LIFETIME(MIN 1 MAX 10)
|
||||
LAYOUT(COMPLEX_KEY_HASHED()); -- {serverError 48}
|
||||
LAYOUT(COMPLEX_KEY_HASHED()); --{serverError 1}
|
||||
|
||||
DROP DATABASE IF EXISTS lazy_db;
|
||||
|
||||
|
@ -18,7 +18,7 @@ SHOW CREATE DICTIONARY test_01190.dict;
|
||||
CREATE TABLE log ENGINE = Log AS SELECT 'test' AS s;
|
||||
SHOW CREATE log;
|
||||
DETACH TABLE log;
|
||||
ATTACH DICTIONARY log; -- { serverError 487 }
|
||||
ATTACH DICTIONARY log; -- { serverError 80 }
|
||||
ATTACH TABLE log (s String) ENGINE = Log();
|
||||
SHOW CREATE log;
|
||||
SELECT * FROM log;
|
||||
|
@ -13,11 +13,15 @@ INSERT INTO test_01191._ VALUES (42, 'test');
|
||||
SELECT name, status FROM system.dictionaries WHERE database='test_01191';
|
||||
SELECT name, engine FROM system.tables WHERE database='test_01191' ORDER BY name;
|
||||
|
||||
RENAME DICTIONARY test_01191.table TO test_01191.table1; -- {serverError 80}
|
||||
EXCHANGE TABLES test_01191.table AND test_01191.dict; -- {serverError 48}
|
||||
RENAME DICTIONARY test_01191.table TO test_01191.table1; -- {serverError 60}
|
||||
EXCHANGE TABLES test_01191.table AND test_01191.dict; -- {serverError 60}
|
||||
EXCHANGE TABLES test_01191.dict AND test_01191.table; -- {serverError 80}
|
||||
RENAME TABLE test_01191.dict TO test_01191.dict1; -- {serverError 80}
|
||||
RENAME DICTIONARY test_01191.dict TO default.dict1; -- {serverError 48}
|
||||
|
||||
CREATE DATABASE dummy_db ENGINE=Atomic;
|
||||
RENAME DICTIONARY test_01191.dict TO dummy_db.dict1;
|
||||
RENAME DICTIONARY dummy_db.dict1 TO test_01191.dict;
|
||||
DROP DATABASE dummy_db;
|
||||
|
||||
RENAME DICTIONARY test_01191.dict TO test_01191.dict1;
|
||||
|
||||
|
@ -0,0 +1,3 @@
|
||||
1 First
|
||||
2 Second
|
||||
3 Third
|
@ -0,0 +1,30 @@
|
||||
DROP DATABASE IF EXISTS 01837_db;
|
||||
CREATE DATABASE 01837_db ENGINE = Memory;
|
||||
|
||||
DROP TABLE IF EXISTS 01837_db.simple_key_dictionary_source;
|
||||
CREATE TABLE 01837_db.simple_key_dictionary_source
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
) ENGINE = TinyLog;
|
||||
|
||||
INSERT INTO 01837_db.simple_key_dictionary_source VALUES (1, 'First');
|
||||
INSERT INTO 01837_db.simple_key_dictionary_source VALUES (2, 'Second');
|
||||
INSERT INTO 01837_db.simple_key_dictionary_source VALUES (3, 'Third');
|
||||
|
||||
DROP DICTIONARY IF EXISTS 01837_db.simple_key_direct_dictionary;
|
||||
CREATE DICTIONARY 01837_db.simple_key_direct_dictionary
|
||||
(
|
||||
id UInt64,
|
||||
value String
|
||||
)
|
||||
PRIMARY KEY id
|
||||
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() DB '01837_db' TABLE 'simple_key_dictionary_source'))
|
||||
LAYOUT(DIRECT());
|
||||
|
||||
SELECT * FROM 01837_db.simple_key_direct_dictionary;
|
||||
|
||||
DROP DICTIONARY 01837_db.simple_key_direct_dictionary;
|
||||
DROP TABLE 01837_db.simple_key_dictionary_source;
|
||||
|
||||
DROP DATABASE 01837_db;
|
Loading…
Reference in New Issue
Block a user