mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Show information about not loaded dictionaries in system.columns, system.tables
and in "SHOW TABLES" executed in any database with the "Dictionary" engine.
This commit is contained in:
parent
a2b77faee3
commit
d62345bbaa
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit ddca76ba4956cb57150082394536cc43ff28f6fa
|
||||
Subproject commit 7d605a1ae5d878294f91f68feb62ae51e9a04426
|
@ -491,6 +491,7 @@ namespace ErrorCodes
|
||||
extern const int CANNOT_ASSIGN_ALTER = 517;
|
||||
extern const int CANNOT_COMMIT_OFFSET = 518;
|
||||
extern const int NO_REMOTE_SHARD_AVAILABLE = 519;
|
||||
extern const int CANNOT_DETACH_DICTIONARY_AS_TABLE = 520;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#include <Databases/DatabaseDictionary.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
@ -15,6 +16,18 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SYNTAX_ERROR;
|
||||
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
StoragePtr createStorageDictionary(const String & database_name, const ExternalLoader::LoadResult & load_result)
|
||||
{
|
||||
if (!load_result.config)
|
||||
return nullptr;
|
||||
DictionaryStructure dictionary_structure = ExternalDictionariesLoader::getDictionaryStructure(*load_result.config);
|
||||
return StorageDictionary::create(StorageID(database_name, load_result.name), load_result.name, dictionary_structure);
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseDictionary::DatabaseDictionary(const String & name_)
|
||||
@ -26,29 +39,12 @@ DatabaseDictionary::DatabaseDictionary(const String & name_)
|
||||
Tables DatabaseDictionary::listTables(const Context & context, const FilterByNameFunction & filter_by_name)
|
||||
{
|
||||
Tables tables;
|
||||
ExternalLoader::LoadResults load_results;
|
||||
if (filter_by_name)
|
||||
auto load_results = context.getExternalDictionariesLoader().getLoadResults(filter_by_name);
|
||||
for (auto & load_result : load_results)
|
||||
{
|
||||
/// If `filter_by_name` is set, we iterate through all dictionaries with such names. That's why we need to load all of them.
|
||||
load_results = context.getExternalDictionariesLoader().tryLoad<ExternalLoader::LoadResults>(filter_by_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// If `filter_by_name` isn't set, we iterate through only already loaded dictionaries. We don't try to load all dictionaries in this case.
|
||||
load_results = context.getExternalDictionariesLoader().getCurrentLoadResults();
|
||||
}
|
||||
|
||||
for (const auto & load_result: load_results)
|
||||
{
|
||||
/// Load tables only from XML dictionaries, don't touch other
|
||||
if (load_result.object && load_result.repository_name.empty())
|
||||
{
|
||||
auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(load_result.object);
|
||||
auto dict_name = dict_ptr->getName();
|
||||
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
|
||||
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
|
||||
tables[dict_name] = StorageDictionary::create(StorageID(getDatabaseName(), dict_name), ColumnsDescription{columns}, context, true, dict_name);
|
||||
}
|
||||
auto storage = createStorageDictionary(getDatabaseName(), load_result);
|
||||
if (storage)
|
||||
tables.emplace(storage->getStorageID().table_name, storage);
|
||||
}
|
||||
return tables;
|
||||
}
|
||||
@ -64,15 +60,8 @@ StoragePtr DatabaseDictionary::tryGetTable(
|
||||
const Context & context,
|
||||
const String & table_name) const
|
||||
{
|
||||
auto dict_ptr = context.getExternalDictionariesLoader().tryGetDictionary(table_name, true /*load*/);
|
||||
if (dict_ptr)
|
||||
{
|
||||
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
|
||||
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
|
||||
return StorageDictionary::create(StorageID(getDatabaseName(), table_name), ColumnsDescription{columns}, context, true, table_name);
|
||||
}
|
||||
|
||||
return {};
|
||||
auto load_result = context.getExternalDictionariesLoader().getLoadResult(table_name);
|
||||
return createStorageDictionary(getDatabaseName(), load_result);
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
|
||||
@ -82,7 +71,7 @@ DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(const Context &
|
||||
|
||||
bool DatabaseDictionary::empty(const Context & context) const
|
||||
{
|
||||
return !context.getExternalDictionariesLoader().hasCurrentlyLoadedObjects();
|
||||
return !context.getExternalDictionariesLoader().hasObjects();
|
||||
}
|
||||
|
||||
ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const Context & context,
|
||||
@ -92,15 +81,17 @@ ASTPtr DatabaseDictionary::getCreateTableQueryImpl(const Context & context,
|
||||
{
|
||||
WriteBufferFromString buffer(query);
|
||||
|
||||
const auto & dictionaries = context.getExternalDictionariesLoader();
|
||||
auto dictionary = throw_on_error ? dictionaries.getDictionary(table_name)
|
||||
: dictionaries.tryGetDictionary(table_name, true /*load*/);
|
||||
if (!dictionary)
|
||||
auto load_result = context.getExternalDictionariesLoader().getLoadResult(table_name);
|
||||
if (!load_result.config)
|
||||
{
|
||||
if (throw_on_error)
|
||||
throw Exception{"Dictionary " + backQuote(table_name) + " doesn't exist", ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY};
|
||||
return {};
|
||||
}
|
||||
|
||||
auto names_and_types = StorageDictionary::getNamesAndTypes(dictionary->getStructure());
|
||||
auto names_and_types = StorageDictionary::getNamesAndTypes(ExternalDictionariesLoader::getDictionaryStructure(*load_result.config));
|
||||
buffer << "CREATE TABLE " << backQuoteIfNeed(database_name) << '.' << backQuoteIfNeed(table_name) << " (";
|
||||
buffer << StorageDictionary::generateNamesAndTypesDescription(names_and_types.begin(), names_and_types.end());
|
||||
buffer << StorageDictionary::generateNamesAndTypesDescription(names_and_types);
|
||||
buffer << ") Engine = Dictionary(" << backQuoteIfNeed(table_name) << ")";
|
||||
}
|
||||
|
||||
|
@ -43,6 +43,8 @@ public:
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
bool shouldBeEmptyOnDetach() const override { return false; }
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
protected:
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
|
||||
#include <Parsers/queryToString.h>
|
||||
@ -74,18 +75,24 @@ namespace
|
||||
|
||||
|
||||
void tryAttachDictionary(
|
||||
Context & context,
|
||||
const ASTCreateQuery & query,
|
||||
DatabaseOrdinary & database)
|
||||
const ASTPtr & query,
|
||||
DatabaseOrdinary & database,
|
||||
const String & metadata_path)
|
||||
{
|
||||
assert(query.is_dictionary);
|
||||
auto & create_query = query->as<ASTCreateQuery &>();
|
||||
assert(create_query.is_dictionary);
|
||||
try
|
||||
{
|
||||
database.attachDictionary(query.table, context);
|
||||
Poco::File meta_file(metadata_path);
|
||||
auto config = getDictionaryConfigurationFromAST(create_query);
|
||||
time_t modification_time = meta_file.getLastModified().epochTime();
|
||||
database.attachDictionary(create_query.table, DictionaryAttachInfo{query, config, modification_time});
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
e.addMessage("Cannot attach table '" + backQuote(query.table) + "' from query " + serializeAST(query));
|
||||
e.addMessage("Cannot attach dictionary " + backQuote(database.getDatabaseName()) + "." + backQuote(create_query.table) +
|
||||
" from metadata file " + metadata_path +
|
||||
" from query " + serializeAST(*query));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
@ -173,12 +180,12 @@ void DatabaseOrdinary::loadStoredObjects(
|
||||
|
||||
/// Attach dictionaries.
|
||||
attachToExternalDictionariesLoader(context);
|
||||
for (const auto & name_with_query : file_names)
|
||||
for (const auto & [name, query] : file_names)
|
||||
{
|
||||
auto create_query = name_with_query.second->as<const ASTCreateQuery &>();
|
||||
auto create_query = query->as<const ASTCreateQuery &>();
|
||||
if (create_query.is_dictionary)
|
||||
{
|
||||
tryAttachDictionary(context, create_query, *this);
|
||||
tryAttachDictionary(query, *this, getMetadataPath() + name);
|
||||
|
||||
/// Messages, so that it's not boring to wait for the server to load for a long time.
|
||||
logAboutProgress(log, ++dictionaries_processed, total_dictionaries, watch);
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include <Interpreters/ExternalLoaderTempConfigRepository.h>
|
||||
#include <Interpreters/ExternalLoaderDatabaseConfigRepository.h>
|
||||
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
@ -24,46 +26,80 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY;
|
||||
extern const int TABLE_ALREADY_EXISTS;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int UNKNOWN_DICTIONARY;
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
}
|
||||
|
||||
|
||||
void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name, const Context & context)
|
||||
void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name, const DictionaryAttachInfo & attach_info)
|
||||
{
|
||||
String full_name = getDatabaseName() + "." + dictionary_name;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (!dictionaries.emplace(dictionary_name).second)
|
||||
auto [it, inserted] = dictionaries.emplace(dictionary_name, attach_info);
|
||||
if (!inserted)
|
||||
throw Exception("Dictionary " + full_name + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
||||
/// Attach the dictionary as table too.
|
||||
try
|
||||
{
|
||||
attachTableUnlocked(
|
||||
dictionary_name,
|
||||
StorageDictionary::create(
|
||||
StorageID(getDatabaseName(), dictionary_name),
|
||||
full_name,
|
||||
ExternalDictionariesLoader::getDictionaryStructure(*attach_info.config)));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
dictionaries.erase(it);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
CurrentStatusInfo::set(CurrentStatusInfo::DictionaryStatus, full_name, static_cast<Int8>(ExternalLoaderStatus::NOT_LOADED));
|
||||
|
||||
/// ExternalLoader::reloadConfig() will find out that the dictionary's config has been added
|
||||
/// and in case `dictionaries_lazy_load == false` it will load the dictionary.
|
||||
const auto & external_loader = context.getExternalDictionariesLoader();
|
||||
external_loader.reloadConfig(getDatabaseName(), full_name);
|
||||
external_loader->reloadConfig(getDatabaseName(), full_name);
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::detachDictionary(const String & dictionary_name, const Context & context)
|
||||
void DatabaseWithDictionaries::detachDictionary(const String & dictionary_name)
|
||||
{
|
||||
DictionaryAttachInfo attach_info;
|
||||
detachDictionaryImpl(dictionary_name, attach_info);
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::detachDictionaryImpl(const String & dictionary_name, DictionaryAttachInfo & attach_info)
|
||||
{
|
||||
String full_name = getDatabaseName() + "." + dictionary_name;
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = dictionaries.find(dictionary_name);
|
||||
if (it == dictionaries.end())
|
||||
throw Exception("Dictionary " + full_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
throw Exception("Dictionary " + full_name + " doesn't exist.", ErrorCodes::UNKNOWN_DICTIONARY);
|
||||
attach_info = std::move(it->second);
|
||||
dictionaries.erase(it);
|
||||
|
||||
/// Detach the dictionary as table too.
|
||||
try
|
||||
{
|
||||
detachTableUnlocked(dictionary_name);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
dictionaries.emplace(dictionary_name, std::move(attach_info));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus, full_name);
|
||||
|
||||
/// ExternalLoader::reloadConfig() will find out that the dictionary's config has been removed
|
||||
/// and therefore it will unload the dictionary.
|
||||
const auto & external_loader = context.getExternalDictionariesLoader();
|
||||
external_loader.reloadConfig(getDatabaseName(), full_name);
|
||||
|
||||
external_loader->reloadConfig(getDatabaseName(), full_name);
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::createDictionary(const Context & context, const String & dictionary_name, const ASTPtr & query)
|
||||
@ -85,8 +121,7 @@ void DatabaseWithDictionaries::createDictionary(const Context & context, const S
|
||||
|
||||
/// A dictionary with the same full name could be defined in *.xml config files.
|
||||
String full_name = getDatabaseName() + "." + dictionary_name;
|
||||
const auto & external_loader = context.getExternalDictionariesLoader();
|
||||
if (external_loader.getCurrentStatus(full_name) != ExternalLoader::Status::NOT_EXIST)
|
||||
if (external_loader->getCurrentStatus(full_name) != ExternalLoader::Status::NOT_EXIST)
|
||||
throw Exception(
|
||||
"Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.",
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
@ -117,23 +152,22 @@ void DatabaseWithDictionaries::createDictionary(const Context & context, const S
|
||||
|
||||
/// Add a temporary repository containing the dictionary.
|
||||
/// We need this temp repository to try loading the dictionary before actually attaching it to the database.
|
||||
auto temp_repository
|
||||
= const_cast<ExternalDictionariesLoader &>(external_loader) /// the change of ExternalDictionariesLoader is temporary
|
||||
.addConfigRepository(std::make_unique<ExternalLoaderTempConfigRepository>(
|
||||
getDatabaseName(), dictionary_metadata_tmp_path, getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>())));
|
||||
auto temp_repository = external_loader->addConfigRepository(std::make_unique<ExternalLoaderTempConfigRepository>(
|
||||
getDatabaseName(), dictionary_metadata_tmp_path, getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>())));
|
||||
|
||||
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
if (!lazy_load)
|
||||
{
|
||||
/// load() is called here to force loading the dictionary, wait until the loading is finished,
|
||||
/// and throw an exception if the loading is failed.
|
||||
external_loader.load(full_name);
|
||||
external_loader->load(full_name);
|
||||
}
|
||||
|
||||
attachDictionary(dictionary_name, context);
|
||||
auto config = getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>());
|
||||
attachDictionary(dictionary_name, DictionaryAttachInfo{query, config, time(nullptr)});
|
||||
SCOPE_EXIT({
|
||||
if (!succeeded)
|
||||
detachDictionary(dictionary_name, context);
|
||||
detachDictionary(dictionary_name);
|
||||
});
|
||||
|
||||
/// If it was ATTACH query and file with dictionary metadata already exist
|
||||
@ -142,94 +176,31 @@ void DatabaseWithDictionaries::createDictionary(const Context & context, const S
|
||||
|
||||
/// ExternalDictionariesLoader doesn't know we renamed the metadata path.
|
||||
/// So we have to manually call reloadConfig() here.
|
||||
external_loader.reloadConfig(getDatabaseName(), full_name);
|
||||
external_loader->reloadConfig(getDatabaseName(), full_name);
|
||||
|
||||
/// Everything's ok.
|
||||
succeeded = true;
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::removeDictionary(const Context & context, const String & dictionary_name)
|
||||
void DatabaseWithDictionaries::removeDictionary(const Context &, const String & dictionary_name)
|
||||
{
|
||||
detachDictionary(dictionary_name, context);
|
||||
|
||||
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
DictionaryAttachInfo attach_info;
|
||||
detachDictionaryImpl(dictionary_name, attach_info);
|
||||
|
||||
try
|
||||
{
|
||||
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
Poco::File(dictionary_metadata_path).remove();
|
||||
CurrentStatusInfo::unset(CurrentStatusInfo::DictionaryStatus, getDatabaseName() + "." + dictionary_name);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// If remove was not possible for some reason
|
||||
attachDictionary(dictionary_name, context);
|
||||
attachDictionary(dictionary_name, attach_info);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
StoragePtr DatabaseWithDictionaries::tryGetTableImpl(const Context & context, const String & table_name, bool load) const
|
||||
{
|
||||
if (auto table_ptr = DatabaseWithOwnTablesBase::tryGetTable(context, table_name))
|
||||
return table_ptr;
|
||||
|
||||
if (isDictionaryExist(context, table_name))
|
||||
/// We don't need lock database here, because database doesn't store dictionary itself
|
||||
/// just metadata
|
||||
return getDictionaryStorage(context, table_name, load);
|
||||
|
||||
return {};
|
||||
}
|
||||
StoragePtr DatabaseWithDictionaries::tryGetTable(const Context & context, const String & table_name) const
|
||||
{
|
||||
return tryGetTableImpl(context, table_name, true /*load*/);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseWithDictionaries::getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
bool has_table = tryGetTableImpl(context, table_name, false /*load*/) != nullptr;
|
||||
auto table_metadata_path = getObjectMetadataPath(table_name);
|
||||
try
|
||||
{
|
||||
ast = getCreateQueryFromMetadata(context, table_metadata_path, throw_on_error);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (!has_table && e.code() == ErrorCodes::FILE_DOESNT_EXIST && throw_on_error)
|
||||
throw Exception{"Table " + backQuote(table_name) + " doesn't exist",
|
||||
ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY};
|
||||
else if (throw_on_error)
|
||||
throw;
|
||||
}
|
||||
return ast;
|
||||
}
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseWithDictionaries::getTablesWithDictionaryTablesIterator(
|
||||
const Context & context, const FilterByNameFunction & filter_by_dictionary_name)
|
||||
{
|
||||
/// NOTE: it's not atomic
|
||||
auto tables_it = getTablesIterator(context, filter_by_dictionary_name);
|
||||
auto dictionaries_it = getDictionariesIterator(context, filter_by_dictionary_name);
|
||||
|
||||
Tables result;
|
||||
while (tables_it && tables_it->isValid())
|
||||
{
|
||||
result.emplace(tables_it->name(), tables_it->table());
|
||||
tables_it->next();
|
||||
}
|
||||
|
||||
while (dictionaries_it && dictionaries_it->isValid())
|
||||
{
|
||||
auto table_name = dictionaries_it->name();
|
||||
auto table_ptr = getDictionaryStorage(context, table_name, false /*load*/);
|
||||
if (table_ptr)
|
||||
result.emplace(table_name, table_ptr);
|
||||
dictionaries_it->next();
|
||||
}
|
||||
|
||||
return std::make_unique<DatabaseTablesSnapshotIterator>(result);
|
||||
}
|
||||
|
||||
DatabaseDictionariesIteratorPtr DatabaseWithDictionaries::getDictionariesIterator(const Context & /*context*/, const FilterByNameFunction & filter_by_dictionary_name)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
@ -237,9 +208,9 @@ DatabaseDictionariesIteratorPtr DatabaseWithDictionaries::getDictionariesIterato
|
||||
return std::make_unique<DatabaseDictionariesSnapshotIterator>(dictionaries);
|
||||
|
||||
Dictionaries filtered_dictionaries;
|
||||
for (const auto & dictionary_name : dictionaries)
|
||||
for (const auto & dictionary_name : dictionaries | boost::adaptors::map_keys)
|
||||
if (filter_by_dictionary_name(dictionary_name))
|
||||
filtered_dictionaries.emplace(dictionary_name);
|
||||
filtered_dictionaries.emplace_back(dictionary_name);
|
||||
return std::make_unique<DatabaseDictionariesSnapshotIterator>(std::move(filtered_dictionaries));
|
||||
}
|
||||
|
||||
@ -249,44 +220,84 @@ bool DatabaseWithDictionaries::isDictionaryExist(const Context & /*context*/, co
|
||||
return dictionaries.find(dictionary_name) != dictionaries.end();
|
||||
}
|
||||
|
||||
StoragePtr DatabaseWithDictionaries::getDictionaryStorage(const Context & context, const String & table_name, bool load) const
|
||||
{
|
||||
auto dict_name = database_name + "." + table_name;
|
||||
const auto & external_loader = context.getExternalDictionariesLoader();
|
||||
auto dict_ptr = external_loader.tryGetDictionary(dict_name, load);
|
||||
if (dict_ptr)
|
||||
{
|
||||
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
|
||||
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
|
||||
return StorageDictionary::create(StorageID(database_name, table_name), ColumnsDescription{columns}, context, true, dict_name);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseWithDictionaries::getCreateDictionaryQueryImpl(
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
|
||||
auto dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
ast = getCreateQueryFromMetadata(context, dictionary_metadata_path, throw_on_error);
|
||||
if (!ast && throw_on_error)
|
||||
{
|
||||
/// Handle system.* tables for which there are no table.sql files.
|
||||
bool has_dictionary = isDictionaryExist(context, dictionary_name);
|
||||
|
||||
auto msg = has_dictionary ? "There is no CREATE DICTIONARY query for table " : "There is no metadata file for dictionary ";
|
||||
|
||||
throw Exception(msg + backQuote(dictionary_name), ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY);
|
||||
/// Try to get create query ifg for an attached dictionary.
|
||||
std::lock_guard lock{mutex};
|
||||
auto it = dictionaries.find(dictionary_name);
|
||||
if (it != dictionaries.end())
|
||||
{
|
||||
ASTPtr ast = it->second.create_query->clone();
|
||||
auto & create_query = ast->as<ASTCreateQuery &>();
|
||||
create_query.attach = false;
|
||||
create_query.database = getDatabaseName();
|
||||
return ast;
|
||||
}
|
||||
}
|
||||
|
||||
return ast;
|
||||
/// Try to get create query for non-attached dictionary.
|
||||
ASTPtr ast;
|
||||
try
|
||||
{
|
||||
auto dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
ast = getCreateQueryFromMetadata(context, dictionary_metadata_path, throw_on_error);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (throw_on_error && (e.code() != ErrorCodes::FILE_DOESNT_EXIST))
|
||||
throw;
|
||||
}
|
||||
|
||||
if (ast)
|
||||
{
|
||||
const auto * create_query = ast->as<const ASTCreateQuery>();
|
||||
if (create_query && create_query->is_dictionary)
|
||||
return ast;
|
||||
}
|
||||
if (throw_on_error)
|
||||
throw Exception{"Dictionary " + backQuote(dictionary_name) + " doesn't exist",
|
||||
ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY};
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> DatabaseWithDictionaries::getDictionaryConfiguration(const String & dictionary_name) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = dictionaries.find(dictionary_name);
|
||||
if (it != dictionaries.end())
|
||||
return it->second.config;
|
||||
throw Exception("Dictionary " + backQuote(dictionary_name) + " doesn't exist", ErrorCodes::UNKNOWN_DICTIONARY);
|
||||
}
|
||||
|
||||
time_t DatabaseWithDictionaries::getObjectMetadataModificationTime(const String & object_name) const
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = dictionaries.find(object_name);
|
||||
if (it != dictionaries.end())
|
||||
return it->second.modification_time;
|
||||
}
|
||||
return DatabaseOnDisk::getObjectMetadataModificationTime(object_name);
|
||||
}
|
||||
|
||||
|
||||
bool DatabaseWithDictionaries::empty(const Context &) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return tables.empty() && dictionaries.empty();
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::shutdown()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
dictionaries.clear();
|
||||
}
|
||||
detachFromExternalDictionariesLoader();
|
||||
DatabaseOnDisk::shutdown();
|
||||
}
|
||||
@ -295,8 +306,9 @@ DatabaseWithDictionaries::~DatabaseWithDictionaries() = default;
|
||||
|
||||
void DatabaseWithDictionaries::attachToExternalDictionariesLoader(Context & context)
|
||||
{
|
||||
database_as_config_repo_for_external_loader = context.getExternalDictionariesLoader().addConfigRepository(
|
||||
std::make_unique<ExternalLoaderDatabaseConfigRepository>(*this, context));
|
||||
external_loader = &context.getExternalDictionariesLoader();
|
||||
database_as_config_repo_for_external_loader
|
||||
= external_loader->addConfigRepository(std::make_unique<ExternalLoaderDatabaseConfigRepository>(*this, context));
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::detachFromExternalDictionariesLoader()
|
||||
|
@ -8,9 +8,9 @@ namespace DB
|
||||
class DatabaseWithDictionaries : public DatabaseOnDisk
|
||||
{
|
||||
public:
|
||||
void attachDictionary(const String & name, const Context & context) override;
|
||||
void attachDictionary(const String & dictionary_name, const DictionaryAttachInfo & attach_info) override;
|
||||
|
||||
void detachDictionary(const String & name, const Context & context) override;
|
||||
void detachDictionary(const String & dictionary_name) override;
|
||||
|
||||
void createDictionary(const Context & context,
|
||||
const String & dictionary_name,
|
||||
@ -18,15 +18,15 @@ public:
|
||||
|
||||
void removeDictionary(const Context & context, const String & dictionary_name) override;
|
||||
|
||||
StoragePtr tryGetTable(const Context & context, const String & table_name) const override;
|
||||
|
||||
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const override;
|
||||
|
||||
DatabaseTablesIteratorPtr getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_dictionary_name) override;
|
||||
bool isDictionaryExist(const Context & context, const String & dictionary_name) const override;
|
||||
|
||||
DatabaseDictionariesIteratorPtr getDictionariesIterator(const Context & context, const FilterByNameFunction & filter_by_dictionary_name) override;
|
||||
|
||||
bool isDictionaryExist(const Context & context, const String & dictionary_name) const override;
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> getDictionaryConfiguration(const String & /*name*/) const override;
|
||||
|
||||
time_t getObjectMetadataModificationTime(const String & object_name) const override;
|
||||
|
||||
bool empty(const Context & context) const override;
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
@ -39,16 +39,17 @@ protected:
|
||||
void attachToExternalDictionariesLoader(Context & context);
|
||||
void detachFromExternalDictionariesLoader();
|
||||
|
||||
StoragePtr getDictionaryStorage(const Context & context, const String & table_name, bool load) const;
|
||||
void detachDictionaryImpl(const String & dictionary_name, DictionaryAttachInfo & attach_info);
|
||||
|
||||
ASTPtr getCreateDictionaryQueryImpl(const Context & context,
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
ext::scope_guard database_as_config_repo_for_external_loader;
|
||||
std::unordered_map<String, DictionaryAttachInfo> dictionaries;
|
||||
|
||||
StoragePtr tryGetTableImpl(const Context & context, const String & table_name, bool load) const;
|
||||
private:
|
||||
ExternalDictionariesLoader * external_loader = nullptr;
|
||||
ext::scope_guard database_as_config_repo_for_external_loader;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ bool DatabaseWithOwnTablesBase::isTableExist(
|
||||
const String & table_name) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return tables.find(table_name) != tables.end() || dictionaries.find(table_name) != dictionaries.end();
|
||||
return tables.find(table_name) != tables.end();
|
||||
}
|
||||
|
||||
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(
|
||||
@ -58,7 +58,7 @@ DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(const Con
|
||||
bool DatabaseWithOwnTablesBase::empty(const Context & /*context*/) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return tables.empty() && dictionaries.empty();
|
||||
return tables.empty();
|
||||
}
|
||||
|
||||
StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
|
||||
@ -125,7 +125,6 @@ void DatabaseWithOwnTablesBase::shutdown()
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
tables.clear();
|
||||
dictionaries.clear();
|
||||
}
|
||||
|
||||
DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
|
||||
|
@ -42,7 +42,6 @@ public:
|
||||
protected:
|
||||
mutable std::mutex mutex;
|
||||
Tables tables;
|
||||
Dictionaries dictionaries;
|
||||
Poco::Logger * log;
|
||||
|
||||
DatabaseWithOwnTablesBase(const String & name_, const String & logger);
|
||||
|
18
src/Databases/DictionaryAttachInfo.h
Normal file
18
src/Databases/DictionaryAttachInfo.h
Normal file
@ -0,0 +1,18 @@
|
||||
#pragma once
|
||||
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Poco/AutoPtr.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct DictionaryAttachInfo
|
||||
{
|
||||
ASTPtr create_query;
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
|
||||
time_t modification_time;
|
||||
};
|
||||
|
||||
}
|
@ -5,8 +5,11 @@
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Dictionaries/IDictionary.h>
|
||||
#include <Databases/DictionaryAttachInfo.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/algorithm/copy.hpp>
|
||||
#include <ctime>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
@ -18,11 +21,10 @@ namespace DB
|
||||
class Context;
|
||||
struct Settings;
|
||||
struct ConstraintsDescription;
|
||||
class ColumnsDescription;
|
||||
struct IndicesDescription;
|
||||
struct TableStructureWriteLockHolder;
|
||||
class ASTCreateQuery;
|
||||
using Dictionaries = std::set<String>;
|
||||
using Dictionaries = std::vector<String>;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
@ -74,9 +76,14 @@ private:
|
||||
public:
|
||||
DatabaseDictionariesSnapshotIterator() = default;
|
||||
DatabaseDictionariesSnapshotIterator(Dictionaries & dictionaries_) : dictionaries(dictionaries_), it(dictionaries.begin()) {}
|
||||
|
||||
DatabaseDictionariesSnapshotIterator(Dictionaries && dictionaries_) : dictionaries(dictionaries_), it(dictionaries.begin()) {}
|
||||
|
||||
DatabaseDictionariesSnapshotIterator(const std::unordered_map<String, DictionaryAttachInfo> & dictionaries_)
|
||||
{
|
||||
boost::range::copy(dictionaries_ | boost::adaptors::map_keys, std::back_inserter(dictionaries));
|
||||
it = dictionaries.begin();
|
||||
}
|
||||
|
||||
void next() { ++it; }
|
||||
|
||||
bool isValid() const { return !dictionaries.empty() && it != dictionaries.end(); }
|
||||
@ -140,12 +147,6 @@ public:
|
||||
return std::make_unique<DatabaseDictionariesSnapshotIterator>();
|
||||
}
|
||||
|
||||
/// Get an iterator to pass through all the tables and dictionary tables.
|
||||
virtual DatabaseTablesIteratorPtr getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_name = {})
|
||||
{
|
||||
return getTablesIterator(context, filter_by_name);
|
||||
}
|
||||
|
||||
/// Is the database empty.
|
||||
virtual bool empty(const Context & context) const = 0;
|
||||
|
||||
@ -192,7 +193,7 @@ public:
|
||||
|
||||
/// Add dictionary to the database, but do not add it to the metadata. The database may not support this method.
|
||||
/// If dictionaries_lazy_load is false it also starts loading the dictionary asynchronously.
|
||||
virtual void attachDictionary(const String & /*name*/, const Context & /*context*/)
|
||||
virtual void attachDictionary(const String & /* dictionary_name */, const DictionaryAttachInfo & /* attach_info */)
|
||||
{
|
||||
throw Exception("There is no ATTACH DICTIONARY query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
@ -204,7 +205,7 @@ public:
|
||||
}
|
||||
|
||||
/// Forget about the dictionary without deleting it. The database may not support this method.
|
||||
virtual void detachDictionary(const String & /*name*/, const Context & /*context*/)
|
||||
virtual void detachDictionary(const String & /*name*/)
|
||||
{
|
||||
throw Exception("There is no DETACH DICTIONARY query for Database" + getEngineName(), ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
@ -260,6 +261,11 @@ public:
|
||||
return getCreateDictionaryQueryImpl(context, name, true);
|
||||
}
|
||||
|
||||
virtual Poco::AutoPtr<Poco::Util::AbstractConfiguration> getDictionaryConfiguration(const String & /*name*/) const
|
||||
{
|
||||
throw Exception(getEngineName() + ": getDictionaryConfiguration() is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
/// Get the CREATE DATABASE query for current database.
|
||||
virtual ASTPtr getCreateDatabaseQuery(const Context & /*context*/) const = 0;
|
||||
|
||||
@ -276,6 +282,9 @@ public:
|
||||
/// Returns metadata path of a concrete table if the database supports it, empty string otherwise
|
||||
virtual String getObjectMetadataPath(const String & /*table_name*/) const { return {}; }
|
||||
|
||||
/// All tables and dictionaries should be detached before detaching the database.
|
||||
virtual bool shouldBeEmptyOnDetach() const { return true; }
|
||||
|
||||
/// Ask all tables to complete the background threads they are using and delete all table objects.
|
||||
virtual void shutdown() = 0;
|
||||
|
||||
|
@ -11,5 +11,4 @@ using DictionaryConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfigurati
|
||||
/// This function is necessary because all loadable objects configuration are Poco::AbstractConfiguration
|
||||
/// Can throw exception if query is ill-formed
|
||||
DictionaryConfigurationPtr getDictionaryConfigurationFromAST(const ASTCreateQuery & query);
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Dictionaries/DictionaryFactory.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
# include "config_core.h"
|
||||
@ -33,6 +34,19 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
|
||||
return DictionaryFactory::instance().create(name, config, key_in_config, context, dictionary_from_database);
|
||||
}
|
||||
|
||||
|
||||
DictionaryStructure
|
||||
ExternalDictionariesLoader::getDictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config)
|
||||
{
|
||||
return {config, key_in_config + ".structure"};
|
||||
}
|
||||
|
||||
DictionaryStructure ExternalDictionariesLoader::getDictionaryStructure(const ObjectConfig & config)
|
||||
{
|
||||
return getDictionaryStructure(*config.config, config.key_in_config);
|
||||
}
|
||||
|
||||
|
||||
void ExternalDictionariesLoader::resetAll()
|
||||
{
|
||||
#if USE_MYSQL
|
||||
|
@ -23,14 +23,14 @@ public:
|
||||
return std::static_pointer_cast<const IDictionaryBase>(load(name));
|
||||
}
|
||||
|
||||
DictPtr tryGetDictionary(const std::string & name, bool load) const
|
||||
DictPtr tryGetDictionary(const std::string & name) const
|
||||
{
|
||||
if (load)
|
||||
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(name));
|
||||
else
|
||||
return std::static_pointer_cast<const IDictionaryBase>(getCurrentLoadResult(name).object);
|
||||
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(name));
|
||||
}
|
||||
|
||||
static DictionaryStructure getDictionaryStructure(const Poco::Util::AbstractConfiguration & config, const std::string & key_in_config = "dictionary");
|
||||
static DictionaryStructure getDictionaryStructure(const ObjectConfig & config);
|
||||
|
||||
static void resetAll();
|
||||
|
||||
protected:
|
||||
|
@ -94,15 +94,6 @@ namespace
|
||||
};
|
||||
}
|
||||
|
||||
struct ExternalLoader::ObjectConfig
|
||||
{
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
|
||||
String key_in_config;
|
||||
String repository_name;
|
||||
bool from_temp_repository = false;
|
||||
String path;
|
||||
};
|
||||
|
||||
|
||||
/** Reads configurations from configuration repository and parses it.
|
||||
*/
|
||||
@ -141,7 +132,7 @@ public:
|
||||
settings = settings_;
|
||||
}
|
||||
|
||||
using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>;
|
||||
using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, std::shared_ptr<const ObjectConfig>>>;
|
||||
|
||||
/// Reads all repositories.
|
||||
ObjectConfigsPtr read()
|
||||
@ -176,8 +167,9 @@ private:
|
||||
struct FileInfo
|
||||
{
|
||||
Poco::Timestamp last_update_time = 0;
|
||||
std::vector<std::pair<String, ObjectConfig>> objects; // Parsed contents of the file.
|
||||
bool in_use = true; // Whether the `FileInfo` should be destroyed because the correspondent file is deleted.
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> file_contents; // Parsed contents of the file.
|
||||
std::unordered_map<String /* object name */, String /* key in file_contents */> objects;
|
||||
};
|
||||
|
||||
struct RepositoryInfo
|
||||
@ -280,14 +272,15 @@ private:
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Loading config file '" << path << "'.");
|
||||
auto file_contents = repository.load(path);
|
||||
file_info.file_contents = repository.load(path);
|
||||
auto & file_contents = *file_info.file_contents;
|
||||
|
||||
/// get all objects' definitions
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
file_contents->keys(keys);
|
||||
file_contents.keys(keys);
|
||||
|
||||
/// for each object defined in repositories
|
||||
std::vector<std::pair<String, ObjectConfig>> object_configs_from_file;
|
||||
std::unordered_map<String, String> objects;
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (!startsWith(key, settings.external_config))
|
||||
@ -297,7 +290,7 @@ private:
|
||||
continue;
|
||||
}
|
||||
|
||||
String object_name = file_contents->getString(key + "." + settings.external_name);
|
||||
String object_name = file_contents.getString(key + "." + settings.external_name);
|
||||
if (object_name.empty())
|
||||
{
|
||||
LOG_WARNING(log, path << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed");
|
||||
@ -306,14 +299,14 @@ private:
|
||||
|
||||
String database;
|
||||
if (!settings.external_database.empty())
|
||||
database = file_contents->getString(key + "." + settings.external_database, "");
|
||||
database = file_contents.getString(key + "." + settings.external_database, "");
|
||||
if (!database.empty())
|
||||
object_name = database + "." + object_name;
|
||||
|
||||
object_configs_from_file.emplace_back(object_name, ObjectConfig{file_contents, key, {}, {}, {}});
|
||||
objects.emplace(object_name, key);
|
||||
}
|
||||
|
||||
file_info.objects = std::move(object_configs_from_file);
|
||||
file_info.objects = std::move(objects);
|
||||
file_info.last_update_time = update_time_from_repository;
|
||||
file_info.in_use = true;
|
||||
return true;
|
||||
@ -333,33 +326,36 @@ private:
|
||||
need_collect_object_configs = false;
|
||||
|
||||
// Generate new result.
|
||||
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>();
|
||||
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, std::shared_ptr<const ObjectConfig>>>();
|
||||
|
||||
for (const auto & [repository, repository_info] : repositories)
|
||||
{
|
||||
for (const auto & [path, file_info] : repository_info.files)
|
||||
{
|
||||
for (const auto & [object_name, object_config] : file_info.objects)
|
||||
for (const auto & [object_name, key_in_config] : file_info.objects)
|
||||
{
|
||||
auto already_added_it = new_configs->find(object_name);
|
||||
if (already_added_it == new_configs->end())
|
||||
{
|
||||
auto & new_config = new_configs->emplace(object_name, object_config).first->second;
|
||||
new_config.from_temp_repository = repository->isTemporary();
|
||||
new_config.repository_name = repository->getName();
|
||||
new_config.path = path;
|
||||
auto new_config = std::make_shared<ObjectConfig>();
|
||||
new_config->config = file_info.file_contents;
|
||||
new_config->key_in_config = key_in_config;
|
||||
new_config->repository_name = repository->getName();
|
||||
new_config->from_temp_repository = repository->isTemporary();
|
||||
new_config->path = path;
|
||||
new_configs->emplace(object_name, std::move(new_config));
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & already_added = already_added_it->second;
|
||||
if (!already_added.from_temp_repository && !repository->isTemporary())
|
||||
if (!already_added->from_temp_repository && !repository->isTemporary())
|
||||
{
|
||||
LOG_WARNING(
|
||||
log,
|
||||
type_name << " '" << object_name << "' is found "
|
||||
<< (((path == already_added.path) && (repository->getName() == already_added.repository_name))
|
||||
<< (((path == already_added->path) && (repository->getName() == already_added->repository_name))
|
||||
? ("twice in the same file '" + path + "'")
|
||||
: ("both in file '" + already_added.path + "' and '" + path + "'")));
|
||||
: ("both in file '" + already_added->path + "' and '" + path + "'")));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -440,13 +436,10 @@ public:
|
||||
else
|
||||
{
|
||||
const auto & new_config = new_config_it->second;
|
||||
bool config_is_same = isSameConfiguration(*info.object_config.config, info.object_config.key_in_config, *new_config.config, new_config.key_in_config);
|
||||
info.object_config = new_config;
|
||||
bool config_is_same = isSameConfiguration(*info.config->config, info.config->key_in_config, *new_config->config, new_config->key_in_config);
|
||||
info.config = new_config;
|
||||
if (!config_is_same)
|
||||
{
|
||||
/// Configuration has been changed.
|
||||
info.object_config = new_config;
|
||||
|
||||
if (info.triedToLoad())
|
||||
{
|
||||
/// The object has been tried to load before, so it is currently in use or was in use
|
||||
@ -531,7 +524,7 @@ public:
|
||||
|
||||
/// Returns the load result of the object.
|
||||
template <typename ReturnType>
|
||||
ReturnType getCurrentLoadResult(const String & name) const
|
||||
ReturnType getLoadResult(const String & name) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
const Info * info = getInfo(name);
|
||||
@ -543,13 +536,13 @@ public:
|
||||
/// Returns all the load results as a map.
|
||||
/// The function doesn't load anything, it just returns the current load results as is.
|
||||
template <typename ReturnType>
|
||||
ReturnType getCurrentLoadResults(const FilterByNameFunction & filter) const
|
||||
ReturnType getLoadResults(const FilterByNameFunction & filter) const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return collectLoadResults<ReturnType>(filter);
|
||||
}
|
||||
|
||||
size_t getNumberOfCurrentlyLoadedObjects() const
|
||||
size_t getNumberOfLoadedObjects() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
size_t count = 0;
|
||||
@ -562,7 +555,7 @@ public:
|
||||
return count;
|
||||
}
|
||||
|
||||
bool hasCurrentlyLoadedObjects() const
|
||||
bool hasLoadedObjects() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
for (auto & name_info : infos)
|
||||
@ -581,6 +574,12 @@ public:
|
||||
return names;
|
||||
}
|
||||
|
||||
size_t getNumberOfObjects() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return infos.size();
|
||||
}
|
||||
|
||||
/// Tries to load a specified object during the timeout.
|
||||
template <typename ReturnType>
|
||||
ReturnType tryLoad(const String & name, Duration timeout)
|
||||
@ -698,7 +697,7 @@ public:
|
||||
private:
|
||||
struct Info
|
||||
{
|
||||
Info(const String & name_, const ObjectConfig & object_config_) : name(name_), object_config(object_config_) {}
|
||||
Info(const String & name_, const std::shared_ptr<const ObjectConfig> & config_) : name(name_), config(config_) {}
|
||||
|
||||
bool loaded() const { return object != nullptr; }
|
||||
bool failed() const { return !object && exception; }
|
||||
@ -737,8 +736,7 @@ private:
|
||||
result.loading_start_time = loading_start_time;
|
||||
result.last_successful_update_time = last_successful_update_time;
|
||||
result.loading_duration = loadingDuration();
|
||||
result.origin = object_config.path;
|
||||
result.repository_name = object_config.repository_name;
|
||||
result.config = config;
|
||||
return result;
|
||||
}
|
||||
else
|
||||
@ -750,7 +748,7 @@ private:
|
||||
|
||||
String name;
|
||||
LoadablePtr object;
|
||||
ObjectConfig object_config;
|
||||
std::shared_ptr<const ObjectConfig> config;
|
||||
TimePoint loading_start_time;
|
||||
TimePoint loading_end_time;
|
||||
TimePoint last_successful_update_time;
|
||||
@ -784,7 +782,7 @@ private:
|
||||
results.reserve(infos.size());
|
||||
for (const auto & [name, info] : infos)
|
||||
{
|
||||
if (filter(name))
|
||||
if (!filter || filter(name))
|
||||
{
|
||||
auto result = info.template getLoadResult<typename ReturnType::value_type>();
|
||||
if constexpr (std::is_same_v<typename ReturnType::value_type, LoadablePtr>)
|
||||
@ -838,7 +836,7 @@ private:
|
||||
bool all_ready = true;
|
||||
for (auto & [name, info] : infos)
|
||||
{
|
||||
if (!filter(name))
|
||||
if (filter && !filter(name))
|
||||
continue;
|
||||
|
||||
if (info.state_id >= min_id)
|
||||
@ -955,7 +953,7 @@ private:
|
||||
previous_version_as_base_for_loading = nullptr; /// Need complete reloading, cannot use the previous version.
|
||||
|
||||
/// Loading.
|
||||
auto [new_object, new_exception] = loadSingleObject(name, info->object_config, previous_version_as_base_for_loading);
|
||||
auto [new_object, new_exception] = loadSingleObject(name, *info->config, previous_version_as_base_for_loading);
|
||||
if (!new_object && !new_exception)
|
||||
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
@ -1296,9 +1294,9 @@ void ExternalLoader::enablePeriodicUpdates(bool enable_)
|
||||
periodic_updater->enable(enable_);
|
||||
}
|
||||
|
||||
bool ExternalLoader::hasCurrentlyLoadedObjects() const
|
||||
bool ExternalLoader::hasLoadedObjects() const
|
||||
{
|
||||
return loading_dispatcher->hasCurrentlyLoadedObjects();
|
||||
return loading_dispatcher->hasLoadedObjects();
|
||||
}
|
||||
|
||||
ExternalLoader::Status ExternalLoader::getCurrentStatus(const String & name) const
|
||||
@ -1307,30 +1305,35 @@ ExternalLoader::Status ExternalLoader::getCurrentStatus(const String & name) con
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename>
|
||||
ReturnType ExternalLoader::getCurrentLoadResult(const String & name) const
|
||||
ReturnType ExternalLoader::getLoadResult(const String & name) const
|
||||
{
|
||||
return loading_dispatcher->getCurrentLoadResult<ReturnType>(name);
|
||||
return loading_dispatcher->getLoadResult<ReturnType>(name);
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename>
|
||||
ReturnType ExternalLoader::getCurrentLoadResults(const FilterByNameFunction & filter) const
|
||||
ReturnType ExternalLoader::getLoadResults(const FilterByNameFunction & filter) const
|
||||
{
|
||||
return loading_dispatcher->getCurrentLoadResults<ReturnType>(filter);
|
||||
return loading_dispatcher->getLoadResults<ReturnType>(filter);
|
||||
}
|
||||
|
||||
ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects() const
|
||||
ExternalLoader::Loadables ExternalLoader::getLoadedObjects() const
|
||||
{
|
||||
return getCurrentLoadResults<Loadables>();
|
||||
return getLoadResults<Loadables>();
|
||||
}
|
||||
|
||||
ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects(const FilterByNameFunction & filter) const
|
||||
ExternalLoader::Loadables ExternalLoader::getLoadedObjects(const FilterByNameFunction & filter) const
|
||||
{
|
||||
return getCurrentLoadResults<Loadables>(filter);
|
||||
return getLoadResults<Loadables>(filter);
|
||||
}
|
||||
|
||||
size_t ExternalLoader::getNumberOfCurrentlyLoadedObjects() const
|
||||
size_t ExternalLoader::getNumberOfLoadedObjects() const
|
||||
{
|
||||
return loading_dispatcher->getNumberOfCurrentlyLoadedObjects();
|
||||
return loading_dispatcher->getNumberOfLoadedObjects();
|
||||
}
|
||||
|
||||
size_t ExternalLoader::getNumberOfObjects() const
|
||||
{
|
||||
return loading_dispatcher->getNumberOfObjects();
|
||||
}
|
||||
|
||||
template <typename ReturnType, typename>
|
||||
@ -1456,10 +1459,10 @@ ExternalLoader::LoadablePtr ExternalLoader::createObject(
|
||||
return create(name, *config.config, config.key_in_config, config.repository_name);
|
||||
}
|
||||
|
||||
template ExternalLoader::LoadablePtr ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadablePtr>(const String &) const;
|
||||
template ExternalLoader::LoadResult ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadResult>(const String &) const;
|
||||
template ExternalLoader::Loadables ExternalLoader::getCurrentLoadResults<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
|
||||
template ExternalLoader::LoadResults ExternalLoader::getCurrentLoadResults<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
|
||||
template ExternalLoader::LoadablePtr ExternalLoader::getLoadResult<ExternalLoader::LoadablePtr>(const String &) const;
|
||||
template ExternalLoader::LoadResult ExternalLoader::getLoadResult<ExternalLoader::LoadResult>(const String &) const;
|
||||
template ExternalLoader::Loadables ExternalLoader::getLoadResults<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
|
||||
template ExternalLoader::LoadResults ExternalLoader::getLoadResults<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
|
||||
|
||||
template ExternalLoader::LoadablePtr ExternalLoader::tryLoad<ExternalLoader::LoadablePtr>(const String &, Duration) const;
|
||||
template ExternalLoader::LoadResult ExternalLoader::tryLoad<ExternalLoader::LoadResult>(const String &, Duration) const;
|
||||
|
@ -53,17 +53,25 @@ public:
|
||||
using Duration = std::chrono::milliseconds;
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
struct ObjectConfig
|
||||
{
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
|
||||
String key_in_config;
|
||||
String repository_name;
|
||||
bool from_temp_repository = false;
|
||||
String path;
|
||||
};
|
||||
|
||||
struct LoadResult
|
||||
{
|
||||
Status status = Status::NOT_EXIST;
|
||||
String name;
|
||||
LoadablePtr object;
|
||||
String origin;
|
||||
TimePoint loading_start_time;
|
||||
TimePoint last_successful_update_time;
|
||||
Duration loading_duration;
|
||||
std::exception_ptr exception;
|
||||
std::string repository_name;
|
||||
std::shared_ptr<const ObjectConfig> config;
|
||||
};
|
||||
|
||||
using LoadResults = std::vector<LoadResult>;
|
||||
@ -99,26 +107,32 @@ public:
|
||||
/// Returns the result of loading the object.
|
||||
/// The function doesn't load anything, it just returns the current load result as is.
|
||||
template <typename ReturnType = LoadResult, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
|
||||
ReturnType getCurrentLoadResult(const String & name) const;
|
||||
ReturnType getLoadResult(const String & name) const;
|
||||
|
||||
using FilterByNameFunction = std::function<bool(const String &)>;
|
||||
|
||||
/// Returns all the load results as a map.
|
||||
/// The function doesn't load anything, it just returns the current load results as is.
|
||||
template <typename ReturnType = LoadResults, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
|
||||
ReturnType getCurrentLoadResults() const { return getCurrentLoadResults<ReturnType>(alwaysTrue); }
|
||||
ReturnType getLoadResults() const { return getLoadResults<ReturnType>(FilterByNameFunction{}); }
|
||||
|
||||
template <typename ReturnType = LoadResults, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
|
||||
ReturnType getCurrentLoadResults(const FilterByNameFunction & filter) const;
|
||||
ReturnType getLoadResults(const FilterByNameFunction & filter) const;
|
||||
|
||||
/// Returns all loaded objects as a map.
|
||||
/// The function doesn't load anything, it just returns the current load results as is.
|
||||
Loadables getCurrentlyLoadedObjects() const;
|
||||
Loadables getCurrentlyLoadedObjects(const FilterByNameFunction & filter) const;
|
||||
Loadables getLoadedObjects() const;
|
||||
Loadables getLoadedObjects(const FilterByNameFunction & filter) const;
|
||||
|
||||
/// Returns true if any object was loaded.
|
||||
bool hasCurrentlyLoadedObjects() const;
|
||||
size_t getNumberOfCurrentlyLoadedObjects() const;
|
||||
bool hasLoadedObjects() const;
|
||||
size_t getNumberOfLoadedObjects() const;
|
||||
|
||||
/// Returns true if there is no object.
|
||||
bool hasObjects() const { return getNumberOfObjects() == 0; }
|
||||
|
||||
/// Returns number of objects.
|
||||
size_t getNumberOfObjects() const;
|
||||
|
||||
static constexpr Duration NO_WAIT = Duration::zero();
|
||||
static constexpr Duration WAIT = Duration::max();
|
||||
@ -139,7 +153,7 @@ public:
|
||||
/// The function does nothing for already loaded objects, it just returns them.
|
||||
/// The function doesn't throw an exception if it's failed to load something.
|
||||
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
|
||||
ReturnType tryLoadAll(Duration timeout = WAIT) const { return tryLoad<ReturnType>(alwaysTrue, timeout); }
|
||||
ReturnType tryLoadAll(Duration timeout = WAIT) const { return tryLoad<ReturnType>(FilterByNameFunction{}, timeout); }
|
||||
|
||||
/// Loads a specified object.
|
||||
/// The function does nothing if it's already loaded.
|
||||
@ -157,7 +171,7 @@ public:
|
||||
/// The function does nothing for already loaded objects, it just returns them.
|
||||
/// The function throws an exception if it's failed to load something.
|
||||
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
|
||||
ReturnType loadAll() const { return load<ReturnType>(alwaysTrue); }
|
||||
ReturnType loadAll() const { return load<ReturnType>(FilterByNameFunction{}); }
|
||||
|
||||
/// Loads or reloads a specified object.
|
||||
/// The function reloads the object if it's already loaded.
|
||||
@ -174,7 +188,7 @@ public:
|
||||
/// Load or reloads all objects. Not recommended to use.
|
||||
/// The function throws an exception if it's failed to load or reload something.
|
||||
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
|
||||
ReturnType loadOrReloadAll() const { return loadOrReload<ReturnType>(alwaysTrue); }
|
||||
ReturnType loadOrReloadAll() const { return loadOrReload<ReturnType>(FilterByNameFunction{}); }
|
||||
|
||||
/// Reloads objects by filter which were tried to load before (successfully or not).
|
||||
/// The function throws an exception if it's failed to load or reload something.
|
||||
@ -197,10 +211,8 @@ private:
|
||||
void checkLoaded(const LoadResult & result, bool check_no_errors) const;
|
||||
void checkLoaded(const LoadResults & results, bool check_no_errors) const;
|
||||
|
||||
static bool alwaysTrue(const String &) { return true; }
|
||||
Strings getAllTriedToLoadNames() const;
|
||||
|
||||
struct ObjectConfig;
|
||||
LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
|
||||
|
||||
class LoadablesConfigReader;
|
||||
|
@ -1,30 +1,30 @@
|
||||
#include <Interpreters/ExternalLoaderDatabaseConfigRepository.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNKNOWN_DICTIONARY;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
String trimDatabaseName(const std::string & loadable_definition_name, const IDatabase & database)
|
||||
{
|
||||
const auto & dbname = database.getDatabaseName();
|
||||
if (!startsWith(loadable_definition_name, dbname))
|
||||
throw Exception(
|
||||
"Loadable '" + loadable_definition_name + "' is not from database '" + database.getDatabaseName(), ErrorCodes::UNKNOWN_DICTIONARY);
|
||||
/// dbname.loadable_name
|
||||
///--> remove <---
|
||||
return loadable_definition_name.substr(dbname.length() + 1);
|
||||
}
|
||||
String trimDatabaseName(const std::string & loadable_definition_name, const IDatabase & database)
|
||||
{
|
||||
const auto & dbname = database.getDatabaseName();
|
||||
if (!startsWith(loadable_definition_name, dbname))
|
||||
throw Exception(
|
||||
"Loadable '" + loadable_definition_name + "' is not from database '" + database.getDatabaseName(), ErrorCodes::UNKNOWN_DICTIONARY);
|
||||
/// dbname.loadable_name
|
||||
///--> remove <---
|
||||
return loadable_definition_name.substr(dbname.length() + 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(IDatabase & database_, const Context & context_)
|
||||
: name(database_.getDatabaseName())
|
||||
, database(database_)
|
||||
@ -34,8 +34,7 @@ ExternalLoaderDatabaseConfigRepository::ExternalLoaderDatabaseConfigRepository(I
|
||||
|
||||
LoadablesConfigurationPtr ExternalLoaderDatabaseConfigRepository::load(const std::string & loadable_definition_name)
|
||||
{
|
||||
String dictname = trimDatabaseName(loadable_definition_name, database);
|
||||
return getDictionaryConfigurationFromAST(database.getCreateDictionaryQuery(context, dictname)->as<const ASTCreateQuery &>());
|
||||
return database.getDictionaryConfiguration(trimDatabaseName(loadable_definition_name, database));
|
||||
}
|
||||
|
||||
bool ExternalLoaderDatabaseConfigRepository::exists(const std::string & loadable_definition_name)
|
||||
|
@ -45,6 +45,8 @@
|
||||
#include <Databases/DatabaseFactory.h>
|
||||
#include <Databases/IDatabase.h>
|
||||
|
||||
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
|
||||
|
||||
#include <Compression/CompressionFactory.h>
|
||||
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
@ -703,7 +705,11 @@ BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
|
||||
}
|
||||
|
||||
if (create.attach)
|
||||
database->attachDictionary(dictionary_name, context);
|
||||
{
|
||||
auto config = getDictionaryConfigurationFromAST(create);
|
||||
auto modification_time = database->getObjectMetadataModificationTime(dictionary_name);
|
||||
database->attachDictionary(dictionary_name, DictionaryAttachInfo{query_ptr, config, modification_time});
|
||||
}
|
||||
else
|
||||
database->createDictionary(context, dictionary_name, query_ptr);
|
||||
|
||||
|
@ -188,7 +188,7 @@ BlockIO InterpreterDropQuery::executeToDictionary(
|
||||
{
|
||||
/// Drop dictionary from memory, don't touch data and metadata
|
||||
context.checkAccess(AccessType::DROP_DICTIONARY, database_name, dictionary_name);
|
||||
database->detachDictionary(dictionary_name, context);
|
||||
database->detachDictionary(dictionary_name);
|
||||
}
|
||||
else if (kind == ASTDropQuery::Kind::Truncate)
|
||||
{
|
||||
@ -254,21 +254,26 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS
|
||||
bool drop = kind == ASTDropQuery::Kind::Drop;
|
||||
context.checkAccess(AccessType::DROP_DATABASE, database_name);
|
||||
|
||||
/// DETACH or DROP all tables and dictionaries inside database
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
if (database->shouldBeEmptyOnDetach())
|
||||
{
|
||||
String current_table_name = iterator->name();
|
||||
executeToTable(database_name, current_table_name, kind, false, false, false);
|
||||
}
|
||||
/// DETACH or DROP all tables and dictionaries inside database.
|
||||
/// First we should DETACH or DROP dictionaries because StorageDictionary
|
||||
/// must be detached only by detaching corresponding dictionary.
|
||||
for (auto iterator = database->getDictionariesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
String current_dictionary = iterator->name();
|
||||
executeToDictionary(database_name, current_dictionary, kind, false, false, false);
|
||||
}
|
||||
|
||||
for (auto iterator = database->getDictionariesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
String current_dictionary = iterator->name();
|
||||
executeToDictionary(database_name, current_dictionary, kind, false, false, false);
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
String current_table_name = iterator->name();
|
||||
executeToTable(database_name, current_table_name, kind, false, false, false);
|
||||
}
|
||||
}
|
||||
|
||||
/// DETACH or DROP database itself
|
||||
DatabaseCatalog::instance().detachDatabase(database_name, drop);
|
||||
DatabaseCatalog::instance().detachDatabase(database_name, drop, database->shouldBeEmptyOnDetach());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,73 +1,48 @@
|
||||
#include <sstream>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeDate.h>
|
||||
#include <Dictionaries/IDictionarySource.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/evaluateConstantExpression.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Processors/Sources/SourceFromInputStream.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <sstream>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int CANNOT_DETACH_DICTIONARY_AS_TABLE;
|
||||
}
|
||||
|
||||
|
||||
StorageDictionary::StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const Context & context,
|
||||
bool attach,
|
||||
const String & dictionary_name_)
|
||||
: IStorage(table_id_)
|
||||
, dictionary_name(dictionary_name_)
|
||||
, logger(&Poco::Logger::get("StorageDictionary"))
|
||||
namespace
|
||||
{
|
||||
setColumns(columns_);
|
||||
|
||||
if (!attach)
|
||||
void checkNamesAndTypesCompatibleWithDictionary(const String & dictionary_name, const ColumnsDescription & columns, const DictionaryStructure & dictionary_structure)
|
||||
{
|
||||
const auto & dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name);
|
||||
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
|
||||
checkNamesAndTypesCompatibleWithDictionary(dictionary_structure);
|
||||
auto dictionary_names_and_types = StorageDictionary::getNamesAndTypes(dictionary_structure);
|
||||
std::set<NameAndTypePair> names_and_types_set(dictionary_names_and_types.begin(), dictionary_names_and_types.end());
|
||||
|
||||
for (const auto & column : columns.getOrdinary())
|
||||
{
|
||||
if (names_and_types_set.find(column) == names_and_types_set.end())
|
||||
{
|
||||
std::string message = "Not found column ";
|
||||
message += column.name + " " + column.type->getName();
|
||||
message += " in dictionary " + backQuote(dictionary_name) + ". ";
|
||||
message += "There are only columns ";
|
||||
message += StorageDictionary::generateNamesAndTypesDescription(dictionary_names_and_types);
|
||||
throw Exception(message, ErrorCodes::THERE_IS_NO_COLUMN);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void StorageDictionary::checkTableCanBeDropped() const
|
||||
{
|
||||
throw Exception("Cannot detach dictionary " + backQuoteIfNeed(dictionary_name) + " as table, use DETACH DICTIONARY query.", ErrorCodes::UNKNOWN_TABLE);
|
||||
}
|
||||
|
||||
Pipes StorageDictionary::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*threads*/)
|
||||
{
|
||||
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name);
|
||||
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
|
||||
auto source = std::make_shared<SourceFromInputStream>(stream);
|
||||
/// TODO: update dictionary interface for processors.
|
||||
Pipes pipes;
|
||||
pipes.emplace_back(std::move(source));
|
||||
return pipes;
|
||||
}
|
||||
|
||||
NamesAndTypesList StorageDictionary::getNamesAndTypes(const DictionaryStructure & dictionary_structure)
|
||||
{
|
||||
@ -103,25 +78,55 @@ NamesAndTypesList StorageDictionary::getNamesAndTypes(const DictionaryStructure
|
||||
return dictionary_names_and_types;
|
||||
}
|
||||
|
||||
void StorageDictionary::checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionary_structure) const
|
||||
{
|
||||
auto dictionary_names_and_types = getNamesAndTypes(dictionary_structure);
|
||||
std::set<NameAndTypePair> names_and_types_set(dictionary_names_and_types.begin(), dictionary_names_and_types.end());
|
||||
|
||||
for (const auto & column : getColumns().getOrdinary())
|
||||
String StorageDictionary::generateNamesAndTypesDescription(const NamesAndTypesList & list)
|
||||
{
|
||||
std::stringstream ss;
|
||||
bool first = true;
|
||||
for (const auto & name_and_type : list)
|
||||
{
|
||||
if (names_and_types_set.find(column) == names_and_types_set.end())
|
||||
{
|
||||
std::string message = "Not found column ";
|
||||
message += column.name + " " + column.type->getName();
|
||||
message += " in dictionary " + dictionary_name + ". ";
|
||||
message += "There are only columns ";
|
||||
message += generateNamesAndTypesDescription(dictionary_names_and_types.begin(), dictionary_names_and_types.end());
|
||||
throw Exception(message, ErrorCodes::THERE_IS_NO_COLUMN);
|
||||
}
|
||||
if (!std::exchange(first, false))
|
||||
ss << ", ";
|
||||
ss << name_and_type.name << ' ' << name_and_type.type->getName();
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
||||
StorageDictionary::StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
const String & dictionary_name_,
|
||||
const DictionaryStructure & dictionary_structure_)
|
||||
: IStorage(table_id_)
|
||||
, dictionary_name(dictionary_name_)
|
||||
{
|
||||
setColumns(ColumnsDescription{getNamesAndTypes(dictionary_structure_)});
|
||||
}
|
||||
|
||||
|
||||
void StorageDictionary::checkTableCanBeDropped() const
|
||||
{
|
||||
throw Exception("Cannot detach dictionary " + backQuote(dictionary_name) + " as table, use DETACH DICTIONARY query.", ErrorCodes::CANNOT_DETACH_DICTIONARY_AS_TABLE);
|
||||
}
|
||||
|
||||
Pipes StorageDictionary::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
const size_t max_block_size,
|
||||
const unsigned /*threads*/)
|
||||
{
|
||||
auto dictionary = context.getExternalDictionariesLoader().getDictionary(dictionary_name);
|
||||
auto stream = dictionary->getBlockInputStream(column_names, max_block_size);
|
||||
auto source = std::make_shared<SourceFromInputStream>(stream);
|
||||
/// TODO: update dictionary interface for processors.
|
||||
Pipes pipes;
|
||||
pipes.emplace_back(std::move(source));
|
||||
return pipes;
|
||||
}
|
||||
|
||||
|
||||
void registerStorageDictionary(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("Dictionary", [](const StorageFactory::Arguments & args)
|
||||
@ -133,8 +138,11 @@ void registerStorageDictionary(StorageFactory & factory)
|
||||
args.engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[0], args.local_context);
|
||||
String dictionary_name = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
|
||||
|
||||
return StorageDictionary::create(
|
||||
args.table_id, args.columns, args.context, args.attach, dictionary_name);
|
||||
const auto & dictionary = args.context.getExternalDictionariesLoader().getDictionary(dictionary_name);
|
||||
const DictionaryStructure & dictionary_structure = dictionary->getStructure();
|
||||
checkNamesAndTypesCompatibleWithDictionary(dictionary_name, args.columns, dictionary_structure);
|
||||
|
||||
return StorageDictionary::create(args.table_id, dictionary_name, dictionary_structure);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1,23 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
|
||||
|
||||
namespace Poco
|
||||
{
|
||||
class Logger;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
struct DictionaryStructure;
|
||||
struct IDictionaryBase;
|
||||
class ExternalDictionaries;
|
||||
|
||||
class StorageDictionary final : public ext::shared_ptr_helper<StorageDictionary>, public IStorage
|
||||
{
|
||||
@ -35,42 +24,16 @@ public:
|
||||
unsigned threads) override;
|
||||
|
||||
static NamesAndTypesList getNamesAndTypes(const DictionaryStructure & dictionary_structure);
|
||||
|
||||
template <typename ForwardIterator>
|
||||
static std::string generateNamesAndTypesDescription(ForwardIterator begin, ForwardIterator end)
|
||||
{
|
||||
std::string description;
|
||||
{
|
||||
WriteBufferFromString buffer(description);
|
||||
bool first = true;
|
||||
for (; begin != end; ++begin)
|
||||
{
|
||||
if (!first)
|
||||
buffer << ", ";
|
||||
first = false;
|
||||
|
||||
buffer << begin->name << ' ' << begin->type->getName();
|
||||
}
|
||||
}
|
||||
|
||||
return description;
|
||||
}
|
||||
static String generateNamesAndTypesDescription(const NamesAndTypesList & list);
|
||||
|
||||
private:
|
||||
using Ptr = MultiVersion<IDictionaryBase>::Version;
|
||||
|
||||
String dictionary_name;
|
||||
Poco::Logger * logger;
|
||||
|
||||
void checkNamesAndTypesCompatibleWithDictionary(const DictionaryStructure & dictionary_structure) const;
|
||||
|
||||
protected:
|
||||
StorageDictionary(
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
const Context & context,
|
||||
bool attach,
|
||||
const String & dictionary_name_);
|
||||
const String & dictionary_name_,
|
||||
const DictionaryStructure & dictionary_structure);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -303,7 +303,7 @@ Pipes StorageSystemColumns::read(
|
||||
const DatabasePtr database = databases.at(database_name);
|
||||
offsets[i] = i ? offsets[i - 1] : 0;
|
||||
|
||||
for (auto iterator = database->getTablesWithDictionaryTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
const String & table_name = iterator->name();
|
||||
storages.emplace(std::piecewise_construct,
|
||||
|
@ -53,7 +53,7 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
|
||||
const bool check_access_for_dictionaries = !access->isGranted(AccessType::SHOW_DICTIONARIES);
|
||||
|
||||
const auto & external_dictionaries = context.getExternalDictionariesLoader();
|
||||
for (const auto & load_result : external_dictionaries.getCurrentLoadResults())
|
||||
for (const auto & load_result : external_dictionaries.getLoadResults())
|
||||
{
|
||||
const auto dict_ptr = std::dynamic_pointer_cast<const IDictionaryBase>(load_result.object);
|
||||
|
||||
@ -66,9 +66,10 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
|
||||
else
|
||||
{
|
||||
short_name = load_result.name;
|
||||
if (!load_result.repository_name.empty() && startsWith(short_name, load_result.repository_name + "."))
|
||||
String repository_name = load_result.config ? load_result.config->repository_name : "";
|
||||
if (!repository_name.empty() && startsWith(short_name, repository_name + "."))
|
||||
{
|
||||
database = load_result.repository_name;
|
||||
database = repository_name;
|
||||
short_name = short_name.substr(database.length() + 1);
|
||||
}
|
||||
}
|
||||
@ -81,7 +82,7 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
|
||||
res_columns[i++]->insert(database);
|
||||
res_columns[i++]->insert(short_name);
|
||||
res_columns[i++]->insert(static_cast<Int8>(load_result.status));
|
||||
res_columns[i++]->insert(load_result.origin);
|
||||
res_columns[i++]->insert(load_result.config ? load_result.config->path : "");
|
||||
|
||||
std::exception_ptr last_exception = load_result.exception;
|
||||
|
||||
|
@ -28,13 +28,13 @@ NamesAndTypesList StorageSystemModels::getNamesAndTypes()
|
||||
void StorageSystemModels::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo &) const
|
||||
{
|
||||
const auto & external_models_loader = context.getExternalModelsLoader();
|
||||
auto load_results = external_models_loader.getCurrentLoadResults();
|
||||
auto load_results = external_models_loader.getLoadResults();
|
||||
|
||||
for (const auto & load_result : load_results)
|
||||
{
|
||||
res_columns[0]->insert(load_result.name);
|
||||
res_columns[1]->insert(static_cast<Int8>(load_result.status));
|
||||
res_columns[2]->insert(load_result.origin);
|
||||
res_columns[2]->insert(load_result.config ? load_result.config->path : "");
|
||||
|
||||
if (load_result.object)
|
||||
{
|
||||
|
@ -226,7 +226,7 @@ protected:
|
||||
const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
|
||||
|
||||
if (!tables_it || !tables_it->isValid())
|
||||
tables_it = database->getTablesWithDictionaryTablesIterator(context);
|
||||
tables_it = database->getTablesIterator(context);
|
||||
|
||||
const bool need_lock_structure = needLockStructure(database, getPort().getHeader());
|
||||
|
||||
|
@ -9,6 +9,7 @@
|
||||
dict1
|
||||
dict2
|
||||
dict3
|
||||
dict4
|
||||
table_for_dict
|
||||
dict1
|
||||
dict2
|
||||
|
@ -32,7 +32,7 @@ EXISTS TABLE db_01048.t_01048; -- Dictionaries are tables as well. But not all t
|
||||
EXISTS DICTIONARY db_01048.t_01048;
|
||||
|
||||
-- But dictionary-tables cannot be dropped as usual tables.
|
||||
DROP TABLE db_01048.t_01048; -- { serverError 60 }
|
||||
DROP TABLE db_01048.t_01048; -- { serverError 520 }
|
||||
DROP DICTIONARY db_01048.t_01048;
|
||||
EXISTS db_01048.t_01048;
|
||||
EXISTS TABLE db_01048.t_01048;
|
||||
|
@ -16,4 +16,9 @@ CREATE TABLE dict_db_01224_dictionary.`dict_db_01224.dict`
|
||||
`val` UInt64
|
||||
)
|
||||
ENGINE = Dictionary(`dict_db_01224.dict`)
|
||||
LOADED
|
||||
NOT_LOADED
|
||||
Dictionary 1 CREATE DICTIONARY dict_db_01224.dict (`key` UInt64 DEFAULT 0, `val` UInt64 DEFAULT 10) PRIMARY KEY key SOURCE(CLICKHOUSE(HOST \'localhost\' PORT 9000 USER \'default\' TABLE \'dict_data\' PASSWORD \'\' DB \'dict_db_01224\')) LIFETIME(MIN 0 MAX 0) LAYOUT(FLAT())
|
||||
NOT_LOADED
|
||||
key UInt64
|
||||
val UInt64
|
||||
NOT_LOADED
|
||||
|
@ -1,6 +1,7 @@
|
||||
DROP DATABASE IF EXISTS dict_db_01224;
|
||||
DROP DATABASE IF EXISTS dict_db_01224_dictionary;
|
||||
CREATE DATABASE dict_db_01224;
|
||||
CREATE DATABASE dict_db_01224_dictionary Engine=Dictionary;
|
||||
|
||||
CREATE TABLE dict_db_01224.dict_data (key UInt64, val UInt64) Engine=Memory();
|
||||
CREATE DICTIONARY dict_db_01224.dict
|
||||
@ -21,10 +22,15 @@ SELECT status FROM system.dictionaries WHERE database = 'dict_db_01224' AND name
|
||||
SHOW CREATE TABLE dict_db_01224.dict FORMAT TSVRaw;
|
||||
SELECT status FROM system.dictionaries WHERE database = 'dict_db_01224' AND name = 'dict';
|
||||
|
||||
CREATE DATABASE dict_db_01224_dictionary Engine=Dictionary;
|
||||
SHOW CREATE TABLE dict_db_01224_dictionary.`dict_db_01224.dict` FORMAT TSVRaw;
|
||||
SELECT status FROM system.dictionaries WHERE database = 'dict_db_01224' AND name = 'dict';
|
||||
|
||||
SELECT engine, metadata_path LIKE '%/metadata/dict\_db\_01224/dict.sql', create_table_query FROM system.tables WHERE database = 'dict_db_01224' AND name = 'dict';
|
||||
SELECT status FROM system.dictionaries WHERE database = 'dict_db_01224' AND name = 'dict';
|
||||
|
||||
SELECT name, type FROM system.columns WHERE database = 'dict_db_01224' AND table = 'dict';
|
||||
SELECT status FROM system.dictionaries WHERE database = 'dict_db_01224' AND name = 'dict';
|
||||
|
||||
DROP DICTIONARY dict_db_01224.dict;
|
||||
SELECT status FROM system.dictionaries WHERE database = 'dict_db_01224' AND name = 'dict';
|
||||
|
||||
|
@ -13,8 +13,8 @@ LIFETIME(MIN 0 MAX 0)
|
||||
LAYOUT(FLAT());
|
||||
|
||||
SYSTEM RELOAD DICTIONARY dict_db_01225.dict;
|
||||
DROP TABLE dict_db_01225.dict; -- { serverError 60; }
|
||||
-- Regression:
|
||||
-- Code: 1000. DB::Exception: Received from localhost:9000. DB::Exception: File not found: ./metadata/dict_db_01225/dict.sql.
|
||||
|
||||
DROP TABLE dict_db_01225.dict; -- { serverError 520; }
|
||||
DROP DICTIONARY dict_db_01225.dict;
|
||||
|
||||
DROP DATABASE dict_db_01225;
|
||||
|
@ -15,7 +15,7 @@ LIFETIME(MIN 0 MAX 0)
|
||||
LAYOUT(FLAT());
|
||||
|
||||
SHOW CREATE TABLE dict_db_01225_dictionary.`dict_db_01225.dict` FORMAT TSVRaw;
|
||||
SHOW CREATE TABLE dict_db_01225_dictionary.`dict_db_01225.no_such_dict`; -- { serverError 36; }
|
||||
SHOW CREATE TABLE dict_db_01225_dictionary.`dict_db_01225.no_such_dict`; -- { serverError 487; }
|
||||
|
||||
DROP DATABASE dict_db_01225;
|
||||
DROP DATABASE dict_db_01225_dictionary;
|
||||
|
Loading…
Reference in New Issue
Block a user