Merge pull request #8055 from vitlibar/more-accurate-using-load-reload

Refactoring of using ExternalLoader in dictionary DDL.
This commit is contained in:
alexey-milovidov 2019-12-16 02:43:02 +03:00 committed by GitHub
commit 3fa3166092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 484 additions and 332 deletions

View File

@ -474,7 +474,8 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_PRIVILEGES = 497; extern const int NOT_ENOUGH_PRIVILEGES = 497;
extern const int LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED = 498; extern const int LIMIT_BY_WITH_TIES_IS_NOT_SUPPORTED = 498;
extern const int S3_ERROR = 499; extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DATABASE = 500; extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int KEEPER_EXCEPTION = 999; extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;

View File

@ -102,13 +102,12 @@ void DatabaseDictionary::removeDictionary(
} }
void DatabaseDictionary::attachDictionary( void DatabaseDictionary::attachDictionary(
const String & /*dictionary_name*/, const Context & /*context*/, bool /*reload*/) const String & /*dictionary_name*/, const Context & /*context*/)
{ {
throw Exception("Dictionary engine doesn't support dictionaries.", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("Dictionary engine doesn't support dictionaries.", ErrorCodes::UNSUPPORTED_METHOD);
} }
void DatabaseDictionary::detachDictionary( void DatabaseDictionary::detachDictionary(const String & /*dictionary_name*/, const Context & /*context*/)
const String & /*dictionary_name*/, const Context & /*context*/, bool /*reload*/)
{ {
throw Exception("Dictionary engine doesn't support dictionaries.", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("Dictionary engine doesn't support dictionaries.", ErrorCodes::UNSUPPORTED_METHOD);
} }

View File

@ -89,9 +89,9 @@ public:
ASTPtr tryGetCreateDictionaryQuery(const Context & context, const String & table_name) const override; ASTPtr tryGetCreateDictionaryQuery(const Context & context, const String & table_name) const override;
void attachDictionary(const String & dictionary_name, const Context & context, bool reload) override; void attachDictionary(const String & dictionary_name, const Context & context) override;
void detachDictionary(const String & dictionary_name, const Context & context, bool reload) override; void detachDictionary(const String & dictionary_name, const Context & context) override;
void shutdown() override; void shutdown() override;

View File

@ -123,13 +123,12 @@ DatabaseDictionariesIteratorPtr DatabaseLazy::getDictionariesIterator(
void DatabaseLazy::attachDictionary( void DatabaseLazy::attachDictionary(
const String & /*dictionary_name*/, const String & /*dictionary_name*/,
const Context & /*context*/, const Context & /*context*/)
bool /*load*/)
{ {
throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD);
} }
void DatabaseLazy::detachDictionary(const String & /*dictionary_name*/, const Context & /*context*/, bool /*reload*/) void DatabaseLazy::detachDictionary(const String & /*dictionary_name*/, const Context & /*context*/)
{ {
throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD);
} }

View File

@ -111,9 +111,9 @@ public:
StoragePtr detachTable(const String & table_name) override; StoragePtr detachTable(const String & table_name) override;
void attachDictionary(const String & dictionary_name, const Context & context, bool reload) override; void attachDictionary(const String & dictionary_name, const Context & context) override;
void detachDictionary(const String & dictionary_name, const Context & context, bool reload) override; void detachDictionary(const String & dictionary_name, const Context & context) override;
void shutdown() override; void shutdown() override;

View File

@ -35,7 +35,7 @@ void DatabaseMemory::createTable(
} }
void DatabaseMemory::attachDictionary(const String & /*name*/, const Context & /*context*/, bool /*reload*/) void DatabaseMemory::attachDictionary(const String & /*name*/, const Context & /*context*/)
{ {
throw Exception("There is no ATTACH DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("There is no ATTACH DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD);
} }
@ -57,7 +57,7 @@ void DatabaseMemory::removeTable(
} }
void DatabaseMemory::detachDictionary(const String & /*name*/, const Context & /*context*/, bool /*reload*/) void DatabaseMemory::detachDictionary(const String & /*name*/, const Context & /*context*/)
{ {
throw Exception("There is no DETACH DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD); throw Exception("There is no DETACH DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD);
} }

View File

@ -40,8 +40,7 @@ public:
void attachDictionary( void attachDictionary(
const String & name, const String & name,
const Context & context, const Context & context) override;
bool reload) override;
void removeTable( void removeTable(
const Context & context, const Context & context,
@ -53,8 +52,7 @@ public:
void detachDictionary( void detachDictionary(
const String & name, const String & name,
const Context & context, const Context & context) override;
bool reload) override;
time_t getObjectMetadataModificationTime(const Context & context, const String & table_name) override; time_t getObjectMetadataModificationTime(const Context & context, const String & table_name) override;

View File

@ -62,6 +62,11 @@ public:
void drop() override; void drop() override;
void detachDictionary(const String &, const Context &) override
{
throw Exception("MySQL database engine does not support detach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
}
String getMetadataPath() const override; String getMetadataPath() const override;
void createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override; void createTable(const Context &, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
@ -74,7 +79,7 @@ public:
void attachTable(const String & table_name, const StoragePtr & storage) override; void attachTable(const String & table_name, const StoragePtr & storage) override;
void detachDictionary(const String &, const Context &, bool) override void detachDictionary(const String &, const Context &) override
{ {
throw Exception("MySQL database engine does not support detach dictionary.", ErrorCodes::NOT_IMPLEMENTED); throw Exception("MySQL database engine does not support detach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
} }
@ -84,7 +89,12 @@ public:
throw Exception("MySQL database engine does not support remove dictionary.", ErrorCodes::NOT_IMPLEMENTED); throw Exception("MySQL database engine does not support remove dictionary.", ErrorCodes::NOT_IMPLEMENTED);
} }
void attachDictionary(const String &, const Context &, bool) override void attachTable(const String &, const StoragePtr &) override
{
throw Exception("MySQL database engine does not support attach table.", ErrorCodes::NOT_IMPLEMENTED);
}
void attachDictionary(const String &, const Context &) override
{ {
throw Exception("MySQL database engine does not support attach dictionary.", ErrorCodes::NOT_IMPLEMENTED); throw Exception("MySQL database engine does not support attach dictionary.", ErrorCodes::NOT_IMPLEMENTED);
} }

View File

@ -7,6 +7,7 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h> #include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExternalDictionariesLoader.h> #include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/ExternalLoaderPresetConfigRepository.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h> #include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h> #include <Parsers/ParserCreateQuery.h>
@ -18,6 +19,7 @@
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <Poco/DirectoryIterator.h> #include <Poco/DirectoryIterator.h>
@ -267,9 +269,11 @@ void DatabaseOnDisk::createDictionary(
{ {
const auto & settings = context.getSettingsRef(); const auto & settings = context.getSettingsRef();
/** The code is based on the assumption that all threads share the same order of operations /** The code is based on the assumption that all threads share the same order of operations:
* - creating the .sql.tmp file; * - create the .sql.tmp file;
* - adding a dictionary to `dictionaries`; * - add the dictionary to ExternalDictionariesLoader;
* - load the dictionary in case dictionaries_lazy_load == false;
* - attach the dictionary;
* - rename .sql.tmp to .sql. * - rename .sql.tmp to .sql.
*/ */
@ -278,17 +282,20 @@ void DatabaseOnDisk::createDictionary(
if (database.isDictionaryExist(context, dictionary_name)) if (database.isDictionaryExist(context, dictionary_name))
throw Exception("Dictionary " + backQuote(database.getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS); throw Exception("Dictionary " + backQuote(database.getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
/// A dictionary with the same full name could be defined in *.xml config files.
String full_name = database.getDatabaseName() + "." + dictionary_name;
auto & external_loader = const_cast<ExternalDictionariesLoader &>(context.getExternalDictionariesLoader());
if (external_loader.getCurrentStatus(full_name) != ExternalLoader::Status::NOT_EXIST)
throw Exception("Dictionary " + backQuote(full_name) + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
if (database.isTableExist(context, dictionary_name)) if (database.isTableExist(context, dictionary_name))
throw Exception("Table " + backQuote(database.getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); throw Exception("Table " + backQuote(database.getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
String dictionary_metadata_path = database.getObjectMetadataPath(dictionary_name); String dictionary_metadata_path = database.getObjectMetadataPath(dictionary_name);
String dictionary_metadata_tmp_path = dictionary_metadata_path + ".tmp"; String dictionary_metadata_tmp_path = dictionary_metadata_path + ".tmp";
String statement; String statement = getObjectDefinitionFromCreateQuery(query);
{ {
statement = getObjectDefinitionFromCreateQuery(query);
/// Exclusive flags guarantees, that table is not created right now in another thread. Otherwise, exception will be thrown. /// Exclusive flags guarantees, that table is not created right now in another thread. Otherwise, exception will be thrown.
WriteBufferFromFile out(dictionary_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL); WriteBufferFromFile out(dictionary_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out); writeString(statement, out);
@ -298,27 +305,48 @@ void DatabaseOnDisk::createDictionary(
out.close(); out.close();
} }
try bool succeeded = false;
{ SCOPE_EXIT({
/// Do not load it now because we want more strict loading if (!succeeded)
database.attachDictionary(dictionary_name, context, false); Poco::File(dictionary_metadata_tmp_path).remove();
/// Load dictionary });
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
String dict_name = database.getDatabaseName() + "." + dictionary_name;
context.getExternalDictionariesLoader().addDictionaryWithConfig(
dict_name, database.getDatabaseName(), query->as<const ASTCreateQuery &>(), !lazy_load);
/// If it was ATTACH query and file with dictionary metadata already exist /// Add a temporary repository containing the dictionary.
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one. /// We need this temp repository to try loading the dictionary before actually attaching it to the database.
Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path); static std::atomic<size_t> counter = 0;
String temp_repository_name = String(IExternalLoaderConfigRepository::INTERNAL_REPOSITORY_NAME_PREFIX) + " creating " + full_name + " "
+ std::to_string(++counter);
external_loader.addConfigRepository(
temp_repository_name,
std::make_unique<ExternalLoaderPresetConfigRepository>(
std::vector{std::pair{dictionary_metadata_tmp_path,
getDictionaryConfigurationFromAST(query->as<const ASTCreateQuery &>(), database.getDatabaseName())}}));
SCOPE_EXIT({ external_loader.removeConfigRepository(temp_repository_name); });
} bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
catch (...) if (!lazy_load)
{ {
database.detachDictionary(dictionary_name, context); /// loadStrict() is called here to force loading the dictionary, wait until the loading is finished,
Poco::File(dictionary_metadata_tmp_path).remove(); /// and throw an exception if the loading is failed.
throw; external_loader.loadStrict(full_name);
} }
database.attachDictionary(dictionary_name, context);
SCOPE_EXIT({
if (!succeeded)
database.detachDictionary(dictionary_name, context);
});
/// If it was ATTACH query and file with dictionary metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path);
/// ExternalDictionariesLoader doesn't know we renamed the metadata path.
/// So we have to manually call reloadConfig() here.
external_loader.reloadConfig(database.getDatabaseName(), full_name);
/// Everything's ok.
succeeded = true;
} }
@ -362,16 +390,18 @@ void DatabaseOnDisk::removeDictionary(
database.detachDictionary(dictionary_name, context); database.detachDictionary(dictionary_name, context);
String dictionary_metadata_path = database.getObjectMetadataPath(dictionary_name); String dictionary_metadata_path = database.getObjectMetadataPath(dictionary_name);
if (Poco::File(dictionary_metadata_path).exists())
try
{ {
Poco::File(dictionary_metadata_path).remove(); try
} {
catch (...) Poco::File(dictionary_metadata_path).remove();
{ }
/// If remove was not possible for some reason catch (...)
database.attachDictionary(dictionary_name, context); {
throw; /// If remove was not possible for some reason
database.attachDictionary(dictionary_name, context);
throw;
}
} }
} }

View File

@ -52,52 +52,68 @@ static constexpr size_t PRINT_MESSAGE_EACH_N_OBJECTS = 256;
static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5; static constexpr size_t PRINT_MESSAGE_EACH_N_SECONDS = 5;
static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768; static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
namespace namespace
{ {
void tryAttachTable(
Context & context,
void loadObject( const ASTCreateQuery & query,
Context & context, DatabaseOrdinary & database,
const ASTCreateQuery & query, const String database_data_path,
DatabaseOrdinary & database, const String & database_name,
const String database_data_path, bool has_force_restore_data_flag)
const String & database_name,
bool has_force_restore_data_flag)
try
{
if (query.is_dictionary)
{ {
String dictionary_name = query.table; assert(!query.is_dictionary);
database.attachDictionary(dictionary_name, context, false); try
{
String table_name;
StoragePtr table;
std::tie(table_name, table)
= createTableFromAST(query, database_name, database_data_path, context, has_force_restore_data_flag);
database.attachTable(table_name, table);
}
catch (const Exception & e)
{
throw Exception(
"Cannot attach table '" + query.table + "' from query " + serializeAST(query)
+ ". Error: " + DB::getCurrentExceptionMessage(true),
e,
DB::ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
}
} }
else
void tryAttachDictionary(
Context & context,
const ASTCreateQuery & query,
DatabaseOrdinary & database)
{ {
String table_name; assert(query.is_dictionary);
StoragePtr table; try
std::tie(table_name, table) {
= createTableFromAST(query, database_name, database_data_path, context, has_force_restore_data_flag); database.attachDictionary(query.table, context);
database.attachTable(table_name, table); }
catch (const Exception & e)
{
throw Exception(
"Cannot create dictionary '" + query.table + "' from query " + serializeAST(query)
+ ". Error: " + DB::getCurrentExceptionMessage(true),
e,
DB::ErrorCodes::CANNOT_CREATE_DICTIONARY_FROM_METADATA);
}
} }
}
catch (const Exception & e)
{
throw Exception(
"Cannot create object '" + query.table + "' from query " + serializeAST(query) + ". Error: " + DB::getCurrentExceptionMessage(true),
e, DB::ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
}
void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch) void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, AtomicStopwatch & watch)
{
if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{ {
LOG_INFO(log, std::fixed << std::setprecision(2) << processed * 100.0 / total << "%"); if (processed % PRINT_MESSAGE_EACH_N_OBJECTS == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
watch.restart(); {
LOG_INFO(log, std::fixed << std::setprecision(2) << processed * 100.0 / total << "%");
watch.restart();
}
} }
} }
}
DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_, const Context & context) DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_, const Context & context)
: DatabaseWithOwnTablesBase(std::move(name_)) : DatabaseWithOwnTablesBase(std::move(name_))
@ -151,22 +167,20 @@ void DatabaseOrdinary::loadStoredObjects(
std::atomic<size_t> tables_processed{0}; std::atomic<size_t> tables_processed{0};
std::atomic<size_t> dictionaries_processed{0}; std::atomic<size_t> dictionaries_processed{0};
auto loadOneObject = [&](const ASTCreateQuery & query)
{
loadObject(context, query, *this, getDataPath(), getDatabaseName(), has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
if (query.is_dictionary)
logAboutProgress(log, ++dictionaries_processed, total_dictionaries, watch);
else
logAboutProgress(log, ++tables_processed, total_tables, watch);
};
ThreadPool pool(SettingMaxThreads().getAutoValue()); ThreadPool pool(SettingMaxThreads().getAutoValue());
/// Attach tables.
for (const auto & name_with_query : file_names) for (const auto & name_with_query : file_names)
{ {
pool.scheduleOrThrowOnError([&]() { loadOneObject(name_with_query.second->as<const ASTCreateQuery &>()); }); const auto & create_query = name_with_query.second->as<const ASTCreateQuery &>();
if (!create_query.is_dictionary)
pool.scheduleOrThrowOnError([&]()
{
tryAttachTable(context, create_query, *this, getDataPath(), getDatabaseName(), has_force_restore_data_flag);
/// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++tables_processed, total_tables, watch);
});
} }
pool.wait(); pool.wait();
@ -178,16 +192,19 @@ void DatabaseOrdinary::loadStoredObjects(
auto dictionaries_repository = std::make_unique<ExternalLoaderDatabaseConfigRepository>(shared_from_this(), context); auto dictionaries_repository = std::make_unique<ExternalLoaderDatabaseConfigRepository>(shared_from_this(), context);
auto & external_loader = context.getExternalDictionariesLoader(); auto & external_loader = context.getExternalDictionariesLoader();
external_loader.addConfigRepository(getDatabaseName(), std::move(dictionaries_repository)); external_loader.addConfigRepository(getDatabaseName(), std::move(dictionaries_repository));
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
auto filter = [this](const std::string & dictionary_name) -> bool /// Attach dictionaries.
for (const auto & name_with_query : file_names)
{ {
if (!startsWith(dictionary_name, name + "." /* db name */)) auto create_query = name_with_query.second->as<const ASTCreateQuery &>();
return false; if (create_query.is_dictionary)
LOG_INFO(log, "Loading dictionary " << backQuote(dictionary_name) << ", for database " << backQuote(name)); {
return true; tryAttachDictionary(context, create_query, *this);
};
external_loader.reload(filter, !lazy_load); /// Messages, so that it's not boring to wait for the server to load for a long time.
logAboutProgress(log, ++dictionaries_processed, total_dictionaries, watch);
}
}
} }

View File

@ -161,19 +161,21 @@ StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
return res; return res;
} }
void DatabaseWithOwnTablesBase::detachDictionary(const String & dictionary_name, const Context & context, bool reload) void DatabaseWithOwnTablesBase::detachDictionary(const String & dictionary_name, const Context & context)
{ {
String full_name = getDatabaseName() + "." + dictionary_name;
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = dictionaries.find(dictionary_name); auto it = dictionaries.find(dictionary_name);
if (it == dictionaries.end()) if (it == dictionaries.end())
throw Exception("Dictionary " + name + "." + dictionary_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE); throw Exception("Dictionary " + full_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
dictionaries.erase(it); dictionaries.erase(it);
} }
if (reload) /// ExternalLoader::reloadConfig() will find out that the dictionary's config has been removed
context.getExternalDictionariesLoader().reload(getDatabaseName() + "." + dictionary_name); /// and therefore it will unload the dictionary.
const auto & external_loader = context.getExternalDictionariesLoader();
external_loader.reloadConfig(getDatabaseName(), full_name);
} }
void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const StoragePtr & table) void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const StoragePtr & table)
@ -184,22 +186,19 @@ void DatabaseWithOwnTablesBase::attachTable(const String & table_name, const Sto
} }
void DatabaseWithOwnTablesBase::attachDictionary(const String & dictionary_name, const Context & context, bool load) void DatabaseWithOwnTablesBase::attachDictionary(const String & dictionary_name, const Context & context)
{ {
const auto & external_loader = context.getExternalDictionariesLoader();
String full_name = getDatabaseName() + "." + dictionary_name; String full_name = getDatabaseName() + "." + dictionary_name;
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto status = external_loader.getCurrentStatus(full_name); if (!dictionaries.emplace(dictionary_name).second)
if (status != ExternalLoader::Status::NOT_EXIST || !dictionaries.emplace(dictionary_name).second) throw Exception("Dictionary " + full_name + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
throw Exception(
"Dictionary " + full_name + " already exists.",
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
} }
if (load) /// ExternalLoader::reloadConfig() will find out that the dictionary's config has been added
external_loader.reload(full_name, true); /// and in case `dictionaries_lazy_load == false` it will load the dictionary.
const auto & external_loader = context.getExternalDictionariesLoader();
external_loader.reloadConfig(getDatabaseName(), full_name);
} }
void DatabaseWithOwnTablesBase::shutdown() void DatabaseWithOwnTablesBase::shutdown()

View File

@ -33,11 +33,11 @@ public:
void attachTable(const String & table_name, const StoragePtr & table) override; void attachTable(const String & table_name, const StoragePtr & table) override;
void attachDictionary(const String & name, const Context & context, bool reload) override; void attachDictionary(const String & name, const Context & context) override;
StoragePtr detachTable(const String & table_name) override; StoragePtr detachTable(const String & table_name) override;
void detachDictionary(const String & name, const Context & context, bool reload) override; void detachDictionary(const String & name, const Context & context) override;
DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name = {}) override; DatabaseTablesIteratorPtr getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name = {}) override;

View File

@ -165,14 +165,14 @@ public:
virtual void attachTable(const String & name, const StoragePtr & table) = 0; virtual void attachTable(const String & name, const StoragePtr & table) = 0;
/// Add dictionary to the database, but do not add it to the metadata. The database may not support this method. /// Add dictionary to the database, but do not add it to the metadata. The database may not support this method.
/// load is false when we starting up and lazy_load is true, so we don't want to load dictionaries synchronously. /// If dictionaries_lazy_load is false it also starts loading the dictionary asynchronously.
virtual void attachDictionary(const String & name, const Context & context, bool reload = true) = 0; virtual void attachDictionary(const String & name, const Context & context) = 0;
/// Forget about the table without deleting it, and return it. The database may not support this method. /// Forget about the table without deleting it, and return it. The database may not support this method.
virtual StoragePtr detachTable(const String & name) = 0; virtual StoragePtr detachTable(const String & name) = 0;
/// Forget about the dictionary without deleting it, and return it. The database may not support this method. /// Forget about the dictionary without deleting it. The database may not support this method.
virtual void detachDictionary(const String & name, const Context & context, bool reload = true) = 0; virtual void detachDictionary(const String & name, const Context & context) = 0;
/// Rename the table and possibly move the table to another database. /// Rename the table and possibly move the table to another database.
virtual void renameTable( virtual void renameTable(

View File

@ -414,7 +414,7 @@ void checkPrimaryKey(const std::unordered_set<std::string> & all_attrs, const Na
} }
DictionaryConfigurationPtr getDictionaryConfigurationFromAST(const ASTCreateQuery & query) DictionaryConfigurationPtr getDictionaryConfigurationFromAST(const ASTCreateQuery & query, const String & database_name)
{ {
checkAST(query); checkAST(query);
@ -427,7 +427,8 @@ DictionaryConfigurationPtr getDictionaryConfigurationFromAST(const ASTCreateQuer
AutoPtr<Poco::XML::Element> name_element(xml_document->createElement("name")); AutoPtr<Poco::XML::Element> name_element(xml_document->createElement("name"));
current_dictionary->appendChild(name_element); current_dictionary->appendChild(name_element);
AutoPtr<Text> name(xml_document->createTextNode(query.database + "." + query.table)); String full_name = (!database_name.empty() ? database_name : query.database) + "." + query.table;
AutoPtr<Text> name(xml_document->createTextNode(full_name));
name_element->appendChild(name); name_element->appendChild(name);
AutoPtr<Element> structure_element(xml_document->createElement("structure")); AutoPtr<Element> structure_element(xml_document->createElement("structure"));

View File

@ -10,6 +10,6 @@ using DictionaryConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfigurati
/// Convert dictionary AST to Poco::AbstractConfiguration /// Convert dictionary AST to Poco::AbstractConfiguration
/// This function is necessary because all loadable objects configuration are Poco::AbstractConfiguration /// This function is necessary because all loadable objects configuration are Poco::AbstractConfiguration
/// Can throw exception if query is ill-formed /// Can throw exception if query is ill-formed
DictionaryConfigurationPtr getDictionaryConfigurationFromAST(const ASTCreateQuery & query); DictionaryConfigurationPtr getDictionaryConfigurationFromAST(const ASTCreateQuery & query, const String & database_name = {});
} }

View File

@ -1,7 +1,5 @@
#include <Interpreters/ExternalDictionariesLoader.h> #include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/Context.h>
#include <Dictionaries/DictionaryFactory.h> #include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
namespace DB namespace DB
{ {
@ -28,15 +26,4 @@ void ExternalDictionariesLoader::addConfigRepository(
ExternalLoader::addConfigRepository(repository_name, std::move(config_repository), {"dictionary", "name"}); ExternalLoader::addConfigRepository(repository_name, std::move(config_repository), {"dictionary", "name"});
} }
void ExternalDictionariesLoader::addDictionaryWithConfig(
const String & dictionary_name, const String & repo_name, const ASTCreateQuery & query, bool load_never_loading) const
{
ExternalLoader::addObjectAndLoad(
dictionary_name, /// names are equal
dictionary_name,
repo_name,
getDictionaryConfigurationFromAST(query),
"dictionary", load_never_loading);
}
} }

View File

@ -1,17 +1,14 @@
#pragma once #pragma once
#include <Dictionaries/IDictionary.h> #include <Dictionaries/IDictionary.h>
#include <Interpreters/IExternalLoaderConfigRepository.h>
#include <Interpreters/ExternalLoader.h> #include <Interpreters/ExternalLoader.h>
#include <common/logger_useful.h>
#include <Parsers/ASTCreateQuery.h>
#include <memory> #include <memory>
namespace DB namespace DB
{ {
class Context; class Context;
class IExternalLoaderConfigRepository;
/// Manages user-defined dictionaries. /// Manages user-defined dictionaries.
class ExternalDictionariesLoader : public ExternalLoader class ExternalDictionariesLoader : public ExternalLoader
@ -36,14 +33,6 @@ public:
const std::string & repository_name, const std::string & repository_name,
std::unique_ptr<IExternalLoaderConfigRepository> config_repository); std::unique_ptr<IExternalLoaderConfigRepository> config_repository);
/// Starts reloading of a specified object.
void addDictionaryWithConfig(
const String & dictionary_name,
const String & repo_name,
const ASTCreateQuery & query,
bool load_never_loading = false) const;
protected: protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config, LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config) const override; const std::string & key_in_config) const override;
@ -52,7 +41,6 @@ protected:
friend class DatabaseDictionary; friend class DatabaseDictionary;
private: private:
Context & context; Context & context;
}; };

View File

@ -10,6 +10,8 @@
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <ext/chrono_io.h> #include <ext/chrono_io.h>
#include <ext/scope_guard.h> #include <ext/scope_guard.h>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/copy.hpp>
namespace DB namespace DB
@ -39,10 +41,10 @@ struct LoadingGuardForAsyncLoad
struct ExternalLoader::ObjectConfig struct ExternalLoader::ObjectConfig
{ {
String config_path;
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config; Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
String key_in_config; String key_in_config;
String repository_name; String repository_name;
String path;
}; };
@ -57,226 +59,258 @@ public:
} }
~LoadablesConfigReader() = default; ~LoadablesConfigReader() = default;
void addConfigRepository( using RepositoryPtr = std::unique_ptr<IExternalLoaderConfigRepository>;
const String & name,
std::unique_ptr<IExternalLoaderConfigRepository> repository, void addConfigRepository(const String & repository_name, RepositoryPtr repository, const ExternalLoaderConfigSettings & settings)
const ExternalLoaderConfigSettings & settings)
{ {
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
repositories.emplace(name, std::make_pair(std::move(repository), settings)); RepositoryInfo repository_info{std::move(repository), settings, {}};
repositories.emplace(repository_name, std::move(repository_info));
need_collect_object_configs = true;
} }
void removeConfigRepository(const String & name) RepositoryPtr removeConfigRepository(const String & repository_name)
{ {
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
repositories.erase(name); auto it = repositories.find(repository_name);
if (it == repositories.end())
return nullptr;
auto repository = std::move(it->second.repository);
repositories.erase(it);
need_collect_object_configs = true;
return repository;
} }
using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>; using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>;
/// Reads all repositories.
/// Reads configurations.
ObjectConfigsPtr read() ObjectConfigsPtr read()
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
// Check last modification times of files and read those files which are new or changed. readRepositories();
if (!readLoadablesInfos()) collectObjectConfigs();
return configs; // Nothing changed, so we can return the previous result. return object_configs;
return collectConfigs();
} }
ObjectConfig updateLoadableInfo( /// Reads only a specified repository.
const String & external_name, /// This functions checks only a specified repository but returns configs from all repositories.
const String & object_name, ObjectConfigsPtr read(const String & repository_name)
const String & repo_name,
const Poco::AutoPtr<Poco::Util::AbstractConfiguration> & config,
const String & key)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
readRepositories(repository_name);
collectObjectConfigs();
return object_configs;
}
auto it = loadables_infos.find(object_name); /// Reads only a specified path from a specified repository.
if (it == loadables_infos.end()) /// This functions checks only a specified repository but returns configs from all repositories.
{ ObjectConfigsPtr read(const String & repository_name, const String & path)
LoadablesInfos loadable_info; {
loadables_infos[object_name] = loadable_info; std::lock_guard lock(mutex);
} readRepositories(repository_name, path);
auto & loadable_info = loadables_infos[object_name]; collectObjectConfigs();
ObjectConfig object_config{object_name, config, key, repo_name}; return object_configs;
bool found = false;
for (auto iter = loadable_info.configs.begin(); iter != loadable_info.configs.end(); ++iter)
{
if (iter->first == external_name)
{
iter->second = object_config;
found = true;
break;
}
}
if (!found)
loadable_info.configs.emplace_back(external_name, object_config);
loadable_info.last_update_time = Poco::Timestamp{}; /// now
loadable_info.in_use = true;
return object_config;
} }
private: private:
struct LoadablesInfos struct FileInfo
{ {
Poco::Timestamp last_update_time = 0; Poco::Timestamp last_update_time = 0;
std::vector<std::pair<String, ObjectConfig>> configs; // Parsed loadable's contents. std::vector<std::pair<String, ObjectConfig>> objects; // Parsed contents of the file.
bool in_use = true; // Whether the `LoadablesInfos` should be destroyed because the correspondent loadable is deleted. bool in_use = true; // Whether the `FileInfo` should be destroyed because the correspondent file is deleted.
}; };
/// Collect current configurations struct RepositoryInfo
ObjectConfigsPtr collectConfigs()
{ {
// Generate new result. RepositoryPtr repository;
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>(); ExternalLoaderConfigSettings settings;
for (const auto & [path, loadable_info] : loadables_infos) std::unordered_map<String /* path */, FileInfo> files;
};
/// Reads the repositories.
/// Checks last modification times of files and read those files which are new or changed.
void readRepositories(const std::optional<String> & only_repository_name = {}, const std::optional<String> & only_path = {})
{
Strings repository_names;
if (only_repository_name)
{ {
for (const auto & [name, config] : loadable_info.configs) if (repositories.count(*only_repository_name))
repository_names.push_back(*only_repository_name);
}
else
boost::copy(repositories | boost::adaptors::map_keys, std::back_inserter(repository_names));
for (const auto & repository_name : repository_names)
{
auto & repository_info = repositories[repository_name];
for (auto & file_info : repository_info.files | boost::adaptors::map_values)
file_info.in_use = false;
Strings existing_paths;
if (only_path)
{ {
auto already_added_it = new_configs->find(name); if (repository_info.repository->exists(*only_path))
if (already_added_it != new_configs->end()) existing_paths.push_back(*only_path);
{
const auto & already_added = already_added_it->second;
LOG_WARNING(log, path << ": " << type_name << " '" << name << "' is found "
<< ((path == already_added.config_path)
? ("twice in the same file")
: ("both in file '" + already_added.config_path + "' and '" + path + "'")));
continue;
}
new_configs->emplace(name, config);
} }
} else
boost::copy(repository_info.repository->getAllLoadablesDefinitionNames(), std::back_inserter(existing_paths));
configs = new_configs; for (const auto & path : existing_paths)
return configs;
}
/// Read files and store them to the map ` loadables_infos`.
bool readLoadablesInfos()
{
bool changed = false;
for (auto & name_and_loadable_info : loadables_infos)
{
LoadablesInfos & loadable_info = name_and_loadable_info.second;
loadable_info.in_use = false;
}
for (const auto & [repo_name, repo_with_settings] : repositories)
{
const auto names = repo_with_settings.first->getAllLoadablesDefinitionNames();
for (const auto & loadable_name : names)
{ {
auto it = loadables_infos.find(loadable_name); auto it = repository_info.files.find(path);
if (it != loadables_infos.end()) if (it != repository_info.files.end())
{ {
LoadablesInfos & loadable_info = it->second; FileInfo & file_info = it->second;
if (readLoadablesInfo(repo_name, *repo_with_settings.first, loadable_name, repo_with_settings.second, loadable_info)) if (readFileInfo(file_info, *repository_info.repository, path, repository_info.settings))
changed = true; need_collect_object_configs = true;
} }
else else
{ {
LoadablesInfos loadable_info; FileInfo file_info;
if (readLoadablesInfo(repo_name, *repo_with_settings.first, loadable_name, repo_with_settings.second, loadable_info)) if (readFileInfo(file_info, *repository_info.repository, path, repository_info.settings))
{ {
loadables_infos.emplace(loadable_name, std::move(loadable_info)); repository_info.files.emplace(path, std::move(file_info));
changed = true; need_collect_object_configs = true;
} }
} }
} }
}
std::vector<String> deleted_names; Strings deleted_paths;
for (auto & [path, loadable_info] : loadables_infos) for (auto & [path, file_info] : repository_info.files)
if (!loadable_info.in_use) {
deleted_names.emplace_back(path); if (file_info.in_use)
if (!deleted_names.empty()) continue;
{
for (const String & deleted_name : deleted_names) if (only_path && (*only_path != path))
loadables_infos.erase(deleted_name); continue;
changed = true;
deleted_paths.emplace_back(path);
}
if (!deleted_paths.empty())
{
for (const String & deleted_path : deleted_paths)
repository_info.files.erase(deleted_path);
need_collect_object_configs = true;
}
} }
return changed;
} }
bool readLoadablesInfo( /// Reads a file, returns true if the file is new or changed.
const String & repo_name, bool readFileInfo(
FileInfo & file_info,
IExternalLoaderConfigRepository & repository, IExternalLoaderConfigRepository & repository,
const String & object_name, const String & path,
const ExternalLoaderConfigSettings & settings, const ExternalLoaderConfigSettings & settings) const
LoadablesInfos & loadable_info) const
{ {
try try
{ {
if (object_name.empty() || !repository.exists(object_name)) if (path.empty() || !repository.exists(path))
{ {
LOG_WARNING(log, "Config file '" + object_name + "' does not exist"); LOG_WARNING(log, "Config file '" + path + "' does not exist");
return false; return false;
} }
auto update_time_from_repository = repository.getUpdateTime(object_name); auto update_time_from_repository = repository.getUpdateTime(path);
/// Actually it can't be less, but for sure we check less or equal /// Actually it can't be less, but for sure we check less or equal
if (update_time_from_repository <= loadable_info.last_update_time) if (update_time_from_repository <= file_info.last_update_time)
{ {
loadable_info.in_use = true; file_info.in_use = true;
return false; return false;
} }
auto file_contents = repository.load(object_name); auto file_contents = repository.load(path);
/// get all objects' definitions /// get all objects' definitions
Poco::Util::AbstractConfiguration::Keys keys; Poco::Util::AbstractConfiguration::Keys keys;
file_contents->keys(keys); file_contents->keys(keys);
/// for each object defined in repositories /// for each object defined in repositories
std::vector<std::pair<String, ObjectConfig>> configs_from_file; std::vector<std::pair<String, ObjectConfig>> object_configs_from_file;
for (const auto & key : keys) for (const auto & key : keys)
{ {
if (!startsWith(key, settings.external_config)) if (!startsWith(key, settings.external_config))
{ {
if (!startsWith(key, "comment") && !startsWith(key, "include_from")) if (!startsWith(key, "comment") && !startsWith(key, "include_from"))
LOG_WARNING(log, object_name << ": file contains unknown node '" << key << "', expected '" << settings.external_config << "'"); LOG_WARNING(log, path << ": file contains unknown node '" << key << "', expected '" << settings.external_config << "'");
continue; continue;
} }
String external_name = file_contents->getString(key + "." + settings.external_name); String object_name = file_contents->getString(key + "." + settings.external_name);
if (external_name.empty()) if (object_name.empty())
{ {
LOG_WARNING(log, object_name << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed"); LOG_WARNING(log, path << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed");
continue; continue;
} }
configs_from_file.emplace_back(external_name, ObjectConfig{object_name, file_contents, key, repo_name}); object_configs_from_file.emplace_back(object_name, ObjectConfig{file_contents, key, {}, {}});
} }
loadable_info.configs = std::move(configs_from_file); file_info.objects = std::move(object_configs_from_file);
loadable_info.last_update_time = update_time_from_repository; file_info.last_update_time = update_time_from_repository;
loadable_info.in_use = true; file_info.in_use = true;
return true; return true;
} }
catch (...) catch (...)
{ {
tryLogCurrentException(log, "Failed to load config for dictionary '" + object_name + "'"); tryLogCurrentException(log, "Failed to load config file '" + path + "'");
return false; return false;
} }
} }
/// Builds a map of current configurations of objects.
void collectObjectConfigs()
{
if (!need_collect_object_configs)
return;
need_collect_object_configs = false;
// Generate new result.
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>();
for (const auto & [repository_name, repository_info] : repositories)
{
for (const auto & [path, file_info] : repository_info.files)
{
for (const auto & [object_name, object_config] : file_info.objects)
{
auto already_added_it = new_configs->find(object_name);
if (already_added_it == new_configs->end())
{
auto & new_config = new_configs->emplace(object_name, object_config).first->second;
new_config.repository_name = repository_name;
new_config.path = path;
}
else
{
const auto & already_added = already_added_it->second;
if (!startsWith(repository_name, IExternalLoaderConfigRepository::INTERNAL_REPOSITORY_NAME_PREFIX) &&
!startsWith(already_added.repository_name, IExternalLoaderConfigRepository::INTERNAL_REPOSITORY_NAME_PREFIX))
{
LOG_WARNING(
log,
type_name << " '" << object_name << "' is found "
<< (((path == already_added.path) && repository_name == already_added.repository_name)
? ("twice in the same file '" + path + "'")
: ("both in file '" + already_added.path + "' and '" + path + "'")));
}
}
}
}
}
object_configs = new_configs;
}
const String type_name; const String type_name;
Logger * log; Logger * log;
std::mutex mutex; std::mutex mutex;
using RepositoryPtr = std::unique_ptr<IExternalLoaderConfigRepository>; std::unordered_map<String, RepositoryInfo> repositories;
using RepositoryWithSettings = std::pair<RepositoryPtr, ExternalLoaderConfigSettings>; ObjectConfigsPtr object_configs;
std::unordered_map<String, RepositoryWithSettings> repositories; bool need_collect_object_configs = false;
ObjectConfigsPtr configs;
std::unordered_map<String /* config path */, LoadablesInfos> loadables_infos;
}; };
@ -338,10 +372,11 @@ public:
else else
{ {
const auto & new_config = new_config_it->second; const auto & new_config = new_config_it->second;
if (!isSameConfiguration(*info.object_config.config, info.object_config.key_in_config, *new_config.config, new_config.key_in_config)) bool config_is_same = isSameConfiguration(*info.object_config.config, info.object_config.key_in_config, *new_config.config, new_config.key_in_config);
info.object_config = new_config;
if (!config_is_same)
{ {
/// Configuration has been changed. /// Configuration has been changed.
info.object_config = new_config;
info.config_changed = true; info.config_changed = true;
if (info.wasLoading()) if (info.wasLoading())
@ -376,12 +411,6 @@ public:
event.notify_all(); event.notify_all();
} }
void setSingleObjectConfigurationWithoutLoading(const String & external_name, const ObjectConfig & config)
{
std::lock_guard lock{mutex};
infos.emplace(external_name, Info{config});
}
/// Sets whether all the objects from the configuration should be always loaded (even if they aren't used). /// Sets whether all the objects from the configuration should be always loaded (even if they aren't used).
void enableAlwaysLoadEverything(bool enable) void enableAlwaysLoadEverything(bool enable)
{ {
@ -662,7 +691,7 @@ private:
result.exception = exception; result.exception = exception;
result.loading_start_time = loading_start_time; result.loading_start_time = loading_start_time;
result.loading_duration = loadingDuration(); result.loading_duration = loadingDuration();
result.origin = object_config.config_path; result.origin = object_config.path;
result.repository_name = object_config.repository_name; result.repository_name = object_config.repository_name;
return result; return result;
} }
@ -1091,12 +1120,14 @@ void ExternalLoader::addConfigRepository(
const ExternalLoaderConfigSettings & config_settings) const ExternalLoaderConfigSettings & config_settings)
{ {
config_files_reader->addConfigRepository(repository_name, std::move(config_repository), config_settings); config_files_reader->addConfigRepository(repository_name, std::move(config_repository), config_settings);
loading_dispatcher->setConfiguration(config_files_reader->read()); reloadConfig(repository_name);
} }
void ExternalLoader::removeConfigRepository(const std::string & repository_name) std::unique_ptr<IExternalLoaderConfigRepository> ExternalLoader::removeConfigRepository(const std::string & repository_name)
{ {
config_files_reader->removeConfigRepository(repository_name); auto repository = config_files_reader->removeConfigRepository(repository_name);
reloadConfig(repository_name);
return repository;
} }
void ExternalLoader::enableAlwaysLoadEverything(bool enable) void ExternalLoader::enableAlwaysLoadEverything(bool enable)
@ -1189,40 +1220,36 @@ void ExternalLoader::load(Loadables & loaded_objects, Duration timeout) const
void ExternalLoader::reload(const String & name, bool load_never_loading) const void ExternalLoader::reload(const String & name, bool load_never_loading) const
{ {
auto configs = config_files_reader->read(); reloadConfig();
loading_dispatcher->setConfiguration(configs);
loading_dispatcher->reload(name, load_never_loading); loading_dispatcher->reload(name, load_never_loading);
} }
void ExternalLoader::reload(bool load_never_loading) const void ExternalLoader::reload(bool load_never_loading) const
{ {
loading_dispatcher->setConfiguration(config_files_reader->read()); reloadConfig();
loading_dispatcher->reload(load_never_loading); loading_dispatcher->reload(load_never_loading);
} }
void ExternalLoader::reload(const FilterByNameFunction & filter_by_name, bool load_never_loading) const void ExternalLoader::reload(const FilterByNameFunction & filter_by_name, bool load_never_loading) const
{ {
loading_dispatcher->setConfiguration(config_files_reader->read()); reloadConfig();
loading_dispatcher->reload(filter_by_name, load_never_loading); loading_dispatcher->reload(filter_by_name, load_never_loading);
} }
void ExternalLoader::addObjectAndLoad( void ExternalLoader::reloadConfig() const
const String & name,
const String & external_name,
const String & repo_name,
const Poco::AutoPtr<Poco::Util::AbstractConfiguration> & config,
const String & key,
bool load_never_loading) const
{ {
auto object_config = config_files_reader->updateLoadableInfo(external_name, name, repo_name, config, key); loading_dispatcher->setConfiguration(config_files_reader->read());
loading_dispatcher->setSingleObjectConfigurationWithoutLoading(external_name, object_config);
LoadablePtr loaded_object;
if (load_never_loading)
loading_dispatcher->loadStrict(name, loaded_object);
else
loading_dispatcher->load(name, loaded_object, Duration::zero());
} }
void ExternalLoader::reloadConfig(const String & repository_name) const
{
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name));
}
void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const
{
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path));
}
ExternalLoader::LoadablePtr ExternalLoader::createObject( ExternalLoader::LoadablePtr ExternalLoader::createObject(
const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const

View File

@ -87,7 +87,7 @@ public:
const ExternalLoaderConfigSettings & config_settings); const ExternalLoaderConfigSettings & config_settings);
/// Removes a repository which were used to read configurations. /// Removes a repository which were used to read configurations.
void removeConfigRepository(const std::string & repository_name); std::unique_ptr<IExternalLoaderConfigRepository> removeConfigRepository(const std::string & repository_name);
/// Sets whether all the objects from the configuration should be always loaded (even those which are never used). /// Sets whether all the objects from the configuration should be always loaded (even those which are never used).
void enableAlwaysLoadEverything(bool enable); void enableAlwaysLoadEverything(bool enable);
@ -128,15 +128,18 @@ public:
/// Tries to finish loading of a specified object during the timeout. /// Tries to finish loading of a specified object during the timeout.
/// Returns nullptr if the loading is unsuccessful or if there is no such object. /// Returns nullptr if the loading is unsuccessful or if there is no such object.
void load(const String & name, LoadablePtr & loaded_object, Duration timeout = NO_TIMEOUT) const; void load(const String & name, LoadablePtr & loaded_object, Duration timeout = NO_TIMEOUT) const;
void load(const String & name) const { LoadablePtr object; load(name, object, Duration::zero()); }
LoadablePtr loadAndGet(const String & name, Duration timeout = NO_TIMEOUT) const { LoadablePtr object; load(name, object, timeout); return object; } LoadablePtr loadAndGet(const String & name, Duration timeout = NO_TIMEOUT) const { LoadablePtr object; load(name, object, timeout); return object; }
LoadablePtr tryGetLoadable(const String & name) const { return loadAndGet(name); } LoadablePtr tryGetLoadable(const String & name) const { return loadAndGet(name); }
/// Tries to finish loading of a specified object during the timeout. /// Tries to finish loading of a specified object during the timeout.
/// Throws an exception if the loading is unsuccessful or if there is no such object. /// Throws an exception if the loading is unsuccessful or if there is no such object.
void loadStrict(const String & name, LoadablePtr & loaded_object) const; void loadStrict(const String & name, LoadablePtr & loaded_object) const;
void loadStrict(const String & name) const { LoadablePtr object; loadStrict(name, object); }
LoadablePtr getLoadable(const String & name) const { LoadablePtr object; loadStrict(name, object); return object; } LoadablePtr getLoadable(const String & name) const { LoadablePtr object; loadStrict(name, object); return object; }
/// Tries to finish loading of the objects for which the specified function returns true. /// Tries to finish loading of the objects for which the specified function returns true.
void load(const FilterByNameFunction & filter_by_name) const { Loadables objects; load(filter_by_name, objects, Duration::zero()); }
void load(const FilterByNameFunction & filter_by_name, Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) const; void load(const FilterByNameFunction & filter_by_name, Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) const;
void load(const FilterByNameFunction & filter_by_name, LoadResults & load_results, Duration timeout = NO_TIMEOUT) const; void load(const FilterByNameFunction & filter_by_name, LoadResults & load_results, Duration timeout = NO_TIMEOUT) const;
Loadables loadAndGet(const FilterByNameFunction & filter_by_name, Duration timeout = NO_TIMEOUT) const { Loadables loaded_objects; load(filter_by_name, loaded_objects, timeout); return loaded_objects; } Loadables loadAndGet(const FilterByNameFunction & filter_by_name, Duration timeout = NO_TIMEOUT) const { Loadables loaded_objects; load(filter_by_name, loaded_objects, timeout); return loaded_objects; }
@ -160,18 +163,18 @@ public:
/// The function can either skip them (false) or load for the first time (true). /// The function can either skip them (false) or load for the first time (true).
void reload(const FilterByNameFunction & filter_by_name, bool load_never_loading = false) const; void reload(const FilterByNameFunction & filter_by_name, bool load_never_loading = false) const;
/// Reloads all config repositories.
void reloadConfig() const;
/// Reloads only a specified config repository.
void reloadConfig(const String & repository_name) const;
/// Reload only a specified path in a specified config repository.
void reloadConfig(const String & repository_name, const String & path) const;
protected: protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0; virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0;
/// Reload object with already parsed configuration
void addObjectAndLoad(
const String & name, /// name of dictionary
const String & external_name, /// name of source (example xml-file, may contain more than dictionary)
const String & repo_name, /// name of repository (database name, or all xml files)
const Poco::AutoPtr<Poco::Util::AbstractConfiguration> & config,
const String & key_in_config, /// key where we can start search of loadables (<dictionary>, <model>, etc)
bool load_never_loading = false) const;
private: private:
struct ObjectConfig; struct ObjectConfig;

View File

@ -0,0 +1,49 @@
#include <Interpreters/ExternalLoaderPresetConfigRepository.h>
#include <Common/Exception.h>
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/adaptor/map.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
ExternalLoaderPresetConfigRepository::ExternalLoaderPresetConfigRepository(const std::vector<std::pair<String, LoadablesConfigurationPtr>> & preset_)
{
boost::range::copy(preset_, std::inserter(preset, preset.end()));
}
ExternalLoaderPresetConfigRepository::~ExternalLoaderPresetConfigRepository() = default;
std::set<String> ExternalLoaderPresetConfigRepository::getAllLoadablesDefinitionNames() const
{
std::set<String> paths;
boost::range::copy(preset | boost::adaptors::map_keys, std::inserter(paths, paths.end()));
return paths;
}
bool ExternalLoaderPresetConfigRepository::exists(const String& path) const
{
return preset.count(path);
}
Poco::Timestamp ExternalLoaderPresetConfigRepository::getUpdateTime(const String & path)
{
if (!exists(path))
throw Exception("Loadable " + path + " not found", ErrorCodes::BAD_ARGUMENTS);
return creation_time;
}
/// May contain definition about several entities (several dictionaries in one .xml file)
LoadablesConfigurationPtr ExternalLoaderPresetConfigRepository::load(const String & path) const
{
auto it = preset.find(path);
if (it == preset.end())
throw Exception("Loadable " + path + " not found", ErrorCodes::BAD_ARGUMENTS);
return it->second;
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Core/Types.h>
#include <unordered_map>
#include <Interpreters/IExternalLoaderConfigRepository.h>
#include <Poco/Timestamp.h>
namespace DB
{
/// A config repository filled with preset loadables used by ExternalLoader.
class ExternalLoaderPresetConfigRepository : public IExternalLoaderConfigRepository
{
public:
ExternalLoaderPresetConfigRepository(const std::vector<std::pair<String, LoadablesConfigurationPtr>> & preset_);
~ExternalLoaderPresetConfigRepository() override;
std::set<String> getAllLoadablesDefinitionNames() const override;
bool exists(const String & path) const override;
Poco::Timestamp getUpdateTime(const String & path) override;
LoadablesConfigurationPtr load(const String & path) const override;
private:
std::unordered_map<String, LoadablesConfigurationPtr> preset;
Poco::Timestamp creation_time;
};
}

View File

@ -0,0 +1,7 @@
#include <Interpreters/IExternalLoaderConfigRepository.h>
namespace DB
{
const char * IExternalLoaderConfigRepository::INTERNAL_REPOSITORY_NAME_PREFIX = "\xFF internal repo ";
}

View File

@ -36,6 +36,8 @@ public:
virtual LoadablesConfigurationPtr load(const std::string & loadable_definition_name) const = 0; virtual LoadablesConfigurationPtr load(const std::string & loadable_definition_name) const = 0;
virtual ~IExternalLoaderConfigRepository() = default; virtual ~IExternalLoaderConfigRepository() = default;
static const char * INTERNAL_REPOSITORY_NAME_PREFIX;
}; };
using ExternalLoaderConfigRepositoryPtr = std::unique_ptr<IExternalLoaderConfigRepository>; using ExternalLoaderConfigRepositoryPtr = std::unique_ptr<IExternalLoaderConfigRepository>;

View File

@ -50,14 +50,21 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
const auto & external_dictionaries = context.getExternalDictionariesLoader(); const auto & external_dictionaries = context.getExternalDictionariesLoader();
for (const auto & [dict_name, load_result] : external_dictionaries.getCurrentLoadResults()) for (const auto & [dict_name, load_result] : external_dictionaries.getCurrentLoadResults())
{ {
if (startsWith(load_result.repository_name, IExternalLoaderConfigRepository::INTERNAL_REPOSITORY_NAME_PREFIX))
continue;
size_t i = 0; size_t i = 0;
String database;
String short_name = dict_name;
res_columns[i++]->insert(load_result.repository_name); if (!load_result.repository_name.empty() && startsWith(dict_name, load_result.repository_name + "."))
if (!load_result.repository_name.empty()) {
res_columns[i++]->insert(dict_name.substr(load_result.repository_name.length() + 1)); database = load_result.repository_name;
else short_name = dict_name.substr(load_result.repository_name.length() + 1);
res_columns[i++]->insert(dict_name); }
res_columns[i++]->insert(database);
res_columns[i++]->insert(short_name);
res_columns[i++]->insert(static_cast<Int8>(load_result.status)); res_columns[i++]->insert(static_cast<Int8>(load_result.status));
res_columns[i++]->insert(load_result.origin); res_columns[i++]->insert(load_result.origin);