Better introspection

This commit is contained in:
alesapin 2019-10-17 16:05:12 +03:00
parent 3987a45ea2
commit 6dfe5c7142
17 changed files with 160 additions and 127 deletions

View File

@ -35,25 +35,29 @@ void DatabaseDictionary::loadStoredObjects(Context &, bool)
Tables DatabaseDictionary::listTables(const Context & context, const FilterByNameFunction & filter_by_name)
{
Tables tables;
ExternalLoader::Loadables loadables;
ExternalLoader::LoadResults load_results;
if (filter_by_name)
{
/// If `filter_by_name` is set, we iterate through all dictionaries with such names. That's why we need to load all of them.
loadables = context.getExternalDictionariesLoader().loadAndGet(filter_by_name);
context.getExternalDictionariesLoader().load(filter_by_name, load_results);
}
else
{
/// If `filter_by_name` isn't set, we iterate through only already loaded dictionaries. We don't try to load all dictionaries in this case.
loadables = context.getExternalDictionariesLoader().getCurrentlyLoadedObjects();
load_results = context.getExternalDictionariesLoader().getCurrentLoadResults();
}
for (const auto & loadable : loadables)
for (const auto & [object_name, info]: load_results)
{
auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(loadable);
auto dict_name = dict_ptr->getName();
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[dict_name] = StorageDictionary::create(getDatabaseName(), dict_name, ColumnsDescription{columns}, context, true, dict_name);
/// Load tables only from XML dictionaries, don't touch other
if (info.object != nullptr && info.repository_name.empty())
{
auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(info.object);
auto dict_name = dict_ptr->getName();
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
tables[dict_name] = StorageDictionary::create(getDatabaseName(), dict_name, ColumnsDescription{columns}, context, true, dict_name);
}
}
return tables;
}

View File

@ -10,6 +10,7 @@ namespace ErrorCodes
{
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY;
extern const int UNSUPPORTED_METHOD;
}
DatabaseMemory::DatabaseMemory(String name_)
@ -34,12 +35,17 @@ void DatabaseMemory::createTable(
}
void DatabaseMemory::attachDictionary(const String & /*name*/, const Context & /*context*/, bool /*load*/)
{
throw Exception("There is no ATTACH DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD);
}
void DatabaseMemory::createDictionary(
const Context & context,
const String & dictionary_name,
const Context & /*context*/,
const String & /*dictionary_name*/,
const ASTPtr & /*query*/)
{
attachDictionary(dictionary_name, context, true);
throw Exception("There is no CREATE DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD);
}
@ -51,11 +57,17 @@ void DatabaseMemory::removeTable(
}
void DatabaseMemory::removeDictionary(
const Context & context,
const String & dictionary_name)
void DatabaseMemory::detachDictionary(const String & /*name*/, const Context & /*context*/)
{
detachDictionary(dictionary_name, context);
throw Exception("There is no DETACH DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD);
}
void DatabaseMemory::removeDictionary(
const Context & /*context*/,
const String & /*dictionary_name*/)
{
throw Exception("There is no DROP DICTIONARY query for DatabaseMemory", ErrorCodes::UNSUPPORTED_METHOD);
}

View File

@ -38,6 +38,11 @@ public:
const String & dictionary_name,
const ASTPtr & query) override;
void attachDictionary(
const String & name,
const Context & context,
bool load) override;
void removeTable(
const Context & context,
const String & table_name) override;
@ -46,6 +51,10 @@ public:
const Context & context,
const String & dictionary_name) override;
void detachDictionary(
const String & name,
const Context & context) override;
time_t getObjectMetadataModificationTime(const Context & context, const String & table_name) override;
ASTPtr getCreateTableQuery(const Context & context, const String & table_name) const override;

View File

@ -10,6 +10,7 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
#include <Storages/IStorage.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageFactory.h>
#include <Common/typeid_cast.h>
#include <TableFunctions/TableFunctionFactory.h>
@ -30,12 +31,30 @@ namespace ErrorCodes
extern const int DICTIONARY_ALREADY_EXISTS;
}
namespace
{
StoragePtr getDictionaryStorage(const Context & context, const String & table_name, const String & db_name)
{
auto dict_name = db_name + "." + table_name;
auto dict_ptr = context.getExternalDictionariesLoader().tryGetDictionary(dict_name);
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
return StorageDictionary::create(db_name, table_name, ColumnsDescription{columns}, context, true, dict_name);
}
return nullptr;
}
}
bool DatabaseWithOwnTablesBase::isTableExist(
const Context & /*context*/,
const String & table_name) const
{
std::lock_guard lock(mutex);
return tables.find(table_name) != tables.end();
return tables.find(table_name) != tables.end() || dictionaries.find(table_name) != dictionaries.end();
}
bool DatabaseWithOwnTablesBase::isDictionaryExist(
@ -47,25 +66,55 @@ bool DatabaseWithOwnTablesBase::isDictionaryExist(
}
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(
const Context & /*context*/,
const Context & context,
const String & table_name) const
{
std::lock_guard lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
{
if (dictionaries.count(table_name))
return getDictionaryStorage(context, table_name, getDatabaseName());
return {};
}
return it->second;
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
{
std::lock_guard lock(mutex);
Tables tables_copy = tables;
if (!filter_by_table_name)
{
for (const String & dictionary_name : dictionaries)
if (auto dictionary_storage = getDictionaryStorage(context, dictionary_name, getDatabaseName()); dictionary_storage)
tables_copy.emplace(dictionary_name, dictionary_storage);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables_copy);
}
Tables filtered_tables;
for (const auto & [table_name, storage] : tables)
if (filter_by_table_name(table_name))
filtered_tables.emplace(table_name, storage);
for (const String & dictionary_name : dictionaries)
if (filter_by_table_name(dictionary_name))
if (auto dictionary_storage = getDictionaryStorage(context, dictionary_name, getDatabaseName()); dictionary_storage)
tables_copy.emplace(dictionary_name, dictionary_storage);
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(filtered_tables));
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(const Context & /*context*/, const FilterByNameFunction & filter_by_table_name)
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
return std::make_unique<DatabaseTablesSnapshotIterator>(tables);
Tables filtered_tables;
for (const auto & [table_name, storage] : tables)
if (filter_by_table_name(table_name))
filtered_tables.emplace(table_name, storage);
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(filtered_tables));
}
@ -94,6 +143,9 @@ StoragePtr DatabaseWithOwnTablesBase::detachTable(const String & table_name)
StoragePtr res;
{
std::lock_guard lock(mutex);
if (dictionaries.count(table_name))
throw Exception("Cannot detach dictionary " + name + "." + table_name + " as table, use DETACH DICTIONARY query.", ErrorCodes::UNKNOWN_TABLE);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);

View File

@ -43,6 +43,8 @@ public:
DatabaseDictionariesIteratorPtr getDictionariesIterator(const Context & context, const FilterByNameFunction & filter_by_dictionary_name = {}) override;
DatabaseTablesIteratorPtr getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_dictionary_name = {}) override;
void shutdown() override;
virtual ~DatabaseWithOwnTablesBase() override;

View File

@ -129,6 +129,12 @@ public:
/// Get an iterator to pass through all the dictionaries.
virtual DatabaseDictionariesIteratorPtr getDictionariesIterator(const Context & context, const FilterByNameFunction & filter_by_dictionary_name = {}) = 0;
/// Get an iterator to pass through all the tables and dictionary tables.
virtual DatabaseTablesIteratorPtr getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_name = {})
{
return getTablesIterator(context, filter_by_name);
}
/// Is the database empty.
virtual bool empty(const Context & context) const = 0;

View File

@ -11,7 +11,7 @@ ExternalDictionariesLoader::ExternalDictionariesLoader(
: ExternalLoader("external dictionary", &Logger::get("ExternalDictionariesLoader"))
, context(context_)
{
addConfigRepository("_XMLConfigRepository", std::move(config_repository), {"dictionary", "name"});
addConfigRepository("", std::move(config_repository), {"dictionary", "name"});
enableAsyncLoading(true);
enablePeriodicUpdates(true);
}

View File

@ -26,6 +26,7 @@ struct ExternalLoader::ObjectConfig
String config_path;
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
String key_in_config;
String repository_name;
};
@ -107,7 +108,7 @@ private:
loadable_info.in_use = false;
}
for (const auto & [name, repo_with_settings] : repositories)
for (const auto & [repo_name, repo_with_settings] : repositories)
{
const auto names = repo_with_settings.first->getAllLoadablesDefinitionNames();
for (const auto & loadable_name : names)
@ -116,13 +117,13 @@ private:
if (it != loadables_infos.end())
{
LoadablesInfos & loadable_info = it->second;
if (readLoadablesInfo(*repo_with_settings.first, loadable_name, repo_with_settings.second, loadable_info))
if (readLoadablesInfo(repo_name, *repo_with_settings.first, loadable_name, repo_with_settings.second, loadable_info))
changed = true;
}
else
{
LoadablesInfos loadable_info;
if (readLoadablesInfo(*repo_with_settings.first, loadable_name, repo_with_settings.second, loadable_info))
if (readLoadablesInfo(repo_name, *repo_with_settings.first, loadable_name, repo_with_settings.second, loadable_info))
{
loadables_infos.emplace(loadable_name, std::move(loadable_info));
changed = true;
@ -145,6 +146,7 @@ private:
}
bool readLoadablesInfo(
const String & repo_name,
IExternalLoaderConfigRepository & repository,
const String & name,
const ExternalLoaderConfigSettings & settings,
@ -191,7 +193,7 @@ private:
continue;
}
configs_from_file.emplace_back(external_name, ObjectConfig{name, file_contents, key});
configs_from_file.emplace_back(external_name, ObjectConfig{name, file_contents, key, repo_name});
}
loadable_info.configs = std::move(configs_from_file);
@ -441,8 +443,17 @@ public:
loaded_objects = collectLoadedObjects(filter_by_name);
}
/// Tries to finish loading of the objects for which the specified function returns true.
void load(const FilterByNameFunction & filter_by_name, LoadResults & loaded_results, Duration timeout = NO_TIMEOUT)
{
std::unique_lock lock{mutex};
loadImpl(filter_by_name, timeout, lock);
loaded_results = collectLoadResults(filter_by_name);
}
/// Tries to finish loading of all the objects during the timeout.
void load(Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) { load(allNames, loaded_objects, timeout); }
void load(LoadResults & loaded_results, Duration timeout = NO_TIMEOUT) { load(allNames, loaded_results, timeout); }
/// Starts reloading a specified object.
void reload(const String & name, bool load_never_loading = false)
@ -581,6 +592,7 @@ private:
result.loading_start_time = loading_start_time;
result.loading_duration = loadingDuration();
result.origin = config.config_path;
result.repository_name = config.repository_name;
return result;
}
@ -627,8 +639,10 @@ private:
LoadResults load_results;
load_results.reserve(infos.size());
for (const auto & [name, info] : infos)
{
if (filter_by_name(name))
load_results.emplace_back(name, info.loadResult());
}
return load_results;
}
@ -1052,6 +1066,16 @@ void ExternalLoader::load(const FilterByNameFunction & filter_by_name, Loadables
loading_dispatcher->load(loaded_objects, timeout);
}
void ExternalLoader::load(const FilterByNameFunction & filter_by_name, LoadResults & loaded_objects, Duration timeout) const
{
if (filter_by_name)
loading_dispatcher->load(filter_by_name, loaded_objects, timeout);
else
loading_dispatcher->load(loaded_objects, timeout);
}
void ExternalLoader::load(Loadables & loaded_objects, Duration timeout) const
{
return loading_dispatcher->load(loaded_objects, timeout);

View File

@ -72,6 +72,7 @@ public:
TimePoint loading_start_time;
Duration loading_duration;
std::exception_ptr exception;
std::string repository_name;
};
using LoadResults = std::vector<std::pair<String, LoadResult>>;
@ -137,6 +138,7 @@ public:
/// Tries to finish loading of the objects for which the specified function returns true.
void load(const FilterByNameFunction & filter_by_name, Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) const;
void load(const FilterByNameFunction & filter_by_name, LoadResults & load_results, Duration timeout = NO_TIMEOUT) const;
Loadables loadAndGet(const FilterByNameFunction & filter_by_name, Duration timeout = NO_TIMEOUT) const { Loadables loaded_objects; load(filter_by_name, loaded_objects, timeout); return loaded_objects; }
/// Tries to finish loading of all the objects during the timeout.

View File

@ -13,6 +13,7 @@ namespace DB
using LoadablesConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Base interface for configurations source for Loadble objects, which can be
/// loaded with ExternalLoader. Configurations may came from filesystem (XML-files),
/// server memory (from database), etc. It's important that main result of this class

View File

@ -292,7 +292,7 @@ BlockInputStreams StorageSystemColumns::read(
const DatabasePtr database = databases.at(database_name);
offsets[i] = i ? offsets[i - 1] : 0;
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
for (auto iterator = database->getTablesWithDictionaryTablesIterator(context); iterator->isValid(); iterator->next())
{
const String & table_name = iterator->name();
storages.emplace(std::piecewise_construct,

View File

@ -19,44 +19,6 @@
namespace DB
{
namespace
{
NameSet getFilteredDatabases(const ASTPtr & query, const Context & context)
{
MutableColumnPtr column = ColumnString::create();
for (const auto & db : context.getDatabases())
column->insert(db.first);
Block block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database")};
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
NameSet result;
for (size_t i = 0; i < block.rows(); ++i)
result.insert(block.getByPosition(0).column->getDataAt(i).toString());
return result;
}
NameSet getFilteredDictionaries(const ASTPtr & query, const Context & context, const DatabasePtr & database)
{
MutableColumnPtr column = ColumnString::create();
auto dicts_it = database->getDictionariesIterator(context);
while (dicts_it->isValid())
{
column->insert(dicts_it->name());
dicts_it->next();
}
Block block{ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "dictionary")};
VirtualColumnUtils::filterBlockWithQuery(query, block, context);
NameSet result;
for (size_t i = 0; i < block.rows(); ++i)
result.insert(block.getByPosition(0).column->getDataAt(i).toString());
return result;
}
}
NamesAndTypesList StorageSystemDictionaries::getNamesAndTypes()
{
return {
@ -81,15 +43,19 @@ NamesAndTypesList StorageSystemDictionaries::getNamesAndTypes()
};
}
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & /*query_info*/) const
{
const auto & external_dictionaries = context.getExternalDictionariesLoader();
for (const auto & [dict_name, load_result] : external_dictionaries.getCurrentLoadResults())
{
size_t i = 0;
res_columns[i++]->insert("");
res_columns[i++]->insert(dict_name);
res_columns[i++]->insert(load_result.repository_name);
if (!load_result.repository_name.empty())
res_columns[i++]->insert(dict_name.substr(load_result.repository_name.length() + 1));
else
res_columns[i++]->insert(dict_name);
res_columns[i++]->insert(static_cast<Int8>(load_result.status));
res_columns[i++]->insert(load_result.origin);
@ -128,27 +94,6 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
else
res_columns[i++]->insertDefault();
}
/// Temporary code for testing TODO(alesapin)
NameSet databases = getFilteredDatabases(query_info.query, context);
for (auto database : databases)
{
DatabasePtr database_ptr = context.getDatabase(database);
auto dictionaries_set = getFilteredDictionaries(query_info.query, context, database_ptr);
auto filter = [&dictionaries_set](const String & dict_name) { return dictionaries_set.count(dict_name); };
auto dictionaries_it = database_ptr->getDictionariesIterator(context, filter);
while (dictionaries_it->isValid())
{
size_t i = 0;
res_columns[i++]->insert(database);
res_columns[i++]->insert(dictionaries_it->name());
for (size_t j = 0; j < getNamesAndTypes().size() - 2; ++j)
res_columns[i++]->insertDefault();
dictionaries_it->next();
}
}
}
}

View File

@ -193,7 +193,7 @@ protected:
}
if (!tables_it || !tables_it->isValid())
tables_it = database->getTablesIterator(context);
tables_it = database->getTablesWithDictionaryTablesIterator(context);
const bool need_lock_structure = needLockStructure(database, header);

View File

@ -12,14 +12,6 @@ ordinary_db dict1
==DROP DICTIONARY
0
=DICTIONARY in Memory DB
dict2
1
memory_db dict2
==DETACH DICTIONARY
0
==ATTACH DICTIONARY
0
==DROP DICTIONARY
0
=DICTIONARY in Dictionary DB
=DICTIONARY in Lazy DB

View File

@ -89,7 +89,7 @@ CREATE DICTIONARY memory_db.dict2
PRIMARY KEY key_column
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict' PASSWORD '' DB 'database_for_dict'))
LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
LAYOUT(FLAT()); -- {serverError 1}
SHOW CREATE DICTIONARY memory_db.dict2; -- {serverError 485}
@ -99,37 +99,6 @@ EXISTS DICTIONARY memory_db.dict2;
SELECT database, name FROM system.dictionaries WHERE name LIKE 'dict2';
SELECT '==DETACH DICTIONARY';
DETACH DICTIONARY memory_db.dict2;
SHOW DICTIONARIES FROM memory_db LIKE 'dict2';
EXISTS DICTIONARY memory_db.dict2;
SELECT database, name FROM system.dictionaries WHERE name LIKE 'dict2';
SELECT '==ATTACH DICTIONARY';
ATTACH DICTIONARY memory_db.dict2; --{serverError 485}
SHOW DICTIONARIES FROM memory_db LIKE 'dict2';
EXISTS DICTIONARY memory_db.dict2;
SELECT database, name FROM system.dictionaries WHERE name LIKE 'dict2';
SELECT '==DROP DICTIONARY';
DROP DICTIONARY IF EXISTS memory_db.dict2;
SHOW DICTIONARIES FROM memory_db LIKE 'dict2';
EXISTS DICTIONARY memory_db.dict2;
SELECT database, name FROM system.dictionaries WHERE name LIKE 'dict2';
DROP DATABASE IF EXISTS memory_db;
DROP DATABASE IF EXISTS dictionary_db;
CREATE DATABASE dictionary_db ENGINE = Dictionary;

View File

@ -1,5 +1,8 @@
11
11
144
144
7
7
17
11
@ -7,3 +10,8 @@
7
11
6
dict1 Dictionary
dict2 Dictionary
table_for_dict MergeTree
database_for_dict dict1 ComplexKeyCache
database_for_dict dict2 Hashed

View File

@ -31,8 +31,11 @@ LIFETIME(MIN 1 MAX 10)
LAYOUT(FLAT());
SELECT dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(11));
SELECT second_column FROM database_for_dict.dict1 WHERE key_column = 11;
SELECT dictGetString('database_for_dict.dict1', 'third_column', toUInt64(12));
SELECT third_column FROM database_for_dict.dict1 WHERE key_column = 12;
SELECT dictGetFloat64('database_for_dict.dict1', 'fourth_column', toUInt64(14));
SELECT fourth_column FROM database_for_dict.dict1 WHERE key_column = 14;
select count(distinct(dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(number)))) from numbers(100);
@ -84,4 +87,8 @@ LAYOUT(HASHED());
SELECT dictGetString('database_for_dict.dict2', 'some_column', toUInt64(12));
SELECT name, engine FROM system.tables WHERE database = 'database_for_dict' ORDER BY name;
SELECT database, name, type FROM system.dictionaries WHERE database = 'database_for_dict' ORDER BY name;
DROP DATABASE IF EXISTS database_for_dict;