Half working code

This commit is contained in:
alesapin 2019-10-15 21:04:17 +03:00
parent 3e068b81de
commit e690a3ca32
14 changed files with 109 additions and 40 deletions

View File

@ -6,6 +6,7 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
@ -308,6 +309,10 @@ void DatabaseOnDisk::createDictionary(
Poco::File(dictionary_metadata_tmp_path).remove();
throw;
}
const auto & config = context.getConfigRef();
context.getExternalDictionariesLoader().reload(
database.getDatabaseName() + "." + dictionary_name, config.getBool("dictionaries_lazy_load", true));
}

View File

@ -11,6 +11,8 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExternalLoaderDatabaseConfigRepository.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserDictionary.h>
@ -160,9 +162,6 @@ void DatabaseOrdinary::loadStoredObjects(
});
if (file_names.empty())
return;
size_t total_tables = file_names.size() - total_dictionaries;
LOG_INFO(log, "Total " << total_tables << " tables and " << total_dictionaries << " dictionaries.");
@ -191,6 +190,7 @@ void DatabaseOrdinary::loadStoredObjects(
/// After all tables was basically initialized, startup them.
startupTables(pool);
loadDictionaries(context);
}
@ -217,6 +217,14 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
thread_pool.wait();
}
void DatabaseOrdinary::loadDictionaries(Context & context)
{
LOG_INFO(log, "Loading dictionaries.");
auto dictionaries_repository = std::make_unique<ExternalLoaderDatabaseConfigRepository>(shared_from_this(), context);
context.getExternalDictionariesLoader().addConfigRepository(getDatabaseName(), std::move(dictionaries_repository), {});
}
void DatabaseOrdinary::createTable(
const Context & context,

View File

@ -92,6 +92,7 @@ private:
Poco::Logger * log;
void startupTables(ThreadPool & thread_pool);
void loadDictionaries(Context & context);
};
}

View File

@ -359,6 +359,7 @@ void buildSourceConfiguration(AutoPtr<Document> doc, AutoPtr<Element> root, cons
void checkAST(const ASTCreateQuery & query)
{
std::cerr << queryToString(query) << std::endl;
if (!query.is_dictionary || query.dictionary == nullptr)
throw Exception("Cannot convert dictionary to configuration from non-dictionary AST.", ErrorCodes::INCORRECT_DICTIONARY_DEFINITION);

View File

@ -1035,8 +1035,6 @@ void Context::addDatabase(const String & database_name, const DatabasePtr & data
assertDatabaseDoesntExist(database_name);
shared->databases[database_name] = database;
auto dictionaries_repository = std::make_unique<ExternalLoaderDatabaseConfigRepository>(database, *this);
getExternalDictionariesLoader().addConfigRepository(std::move(dictionaries_repository), {});
}
@ -1045,6 +1043,7 @@ DatabasePtr Context::detachDatabase(const String & database_name)
auto lock = getLock();
auto res = getDatabase(database_name);
getExternalDictionariesLoader().removeConfigRepository(database_name);
shared->databases.erase(database_name);
return res;
}

View File

@ -11,7 +11,7 @@ ExternalDictionariesLoader::ExternalDictionariesLoader(
: ExternalLoader("external dictionary", &Logger::get("ExternalDictionariesLoader"))
, context(context_)
{
addConfigRepository(std::move(config_repository), {"dictionary", "name"});
addConfigRepository("_XMLConfigRepository", std::move(config_repository), {"dictionary", "name"});
enableAsyncLoading(true);
enablePeriodicUpdates(true);
}

View File

@ -40,12 +40,19 @@ public:
}
~LoadablesConfigReader() = default;
void addConfigRepository(std::unique_ptr<IExternalLoaderConfigRepository> repository, const ExternalLoaderConfigSettings & settings)
void addConfigRepository(const String & name, std::unique_ptr<IExternalLoaderConfigRepository> repository, const ExternalLoaderConfigSettings & settings)
{
std::lock_guard lock{mutex};
repositories.emplace_back(std::move(repository), std::move(settings));
repositories.emplace(name, std::make_pair(std::move(repository), settings));
}
void removeConfigRepository(const String & name)
{
std::lock_guard lock{mutex};
repositories.erase(name);
}
using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>;
/// Reads configuration files.
@ -100,22 +107,22 @@ private:
loadable_info.in_use = false;
}
for (const auto & [repository, settings] : repositories)
for (const auto & [name, repo_with_settings] : repositories)
{
const auto names = repository->getAllLoadablesDefinitionNames();
const auto names = repo_with_settings.first->getAllLoadablesDefinitionNames();
for (const auto & name : names)
{
auto it = loadables_infos.find(name);
if (it != loadables_infos.end())
{
LoadablesInfos & loadable_info = it->second;
if (readLoadablesInfo(*repository, name, settings, loadable_info))
if (readLoadablesInfo(*repo_with_settings.first, name, repo_with_settings.second, loadable_info))
changed = true;
}
else
{
LoadablesInfos loadable_info;
if (readLoadablesInfo(*repository, name, settings, loadable_info))
if (readLoadablesInfo(*repo_with_settings.first, name, repo_with_settings.second, loadable_info))
{
loadables_infos.emplace(name, std::move(loadable_info));
changed = true;
@ -147,7 +154,7 @@ private:
{
if (path.empty() || !repository.exists(path))
{
LOG_WARNING(log, "config file '" + path + "' does not exist");
LOG_WARNING(log, "Config file '" + path + "' does not exist");
return false;
}
@ -203,7 +210,9 @@ private:
Logger * log;
std::mutex mutex;
std::vector<std::pair<std::unique_ptr<IExternalLoaderConfigRepository>, ExternalLoaderConfigSettings>> repositories;
using RepositoryPtr = std::unique_ptr<IExternalLoaderConfigRepository>;
using RepositoryWithSettings = std::pair<RepositoryPtr, ExternalLoaderConfigSettings>;
std::unordered_map<String, RepositoryWithSettings> repositories;
ObjectConfigsPtr configs;
std::unordered_map<String /* config path */, LoadablesInfos> loadables_infos;
};
@ -956,9 +965,17 @@ ExternalLoader::ExternalLoader(const String & type_name_, Logger * log)
ExternalLoader::~ExternalLoader() = default;
void ExternalLoader::addConfigRepository(
std::unique_ptr<IExternalLoaderConfigRepository> config_repository, const ExternalLoaderConfigSettings & config_settings)
const std::string & repository_name,
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
const ExternalLoaderConfigSettings & config_settings)
{
config_files_reader->addConfigRepository(std::move(config_repository), config_settings);
config_files_reader->addConfigRepository(repository_name, std::move(config_repository), config_settings);
loading_dispatcher->setConfiguration(config_files_reader->read());
}
void ExternalLoader::removeConfigRepository(const std::string & repository_name)
{
config_files_reader->removeConfigRepository(repository_name);
loading_dispatcher->setConfiguration(config_files_reader->read());
}
@ -1040,13 +1057,13 @@ void ExternalLoader::load(Loadables & loaded_objects, Duration timeout) const
return loading_dispatcher->load(loaded_objects, timeout);
}
void ExternalLoader::reload(const String & name, bool load_never_loading)
void ExternalLoader::reload(const String & name, bool load_never_loading) const
{
loading_dispatcher->setConfiguration(config_files_reader->read());
loading_dispatcher->reload(name, load_never_loading);
}
void ExternalLoader::reload(bool load_never_loading)
void ExternalLoader::reload(bool load_never_loading) const
{
loading_dispatcher->setConfiguration(config_files_reader->read());
loading_dispatcher->reload(load_never_loading);

View File

@ -81,7 +81,12 @@ public:
/// Adds a repository which will be used to read configurations from.
void addConfigRepository(
std::unique_ptr<IExternalLoaderConfigRepository> config_repository, const ExternalLoaderConfigSettings & config_settings);
const std::string & repository_name,
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
const ExternalLoaderConfigSettings & config_settings);
/// Removes a repository which were used to read configurations.
void removeConfigRepository(const std::string & repository_name);
/// Sets whether all the objects from the configuration should be always loaded (even those which are never used).
void enableAlwaysLoadEverything(bool enable);
@ -140,12 +145,12 @@ public:
/// Starts reloading of a specified object.
/// `load_never_loading` specifies what to do if the object has never been loading before.
/// The function can either skip it (false) or load for the first time (true).
void reload(const String & name, bool load_never_loading = false);
void reload(const String & name, bool load_never_loading = false) const;
/// Starts reloading of all the objects.
/// `load_never_loading` specifies what to do with the objects which have never been loading before.
/// The function can either skip them (false) or load for the first time (true).
void reload(bool load_never_loading = false);
void reload(bool load_never_loading = false) const;
protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0;

View File

@ -19,7 +19,7 @@ String trimDatabaseName(const std::string & loadable_definition_name, const Data
"Loadable '" + loadable_definition_name + "' is not from database '" + database->getDatabaseName(), ErrorCodes::UNKNOWN_DICTIONARY);
/// dbname.loadable_name
///--> remove <---
return loadable_definition_name.substr(dbname.length());
return loadable_definition_name.substr(dbname.length() + 1);
}
}
@ -31,7 +31,12 @@ LoadablesConfigurationPtr ExternalLoaderDatabaseConfigRepository::load(const std
bool ExternalLoaderDatabaseConfigRepository::exists(const std::string & loadable_definition_name) const
{
return database->isDictionaryExist(context, trimDatabaseName(loadable_definition_name, database));
std::cerr << "IS EXISTS:"
<< loadable_definition_name << std::endl;
std::cerr << "CUTTED:"
<< trimDatabaseName(loadable_definition_name, database) << std::endl;
return database->isDictionaryExist(
context, trimDatabaseName(loadable_definition_name, database));
}
Poco::Timestamp ExternalLoaderDatabaseConfigRepository::getUpdateTime(const std::string & loadable_definition_name)
@ -49,6 +54,7 @@ std::set<std::string> ExternalLoaderDatabaseConfigRepository::getAllLoadablesDef
result.insert(dbname + "." + itr->name());
itr->next();
}
std::cerr << "RESULTSIZE:" << result.size() << std::endl;
return result;
}

View File

@ -15,7 +15,7 @@ ExternalModelsLoader::ExternalModelsLoader(
: ExternalLoader("external model", &Logger::get("ExternalModelsLoader"))
, context(context_)
{
addConfigRepository(std::move(config_repository), {"model", "name"});
addConfigRepository("_XMLConfigRepository", std::move(config_repository), {"model", "name"});
enablePeriodicUpdates(true);
}

View File

@ -161,6 +161,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
try
{
std::cerr << "ADDING DB NAME:" << database_name << std::endl;
context.addDatabase(database_name, database);
if (need_write_metadata)
@ -721,6 +722,13 @@ BlockIO InterpreterCreateQuery::createDictionary(ASTCreateQuery & create)
"Dictionary " + database_name + "." + dictionary_name + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
}
if (create.attach)
{
auto query = context.getCreateDictionaryQuery(database_name, dictionary_name);
create = query->as<ASTCreateQuery &>();
create.attach = true;
}
auto res = DictionaryFactory::instance().create(dictionary_name, create, context.getGlobalContext());
if (create.attach)
database->attachDictionary(dictionary_name, res);

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/InterpreterDropQuery.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Parsers/ASTDropQuery.h>
#include <Storages/IStorage.h>
#include <Common/escapeForFileName.h>

View File

@ -18,9 +18,7 @@ memory_db dict2
==DETACH DICTIONARY
0
==ATTACH DICTIONARY
dict2
1
memory_db dict2
0
==DROP DICTIONARY
0
=DICTIONARY in Dictionary DB

View File

@ -1,5 +1,22 @@
SET send_logs_level = 'none';
DROP DATABASE IF EXISTS database_for_dict;
CREATE DATABASE database_for_dict Engine = Ordinary;
DROP TABLE IF EXISTS database_for_dict.table_for_dict;
CREATE TABLE database_for_dict.table_for_dict
(
key_column UInt64,
second_column UInt8,
third_column String
)
ENGINE = MergeTree()
ORDER BY key_column;
INSERT INTO database_for_dict.table_for_dict VALUES (1, 100, 'Hello world');
DROP DATABASE IF EXISTS ordinary_db;
CREATE DATABASE ordinary_db ENGINE = Ordinary;
@ -15,7 +32,7 @@ CREATE DICTIONARY ordinary_db.dict1
third_column String DEFAULT 'qqq'
)
PRIMARY KEY key_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD ''))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
@ -65,12 +82,12 @@ SELECT '=DICTIONARY in Memory DB';
CREATE DICTIONARY memory_db.dict2
(
key_column UInt64 DEFAULT 0 INJECTIVE HIERARCHICAL,
key_column UInt64 DEFAULT 0 INJECTIVE,
second_column UInt8 DEFAULT 1 EXPRESSION rand() % 222,
third_column String DEFAULT 'qqq'
)
PRIMARY KEY key_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD ''))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
@ -93,7 +110,7 @@ SELECT database, name FROM system.dictionaries WHERE name LIKE 'dict2';
SELECT '==ATTACH DICTIONARY';
ATTACH DICTIONARY memory_db.dict2;
ATTACH DICTIONARY memory_db.dict2; --{serverError 485}
SHOW DICTIONARIES FROM memory_db LIKE 'dict2';
@ -121,14 +138,14 @@ SELECT '=DICTIONARY in Dictionary DB';
CREATE DICTIONARY dictionary_db.dict2
(
key_column UInt64 DEFAULT 0 INJECTIVE HIERARCHICAL,
key_column UInt64 DEFAULT 0 INJECTIVE,
second_column UInt8 DEFAULT 1 EXPRESSION rand() % 222,
third_column String DEFAULT 'qqq'
)
PRIMARY KEY key_column, second_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD ''))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT()); -- {serverError 1}
LAYOUT(COMPLEX_KEY_HASHED()); -- {serverError 1}
DROP DATABASE IF EXISTS dictionary_db;
@ -140,18 +157,17 @@ CREATE DATABASE lazy_db ENGINE = Lazy(1);
CREATE DICTIONARY lazy_db.dict3
(
key_column UInt64 DEFAULT 0 INJECTIVE HIERARCHICAL,
key_column UInt64 DEFAULT 0 INJECTIVE,
second_column UInt8 DEFAULT 1 EXPRESSION rand() % 222,
third_column String DEFAULT 'qqq'
)
PRIMARY KEY key_column, second_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD ''))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT()); -- {serverError 1}
LAYOUT(COMPLEX_KEY_HASHED()); -- {serverError 1}
DROP DATABASE IF EXISTS lazy_db;
SELECT '=DROP DATABASE WITH DICTIONARY';
DROP DATABASE IF EXISTS ordinary_db;
@ -165,7 +181,7 @@ CREATE DICTIONARY ordinary_db.dict4
third_column String DEFAULT 'qqq'
)
PRIMARY KEY key_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD ''))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
@ -184,10 +200,14 @@ CREATE DICTIONARY ordinary_db.dict4
third_column String DEFAULT 'qqq'
)
PRIMARY KEY key_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD ''))
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
SHOW DICTIONARIES FROM ordinary_db;
DROP DATABASE IF EXISTS ordinary_db;
DROP TABLE IF EXISTS database_for_dict.table_for_dict;
DROP DATABASE IF EXISTS database_for_dict;