Fix deadlock, add test and refactoring

This commit is contained in:
alesapin 2019-10-17 20:18:54 +03:00
parent 6dfe5c7142
commit 05392fd882
10 changed files with 187 additions and 43 deletions

View File

@ -184,10 +184,11 @@ void DatabaseOrdinary::loadStoredObjects(
/// After all tables was basically initialized, startup them.
startupTables(pool);
/// Add database as repository
auto dictionaries_repository = std::make_unique<ExternalLoaderDatabaseConfigRepository>(shared_from_this(), context);
context.getExternalDictionariesLoader().addConfigRepository(
getDatabaseName(), std::move(dictionaries_repository), {"dictionary", "name"});
auto & external_loader = context.getExternalDictionariesLoader();
external_loader.addConfigRepository(getDatabaseName(), std::move(dictionaries_repository));
}

View File

@ -37,7 +37,8 @@ namespace
StoragePtr getDictionaryStorage(const Context & context, const String & table_name, const String & db_name)
{
auto dict_name = db_name + "." + table_name;
auto dict_ptr = context.getExternalDictionariesLoader().tryGetDictionary(dict_name);
const auto & external_loader = context.getExternalDictionariesLoader();
auto dict_ptr = external_loader.tryGetDictionary(dict_name);
if (dict_ptr)
{
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
@ -69,39 +70,41 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(
const Context & context,
const String & table_name) const
{
std::lock_guard lock(mutex);
auto it = tables.find(table_name);
if (it == tables.end())
{
if (dictionaries.count(table_name))
return getDictionaryStorage(context, table_name, getDatabaseName());
return {};
std::lock_guard lock(mutex);
auto it = tables.find(table_name);
if (it != tables.end())
return it->second;
}
return it->second;
if (isDictionaryExist(context, table_name))
/// We don't need lock database here, because database doesn't store dictionary itself
/// just metadata
return getDictionaryStorage(context, table_name, getDatabaseName());
return {};
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesWithDictionaryTablesIterator(const Context & context, const FilterByNameFunction & filter_by_name)
{
std::lock_guard lock(mutex);
Tables tables_copy = tables;
if (!filter_by_table_name)
auto tables_it = getTablesIterator(context, filter_by_name);
auto dictionaries_it = getDictionariesIterator(context, filter_by_name);
Tables result;
while (tables_it && tables_it->isValid())
{
for (const String & dictionary_name : dictionaries)
if (auto dictionary_storage = getDictionaryStorage(context, dictionary_name, getDatabaseName()); dictionary_storage)
tables_copy.emplace(dictionary_name, dictionary_storage);
return std::make_unique<DatabaseTablesSnapshotIterator>(tables_copy);
result.emplace(tables_it->name(), tables_it->table());
tables_it->next();
}
Tables filtered_tables;
for (const auto & [table_name, storage] : tables)
if (filter_by_table_name(table_name))
filtered_tables.emplace(table_name, storage);
for (const String & dictionary_name : dictionaries)
if (filter_by_table_name(dictionary_name))
if (auto dictionary_storage = getDictionaryStorage(context, dictionary_name, getDatabaseName()); dictionary_storage)
tables_copy.emplace(dictionary_name, dictionary_storage);
while (dictionaries_it && dictionaries_it->isValid())
{
auto table_name = dictionaries_it->name();
result.emplace(table_name, getDictionaryStorage(context, table_name, getDatabaseName()));
dictionaries_it->next();
}
return std::make_unique<DatabaseTablesSnapshotIterator>(std::move(filtered_tables));
return std::make_unique<DatabaseTablesSnapshotIterator>(result);
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(const Context & /*context*/, const FilterByNameFunction & filter_by_table_name)

View File

@ -1041,10 +1041,10 @@ void Context::addDatabase(const String & database_name, const DatabasePtr & data
DatabasePtr Context::detachDatabase(const String & database_name)
{
auto lock = getLock();
auto res = getDatabase(database_name);
getExternalDictionariesLoader().removeConfigRepository(database_name);
shared->databases.erase(database_name);
return res;
}
@ -1342,13 +1342,20 @@ ExternalDictionariesLoader & Context::getExternalDictionariesLoader()
const ExternalModelsLoader & Context::getExternalModelsLoader() const
{
{
std::lock_guard lock(shared->external_models_mutex);
if (shared->external_models_loader)
return *shared->external_models_loader;
}
const auto & config = getConfigRef();
std::lock_guard lock(shared->external_models_mutex);
if (!shared->external_models_loader)
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
auto config_repository = std::make_unique<ExternalLoaderXMLConfigRepository>(getConfigRef(), "models_config");
auto config_repository = std::make_unique<ExternalLoaderXMLConfigRepository>(config, "models_config");
shared->external_models_loader.emplace(std::move(config_repository), *this->global_context);
}
return *shared->external_models_loader;

View File

@ -11,7 +11,7 @@ ExternalDictionariesLoader::ExternalDictionariesLoader(
: ExternalLoader("external dictionary", &Logger::get("ExternalDictionariesLoader"))
, context(context_)
{
addConfigRepository("", std::move(config_repository), {"dictionary", "name"});
addConfigRepository("", std::move(config_repository));
enableAsyncLoading(true);
enablePeriodicUpdates(true);
}
@ -23,4 +23,9 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
return DictionaryFactory::instance().create(name, config, key_in_config, context);
}
void ExternalDictionariesLoader::addConfigRepository(
const std::string & repository_name, std::unique_ptr<IExternalLoaderConfigRepository> config_repository)
{
ExternalLoader::addConfigRepository(repository_name, std::move(config_repository), {"dictionary", "name"});
}
}

View File

@ -33,6 +33,11 @@ public:
return std::static_pointer_cast<const IDictionaryBase>(tryGetLoadable(name));
}
void addConfigRepository(
const std::string & repository_name,
std::unique_ptr<IExternalLoaderConfigRepository> config_repository);
protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config) const override;

View File

@ -67,7 +67,7 @@ public:
// Generate new result.
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>();
for (const auto & [path, loadable_info] : loadables_infos)
for (const auto & [path, loadable_info] : loadables_infos)
{
for (const auto & [name, config] : loadable_info.configs)
{
@ -148,19 +148,19 @@ private:
bool readLoadablesInfo(
const String & repo_name,
IExternalLoaderConfigRepository & repository,
const String & name,
const String & object_name,
const ExternalLoaderConfigSettings & settings,
LoadablesInfos & loadable_info) const
{
try
{
if (name.empty() || !repository.exists(name))
if (object_name.empty() || !repository.exists(object_name))
{
LOG_WARNING(log, "Config file '" + name + "' does not exist");
LOG_WARNING(log, "Config file '" + object_name + "' does not exist");
return false;
}
auto update_time_from_repository = repository.getUpdateTime(name);
auto update_time_from_repository = repository.getUpdateTime(object_name);
/// Actually it can't be less, but for sure we check less or equal
if (update_time_from_repository <= loadable_info.last_update_time)
@ -169,31 +169,31 @@ private:
return false;
}
auto file_contents = repository.load(name);
auto file_contents = repository.load(object_name);
/// get all objects' definitions
Poco::Util::AbstractConfiguration::Keys keys;
file_contents->keys(keys);
/// for each object defined in xml config
/// for each object defined in repositories
std::vector<std::pair<String, ObjectConfig>> configs_from_file;
for (const auto & key : keys)
{
if (!startsWith(key, settings.external_config))
{
if (!startsWith(key, "comment") && !startsWith(key, "include_from"))
LOG_WARNING(log, name << ": file contains unknown node '" << key << "', expected '" << settings.external_config << "'");
LOG_WARNING(log, object_name << ": file contains unknown node '" << key << "', expected '" << settings.external_config << "'");
continue;
}
String external_name = file_contents->getString(key + "." + settings.external_name);
if (external_name.empty())
{
LOG_WARNING(log, name << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed");
LOG_WARNING(log, object_name << ": node '" << key << "' defines " << type_name << " with an empty name. It's not allowed");
continue;
}
configs_from_file.emplace_back(external_name, ObjectConfig{name, file_contents, key, repo_name});
configs_from_file.emplace_back(external_name, ObjectConfig{object_name, file_contents, key, repo_name});
}
loadable_info.configs = std::move(configs_from_file);
@ -203,7 +203,7 @@ private:
}
catch (...)
{
tryLogCurrentException(log, "Failed to load config for dictionary '" + name + "'");
tryLogCurrentException(log, "Failed to load config for dictionary '" + object_name + "'");
return false;
}
}

View File

@ -778,7 +778,7 @@ void InterpreterCreateQuery::checkAccess(const ASTCreateQuery & create)
return;
if (readonly)
throw Exception("Cannot create table in readonly mode", ErrorCodes::READONLY);
throw Exception("Cannot create table or dictionary in readonly mode", ErrorCodes::READONLY);
throw Exception("Cannot create table. DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
}

View File

@ -0,0 +1,122 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT -n -q "
DROP DATABASE IF EXISTS database_for_dict;
DROP TABLE IF EXISTS table_for_dict1;
DROP TABLE IF EXISTS table_for_dict2;
CREATE TABLE table_for_dict1 (key_column UInt64, value_column String) ENGINE = MergeTree ORDER BY key_column;
CREATE TABLE table_for_dict2 (key_column UInt64, value_column String) ENGINE = MergeTree ORDER BY key_column;
INSERT INTO table_for_dict1 SELECT number, toString(number) from numbers(1000);
INSERT INTO table_for_dict2 SELECT number, toString(number) from numbers(1000, 1000);
CREATE DATABASE database_for_dict;
CREATE DICTIONARY database_for_dict.dict1 (key_column UInt64, value_column String) PRIMARY KEY key_column SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict1' PASSWORD '' DB '$CLICKHOUSE_DATABASE')) LIFETIME(MIN 1 MAX 5) LAYOUT(FLAT());
CREATE DICTIONARY database_for_dict.dict2 (key_column UInt64, value_column String) PRIMARY KEY key_column SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_for_dict2' PASSWORD '' DB '$CLICKHOUSE_DATABASE')) LIFETIME(MIN 1 MAX 5) LAYOUT(CACHE(SIZE_IN_CELLS 150));
"
function thread1()
{
while true; do $CLICKHOUSE_CLIENT --query "SELECT * FROM system.dictionaries FORMAT Null"; done
}
function thread2()
{
while true; do CLICKHOUSE_CLIENT --query "ATTACH DICTIONARY database_for_dict.dict1" ||: ; done
}
function thread3()
{
while true; do CLICKHOUSE_CLIENT --query "ATTACH DICTIONARY database_for_dict.dict2" ||:; done
}
function thread4()
{
while true; do $CLICKHOUSE_CLIENT -n -q "
SELECT * FROM database_for_dict.dict1 FORMAT Null;
SELECT * FROM database_for_dict.dict2 FORMAT Null;
" ||: ; done
}
function thread5()
{
while true; do $CLICKHOUSE_CLIENT -n -q "
SELECT dictGetString('database_for_dict.dict1', 'value_column', toUInt64(number)) from numbers(1000) FROM FORMAT Null;
SELECT dictGetString('database_for_dict.dict2', 'value_column', toUInt64(number)) from numbers(1000) FROM FORMAT Null;
" ||: ; done
}
function thread6()
{
while true; do $CLICKHOUSE_CLIENT -q "DETACH DICTIONARY database_for_dict.dict1"; done
}
function thread7()
{
while true; do $CLICKHOUSE_CLIENT -q "DETACH DICTIONARY database_for_dict.dict2"; done
}
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
export -f thread5;
export -f thread6;
export -f thread7;
TIMEOUT=10
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread7 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread7 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread7 2> /dev/null &
timeout $TIMEOUT bash -c thread1 2> /dev/null &
timeout $TIMEOUT bash -c thread2 2> /dev/null &
timeout $TIMEOUT bash -c thread3 2> /dev/null &
timeout $TIMEOUT bash -c thread4 2> /dev/null &
timeout $TIMEOUT bash -c thread5 2> /dev/null &
timeout $TIMEOUT bash -c thread6 2> /dev/null &
timeout $TIMEOUT bash -c thread7 2> /dev/null &
wait
$CLICKHOUSE_CLIENT -q "SELECT 'Still alive'"
$CLICKHOUSE_CLIENT -q "ATTACH DICTIONARY database_for_dict.dict1"
$CLICKHOUSE_CLIENT -q "ATTACH DICTIONARY database_for_dict.dict2"
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE table_for_dict1;
DROP TABLE table_for_dict2;
DROP DATABASE database_for_dict;
"

View File

@ -37,7 +37,7 @@ SELECT third_column FROM database_for_dict.dict1 WHERE key_column = 12;
SELECT dictGetFloat64('database_for_dict.dict1', 'fourth_column', toUInt64(14));
SELECT fourth_column FROM database_for_dict.dict1 WHERE key_column = 14;
select count(distinct(dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(number)))) from numbers(100);
SELECT count(distinct(dictGetUInt8('database_for_dict.dict1', 'second_column', toUInt64(number)))) from numbers(100);
DETACH DICTIONARY database_for_dict.dict1;