Fix some races from lazy load and add ugly interface to loader (need to be refactored)

This commit is contained in:
alesapin 2019-10-18 18:44:32 +03:00
parent b222ec1209
commit cdc195727e
14 changed files with 409 additions and 36 deletions

View File

@ -41,6 +41,7 @@
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/SystemLog.cpp>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -920,6 +921,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->tryCreateEmbeddedDictionaries();
global_context->getExternalDictionariesLoader().enableAlwaysLoadEverything(true);
}
auto config_repository = std::make_unique<ExternalLoaderXMLConfigRepository>(config(), "dictionaries_config");
global_context->getExternalDictionariesLoader().addConfigRepository("", std::move(config_repository));
}
catch (...)
{

View File

@ -7,6 +7,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/formatAST.h>
@ -298,14 +299,15 @@ void DatabaseOnDisk::createDictionary(
{
/// Do not load it now
database.attachDictionary(dictionary_name, context, false);
/// Load dictionary
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
String dict_name = database.getDatabaseName() + "." + dictionary_name;
context.getExternalDictionariesLoader().reloadSingleDictionary(dict_name, database.getDatabaseName(), query->as<const ASTCreateQuery &>(), !lazy_load, !lazy_load);
/// If it was ATTACH query and file with table metadata already exist
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path);
/// Load dictionary
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
context.getExternalDictionariesLoader().reload(database.getDatabaseName() + "." + dictionary_name, !lazy_load);
}
catch (...)
{

View File

@ -1321,15 +1321,13 @@ const ExternalDictionariesLoader & Context::getExternalDictionariesLoader() cons
return *shared->external_dictionaries_loader;
}
const auto & config = getConfigRef();
std::lock_guard lock(shared->external_dictionaries_mutex);
if (!shared->external_dictionaries_loader)
{
if (!this->global_context)
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
auto config_repository = std::make_unique<ExternalLoaderXMLConfigRepository>(config, "dictionaries_config");
shared->external_dictionaries_loader.emplace(std::move(config_repository), *this->global_context);
shared->external_dictionaries_loader.emplace(*this->global_context);
}
return *shared->external_dictionaries_loader;
}

View File

@ -1,17 +1,16 @@
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/Context.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
namespace DB
{
/// Must not acquire Context lock in constructor to avoid possibility of deadlocks.
ExternalDictionariesLoader::ExternalDictionariesLoader(
ExternalLoaderConfigRepositoryPtr config_repository, Context & context_)
ExternalDictionariesLoader::ExternalDictionariesLoader(Context & context_)
: ExternalLoader("external dictionary", &Logger::get("ExternalDictionariesLoader"))
, context(context_)
{
addConfigRepository("", std::move(config_repository));
enableAsyncLoading(true);
enablePeriodicUpdates(true);
}
@ -28,4 +27,19 @@ void ExternalDictionariesLoader::addConfigRepository(
{
ExternalLoader::addConfigRepository(repository_name, std::move(config_repository), {"dictionary", "name"});
}
void ExternalDictionariesLoader::reloadSingleDictionary(
const String & name,
const String & repo_name,
const ASTCreateQuery & query,
bool load_never_loading, bool sync) const
{
return ExternalLoader::reloadWithConfig(
name, /// names are equal
name,
repo_name,
getDictionaryConfigurationFromAST(query),
"dictionary", load_never_loading, sync);
}
}

View File

@ -4,6 +4,7 @@
#include <Interpreters/IExternalLoaderConfigRepository.h>
#include <Interpreters/ExternalLoader.h>
#include <common/logger_useful.h>
#include <Parsers/ASTCreateQuery.h>
#include <memory>
@ -19,9 +20,7 @@ public:
using DictPtr = std::shared_ptr<const IDictionaryBase>;
/// Dictionaries will be loaded immediately and then will be updated in separate thread, each 'reload_period' seconds.
ExternalDictionariesLoader(
ExternalLoaderConfigRepositoryPtr config_repository,
Context & context_);
ExternalDictionariesLoader(Context & context_);
DictPtr getDictionary(const std::string & name) const
{
@ -38,6 +37,15 @@ public:
std::unique_ptr<IExternalLoaderConfigRepository> config_repository);
/// Starts reloading of a specified object.
void reloadSingleDictionary(
const String & name,
const String & repo_name,
const ASTCreateQuery & query,
bool load_never_loading = false,
bool sync = false) const;
protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config) const override;

View File

@ -41,7 +41,10 @@ public:
}
~LoadablesConfigReader() = default;
void addConfigRepository(const String & name, std::unique_ptr<IExternalLoaderConfigRepository> repository, const ExternalLoaderConfigSettings & settings)
void addConfigRepository(
const String & name,
std::unique_ptr<IExternalLoaderConfigRepository> repository,
const ExternalLoaderConfigSettings & settings)
{
std::lock_guard lock{mutex};
repositories.emplace(name, std::make_pair(std::move(repository), settings));
@ -53,18 +56,66 @@ public:
repositories.erase(name);
}
using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, ObjectConfig>>;
/// Reads configurations.
ObjectConfigsPtr read()
{
std::lock_guard lock{mutex};
std::lock_guard lock(mutex);
// Check last modification times of files and read those files which are new or changed.
if (!readLoadablesInfos())
return configs; // Nothing changed, so we can return the previous result.
return collectConfigs();
}
ObjectConfigsPtr updateLoadableInfo(
const String & external_name,
const String & object_name,
const String & repo_name,
const Poco::AutoPtr<Poco::Util::AbstractConfiguration> & config,
const String & key)
{
std::lock_guard lock(mutex);
auto it = loadables_infos.find(object_name);
if (it == loadables_infos.end())
{
LoadablesInfos loadable_info;
loadables_infos[object_name] = loadable_info;
}
auto & loadable_info = loadables_infos[object_name];
ObjectConfig object_config{object_name, config, key, repo_name};
bool found = false;
for (auto iter = loadable_info.configs.begin(); iter != loadable_info.configs.end(); ++iter)
{
if (iter->first == external_name)
{
iter->second = object_config;
found = true;
break;
}
}
if (!found)
loadable_info.configs.emplace_back(external_name, object_config);
loadable_info.last_update_time = Poco::Timestamp{}; /// now
loadable_info.in_use = true;
return collectConfigs();
}
private:
struct LoadablesInfos
{
Poco::Timestamp last_update_time = 0;
std::vector<std::pair<String, ObjectConfig>> configs; // Parsed loadable's contents.
bool in_use = true; // Whether the `LoadablesInfos` should be destroyed because the correspondent loadable is deleted.
};
/// Collect current configurations
ObjectConfigsPtr collectConfigs()
{
// Generate new result.
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, ObjectConfig>>();
for (const auto & [path, loadable_info] : loadables_infos)
@ -89,14 +140,6 @@ public:
return configs;
}
private:
struct LoadablesInfos
{
Poco::Timestamp last_update_time = 0;
std::vector<std::pair<String, ObjectConfig>> configs; // Parsed file's contents.
bool in_use = true; // Whether the ` LoadablesInfos` should be destroyed because the correspondent file is deleted.
};
/// Read files and store them to the map ` loadables_infos`.
bool readLoadablesInfos()
{
@ -208,6 +251,7 @@ private:
}
}
const String type_name;
Logger * log;
@ -337,7 +381,6 @@ public:
/// Sets whether the objects should be loaded asynchronously, each loading in a new thread (from the thread pool).
void enableAsyncLoading(bool enable)
{
std::lock_guard lock{mutex};
enable_async_loading = enable;
}
@ -456,18 +499,20 @@ public:
void load(LoadResults & loaded_results, Duration timeout = NO_TIMEOUT) { load(allNames, loaded_results, timeout); }
/// Starts reloading a specified object.
void reload(const String & name, bool load_never_loading = false)
void reload(const String & name, bool load_never_loading = false, bool sync = false)
{
std::lock_guard lock{mutex};
Info * info = getInfo(name);
if (!info)
{
return;
}
if (info->wasLoading() || load_never_loading)
{
cancelLoading(*info);
info->forced_to_reload = true;
startLoading(name, *info);
startLoading(name, *info, sync);
}
}
@ -690,7 +735,7 @@ private:
event.wait_for(lock, timeout, pred);
}
void startLoading(const String & name, Info & info)
void startLoading(const String & name, Info & info, bool sync = false)
{
if (info.loading())
return;
@ -701,7 +746,7 @@ private:
info.loading_start_time = std::chrono::system_clock::now();
info.loading_end_time = TimePoint{};
if (enable_async_loading)
if (enable_async_loading && !sync)
{
/// Put a job to the thread pool for the loading.
auto thread = ThreadFromGlobalPool{&LoadingDispatcher::doLoading, this, name, loading_id, true};
@ -710,6 +755,7 @@ private:
else
{
/// Perform the loading immediately.
/// Deadlock when we try to load dictionary from dictionary on localhost
doLoading(name, loading_id, false);
}
}
@ -773,6 +819,8 @@ private:
/// Lock the mutex again to store the changes.
if (async)
lock.lock();
else if (new_exception)
std::rethrow_exception(new_exception);
/// Calculate a new update time.
TimePoint next_update_time;
@ -895,7 +943,7 @@ private:
ObjectConfigsPtr configs;
std::unordered_map<String, Info> infos;
bool always_load_everything = false;
bool enable_async_loading = false;
std::atomic<bool> enable_async_loading = false;
std::unordered_map<size_t, ThreadFromGlobalPool> loading_ids;
size_t next_loading_id = 1; /// should always be > 0
mutable pcg64 rnd_engine{randomSeed()};
@ -992,7 +1040,6 @@ void ExternalLoader::addConfigRepository(
void ExternalLoader::removeConfigRepository(const std::string & repository_name)
{
config_files_reader->removeConfigRepository(repository_name);
loading_dispatcher->setConfiguration(config_files_reader->read());
}
void ExternalLoader::enableAlwaysLoadEverything(bool enable)
@ -1083,10 +1130,11 @@ void ExternalLoader::load(Loadables & loaded_objects, Duration timeout) const
return loading_dispatcher->load(loaded_objects, timeout);
}
void ExternalLoader::reload(const String & name, bool load_never_loading) const
void ExternalLoader::reload(const String & name, bool load_never_loading, bool sync) const
{
loading_dispatcher->setConfiguration(config_files_reader->read());
loading_dispatcher->reload(name, load_never_loading);
auto configs = config_files_reader->read();
loading_dispatcher->setConfiguration(configs);
loading_dispatcher->reload(name, load_never_loading, sync);
}
void ExternalLoader::reload(bool load_never_loading) const
@ -1095,6 +1143,21 @@ void ExternalLoader::reload(bool load_never_loading) const
loading_dispatcher->reload(load_never_loading);
}
void ExternalLoader::reloadWithConfig(
const String & name,
const String & external_name,
const String & repo_name,
const Poco::AutoPtr<Poco::Util::AbstractConfiguration> & config,
const String & key,
bool load_never_loading,
bool sync) const
{
loading_dispatcher->setConfiguration(
config_files_reader->updateLoadableInfo(external_name, name, repo_name, config, key));
loading_dispatcher->reload(name, load_never_loading, sync);
}
ExternalLoader::LoadablePtr ExternalLoader::createObject(
const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const
{

View File

@ -147,7 +147,9 @@ public:
/// Starts reloading of a specified object.
/// `load_never_loading` specifies what to do if the object has never been loading before.
/// The function can either skip it (false) or load for the first time (true).
void reload(const String & name, bool load_never_loading = false) const;
/// Also function can load dictionary synchronously
void reload(const String & name, bool load_never_loading = false, bool sync = false) const;
/// Starts reloading of all the objects.
/// `load_never_loading` specifies what to do with the objects which have never been loading before.
@ -157,6 +159,16 @@ public:
protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0;
/// Reload object with already parsed configuration
void reloadWithConfig(
const String & name, /// name of dictionary
const String & external_name, /// name of source (example xml-file, may contain more than dictionary)
const String & repo_name, /// name of repository (database name, or all xml files)
const Poco::AutoPtr<Poco::Util::AbstractConfiguration> & config,
const String & key_in_config, /// key where we can start search of loadables (<dictionary>, <model>, etc)
bool load_never_loading = false,
bool sync = false) const;
private:
struct ObjectConfig;

View File

@ -773,6 +773,14 @@ void InterpreterCreateQuery::checkAccess(const ASTCreateQuery & create)
throw Exception("Cannot create database. DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
}
String object = "table";
if (create.is_dictionary)
{
if (readonly)
throw Exception("Cannot create dictionary in readonly mode", ErrorCodes::READONLY);
object = "dictionary";
}
if (create.temporary && readonly >= 2)
return;
@ -780,6 +788,7 @@ void InterpreterCreateQuery::checkAccess(const ASTCreateQuery & create)
if (readonly)
throw Exception("Cannot create table or dictionary in readonly mode", ErrorCodes::READONLY);
throw Exception("Cannot create table. DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
throw Exception("Cannot create " + object + ". DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED);
}
}

View File

@ -0,0 +1,19 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,4 @@
<yandex>
<dictionaries_lazy_load>false</dictionaries_lazy_load>
</yandex>

View File

@ -0,0 +1,41 @@
<yandex>
<dictionary>
<name>xml_dictionary</name>
<source>
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>test</db>
<table>xml_dictionary_table</table>
</clickhouse>
</source>
<lifetime>
<min>0</min>
<max>0</max>
</lifetime>
<layout>
<cache><size_in_cells>128</size_in_cells></cache>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>SomeValue1</name>
<type>UInt8</type>
<null_value>1</null_value>
</attribute>
<attribute>
<name>SomeValue2</name>
<type>String</type>
<null_value>''</null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -0,0 +1,36 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<allow_databases>
<database>default</database>
<database>test</database>
</allow_databases>
</default>
<admin>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</admin>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,163 @@
import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, assert_eq_with_retry
from helpers.client import QueryRuntimeException
import pymysql
import warnings
import time
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
node1 = cluster.add_instance('node1', with_mysql=True, main_configs=['configs/dictionaries/simple_dictionary.xml'])
node2 = cluster.add_instance('node2', with_mysql=True, main_configs=['configs/dictionaries/simple_dictionary.xml', 'configs/dictionaries/lazy_load.xml'])
def create_mysql_conn(user, password, hostname, port):
return pymysql.connect(
user=user,
password=password,
host=hostname,
port=port)
def execute_mysql_query(connection, query):
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with connection.cursor() as cursor:
cursor.execute(query)
connection.commit()
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for clickhouse in [node1, node2]:
clickhouse.query("CREATE DATABASE test", user="admin")
clickhouse.query("CREATE TABLE test.xml_dictionary_table (id UInt64, SomeValue1 UInt8, SomeValue2 String) ENGINE = MergeTree() ORDER BY id", user="admin")
clickhouse.query("INSERT INTO test.xml_dictionary_table SELECT number, number % 23, hex(number) from numbers(1000)", user="admin")
yield cluster
finally:
cluster.shutdown()
@pytest.mark.parametrize("clickhouse,name,layout", [
(node1, 'complex_node1_hashed', 'LAYOUT(COMPLEX_KEY_HASHED())'),
(node1, 'complex_node1_cache', 'LAYOUT(COMPLEX_KEY_CACHE(SIZE_IN_CELLS 10))'),
(node2, 'complex_node2_hashed', 'LAYOUT(COMPLEX_KEY_HASHED())'),
(node2, 'complex_node2_cache', 'LAYOUT(COMPLEX_KEY_CACHE(SIZE_IN_CELLS 10))'),
])
def test_crete_and_select_mysql(started_cluster, clickhouse, name, layout):
mysql_conn = create_mysql_conn("root", "clickhouse", "localhost", 3308)
execute_mysql_query(mysql_conn, "CREATE DATABASE IF NOT EXISTS clickhouse")
execute_mysql_query(mysql_conn, "CREATE TABLE clickhouse.{} (key_field1 int, key_field2 bigint, value1 text, value2 float, PRIMARY KEY (key_field1, key_field2))".format(name))
values = []
for i in range(1000):
values.append('(' + ','.join([str(i), str(i * i), str(i) * 5, str(i * 3.14)]) + ')')
execute_mysql_query(mysql_conn, "INSERT INTO clickhouse.{} VALUES ".format(name) + ','.join(values))
clickhouse.query("""
CREATE DICTIONARY default.{} (
key_field1 Int32,
key_field2 Int64,
value1 String DEFAULT 'xxx',
value2 Float32 DEFAULT 'yyy'
)
PRIMARY KEY key_field1, key_field2
SOURCE(MYSQL(
USER 'root'
PASSWORD 'clickhouse'
DB 'clickhouse'
TABLE '{}'
REPLICA(PRIORITY 1 HOST '127.0.0.1' PORT 3333)
REPLICA(PRIORITY 2 HOST 'mysql1' PORT 3306)
))
{}
LIFETIME(MIN 1 MAX 3)
""".format(name, name, layout))
for i in range(172, 200):
assert clickhouse.query("SELECT dictGetString('default.{}', 'value1', tuple(toInt32({}), toInt64({})))".format(name, i, i * i)) == str(i) * 5 + '\n'
stroka = clickhouse.query("SELECT dictGetFloat32('default.{}', 'value2', tuple(toInt32({}), toInt64({})))".format(name, i, i * i)).strip()
value = float(stroka)
assert int(value) == int(i * 3.14)
for i in range(1000):
values.append('(' + ','.join([str(i), str(i * i), str(i) * 3, str(i * 2.718)]) + ')')
execute_mysql_query(mysql_conn, "REPLACE INTO clickhouse.{} VALUES ".format(name) + ','.join(values))
clickhouse.query("SYSTEM RELOAD DICTIONARY 'default.{}'".format(name))
for i in range(172, 200):
assert clickhouse.query("SELECT dictGetString('default.{}', 'value1', tuple(toInt32({}), toInt64({})))".format(name, i, i * i)) == str(i) * 3 + '\n'
stroka = clickhouse.query("SELECT dictGetFloat32('default.{}', 'value2', tuple(toInt32({}), toInt64({})))".format(name, i, i * i)).strip()
value = float(stroka)
assert int(value) == int(i * 2.718)
clickhouse.query("select dictGetUInt8('xml_dictionary', 'SomeValue1', toUInt64(17))") == "17\n"
clickhouse.query("select dictGetString('xml_dictionary', 'SomeValue2', toUInt64(977))") == str(hex(977))[2:] + '\n'
def test_restricted_database(started_cluster):
for node in [node1, node2]:
node.query("CREATE DATABASE IF NOT EXISTS restricted_db", user="admin")
node.query("CREATE TABLE restricted_db.table_in_restricted_db AS test.xml_dictionary_table", user="admin")
with pytest.raises(QueryRuntimeException):
node1.query("""
CREATE DICTIONARY restricted_db.some_dict(
id UInt64,
SomeValue1 UInt8,
SomeValue2 String
)
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_in_restricted_db' DB 'restricted_db'))
LIFETIME(MIN 1 MAX 10)
""")
with pytest.raises(QueryRuntimeException):
node1.query("""
CREATE DICTIONARY default.some_dict(
id UInt64,
SomeValue1 UInt8,
SomeValue2 String
)
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_in_restricted_db' DB 'restricted_db'))
LIFETIME(MIN 1 MAX 10)
""")
node1.query("SELECT dictGetUInt8('default.some_dict', 'SomeValue1', toUInt64(17))") == "17\n"
# with lazy load we don't need query to get exception
with pytest.raises(QueryRuntimeException):
node2.query("""
CREATE DICTIONARY restricted_db.some_dict(
id UInt64,
SomeValue1 UInt8,
SomeValue2 String
)
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_in_restricted_db' DB 'restricted_db'))
LIFETIME(MIN 1 MAX 10)
""")
with pytest.raises(QueryRuntimeException):
node2.query("""
CREATE DICTIONARY default.some_dict(
id UInt64,
SomeValue1 UInt8,
SomeValue2 String
)
PRIMARY KEY id
LAYOUT(FLAT())
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' TABLE 'table_in_restricted_db' DB 'restricted_db'))
LIFETIME(MIN 1 MAX 10)
""")