Merge pull request #61356 from vitlibar/fix-replace-dictionary

Fix CREATE OR REPLACE DICTIONARY
This commit is contained in:
Vitaly Baranov 2024-03-21 16:45:06 +01:00 committed by GitHub
commit 1acc0ebe79
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 433 additions and 181 deletions

View File

@ -19,6 +19,62 @@ namespace
result += subkey;
return result;
}
bool isSameConfigurationImpl(const Poco::Util::AbstractConfiguration & left, const String & left_key,
const Poco::Util::AbstractConfiguration & right, const String & right_key,
const std::unordered_set<std::string_view> * ignore_keys)
{
if (&left == &right && left_key == right_key)
return true;
/// Get the subkeys of the left and right configurations.
Poco::Util::AbstractConfiguration::Keys left_subkeys;
Poco::Util::AbstractConfiguration::Keys right_subkeys;
left.keys(left_key, left_subkeys);
right.keys(right_key, right_subkeys);
if (ignore_keys)
{
std::erase_if(left_subkeys, [&](const String & key) { return ignore_keys->contains(key); });
std::erase_if(right_subkeys, [&](const String & key) { return ignore_keys->contains(key); });
#if defined(ABORT_ON_LOGICAL_ERROR)
/// Compound `ignore_keys` are not yet implemented.
for (const auto & ignore_key : *ignore_keys)
chassert(ignore_key.find('.') == std::string_view::npos);
#endif
}
/// Check that the right configuration has the same set of subkeys as the left configuration.
if (left_subkeys.size() != right_subkeys.size())
return false;
if (left_subkeys.empty())
{
if (left.hasProperty(left_key))
{
return right.hasProperty(right_key) && (left.getRawString(left_key) == right.getRawString(right_key));
}
else
{
return !right.hasProperty(right_key);
}
}
else
{
/// Go through all the subkeys and compare corresponding parts of the configurations.
std::unordered_set<std::string_view> left_subkeys_set{left_subkeys.begin(), left_subkeys.end()};
for (const auto & subkey : right_subkeys)
{
if (!left_subkeys_set.contains(subkey))
return false;
if (!isSameConfigurationImpl(left, concatKeyAndSubKey(left_key, subkey), right, concatKeyAndSubKey(right_key, subkey), nullptr))
return false;
}
return true;
}
}
}
@ -32,6 +88,14 @@ bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const P
return isSameConfiguration(left, key, right, key);
}
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const String & left_key,
const Poco::Util::AbstractConfiguration & right, const String & right_key,
const std::unordered_set<std::string_view> & ignore_keys)
{
const auto * ignore_keys_ptr = !ignore_keys.empty() ? &ignore_keys : nullptr;
return isSameConfigurationImpl(left, left_key, right, right_key, ignore_keys_ptr);
}
bool isSameConfigurationWithMultipleKeys(const Poco::Util::AbstractConfiguration & left, const Poco::Util::AbstractConfiguration & right, const String & root, const String & name)
{
if (&left == &right)
@ -49,44 +113,4 @@ bool isSameConfigurationWithMultipleKeys(const Poco::Util::AbstractConfiguration
return true;
}
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const String & left_key,
const Poco::Util::AbstractConfiguration & right, const String & right_key)
{
if (&left == &right && left_key == right_key)
return true;
bool has_property = left.hasProperty(left_key);
if (has_property != right.hasProperty(right_key))
return false;
if (has_property)
{
/// The left and right configurations contains values so we can compare them.
if (left.getRawString(left_key) != right.getRawString(right_key))
return false;
}
/// Get the subkeys of the left and right configurations.
Poco::Util::AbstractConfiguration::Keys subkeys;
left.keys(left_key, subkeys);
{
/// Check that the right configuration has the same set of subkeys as the left configuration.
Poco::Util::AbstractConfiguration::Keys right_subkeys;
right.keys(right_key, right_subkeys);
std::unordered_set<std::string_view> left_subkeys{subkeys.begin(), subkeys.end()};
if ((left_subkeys.size() != right_subkeys.size()) || (left_subkeys.size() != subkeys.size()))
return false;
for (const auto & right_subkey : right_subkeys)
if (!left_subkeys.contains(right_subkey))
return false;
}
/// Go through all the subkeys and compare corresponding parts of the configurations.
for (const auto & subkey : subkeys)
if (!isSameConfiguration(left, concatKeyAndSubKey(left_key, subkey), right, concatKeyAndSubKey(right_key, subkey)))
return false;
return true;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <base/types.h>
#include <unordered_set>
namespace Poco::Util
{
@ -10,6 +11,8 @@ namespace Poco::Util
namespace DB
{
/// Returns true if two configurations contains the same keys and values.
/// NOTE: These functions assume no configuration has items having both children and a value
/// (i.e. items like "<test>value<child1/></test>").
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left,
const Poco::Util::AbstractConfiguration & right);
@ -30,8 +33,11 @@ namespace DB
const String & key);
/// Returns true if specified subviews of the two configurations contains the same keys and values.
/// If `ignore_keys` is specified then the function skips those keys while comparing
/// (even if their values differ, they're considered to be the same.)
bool isSameConfiguration(const Poco::Util::AbstractConfiguration & left, const String & left_key,
const Poco::Util::AbstractConfiguration & right, const String & right_key);
const Poco::Util::AbstractConfiguration & right, const String & right_key,
const std::unordered_set<std::string_view> & ignore_keys = {});
inline bool operator==(const Poco::Util::AbstractConfiguration & left, const Poco::Util::AbstractConfiguration & right)
{

View File

@ -1,5 +1,7 @@
#include <Common/Config/ConfigHelper.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
namespace DB
{
@ -7,6 +9,33 @@ namespace DB
namespace ConfigHelper
{
namespace
{
void cloneImpl(Poco::Util::AbstractConfiguration & dest, const Poco::Util::AbstractConfiguration & src, const std::string & prefix = "")
{
std::vector<std::string> keys;
src.keys(prefix, keys);
if (!keys.empty())
{
std::string prefix_with_dot = prefix + ".";
for (const auto & key : keys)
cloneImpl(dest, src, prefix_with_dot + key);
}
else if (!prefix.empty())
{
dest.setString(prefix, src.getRawString(prefix));
}
}
}
Poco::AutoPtr<Poco::Util::AbstractConfiguration> clone(const Poco::Util::AbstractConfiguration & src)
{
Poco::AutoPtr<Poco::Util::AbstractConfiguration> res(new Poco::Util::XMLConfiguration());
cloneImpl(*res, src);
return res;
}
bool getBool(const Poco::Util::AbstractConfiguration & config, const std::string & key, bool default_, bool empty_as)
{
if (!config.has(key))

View File

@ -1,18 +1,23 @@
#pragma once
#include <Poco/AutoPtr.h>
#include <string>
namespace Poco
namespace Poco::Util
{
namespace Util
{
class AbstractConfiguration;
}
class AbstractConfiguration;
}
namespace DB::ConfigHelper
{
/// Clones a configuration.
/// NOTE: This function assumes the source configuration doesn't have items having both children and a value
/// (i.e. items like "<test>value<child1/></test>").
Poco::AutoPtr<Poco::Util::AbstractConfiguration> clone(const Poco::Util::AbstractConfiguration & src);
/// The behavior is like `config.getBool(key, default_)`,
/// except when the tag is empty (aka. self-closing), `empty_as` will be used instead of throwing Poco::Exception.
bool getBool(const Poco::Util::AbstractConfiguration & config, const std::string & key, bool default_ = false, bool empty_as = true);

View File

@ -98,7 +98,7 @@ public:
bool supportUpdates() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<CacheDictionary>(
getDictionaryID(),

View File

@ -55,11 +55,7 @@ DictionaryPtr DictionaryFactory::create(
if (found != registered_layouts.end())
{
const auto & layout_creator = found->second.layout_create_function;
auto result = layout_creator(name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
if (config.hasProperty(config_prefix + ".comment"))
result->setDictionaryComment(config.getString(config_prefix + ".comment"));
return result;
return layout_creator(name, dict_struct, config, config_prefix, std::move(source_ptr), global_context, created_from_ddl);
}
}

View File

@ -50,7 +50,7 @@ public:
double getLoadFactor() const override { return 0; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<DirectDictionary>(getDictionaryID(), dict_struct, source_ptr->clone());
}

View File

@ -57,7 +57,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<FlatDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
}

View File

@ -74,7 +74,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(total_element_count) / bucket_count; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<HashedArrayDictionary<dictionary_key_type, sharded>>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
}

View File

@ -117,7 +117,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<HashedDictionary<dictionary_key_type, sparse, sharded>>(
getDictionaryID(),

View File

@ -75,13 +75,6 @@ public:
return dictionary_id;
}
void updateDictionaryName(const StorageID & new_name) const
{
std::lock_guard lock{mutex};
assert(new_name.uuid == dictionary_id.uuid && dictionary_id.uuid != UUIDHelpers::Nil);
dictionary_id = new_name;
}
std::string getLoadableName() const final
{
std::lock_guard lock{mutex};
@ -339,12 +332,6 @@ public:
return std::static_pointer_cast<const IDictionary>(IExternalLoadable::shared_from_this());
}
void setDictionaryComment(String new_comment)
{
std::lock_guard lock{mutex};
dictionary_comment = std::move(new_comment);
}
String getDictionaryComment() const
{
std::lock_guard lock{mutex};
@ -452,9 +439,26 @@ public:
return sample_block;
}
/// Internally called by ExternalDictionariesLoader.
/// In order to update the dictionary ID change its configuration first and then call ExternalDictionariesLoader::reloadConfig().
void updateDictionaryID(const StorageID & new_dictionary_id)
{
std::lock_guard lock{mutex};
assert((new_dictionary_id.uuid == dictionary_id.uuid) && (dictionary_id.uuid != UUIDHelpers::Nil));
dictionary_id = new_dictionary_id;
}
/// Internally called by ExternalDictionariesLoader.
/// In order to update the dictionary comment change its configuration first and then call ExternalDictionariesLoader::reloadConfig().
void updateDictionaryComment(const String & new_dictionary_comment)
{
std::lock_guard lock{mutex};
dictionary_comment = new_dictionary_comment;
}
private:
mutable std::mutex mutex;
mutable StorageID dictionary_id TSA_GUARDED_BY(mutex);
StorageID dictionary_id TSA_GUARDED_BY(mutex);
String dictionary_comment TSA_GUARDED_BY(mutex);
};

View File

@ -57,7 +57,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<IPAddressDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration);
}

View File

@ -29,7 +29,7 @@ PolygonDictionarySimple::PolygonDictionarySimple(
{
}
std::shared_ptr<const IExternalLoadable> PolygonDictionarySimple::clone() const
std::shared_ptr<IExternalLoadable> PolygonDictionarySimple::clone() const
{
return std::make_shared<PolygonDictionarySimple>(
this->getDictionaryID(),
@ -76,7 +76,7 @@ PolygonDictionaryIndexEach::PolygonDictionaryIndexEach(
}
}
std::shared_ptr<const IExternalLoadable> PolygonDictionaryIndexEach::clone() const
std::shared_ptr<IExternalLoadable> PolygonDictionaryIndexEach::clone() const
{
return std::make_shared<PolygonDictionaryIndexEach>(
this->getDictionaryID(),
@ -126,7 +126,7 @@ PolygonDictionaryIndexCell::PolygonDictionaryIndexCell(
{
}
std::shared_ptr<const IExternalLoadable> PolygonDictionaryIndexCell::clone() const
std::shared_ptr<IExternalLoadable> PolygonDictionaryIndexCell::clone() const
{
return std::make_shared<PolygonDictionaryIndexCell>(
this->getDictionaryID(),

View File

@ -23,7 +23,7 @@ public:
DictionaryLifetime dict_lifetime_,
Configuration configuration_);
std::shared_ptr<const IExternalLoadable> clone() const override;
std::shared_ptr<IExternalLoadable> clone() const override;
private:
bool find(const Point & point, size_t & polygon_index) const override;
@ -47,7 +47,7 @@ public:
int min_intersections_,
int max_depth_);
std::shared_ptr<const IExternalLoadable> clone() const override;
std::shared_ptr<IExternalLoadable> clone() const override;
static constexpr size_t kMinIntersectionsDefault = 1;
static constexpr size_t kMaxDepthDefault = 5;
@ -75,7 +75,7 @@ public:
size_t min_intersections_,
size_t max_depth_);
std::shared_ptr<const IExternalLoadable> clone() const override;
std::shared_ptr<IExternalLoadable> clone() const override;
static constexpr size_t kMinIntersectionsDefault = 1;
static constexpr size_t kMaxDepthDefault = 5;

View File

@ -97,7 +97,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
auto result = std::make_shared<RangeHashedDictionary>(
getDictionaryID(),

View File

@ -86,7 +86,7 @@ public:
bool hasHierarchy() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<RegExpTreeDictionary>(
getDictionaryID(), structure, source_ptr->clone(), configuration, use_vectorscan, flag_case_insensitive, flag_dotall);

View File

@ -120,7 +120,7 @@ void ExternalUserDefinedExecutableFunctionsLoader::reloadFunction(const std::str
loadOrReload(user_defined_function_name);
}
ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create(const std::string & name,
ExternalLoader::LoadableMutablePtr ExternalUserDefinedExecutableFunctionsLoader::createObject(const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config,
const std::string &) const

View File

@ -28,7 +28,7 @@ public:
void reloadFunction(const std::string & user_defined_function_name) const;
protected:
LoadablePtr create(const std::string & name,
LoadableMutablePtr createObject(const std::string & name,
const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config,
const std::string & repository_name) const override;

View File

@ -62,7 +62,7 @@ public:
return true;
}
std::shared_ptr<const IExternalLoadable> clone() const override
std::shared_ptr<IExternalLoadable> clone() const override
{
return std::make_shared<UserDefinedExecutableFunction>(configuration, coordinator, lifetime);
}

View File

@ -5,6 +5,7 @@
#include <Dictionaries/DictionaryStructure.h>
#include <Databases/IDatabase.h>
#include <Storages/IStorage.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include "config.h"
@ -31,7 +32,7 @@ ExternalDictionariesLoader::ExternalDictionariesLoader(ContextPtr global_context
enablePeriodicUpdates(true);
}
ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
ExternalLoader::LoadableMutablePtr ExternalDictionariesLoader::createObject(
const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config, const std::string & repository_name) const
{
@ -41,6 +42,38 @@ ExternalLoader::LoadablePtr ExternalDictionariesLoader::create(
return DictionaryFactory::instance().create(name, config, key_in_config, getContext(), created_from_ddl);
}
bool ExternalDictionariesLoader::doesConfigChangeRequiresReloadingObject(const Poco::Util::AbstractConfiguration & old_config, const String & old_key_in_config,
const Poco::Util::AbstractConfiguration & new_config, const String & new_key_in_config) const
{
std::unordered_set<std::string_view> ignore_keys;
ignore_keys.insert("comment"); /// We always can change the comment without reloading a dictionary.
/// If the database is atomic then a dictionary can be renamed without reloading.
if (!old_config.getString(old_key_in_config + ".uuid", "").empty() && !new_config.getString(new_key_in_config + ".uuid", "").empty())
{
ignore_keys.insert("name");
ignore_keys.insert("database");
}
return !isSameConfiguration(old_config, old_key_in_config, new_config, new_key_in_config, ignore_keys);
}
void ExternalDictionariesLoader::updateObjectFromConfigWithoutReloading(IExternalLoadable & object, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const
{
IDictionary & dict = static_cast<IDictionary &>(object);
auto new_dictionary_id = StorageID::fromDictionaryConfig(config, key_in_config);
auto old_dictionary_id = dict.getDictionaryID();
if ((new_dictionary_id.table_name != old_dictionary_id.table_name) || (new_dictionary_id.database_name != old_dictionary_id.database_name))
{
/// We can update the dictionary ID without reloading only if it's in the atomic database.
if ((new_dictionary_id.uuid == old_dictionary_id.uuid) && (new_dictionary_id.uuid != UUIDHelpers::Nil))
dict.updateDictionaryID(new_dictionary_id);
}
dict.updateDictionaryComment(config.getString(key_in_config + ".comment", ""));
}
ExternalDictionariesLoader::DictPtr ExternalDictionariesLoader::getDictionary(const std::string & dictionary_name, ContextPtr local_context) const
{
std::string resolved_dictionary_name = resolveDictionaryName(dictionary_name, local_context->getCurrentDatabase());

View File

@ -40,8 +40,14 @@ public:
static void resetAll();
protected:
LoadablePtr create(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config, const std::string & repository_name) const override;
LoadableMutablePtr createObject(const std::string & name, const Poco::Util::AbstractConfiguration & config,
const std::string & key_in_config, const std::string & repository_name) const override;
bool doesConfigChangeRequiresReloadingObject(const Poco::Util::AbstractConfiguration & old_config, const String & old_key_in_config,
const Poco::Util::AbstractConfiguration & new_config, const String & new_key_in_config) const override;
void updateObjectFromConfigWithoutReloading(
IExternalLoadable & object, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const override;
std::string resolveDictionaryName(const std::string & dictionary_name, const std::string & current_database_name) const;

View File

@ -95,10 +95,7 @@ namespace
class ExternalLoader::LoadablesConfigReader : private boost::noncopyable
{
public:
LoadablesConfigReader(const String & type_name_, LoggerPtr log_)
: type_name(type_name_), log(log_)
{
}
LoadablesConfigReader(const String & type_name_, LoggerPtr log_) : type_name(type_name_), log(log_) { }
~LoadablesConfigReader() = default;
using Repository = IExternalLoaderConfigRepository;
@ -167,7 +164,7 @@ public:
private:
struct FileInfo
{
Poco::Timestamp last_update_time = 0;
std::optional<Poco::Timestamp> last_update_time;
bool in_use = true; // Whether the `FileInfo` should be destroyed because the correspondent file is deleted.
Poco::AutoPtr<Poco::Util::AbstractConfiguration> file_contents; // Parsed contents of the file.
std::unordered_map<String /* object name */, String /* key in file_contents */> objects;
@ -266,7 +263,7 @@ private:
// is updated, but in the same second).
// The solution to this is probably switching to std::filesystem
// -- the work is underway to do so.
if (update_time_from_repository == file_info.last_update_time)
if (update_time_from_repository && (update_time_from_repository == file_info.last_update_time))
{
file_info.in_use = true;
return false;
@ -377,7 +374,7 @@ private:
}
const String type_name;
LoggerPtr log;
const LoggerPtr log;
std::mutex mutex;
ExternalLoaderConfigSettings settings;
@ -394,17 +391,8 @@ private:
class ExternalLoader::LoadingDispatcher : private boost::noncopyable
{
public:
/// Called to load or reload an object.
using CreateObjectFunction = std::function<LoadablePtr(
const String & /* name */, const ObjectConfig & /* config */, const LoadablePtr & /* previous_version */)>;
LoadingDispatcher(
const CreateObjectFunction & create_object_function_,
const String & type_name_,
LoggerPtr log_)
: create_object(create_object_function_)
, type_name(type_name_)
, log(log_)
LoadingDispatcher(const String & type_name_, LoggerPtr log_, const ExternalLoader & external_loader_)
: type_name(type_name_), log(log_), external_loader(external_loader_)
{
}
@ -457,16 +445,25 @@ public:
else
{
const auto & new_config = new_config_it->second;
bool config_is_same = isSameConfiguration(*info.config->config, info.config->key_in_config, *new_config->config, new_config->key_in_config);
auto previous_config = info.config;
info.config = new_config;
if (!config_is_same)
bool config_changed = !isSameConfiguration(*previous_config->config, previous_config->key_in_config, *new_config->config, new_config->key_in_config);
if (config_changed)
{
if (info.object)
external_loader.updateObjectFromConfigWithoutReloading(*info.object, *new_config->config, new_config->key_in_config);
if (info.triedToLoad())
{
/// The object has been tried to load before, so it is currently in use or was in use
/// and we should try to reload it with the new config.
LOG_TRACE(log, "Will reload '{}' because its configuration has been changed and there were attempts to load it before", name);
startLoading(info, true);
bool config_change_requires_reloading = external_loader.doesConfigChangeRequiresReloadingObject(*previous_config->config, previous_config->key_in_config, *new_config->config, new_config->key_in_config);
if (config_change_requires_reloading)
{
/// The object has been tried to load before, so it is currently in use or was in use
/// and we should try to reload it with the new config.
LOG_TRACE(log, "Will reload '{}' because its configuration has been changed and there were attempts to load it before", name);
startLoading(info, true);
}
}
}
}
@ -770,7 +767,7 @@ private:
}
String name;
LoadablePtr object;
LoadableMutablePtr object;
std::shared_ptr<const ObjectConfig> config;
TimePoint loading_start_time;
TimePoint loading_end_time;
@ -1030,17 +1027,17 @@ private:
}
/// Load one object, returns object ptr or exception.
std::pair<LoadablePtr, std::exception_ptr>
std::pair<LoadableMutablePtr, std::exception_ptr>
loadSingleObject(const String & name, const ObjectConfig & config, LoadablePtr previous_version)
{
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
LoadablePtr new_object;
LoadableMutablePtr new_object;
std::exception_ptr new_exception;
try
{
new_object = create_object(name, config, previous_version);
new_object = external_loader.createOrCloneObject(name, config, previous_version);
}
catch (...)
{
@ -1054,7 +1051,7 @@ private:
const String & name,
size_t loading_id,
LoadablePtr previous_version,
LoadablePtr new_object,
LoadableMutablePtr new_object,
std::exception_ptr new_exception,
size_t error_count,
const LoadingGuardForAsyncLoad &)
@ -1117,7 +1114,10 @@ private:
}
if (new_object)
{
external_loader.updateObjectFromConfigWithoutReloading(*new_object, *info->config->config, info->config->key_in_config);
info->object = new_object;
}
info->exception = new_exception;
info->error_count = error_count;
@ -1191,9 +1191,9 @@ private:
}
}
const CreateObjectFunction create_object;
const String type_name;
LoggerPtr log;
const LoggerPtr log;
const ExternalLoader & external_loader;
mutable std::mutex mutex;
std::condition_variable event;
@ -1275,10 +1275,7 @@ private:
ExternalLoader::ExternalLoader(const String & type_name_, LoggerPtr log_)
: config_files_reader(std::make_unique<LoadablesConfigReader>(type_name_, log_))
, loading_dispatcher(std::make_unique<LoadingDispatcher>(
[this](auto && a, auto && b, auto && c) { return createObject(a, b, c); },
type_name_,
log_))
, loading_dispatcher(std::make_unique<LoadingDispatcher>(type_name_, log_, *this))
, periodic_updater(std::make_unique<PeriodicUpdater>(*config_files_reader, *loading_dispatcher))
, type_name(type_name_)
, log(log_)
@ -1505,13 +1502,13 @@ void ExternalLoader::reloadConfig(const String & repository_name, const String &
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path));
}
ExternalLoader::LoadablePtr ExternalLoader::createObject(
ExternalLoader::LoadableMutablePtr ExternalLoader::createOrCloneObject(
const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const
{
if (previous_version)
return previous_version->clone();
return create(name, *config.config, config.key_in_config, config.repository_name);
return createObject(name, *config.config, config.key_in_config, config.repository_name);
}
template ExternalLoader::LoadablePtr ExternalLoader::getLoadResult<ExternalLoader::LoadablePtr>(const String &) const;

View File

@ -50,6 +50,7 @@ class ExternalLoader
{
public:
using LoadablePtr = std::shared_ptr<const IExternalLoadable>;
using LoadableMutablePtr = std::shared_ptr<IExternalLoadable>;
using Loadables = std::vector<LoadablePtr>;
using Status = ExternalLoaderStatus;
@ -211,7 +212,15 @@ public:
void reloadConfig(const String & repository_name, const String & path) const;
protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config, const String & repository_name) const = 0;
virtual LoadableMutablePtr createObject(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config, const String & repository_name) const = 0;
/// Returns whether the object must be reloaded after a specified change in its configuration.
virtual bool doesConfigChangeRequiresReloadingObject(const Poco::Util::AbstractConfiguration & /* old_config */, const String & /* old_key_in_config */,
const Poco::Util::AbstractConfiguration & /* new_config */, const String & /* new_key_in_config */) const { return true; /* always reload */ }
/// Updates the object from the configuration without reloading as much as possible.
virtual void updateObjectFromConfigWithoutReloading(
IExternalLoadable & /* object */, const Poco::Util::AbstractConfiguration & /* config */, const String & /* key_in_config */) const {}
private:
void checkLoaded(const LoadResult & result, bool check_no_errors) const;
@ -219,7 +228,7 @@ private:
Strings getAllTriedToLoadNames() const;
LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
LoadableMutablePtr createOrCloneObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
class LoadablesConfigReader;
std::unique_ptr<LoadablesConfigReader> config_files_reader;
@ -231,7 +240,7 @@ private:
std::unique_ptr<PeriodicUpdater> periodic_updater;
const String type_name;
LoggerPtr log;
const LoggerPtr log;
};
}

View File

@ -26,11 +26,6 @@ bool ExternalLoaderDictionaryStorageConfigRepository::exists(const std::string &
return getName() == loadable_definition_name;
}
Poco::Timestamp ExternalLoaderDictionaryStorageConfigRepository::getUpdateTime(const std::string &)
{
return dictionary_storage.getUpdateTime();
}
LoadablesConfigurationPtr ExternalLoaderDictionaryStorageConfigRepository::load(const std::string &)
{
return dictionary_storage.getConfiguration();

View File

@ -19,8 +19,6 @@ public:
bool exists(const std::string & loadable_definition_name) override;
Poco::Timestamp getUpdateTime(const std::string & loadable_definition_name) override;
LoadablesConfigurationPtr load(const std::string & loadable_definition_name) override;
private:

View File

@ -28,14 +28,6 @@ bool ExternalLoaderTempConfigRepository::exists(const String & path_)
}
Poco::Timestamp ExternalLoaderTempConfigRepository::getUpdateTime(const String & path_)
{
if (!exists(path_))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Loadable {} not found", path_);
return creation_time;
}
LoadablesConfigurationPtr ExternalLoaderTempConfigRepository::load(const String & path_)
{
if (!exists(path_))

View File

@ -18,14 +18,12 @@ public:
std::set<String> getAllLoadablesDefinitionNames() override;
bool exists(const String & path) override;
Poco::Timestamp getUpdateTime(const String & path) override;
LoadablesConfigurationPtr load(const String & path) override;
private:
String name;
String path;
LoadablesConfigurationPtr config;
Poco::Timestamp creation_time;
};
}

View File

@ -24,7 +24,7 @@ ExternalLoaderXMLConfigRepository::ExternalLoaderXMLConfigRepository(
{
}
Poco::Timestamp ExternalLoaderXMLConfigRepository::getUpdateTime(const std::string & definition_entity_name)
std::optional<Poco::Timestamp> ExternalLoaderXMLConfigRepository::getUpdateTime(const std::string & definition_entity_name)
{
return FS::getModificationTimestamp(definition_entity_name);
}

View File

@ -26,7 +26,7 @@ public:
bool exists(const std::string & definition_entity_name) override;
/// Return xml-file modification time via stat call
Poco::Timestamp getUpdateTime(const std::string & definition_entity_name) override;
std::optional<Poco::Timestamp> getUpdateTime(const std::string & definition_entity_name) override;
/// May contain definition about several entities (several dictionaries in one .xml file)
LoadablesConfigurationPtr load(const std::string & config_file) override;

View File

@ -43,7 +43,7 @@ public:
/// If lifetime exceeded and isModified(), ExternalLoader replace current object with the result of clone().
virtual bool isModified() const = 0;
/// Returns new object with the same configuration. Is used to update modified object when lifetime exceeded.
virtual std::shared_ptr<const IExternalLoadable> clone() const = 0;
virtual std::shared_ptr<IExternalLoadable> clone() const = 0;
};
}

View File

@ -37,7 +37,7 @@ public:
virtual bool exists(const std::string & path) = 0;
/// Returns entity last update time
virtual Poco::Timestamp getUpdateTime(const std::string & path) = 0;
virtual std::optional<Poco::Timestamp> getUpdateTime(const std::string & /* path */) { return {}; }
/// Load configuration from some concrete source to AbstractConfiguration
virtual LoadablesConfigurationPtr load(const std::string & path) = 0;

View File

@ -7,6 +7,7 @@
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/ExternalLoaderDictionaryStorageConfigRepository.h>
#include <Parsers/ASTLiteral.h>
#include <Common/Config/ConfigHelper.h>
#include <Common/quoteString.h>
#include <QueryPipeline/Pipe.h>
#include <IO/Operators.h>
@ -194,12 +195,6 @@ void StorageDictionary::removeDictionaryConfigurationFromRepository()
remove_repository_callback.reset();
}
Poco::Timestamp StorageDictionary::getUpdateTime() const
{
std::lock_guard lock(dictionary_config_mutex);
return update_time;
}
LoadablesConfigurationPtr StorageDictionary::getConfiguration() const
{
std::lock_guard lock(dictionary_config_mutex);
@ -221,42 +216,47 @@ void StorageDictionary::renameInMemory(const StorageID & new_table_id)
bool move_to_ordinary = old_table_id.uuid != UUIDHelpers::Nil && new_table_id.uuid == UUIDHelpers::Nil;
assert(old_table_id.uuid == new_table_id.uuid || move_to_atomic || move_to_ordinary);
/// It's better not to update an associated `IDictionary` directly here because it can be not loaded yet or
/// it can be in the process of loading or reloading right now.
/// The correct way is to update the dictionary's configuration first and then ask ExternalDictionariesLoader to reload our dictionary.
{
std::lock_guard lock(dictionary_config_mutex);
auto new_configuration = ConfigHelper::clone(*configuration);
new_configuration->setString("dictionary.database", new_table_id.database_name);
new_configuration->setString("dictionary.name", new_table_id.table_name);
configuration->setString("dictionary.database", new_table_id.database_name);
configuration->setString("dictionary.name", new_table_id.table_name);
if (move_to_atomic)
configuration->setString("dictionary.uuid", toString(new_table_id.uuid));
new_configuration->setString("dictionary.uuid", toString(new_table_id.uuid));
else if (move_to_ordinary)
configuration->remove("dictionary.uuid");
new_configuration->remove("dictionary.uuid");
configuration = new_configuration;
}
const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
/// Dictionary is moving between databases of different engines or is renaming inside Ordinary database
bool recreate_dictionary = old_table_id.uuid == UUIDHelpers::Nil || new_table_id.uuid == UUIDHelpers::Nil;
if (recreate_dictionary)
{
/// It's too hard to update both name and uuid, better to reload dictionary with new name
/// For an ordinary database the config repositories of dictionaries are identified by the full name (database name + dictionary name),
/// so we cannot change the dictionary name or the database name on the fly (without extra reloading) and have to recreate the config repository.
removeDictionaryConfigurationFromRepository();
auto repository = std::make_unique<ExternalLoaderDictionaryStorageConfigRepository>(*this);
remove_repository_callback = getContext()->getExternalDictionariesLoader().addConfigRepository(std::move(repository));
/// Dictionary will be reloaded lazily to avoid exceptions in the middle of renaming
remove_repository_callback = external_dictionaries_loader.addConfigRepository(std::move(repository));
/// Dictionary will be now reloaded lazily.
}
else
{
const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
auto result = external_dictionaries_loader.getLoadResult(old_table_id.getInternalDictionaryName());
if (result.object)
{
const auto dictionary = std::static_pointer_cast<const IDictionary>(result.object);
dictionary->updateDictionaryName(new_table_id);
}
/// For an atomic database dictionaries are identified inside the ExternalLoader by UUID,
/// so we can change the dictionary name or the database name on the fly (without extra reloading) because UUID doesn't change.
external_dictionaries_loader.reloadConfig(old_table_id.getInternalDictionaryName());
dictionary_name = new_table_id.getFullNameNotQuoted();
}
dictionary_name = new_table_id.getFullNameNotQuoted();
}
void StorageDictionary::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /* context */) const
@ -278,19 +278,19 @@ void StorageDictionary::alter(const AlterCommands & params, ContextPtr alter_con
auto new_comment = getInMemoryMetadataPtr()->comment;
auto storage_id = getStorageID();
const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
auto result = external_dictionaries_loader.getLoadResult(storage_id.getInternalDictionaryName());
/// It's better not to update an associated `IDictionary` directly here because it can be not loaded yet or
/// it can be in the process of loading or reloading right now.
/// The correct way is to update the dictionary's configuration first and then ask ExternalDictionariesLoader to reload our dictionary.
if (result.object)
{
auto dictionary = std::static_pointer_cast<const IDictionary>(result.object);
auto * dictionary_non_const = const_cast<IDictionary *>(dictionary.get());
dictionary_non_const->setDictionaryComment(new_comment);
std::lock_guard lock(dictionary_config_mutex);
auto new_configuration = ConfigHelper::clone(*configuration);
new_configuration->setString("dictionary.comment", new_comment);
configuration = new_configuration;
}
std::lock_guard lock(dictionary_config_mutex);
configuration->setString("dictionary.comment", new_comment);
const auto & external_dictionaries_loader = getContext()->getExternalDictionariesLoader();
external_dictionaries_loader.reloadConfig(getStorageID().getInternalDictionaryName());
}
void registerStorageDictionary(StorageFactory & factory)

View File

@ -92,7 +92,6 @@ public:
void alter(const AlterCommands & params, ContextPtr alter_context, AlterLockHolder &) override;
Poco::Timestamp getUpdateTime() const;
LoadablesConfigurationPtr getConfiguration() const;
String getDictionaryName() const { return dictionary_name; }
@ -102,8 +101,7 @@ private:
const Location location;
mutable std::mutex dictionary_config_mutex;
Poco::Timestamp update_time;
LoadablesConfigurationPtr configuration;
LoadablesConfigurationPtr configuration TSA_GUARDED_BY(dictionary_config_mutex);
scope_guard remove_repository_callback;

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<allow_experimental_database_replicated>1</allow_experimental_database_replicated>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,3 @@
<clickhouse>
<dictionaries_lazy_load>0</dictionaries_lazy_load>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>node_ll</host>
<port>9000</port>
</replica>
<replica>
<host>node_no_ll</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,136 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node_ll = cluster.add_instance(
"node_ll",
main_configs=[
"configs/remote_servers.xml",
],
user_configs=[
"configs/allow_database_types.xml",
],
macros={"replica": "node_ll", "shard": "shard"},
with_zookeeper=True,
)
node_no_ll = cluster.add_instance(
"node_no_ll",
main_configs=[
"configs/no_lazy_load.xml",
"configs/remote_servers.xml",
],
user_configs=[
"configs/allow_database_types.xml",
],
macros={"replica": "node_no_ll", "shard": "shard"},
with_zookeeper=True,
)
instances = [node_ll, node_no_ll]
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
# "sleep(1)" is here to make loading of dictionaries a bit slower for this test.
instances[0].query(
"CREATE VIEW src ON CLUSTER 'cluster' AS SELECT number AS key, number * number + 1 + sleep(1) AS value FROM numbers(10)"
)
# "dict_get_user" can only call dictGet(), nothing more.
instances[0].query("CREATE USER dictget_user ON CLUSTER 'cluster'")
instances[0].query(
"GRANT dictGet ON atomicdb.dict TO dictget_user ON CLUSTER 'cluster'"
)
instances[0].query(
"GRANT dictGet ON repldb.dict TO dictget_user ON CLUSTER 'cluster'"
)
instances[0].query("CREATE DATABASE atomicdb ON CLUSTER 'cluster'")
instances[0].query(
"CREATE DATABASE repldb ON CLUSTER 'cluster' ENGINE=Replicated('/clickhouse/path/','{shard}','{replica}')"
)
yield cluster
finally:
instances[0].query("DROP TABLE IF EXISTS src ON CLUSTER 'cluster'")
instances[0].query("DROP USER IF EXISTS dictget_user ON CLUSTER 'cluster'")
instances[0].query("DROP DATABASE IF EXISTS atomicdb ON CLUSTER 'cluster'")
instances[0].query("DROP DATABASE IF EXISTS repldb ON CLUSTER 'cluster'")
cluster.shutdown()
@pytest.fixture(autouse=True)
def cleanup_after_test():
try:
yield
finally:
instances[0].query("DROP DICTIONARY IF EXISTS dict ON CLUSTER 'cluster'")
def get_status(instance, dictionary_name):
return instance.query(
"SELECT status FROM system.dictionaries WHERE name='" + dictionary_name + "'"
).rstrip("\n")
@pytest.mark.parametrize(
"database, instance_to_create_dictionary, instances_to_check",
[
("atomicdb", node_ll, [node_ll]),
("atomicdb", node_no_ll, [node_no_ll]),
("repldb", node_ll, [node_ll, node_no_ll]),
("repldb", node_no_ll, [node_ll, node_no_ll]),
],
)
def test_create_or_replace(database, instance_to_create_dictionary, instances_to_check):
num_steps = 2
dict_uuids = {}
for step in range(0, num_steps):
create_dictionary_query = f"CREATE OR REPLACE DICTIONARY {database}.dict (key Int64, value Int64) PRIMARY KEY key SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'src' PASSWORD '' DB 'default')) LIFETIME(0) LAYOUT(FLAT())"
instance_to_create_dictionary.query(create_dictionary_query)
for instance in instances_to_check:
if instance != instance_to_create_dictionary:
instance.query(f"SYSTEM SYNC DATABASE REPLICA {database}")
dict_uuid = instance.query(
f"SELECT uuid FROM system.tables WHERE database='{database}' AND table='dict'"
).strip("\n")
dict_db, dict_name, dict_status = (
instance.query(
f"SELECT database, name, status FROM system.dictionaries WHERE uuid='{dict_uuid}'"
)
.strip("\n")
.split("\t")
)
assert dict_db == database
assert dict_name == "dict"
# "uuid" must be the same for all the dictionaries created at the same "step" and different for the dictionaries created at different steps.
if step in dict_uuids:
assert dict_uuids[step] == dict_uuid
dict_uuids[step] = dict_uuid
assert dict_uuid not in [
dict_uuids[prev_step] for prev_step in range(0, step)
]
expected_dict_status = (
["NOT_LOADED"] if instance == node_ll else ["LOADING", "LOADED"]
)
assert dict_status in expected_dict_status
for instance in instances_to_check:
select_query = f"SELECT arrayJoin([0, 5, 7, 11]) as key, dictGet({database}.dict, 'value', key)"
expected_result = TSV([[0, 1], [5, 26], [7, 50], [11, 0]])
assert instance.query(select_query) == expected_result
assert instance.query(select_query, user="dictget_user") == expected_result