mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Move everything to NamedCollecitonFactory
This commit is contained in:
parent
e864be5a66
commit
83c0c9bcde
@ -48,6 +48,7 @@
|
||||
#include <Common/FailPoint.h>
|
||||
#include <Common/CPUID.h>
|
||||
#include <Common/HTTPConnectionPool.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
#include <Server/waitServersToFinish.h>
|
||||
#include <Interpreters/Cache/FileCacheFactory.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
@ -70,7 +71,6 @@
|
||||
#include <Storages/System/attachInformationSchemaTables.h>
|
||||
#include <Storages/Cache/ExternalDataSourceCache.h>
|
||||
#include <Storages/Cache/registerRemoteFileMetadatas.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <AggregateFunctions/registerAggregateFunctions.h>
|
||||
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
|
||||
#include <Functions/registerFunctions.h>
|
||||
@ -1337,7 +1337,7 @@ try
|
||||
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_max_size_in_bytes, compiled_expression_cache_max_elements);
|
||||
#endif
|
||||
|
||||
NamedCollectionUtils::loadIfNot();
|
||||
NamedCollectionFactory::instance().loadIfNot();
|
||||
|
||||
/// Initialize main config reloader.
|
||||
std::string include_from_path = config().getString("include_from", "/etc/metrika.xml");
|
||||
@ -1606,7 +1606,7 @@ try
|
||||
#if USE_SSL
|
||||
CertificateReloader::instance().tryLoad(*config);
|
||||
#endif
|
||||
NamedCollectionUtils::reloadFromConfig(*config);
|
||||
NamedCollectionFactory::instance().reloadFromConfig(*config);
|
||||
|
||||
FileCacheFactory::instance().updateSettingsFromConfig(*config);
|
||||
|
||||
|
@ -1,674 +0,0 @@
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/FieldVisitorToString.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Parsers/ASTCreateNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTAlterNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTDropNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <Common/NamedCollections/NamedCollections.h>
|
||||
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NAMED_COLLECTION_ALREADY_EXISTS;
|
||||
extern const int NAMED_COLLECTION_DOESNT_EXIST;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
}
|
||||
|
||||
namespace NamedCollectionUtils
|
||||
{
|
||||
|
||||
static std::atomic<bool> is_loaded_from_config = false;
|
||||
static std::atomic<bool> is_loaded_from_sql = false;
|
||||
|
||||
class LoadFromConfig
|
||||
{
|
||||
private:
|
||||
const Poco::Util::AbstractConfiguration & config;
|
||||
|
||||
public:
|
||||
explicit LoadFromConfig(const Poco::Util::AbstractConfiguration & config_)
|
||||
: config(config_) {}
|
||||
|
||||
std::vector<std::string> listCollections() const
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys collections_names;
|
||||
config.keys(NAMED_COLLECTIONS_CONFIG_PREFIX, collections_names);
|
||||
return collections_names;
|
||||
}
|
||||
|
||||
NamedCollectionsMap getAll() const
|
||||
{
|
||||
NamedCollectionsMap result;
|
||||
for (const auto & collection_name : listCollections())
|
||||
{
|
||||
if (result.contains(collection_name))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Found duplicate named collection `{}`",
|
||||
collection_name);
|
||||
}
|
||||
result.emplace(collection_name, get(collection_name));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
MutableNamedCollectionPtr get(const std::string & collection_name) const
|
||||
{
|
||||
const auto collection_prefix = getCollectionPrefix(collection_name);
|
||||
std::queue<std::string> enumerate_input;
|
||||
std::set<std::string, std::less<>> enumerate_result;
|
||||
|
||||
enumerate_input.push(collection_prefix);
|
||||
NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
|
||||
|
||||
/// Collection does not have any keys.
|
||||
/// (`enumerate_result` == <collection_path>).
|
||||
const bool collection_is_empty = enumerate_result.size() == 1
|
||||
&& *enumerate_result.begin() == collection_prefix;
|
||||
std::set<std::string, std::less<>> keys;
|
||||
if (!collection_is_empty)
|
||||
{
|
||||
/// Skip collection prefix and add +1 to avoid '.' in the beginning.
|
||||
for (const auto & path : enumerate_result)
|
||||
keys.emplace(path.substr(collection_prefix.size() + 1));
|
||||
}
|
||||
|
||||
return NamedCollection::create(
|
||||
config, collection_name, collection_prefix, keys, SourceId::CONFIG, /* is_mutable */false);
|
||||
}
|
||||
|
||||
private:
|
||||
static constexpr auto NAMED_COLLECTIONS_CONFIG_PREFIX = "named_collections";
|
||||
|
||||
static std::string getCollectionPrefix(const std::string & collection_name)
|
||||
{
|
||||
return fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
|
||||
}
|
||||
};
|
||||
|
||||
class INamedCollectionsStorage
|
||||
{
|
||||
public:
|
||||
virtual ~INamedCollectionsStorage() = default;
|
||||
|
||||
virtual bool exists(const std::string & path) const = 0;
|
||||
|
||||
virtual std::vector<std::string> list() const = 0;
|
||||
|
||||
virtual std::string read(const std::string & path) const = 0;
|
||||
|
||||
virtual void write(const std::string & path, const std::string & data, bool replace) = 0;
|
||||
|
||||
virtual void remove(const std::string & path) = 0;
|
||||
|
||||
virtual bool removeIfExists(const std::string & path) = 0;
|
||||
};
|
||||
|
||||
using NamedCollectionsStoragePtr = std::unique_ptr<INamedCollectionsStorage>;
|
||||
|
||||
|
||||
class NamedCollectionsMetadata : private WithContext
|
||||
{
|
||||
private:
|
||||
NamedCollectionsStoragePtr storage;
|
||||
|
||||
public:
|
||||
NamedCollectionsMetadata(NamedCollectionsStoragePtr storage_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, storage(std::move(storage_)) {}
|
||||
|
||||
std::vector<std::string> listCollections() const
|
||||
{
|
||||
auto paths = storage->list();
|
||||
std::vector<std::string> collections;
|
||||
collections.reserve(paths.size());
|
||||
for (const auto & path : paths)
|
||||
collections.push_back(fs::path(path).stem());
|
||||
return collections;
|
||||
}
|
||||
|
||||
NamedCollectionsMap getAll() const
|
||||
{
|
||||
NamedCollectionsMap result;
|
||||
for (const auto & collection_name : listCollections())
|
||||
{
|
||||
if (result.contains(collection_name))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Found duplicate named collection `{}`",
|
||||
collection_name);
|
||||
}
|
||||
result.emplace(collection_name, get(collection_name));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
MutableNamedCollectionPtr get(const std::string & collection_name) const
|
||||
{
|
||||
const auto query = readCreateQuery(collection_name);
|
||||
return createNamedCollectionFromAST(query);
|
||||
}
|
||||
|
||||
MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query)
|
||||
{
|
||||
writeCreateQuery(query);
|
||||
return createNamedCollectionFromAST(query);
|
||||
}
|
||||
|
||||
void update(const ASTAlterNamedCollectionQuery & query)
|
||||
{
|
||||
auto create_query = readCreateQuery(query.collection_name);
|
||||
|
||||
std::unordered_map<std::string, Field> result_changes_map;
|
||||
for (const auto & [name, value] : query.changes)
|
||||
{
|
||||
auto [it, inserted] = result_changes_map.emplace(name, value);
|
||||
if (!inserted)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Value with key `{}` is used twice in the SET query (collection name: {})",
|
||||
name, query.collection_name);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & [name, value] : create_query.changes)
|
||||
result_changes_map.emplace(name, value);
|
||||
|
||||
std::unordered_map<std::string, bool> result_overridability_map;
|
||||
for (const auto & [name, value] : query.overridability)
|
||||
result_overridability_map.emplace(name, value);
|
||||
for (const auto & [name, value] : create_query.overridability)
|
||||
result_overridability_map.emplace(name, value);
|
||||
|
||||
for (const auto & delete_key : query.delete_keys)
|
||||
{
|
||||
auto it = result_changes_map.find(delete_key);
|
||||
if (it == result_changes_map.end())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Cannot delete key `{}` because it does not exist in collection",
|
||||
delete_key);
|
||||
}
|
||||
else
|
||||
{
|
||||
result_changes_map.erase(it);
|
||||
auto it_override = result_overridability_map.find(delete_key);
|
||||
if (it_override != result_overridability_map.end())
|
||||
result_overridability_map.erase(it_override);
|
||||
}
|
||||
}
|
||||
|
||||
create_query.changes.clear();
|
||||
for (const auto & [name, value] : result_changes_map)
|
||||
create_query.changes.emplace_back(name, value);
|
||||
create_query.overridability = std::move(result_overridability_map);
|
||||
|
||||
if (create_query.changes.empty())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Named collection cannot be empty (collection name: {})",
|
||||
query.collection_name);
|
||||
|
||||
chassert(create_query.collection_name == query.collection_name);
|
||||
writeCreateQuery(create_query, true);
|
||||
}
|
||||
|
||||
void remove(const std::string & collection_name)
|
||||
{
|
||||
storage->remove(getFileName(collection_name));
|
||||
}
|
||||
|
||||
bool removeIfExists(const std::string & collection_name)
|
||||
{
|
||||
return storage->removeIfExists(getFileName(collection_name));
|
||||
}
|
||||
|
||||
private:
|
||||
std::string getFileName(const std::string & collection_name) const
|
||||
{
|
||||
return escapeForFileName(collection_name) + ".sql";
|
||||
}
|
||||
|
||||
static MutableNamedCollectionPtr createNamedCollectionFromAST(const ASTCreateNamedCollectionQuery & query)
|
||||
{
|
||||
const auto & collection_name = query.collection_name;
|
||||
const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
|
||||
|
||||
std::set<std::string, std::less<>> keys;
|
||||
for (const auto & [name, _] : query.changes)
|
||||
keys.insert(name);
|
||||
|
||||
return NamedCollection::create(
|
||||
*config, collection_name, "", keys, SourceId::SQL, /* is_mutable */true);
|
||||
}
|
||||
|
||||
ASTCreateNamedCollectionQuery readCreateQuery(const std::string & collection_name) const
|
||||
{
|
||||
const auto path = getFileName(collection_name);
|
||||
auto query = storage->read(path);
|
||||
|
||||
ParserCreateNamedCollectionQuery parser;
|
||||
auto ast = parseQuery(parser, query, "in file " + path, 0, getContext()->getSettingsRef().max_parser_depth, getContext()->getSettingsRef().max_parser_backtracks);
|
||||
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
|
||||
return create_query;
|
||||
}
|
||||
|
||||
void writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace = false)
|
||||
{
|
||||
auto normalized_query = query.clone();
|
||||
auto & changes = typeid_cast<ASTCreateNamedCollectionQuery *>(normalized_query.get())->changes;
|
||||
::sort(
|
||||
changes.begin(), changes.end(),
|
||||
[](const SettingChange & lhs, const SettingChange & rhs) { return lhs.name < rhs.name; });
|
||||
|
||||
storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
|
||||
}
|
||||
};
|
||||
|
||||
class NamedCollectionsLocalStorage : public INamedCollectionsStorage, private WithContext
|
||||
{
|
||||
private:
|
||||
std::string root_path;
|
||||
|
||||
public:
|
||||
NamedCollectionsLocalStorage(ContextPtr context_, const std::string & path_)
|
||||
: WithContext(context_)
|
||||
, root_path(path_)
|
||||
{
|
||||
if (fs::exists(root_path))
|
||||
cleanup();
|
||||
}
|
||||
|
||||
~NamedCollectionsLocalStorage() override = default;
|
||||
|
||||
std::vector<std::string> list() const override
|
||||
{
|
||||
if (!fs::exists(root_path))
|
||||
return {};
|
||||
|
||||
std::vector<std::string> elements;
|
||||
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
|
||||
{
|
||||
const auto & current_path = it->path();
|
||||
if (current_path.extension() == ".sql")
|
||||
{
|
||||
elements.push_back(it->path());
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(
|
||||
getLogger("NamedCollectionsLocalStorage"),
|
||||
"Unexpected file {} in named collections directory",
|
||||
current_path.filename().string());
|
||||
}
|
||||
}
|
||||
return elements;
|
||||
}
|
||||
|
||||
bool exists(const std::string & path) const override
|
||||
{
|
||||
return fs::exists(getPath(path));
|
||||
}
|
||||
|
||||
std::string read(const std::string & path) const override
|
||||
{
|
||||
ReadBufferFromFile in(getPath(path));
|
||||
std::string data;
|
||||
readStringUntilEOF(data, in);
|
||||
return data;
|
||||
}
|
||||
|
||||
void write(const std::string & path, const std::string & data, bool replace) override
|
||||
{
|
||||
if (!replace && fs::exists(path))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Metadata file {} for named collection already exists",
|
||||
path);
|
||||
}
|
||||
|
||||
fs::create_directories(root_path);
|
||||
|
||||
auto tmp_path = getPath(path + ".tmp");
|
||||
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
|
||||
writeString(data, out);
|
||||
|
||||
out.next();
|
||||
if (getContext()->getSettingsRef().fsync_metadata)
|
||||
out.sync();
|
||||
out.close();
|
||||
|
||||
fs::rename(tmp_path, getPath(path));
|
||||
}
|
||||
|
||||
void remove(const std::string & path) override
|
||||
{
|
||||
if (!removeIfExists(getPath(path)))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove `{}`, because it doesn't exist", path);
|
||||
}
|
||||
}
|
||||
|
||||
bool removeIfExists(const std::string & path) override
|
||||
{
|
||||
return fs::remove(getPath(path));
|
||||
}
|
||||
|
||||
private:
|
||||
std::string getPath(const std::string & path) const
|
||||
{
|
||||
return fs::path(root_path) / path;
|
||||
}
|
||||
|
||||
/// Delete .tmp files. They could be left undeleted in case of
|
||||
/// some exception or abrupt server restart.
|
||||
void cleanup()
|
||||
{
|
||||
std::vector<std::string> files_to_remove;
|
||||
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
|
||||
{
|
||||
const auto & current_path = it->path();
|
||||
if (current_path.extension() == ".tmp")
|
||||
files_to_remove.push_back(current_path);
|
||||
}
|
||||
for (const auto & file : files_to_remove)
|
||||
fs::remove(file);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class NamedCollectionsZooKeeperStorage : public INamedCollectionsStorage, private WithContext
|
||||
{
|
||||
private:
|
||||
std::string root_path;
|
||||
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
|
||||
|
||||
public:
|
||||
NamedCollectionsZooKeeperStorage(ContextPtr context_, const std::string & path_)
|
||||
: WithContext(context_)
|
||||
, root_path(path_)
|
||||
{
|
||||
if (root_path.empty())
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Collections path cannot be empty");
|
||||
|
||||
if (root_path != "/" && root_path.back() == '/')
|
||||
root_path.resize(root_path.size() - 1);
|
||||
if (root_path.front() != '/')
|
||||
root_path = "/" + root_path;
|
||||
|
||||
auto client = getClient();
|
||||
if (root_path != "/" && !client->exists(root_path))
|
||||
{
|
||||
client->createAncestors(root_path);
|
||||
client->createIfNotExists(root_path, "");
|
||||
}
|
||||
}
|
||||
|
||||
~NamedCollectionsZooKeeperStorage() override = default;
|
||||
|
||||
std::vector<std::string> list() const override
|
||||
{
|
||||
return getClient()->getChildren(root_path);
|
||||
}
|
||||
|
||||
bool exists(const std::string & path) const override
|
||||
{
|
||||
return getClient()->exists(getPath(path));
|
||||
}
|
||||
|
||||
std::string read(const std::string & path) const override
|
||||
{
|
||||
return getClient()->get(getPath(path));
|
||||
}
|
||||
|
||||
void write(const std::string & path, const std::string & data, bool replace) override
|
||||
{
|
||||
if (replace)
|
||||
{
|
||||
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent);
|
||||
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Metadata file {} for named collection already exists",
|
||||
path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void remove(const std::string & path) override
|
||||
{
|
||||
getClient()->remove(getPath(path));
|
||||
}
|
||||
|
||||
bool removeIfExists(const std::string & path) override
|
||||
{
|
||||
auto code = getClient()->tryRemove(getPath(path));
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return true;
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
return false;
|
||||
throw Coordination::Exception::fromPath(code, getPath(path));
|
||||
}
|
||||
|
||||
private:
|
||||
zkutil::ZooKeeperPtr getClient() const
|
||||
{
|
||||
if (!zookeeper_client || zookeeper_client->expired())
|
||||
{
|
||||
zookeeper_client = getContext()->getZooKeeper();
|
||||
zookeeper_client->sync(root_path);
|
||||
}
|
||||
return zookeeper_client;
|
||||
}
|
||||
|
||||
std::string getPath(const std::string & path) const
|
||||
{
|
||||
return fs::path(root_path) / path;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
std::unique_lock<std::mutex> lockNamedCollectionsTransaction()
|
||||
{
|
||||
static std::mutex transaction_lock;
|
||||
return std::unique_lock(transaction_lock);
|
||||
}
|
||||
|
||||
void loadFromConfigUnlocked(const Poco::Util::AbstractConfiguration & config, std::unique_lock<std::mutex> &)
|
||||
{
|
||||
auto named_collections = LoadFromConfig(config).getAll();
|
||||
LOG_TRACE(
|
||||
getLogger("NamedCollectionsUtils"),
|
||||
"Loaded {} collections from config", named_collections.size());
|
||||
|
||||
NamedCollectionFactory::instance().add(std::move(named_collections));
|
||||
is_loaded_from_config = true;
|
||||
}
|
||||
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadFromConfigUnlocked(config, lock);
|
||||
}
|
||||
|
||||
void reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
auto collections = LoadFromConfig(config).getAll();
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
instance.removeById(SourceId::CONFIG);
|
||||
instance.add(collections);
|
||||
is_loaded_from_config = true;
|
||||
}
|
||||
|
||||
auto getNamedCollectionsStorage(ContextPtr context)
|
||||
{
|
||||
static const std::string storage_config_path = "named_collections_storage";
|
||||
|
||||
const auto & config = context->getConfigRef();
|
||||
const auto storage = config.getString(storage_config_path + ".type", "local");
|
||||
|
||||
if (storage == "local")
|
||||
{
|
||||
const auto path = config.getString(storage_config_path + ".path", fs::path(context->getPath()) / "named_collections");
|
||||
return NamedCollectionsMetadata(
|
||||
std::make_unique<NamedCollectionsLocalStorage>(context, path), context);
|
||||
}
|
||||
if (storage == "zookeeper")
|
||||
{
|
||||
return NamedCollectionsMetadata(
|
||||
std::make_unique<NamedCollectionsZooKeeperStorage>(
|
||||
context, config.getString(storage_config_path + ".path")),
|
||||
context);
|
||||
}
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER,
|
||||
"Unknown storage for named collections: {}", storage);
|
||||
}
|
||||
|
||||
void loadFromSQLUnlocked(ContextPtr context, std::unique_lock<std::mutex> &)
|
||||
{
|
||||
auto named_collections = getNamedCollectionsStorage(context).getAll();
|
||||
LOG_TRACE(
|
||||
getLogger("NamedCollectionsUtils"),
|
||||
"Loaded {} collections from SQL", named_collections.size());
|
||||
|
||||
NamedCollectionFactory::instance().add(std::move(named_collections));
|
||||
is_loaded_from_sql = true;
|
||||
}
|
||||
|
||||
void loadFromSQL(ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadFromSQLUnlocked(context, lock);
|
||||
}
|
||||
|
||||
void loadIfNotUnlocked(std::unique_lock<std::mutex> & lock)
|
||||
{
|
||||
auto global_context = Context::getGlobalContextInstance();
|
||||
if (!is_loaded_from_config)
|
||||
loadFromConfigUnlocked(global_context->getConfigRef(), lock);
|
||||
if (!is_loaded_from_sql)
|
||||
loadFromSQLUnlocked(global_context, lock);
|
||||
}
|
||||
|
||||
void loadIfNot()
|
||||
{
|
||||
if (is_loaded_from_sql && is_loaded_from_config)
|
||||
return;
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
}
|
||||
|
||||
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
if (!instance.exists(query.collection_name))
|
||||
{
|
||||
if (!query.if_exists)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
query.collection_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
getNamedCollectionsStorage(context).remove(query.collection_name);
|
||||
instance.remove(query.collection_name);
|
||||
}
|
||||
|
||||
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
if (instance.exists(query.collection_name))
|
||||
{
|
||||
if (!query.if_not_exists)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"A named collection `{}` already exists",
|
||||
query.collection_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
instance.add(query.collection_name, getNamedCollectionsStorage(context).create(query));
|
||||
}
|
||||
|
||||
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
auto lock = lockNamedCollectionsTransaction();
|
||||
loadIfNotUnlocked(lock);
|
||||
auto & instance = NamedCollectionFactory::instance();
|
||||
if (!instance.exists(query.collection_name))
|
||||
{
|
||||
if (!query.if_exists)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
query.collection_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
getNamedCollectionsStorage(context).update(query);
|
||||
|
||||
auto collection = instance.getMutable(query.collection_name);
|
||||
auto collection_lock = collection->lock();
|
||||
|
||||
for (const auto & [name, value] : query.changes)
|
||||
{
|
||||
auto it_override = query.overridability.find(name);
|
||||
if (it_override != query.overridability.end())
|
||||
collection->setOrUpdate<String, true>(name, convertFieldToString(value), it_override->second);
|
||||
else
|
||||
collection->setOrUpdate<String, true>(name, convertFieldToString(value), {});
|
||||
}
|
||||
|
||||
for (const auto & key : query.delete_keys)
|
||||
collection->remove<true>(key);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
#pragma once
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
|
||||
namespace Poco { namespace Util { class AbstractConfiguration; } }
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ASTCreateNamedCollectionQuery;
|
||||
class ASTAlterNamedCollectionQuery;
|
||||
class ASTDropNamedCollectionQuery;
|
||||
|
||||
namespace NamedCollectionUtils
|
||||
{
|
||||
|
||||
enum class SourceId : uint8_t
|
||||
{
|
||||
NONE = 0,
|
||||
CONFIG = 1,
|
||||
SQL = 2,
|
||||
};
|
||||
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
/// Load named collections from `context->getPath() / named_collections /`.
|
||||
void loadFromSQL(ContextPtr context);
|
||||
|
||||
/// Remove collection as well as its metadata from `context->getPath() / named_collections /`.
|
||||
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
/// Create a new collection from AST and put it to `context->getPath() / named_collections /`.
|
||||
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
/// Update definition of already existing collection from AST and update result in `context->getPath() / named_collections /`.
|
||||
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
void loadIfNot();
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -4,7 +4,6 @@
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/Operators.h>
|
||||
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
|
||||
@ -299,7 +298,7 @@ MutableNamedCollectionPtr NamedCollection::duplicate() const
|
||||
auto impl = pimpl->createCopy(collection_name);
|
||||
return std::unique_ptr<NamedCollection>(
|
||||
new NamedCollection(
|
||||
std::move(impl), collection_name, NamedCollectionUtils::SourceId::NONE, true));
|
||||
std::move(impl), collection_name, SourceId::NONE, true));
|
||||
}
|
||||
|
||||
NamedCollection::Keys NamedCollection::getKeys(ssize_t depth, const std::string & prefix) const
|
||||
|
@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/NamedCollections/NamedCollections_fwd.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
|
||||
namespace Poco { namespace Util { class AbstractConfiguration; } }
|
||||
|
||||
@ -23,7 +22,12 @@ class NamedCollection
|
||||
public:
|
||||
using Key = std::string;
|
||||
using Keys = std::set<Key, std::less<>>;
|
||||
using SourceId = NamedCollectionUtils::SourceId;
|
||||
enum class SourceId : uint8_t
|
||||
{
|
||||
NONE = 0,
|
||||
CONFIG = 1,
|
||||
SQL = 2,
|
||||
};
|
||||
|
||||
static MutableNamedCollectionPtr create(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsMetadata.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -152,7 +153,7 @@ bool NamedCollectionFactory::removeIfExistsUnlocked(
|
||||
return true;
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::removeById(NamedCollectionUtils::SourceId id)
|
||||
void NamedCollectionFactory::removeById(NamedCollection::SourceId id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
std::erase_if(
|
||||
@ -166,4 +167,150 @@ NamedCollectionsMap NamedCollectionFactory::getAll() const
|
||||
return loaded_named_collections;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
constexpr auto NAMED_COLLECTIONS_CONFIG_PREFIX = "named_collections";
|
||||
|
||||
std::vector<std::string> listCollections(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys collections_names;
|
||||
config.keys(NAMED_COLLECTIONS_CONFIG_PREFIX, collections_names);
|
||||
return collections_names;
|
||||
}
|
||||
|
||||
MutableNamedCollectionPtr getCollection(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & collection_name)
|
||||
{
|
||||
const auto collection_prefix = fmt::format("{}.{}", NAMED_COLLECTIONS_CONFIG_PREFIX, collection_name);
|
||||
std::queue<std::string> enumerate_input;
|
||||
std::set<std::string, std::less<>> enumerate_result;
|
||||
|
||||
enumerate_input.push(collection_prefix);
|
||||
NamedCollectionConfiguration::listKeys(config, std::move(enumerate_input), enumerate_result, -1);
|
||||
|
||||
/// Collection does not have any keys. (`enumerate_result` == <collection_path>).
|
||||
const bool collection_is_empty = enumerate_result.size() == 1
|
||||
&& *enumerate_result.begin() == collection_prefix;
|
||||
|
||||
std::set<std::string, std::less<>> keys;
|
||||
if (!collection_is_empty)
|
||||
{
|
||||
/// Skip collection prefix and add +1 to avoid '.' in the beginning.
|
||||
for (const auto & path : enumerate_result)
|
||||
keys.emplace(path.substr(collection_prefix.size() + 1));
|
||||
}
|
||||
|
||||
return NamedCollection::create(
|
||||
config, collection_name, collection_prefix, keys, NamedCollection::SourceId::CONFIG, /* is_mutable */false);
|
||||
}
|
||||
|
||||
NamedCollectionsMap getNamedCollections(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
NamedCollectionsMap result;
|
||||
for (const auto & collection_name : listCollections(config))
|
||||
{
|
||||
if (result.contains(collection_name))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Found duplicate named collection `{}`",
|
||||
collection_name);
|
||||
}
|
||||
result.emplace(collection_name, getCollection(config, collection_name));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::loadFromConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
add(getNamedCollections(config));
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::reloadFromConfig(const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto collections = getNamedCollections(config);
|
||||
removeById(NamedCollection::SourceId::CONFIG);
|
||||
add(collections);
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::loadFromSQL(const ContextPtr & context)
|
||||
{
|
||||
add(NamedCollectionsMetadata::create(context)->getAll());
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::loadIfNot()
|
||||
{
|
||||
if (loaded)
|
||||
return;
|
||||
auto global_context = Context::getGlobalContextInstance();
|
||||
loadFromConfig(global_context->getConfigRef());
|
||||
loadFromSQL(global_context);
|
||||
loaded = true;
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
loadIfNot();
|
||||
if (exists(query.collection_name))
|
||||
{
|
||||
if (query.if_not_exists)
|
||||
return;
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"A named collection `{}` already exists",
|
||||
query.collection_name);
|
||||
}
|
||||
add(query.collection_name, NamedCollectionsMetadata::create(context)->create(query));
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
loadIfNot();
|
||||
if (!exists(query.collection_name))
|
||||
{
|
||||
if (query.if_exists)
|
||||
return;
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
query.collection_name);
|
||||
}
|
||||
NamedCollectionsMetadata::create(context)->remove(query.collection_name);
|
||||
remove(query.collection_name);
|
||||
}
|
||||
|
||||
void NamedCollectionFactory::updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context)
|
||||
{
|
||||
loadIfNot();
|
||||
if (!exists(query.collection_name))
|
||||
{
|
||||
if (query.if_exists)
|
||||
return;
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove collection `{}`, because it doesn't exist",
|
||||
query.collection_name);
|
||||
}
|
||||
NamedCollectionsMetadata::create(context)->update(query);
|
||||
|
||||
auto collection = getMutable(query.collection_name);
|
||||
auto collection_lock = collection->lock();
|
||||
|
||||
for (const auto & [name, value] : query.changes)
|
||||
{
|
||||
auto it_override = query.overridability.find(name);
|
||||
if (it_override != query.overridability.end())
|
||||
collection->setOrUpdate<String, true>(name, convertFieldToString(value), it_override->second);
|
||||
else
|
||||
collection->setOrUpdate<String, true>(name, convertFieldToString(value), {});
|
||||
}
|
||||
|
||||
for (const auto & key : query.delete_keys)
|
||||
collection->remove<true>(key);
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,11 @@
|
||||
#include <Common/NamedCollections/NamedCollections.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
class ASTCreateNamedCollectionQuery;
|
||||
class ASTDropNamedCollectionQuery;
|
||||
class ASTAlterNamedCollectionQuery;
|
||||
|
||||
class NamedCollectionFactory : boost::noncopyable
|
||||
{
|
||||
@ -26,10 +30,23 @@ public:
|
||||
|
||||
void removeIfExists(const std::string & collection_name);
|
||||
|
||||
void removeById(NamedCollectionUtils::SourceId id);
|
||||
void removeById(NamedCollection::SourceId id);
|
||||
|
||||
NamedCollectionsMap getAll() const;
|
||||
|
||||
void loadIfNot();
|
||||
|
||||
void reloadFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void createFromSQL(const ASTCreateNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
void removeFromSQL(const ASTDropNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
void updateFromSQL(const ASTAlterNamedCollectionQuery & query, ContextPtr context);
|
||||
|
||||
/// This method is public only for unit tests.
|
||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
private:
|
||||
bool existsUnlocked(
|
||||
const std::string & collection_name,
|
||||
@ -50,8 +67,12 @@ private:
|
||||
|
||||
mutable NamedCollectionsMap loaded_named_collections;
|
||||
|
||||
LoggerPtr log = getLogger("NamedCollectionFactory");
|
||||
mutable std::mutex mutex;
|
||||
bool is_initialized = false;
|
||||
bool loaded = false;
|
||||
|
||||
void loadFromSQL(const ContextPtr & context);
|
||||
};
|
||||
|
||||
}
|
||||
|
438
src/Common/NamedCollections/NamedCollectionsMetadata.cpp
Normal file
438
src/Common/NamedCollections/NamedCollectionsMetadata.cpp
Normal file
@ -0,0 +1,438 @@
|
||||
#include <Common/NamedCollections/NamedCollectionsMetadata.h>
|
||||
#include <Common/NamedCollections/NamedCollectionConfiguration.h>
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
#include <Common/ZooKeeper/KeeperException.h>
|
||||
#include <Common/ZooKeeper/ZooKeeper.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/ParserCreateQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <filesystem>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NAMED_COLLECTION_ALREADY_EXISTS;
|
||||
extern const int NAMED_COLLECTION_DOESNT_EXIST;
|
||||
extern const int INVALID_CONFIG_PARAMETER;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
MutableNamedCollectionPtr createNamedCollectionFromAST(const ASTCreateNamedCollectionQuery & query)
|
||||
{
|
||||
const auto & collection_name = query.collection_name;
|
||||
const auto config = NamedCollectionConfiguration::createConfiguration(collection_name, query.changes, query.overridability);
|
||||
|
||||
std::set<std::string, std::less<>> keys;
|
||||
for (const auto & [name, _] : query.changes)
|
||||
keys.insert(name);
|
||||
|
||||
return NamedCollection::create(
|
||||
*config, collection_name, "", keys, NamedCollection::SourceId::SQL, /* is_mutable */true);
|
||||
}
|
||||
|
||||
std::string getFileName(const std::string & collection_name)
|
||||
{
|
||||
return escapeForFileName(collection_name) + ".sql";
|
||||
}
|
||||
}
|
||||
|
||||
class NamedCollectionsMetadata::INamedCollectionsStorage
|
||||
{
|
||||
public:
|
||||
virtual ~INamedCollectionsStorage() = default;
|
||||
|
||||
virtual bool exists(const std::string & path) const = 0;
|
||||
|
||||
virtual std::vector<std::string> list() const = 0;
|
||||
|
||||
virtual std::string read(const std::string & path) const = 0;
|
||||
|
||||
virtual void write(const std::string & path, const std::string & data, bool replace) = 0;
|
||||
|
||||
virtual void remove(const std::string & path) = 0;
|
||||
|
||||
virtual bool removeIfExists(const std::string & path) = 0;
|
||||
};
|
||||
|
||||
|
||||
class NamedCollectionsMetadata::LocalStorage : public INamedCollectionsStorage, private WithContext
|
||||
{
|
||||
private:
|
||||
std::string root_path;
|
||||
|
||||
public:
|
||||
LocalStorage(ContextPtr context_, const std::string & path_)
|
||||
: WithContext(context_)
|
||||
, root_path(path_)
|
||||
{
|
||||
if (fs::exists(root_path))
|
||||
cleanup();
|
||||
}
|
||||
|
||||
~LocalStorage() override = default;
|
||||
|
||||
std::vector<std::string> list() const override
|
||||
{
|
||||
if (!fs::exists(root_path))
|
||||
return {};
|
||||
|
||||
std::vector<std::string> elements;
|
||||
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
|
||||
{
|
||||
const auto & current_path = it->path();
|
||||
if (current_path.extension() == ".sql")
|
||||
{
|
||||
elements.push_back(it->path());
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_WARNING(
|
||||
getLogger("LocalStorage"),
|
||||
"Unexpected file {} in named collections directory",
|
||||
current_path.filename().string());
|
||||
}
|
||||
}
|
||||
return elements;
|
||||
}
|
||||
|
||||
bool exists(const std::string & path) const override
|
||||
{
|
||||
return fs::exists(getPath(path));
|
||||
}
|
||||
|
||||
std::string read(const std::string & path) const override
|
||||
{
|
||||
ReadBufferFromFile in(getPath(path));
|
||||
std::string data;
|
||||
readStringUntilEOF(data, in);
|
||||
return data;
|
||||
}
|
||||
|
||||
void write(const std::string & path, const std::string & data, bool replace) override
|
||||
{
|
||||
if (!replace && fs::exists(path))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Metadata file {} for named collection already exists",
|
||||
path);
|
||||
}
|
||||
|
||||
fs::create_directories(root_path);
|
||||
|
||||
auto tmp_path = getPath(path + ".tmp");
|
||||
WriteBufferFromFile out(tmp_path, data.size(), O_WRONLY | O_CREAT | O_EXCL);
|
||||
writeString(data, out);
|
||||
|
||||
out.next();
|
||||
if (getContext()->getSettingsRef().fsync_metadata)
|
||||
out.sync();
|
||||
out.close();
|
||||
|
||||
fs::rename(tmp_path, getPath(path));
|
||||
}
|
||||
|
||||
void remove(const std::string & path) override
|
||||
{
|
||||
if (!removeIfExists(getPath(path)))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_DOESNT_EXIST,
|
||||
"Cannot remove `{}`, because it doesn't exist", path);
|
||||
}
|
||||
}
|
||||
|
||||
bool removeIfExists(const std::string & path) override
|
||||
{
|
||||
return fs::remove(getPath(path));
|
||||
}
|
||||
|
||||
private:
|
||||
std::string getPath(const std::string & path) const
|
||||
{
|
||||
return fs::path(root_path) / path;
|
||||
}
|
||||
|
||||
/// Delete .tmp files. They could be left undeleted in case of
|
||||
/// some exception or abrupt server restart.
|
||||
void cleanup()
|
||||
{
|
||||
std::vector<std::string> files_to_remove;
|
||||
for (fs::directory_iterator it{root_path}; it != fs::directory_iterator{}; ++it)
|
||||
{
|
||||
const auto & current_path = it->path();
|
||||
if (current_path.extension() == ".tmp")
|
||||
files_to_remove.push_back(current_path);
|
||||
}
|
||||
for (const auto & file : files_to_remove)
|
||||
fs::remove(file);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
class NamedCollectionsMetadata::ZooKeeperStorage : public INamedCollectionsStorage, private WithContext
|
||||
{
|
||||
private:
|
||||
std::string root_path;
|
||||
mutable zkutil::ZooKeeperPtr zookeeper_client{nullptr};
|
||||
|
||||
public:
|
||||
ZooKeeperStorage(ContextPtr context_, const std::string & path_)
|
||||
: WithContext(context_)
|
||||
, root_path(path_)
|
||||
{
|
||||
if (root_path.empty())
|
||||
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Collections path cannot be empty");
|
||||
|
||||
if (root_path != "/" && root_path.back() == '/')
|
||||
root_path.resize(root_path.size() - 1);
|
||||
if (root_path.front() != '/')
|
||||
root_path = "/" + root_path;
|
||||
|
||||
auto client = getClient();
|
||||
if (root_path != "/" && !client->exists(root_path))
|
||||
{
|
||||
client->createAncestors(root_path);
|
||||
client->createIfNotExists(root_path, "");
|
||||
}
|
||||
}
|
||||
|
||||
~ZooKeeperStorage() override = default;
|
||||
|
||||
std::vector<std::string> list() const override
|
||||
{
|
||||
return getClient()->getChildren(root_path);
|
||||
}
|
||||
|
||||
bool exists(const std::string & path) const override
|
||||
{
|
||||
return getClient()->exists(getPath(path));
|
||||
}
|
||||
|
||||
std::string read(const std::string & path) const override
|
||||
{
|
||||
return getClient()->get(getPath(path));
|
||||
}
|
||||
|
||||
void write(const std::string & path, const std::string & data, bool replace) override
|
||||
{
|
||||
if (replace)
|
||||
{
|
||||
getClient()->createOrUpdate(getPath(path), data, zkutil::CreateMode::Persistent);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto code = getClient()->tryCreate(getPath(path), data, zkutil::CreateMode::Persistent);
|
||||
|
||||
if (code == Coordination::Error::ZNODEEXISTS)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Metadata file {} for named collection already exists",
|
||||
path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void remove(const std::string & path) override
|
||||
{
|
||||
getClient()->remove(getPath(path));
|
||||
}
|
||||
|
||||
bool removeIfExists(const std::string & path) override
|
||||
{
|
||||
auto code = getClient()->tryRemove(getPath(path));
|
||||
if (code == Coordination::Error::ZOK)
|
||||
return true;
|
||||
if (code == Coordination::Error::ZNONODE)
|
||||
return false;
|
||||
throw Coordination::Exception::fromPath(code, getPath(path));
|
||||
}
|
||||
|
||||
private:
|
||||
zkutil::ZooKeeperPtr getClient() const
|
||||
{
|
||||
if (!zookeeper_client || zookeeper_client->expired())
|
||||
{
|
||||
zookeeper_client = getContext()->getZooKeeper();
|
||||
zookeeper_client->sync(root_path);
|
||||
}
|
||||
return zookeeper_client;
|
||||
}
|
||||
|
||||
std::string getPath(const std::string & path) const
|
||||
{
|
||||
return fs::path(root_path) / path;
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<NamedCollectionsMetadata> NamedCollectionsMetadata::create(const ContextPtr & context_)
|
||||
{
|
||||
static const std::string storage_config_path = "named_collections_storage";
|
||||
|
||||
const auto & config = context_->getConfigRef();
|
||||
const auto storage_type = config.getString(storage_config_path + ".type", "local");
|
||||
|
||||
if (storage_type == "local")
|
||||
{
|
||||
const auto path = config.getString(
|
||||
storage_config_path + ".path",
|
||||
std::filesystem::path(context_->getPath()) / "named_collections");
|
||||
|
||||
auto local_storage = std::make_unique<NamedCollectionsMetadata::LocalStorage>(context_, path);
|
||||
return std::make_unique<NamedCollectionsMetadata>(std::move(local_storage), context_);
|
||||
}
|
||||
if (storage_type == "zookeeper")
|
||||
{
|
||||
auto zk_storage = std::make_unique<NamedCollectionsMetadata::ZooKeeperStorage>(context_, config.getString(storage_config_path + ".path"));
|
||||
return std::make_unique<NamedCollectionsMetadata>(std::move(zk_storage), context_);
|
||||
}
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::INVALID_CONFIG_PARAMETER,
|
||||
"Unknown storage for named collections: {}", storage_type);
|
||||
}
|
||||
|
||||
MutableNamedCollectionPtr NamedCollectionsMetadata::get(const std::string & collection_name) const
|
||||
{
|
||||
const auto query = readCreateQuery(collection_name);
|
||||
return createNamedCollectionFromAST(query);
|
||||
}
|
||||
|
||||
NamedCollectionsMap NamedCollectionsMetadata::getAll() const
|
||||
{
|
||||
NamedCollectionsMap result;
|
||||
for (const auto & collection_name : listCollections())
|
||||
{
|
||||
if (result.contains(collection_name))
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::NAMED_COLLECTION_ALREADY_EXISTS,
|
||||
"Found duplicate named collection `{}`",
|
||||
collection_name);
|
||||
}
|
||||
result.emplace(collection_name, get(collection_name));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
MutableNamedCollectionPtr NamedCollectionsMetadata::create(const ASTCreateNamedCollectionQuery & query)
|
||||
{
|
||||
writeCreateQuery(query);
|
||||
return createNamedCollectionFromAST(query);
|
||||
}
|
||||
|
||||
void NamedCollectionsMetadata::remove(const std::string & collection_name)
|
||||
{
|
||||
storage->remove(getFileName(collection_name));
|
||||
}
|
||||
|
||||
bool NamedCollectionsMetadata::removeIfExists(const std::string & collection_name)
|
||||
{
|
||||
return storage->removeIfExists(getFileName(collection_name));
|
||||
}
|
||||
|
||||
void NamedCollectionsMetadata::update(const ASTAlterNamedCollectionQuery & query)
|
||||
{
|
||||
auto create_query = readCreateQuery(query.collection_name);
|
||||
|
||||
std::unordered_map<std::string, Field> result_changes_map;
|
||||
for (const auto & [name, value] : query.changes)
|
||||
{
|
||||
auto [it, inserted] = result_changes_map.emplace(name, value);
|
||||
if (!inserted)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Value with key `{}` is used twice in the SET query (collection name: {})",
|
||||
name, query.collection_name);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto & [name, value] : create_query.changes)
|
||||
result_changes_map.emplace(name, value);
|
||||
|
||||
std::unordered_map<std::string, bool> result_overridability_map;
|
||||
for (const auto & [name, value] : query.overridability)
|
||||
result_overridability_map.emplace(name, value);
|
||||
for (const auto & [name, value] : create_query.overridability)
|
||||
result_overridability_map.emplace(name, value);
|
||||
|
||||
for (const auto & delete_key : query.delete_keys)
|
||||
{
|
||||
auto it = result_changes_map.find(delete_key);
|
||||
if (it == result_changes_map.end())
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Cannot delete key `{}` because it does not exist in collection",
|
||||
delete_key);
|
||||
}
|
||||
else
|
||||
{
|
||||
result_changes_map.erase(it);
|
||||
auto it_override = result_overridability_map.find(delete_key);
|
||||
if (it_override != result_overridability_map.end())
|
||||
result_overridability_map.erase(it_override);
|
||||
}
|
||||
}
|
||||
|
||||
create_query.changes.clear();
|
||||
for (const auto & [name, value] : result_changes_map)
|
||||
create_query.changes.emplace_back(name, value);
|
||||
create_query.overridability = std::move(result_overridability_map);
|
||||
|
||||
if (create_query.changes.empty())
|
||||
throw Exception(
|
||||
ErrorCodes::BAD_ARGUMENTS,
|
||||
"Named collection cannot be empty (collection name: {})",
|
||||
query.collection_name);
|
||||
|
||||
chassert(create_query.collection_name == query.collection_name);
|
||||
writeCreateQuery(create_query, true);
|
||||
}
|
||||
|
||||
std::vector<std::string> NamedCollectionsMetadata::listCollections() const
|
||||
{
|
||||
auto paths = storage->list();
|
||||
std::vector<std::string> collections;
|
||||
collections.reserve(paths.size());
|
||||
for (const auto & path : paths)
|
||||
collections.push_back(std::filesystem::path(path).stem());
|
||||
return collections;
|
||||
}
|
||||
|
||||
ASTCreateNamedCollectionQuery NamedCollectionsMetadata::readCreateQuery(const std::string & collection_name) const
|
||||
{
|
||||
const auto path = getFileName(collection_name);
|
||||
auto query = storage->read(path);
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
|
||||
ParserCreateNamedCollectionQuery parser;
|
||||
auto ast = parseQuery(parser, query, "in file " + path, 0, settings.max_parser_depth, settings.max_parser_backtracks);
|
||||
const auto & create_query = ast->as<const ASTCreateNamedCollectionQuery &>();
|
||||
return create_query;
|
||||
}
|
||||
|
||||
void NamedCollectionsMetadata::writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace)
|
||||
{
|
||||
auto normalized_query = query.clone();
|
||||
auto & changes = typeid_cast<ASTCreateNamedCollectionQuery *>(normalized_query.get())->changes;
|
||||
::sort(
|
||||
changes.begin(), changes.end(),
|
||||
[](const SettingChange & lhs, const SettingChange & rhs) { return lhs.name < rhs.name; });
|
||||
|
||||
storage->write(getFileName(query.collection_name), serializeAST(*normalized_query), replace);
|
||||
}
|
||||
|
||||
}
|
49
src/Common/NamedCollections/NamedCollectionsMetadata.h
Normal file
49
src/Common/NamedCollections/NamedCollectionsMetadata.h
Normal file
@ -0,0 +1,49 @@
|
||||
#pragma once
|
||||
#include <Parsers/ASTCreateNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTAlterNamedCollectionQuery.h>
|
||||
#include <Parsers/ASTDropNamedCollectionQuery.h>
|
||||
#include <Common/NamedCollections/NamedCollections.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class NamedCollectionsMetadata : private WithContext
|
||||
{
|
||||
public:
|
||||
static std::unique_ptr<NamedCollectionsMetadata> create(const ContextPtr & context);
|
||||
|
||||
~NamedCollectionsMetadata() = default;
|
||||
|
||||
NamedCollectionsMap getAll() const;
|
||||
|
||||
MutableNamedCollectionPtr get(const std::string & collection_name) const;
|
||||
|
||||
MutableNamedCollectionPtr create(const ASTCreateNamedCollectionQuery & query);
|
||||
|
||||
void remove(const std::string & collection_name);
|
||||
|
||||
bool removeIfExists(const std::string & collection_name);
|
||||
|
||||
void update(const ASTAlterNamedCollectionQuery & query);
|
||||
|
||||
class INamedCollectionsStorage;
|
||||
NamedCollectionsMetadata(std::shared_ptr<INamedCollectionsStorage> storage_, ContextPtr context_)
|
||||
: WithContext(context_)
|
||||
, storage(std::move(storage_)) {}
|
||||
/// FIXME: It should be a protected constructor, but I failed make create() method a proper friend.
|
||||
|
||||
private:
|
||||
class LocalStorage;
|
||||
class ZooKeeperStorage;
|
||||
|
||||
std::shared_ptr<INamedCollectionsStorage> storage;
|
||||
|
||||
std::vector<std::string> listCollections() const;
|
||||
|
||||
ASTCreateNamedCollectionQuery readCreateQuery(const std::string & collection_name) const;
|
||||
|
||||
void writeCreateQuery(const ASTCreateNamedCollectionQuery & query, bool replace = false);
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -1,6 +1,5 @@
|
||||
#include <Common/tests/gtest_global_context.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Poco/Util/XMLConfiguration.h>
|
||||
#include <Poco/DOM/DOMParser.h>
|
||||
#include <gtest/gtest.h>
|
||||
@ -29,7 +28,7 @@ TEST(NamedCollections, SimpleConfig)
|
||||
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
|
||||
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
|
||||
|
||||
NamedCollectionUtils::loadFromConfig(*config);
|
||||
NamedCollectionFactory::instance().loadFromConfig(*config);
|
||||
|
||||
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection1"));
|
||||
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection2"));
|
||||
@ -119,7 +118,7 @@ TEST(NamedCollections, NestedConfig)
|
||||
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
|
||||
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
|
||||
|
||||
NamedCollectionUtils::loadFromConfig(*config);
|
||||
NamedCollectionFactory::instance().loadFromConfig(*config);
|
||||
|
||||
ASSERT_TRUE(NamedCollectionFactory::instance().exists("collection3"));
|
||||
|
||||
@ -171,7 +170,7 @@ TEST(NamedCollections, NestedConfigDuplicateKeys)
|
||||
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
|
||||
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
|
||||
|
||||
NamedCollectionUtils::loadFromConfig(*config);
|
||||
NamedCollectionFactory::instance().loadFromConfig(*config);
|
||||
auto collection = NamedCollectionFactory::instance().get("collection");
|
||||
|
||||
auto keys = collection->getKeys();
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -23,7 +23,7 @@ BlockIO InterpreterAlterNamedCollectionQuery::execute()
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
}
|
||||
|
||||
NamedCollectionUtils::updateFromSQL(query, current_context);
|
||||
NamedCollectionFactory::instance().updateFromSQL(query, current_context);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -23,7 +23,7 @@ BlockIO InterpreterCreateNamedCollectionQuery::execute()
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
}
|
||||
|
||||
NamedCollectionUtils::createFromSQL(query, current_context);
|
||||
NamedCollectionFactory::instance().createFromSQL(query, current_context);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -4,7 +4,7 @@
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
#include <Common/NamedCollections/NamedCollectionUtils.h>
|
||||
#include <Common/NamedCollections/NamedCollectionsFactory.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -23,7 +23,7 @@ BlockIO InterpreterDropNamedCollectionQuery::execute()
|
||||
return executeDDLQueryOnCluster(query_ptr, current_context, params);
|
||||
}
|
||||
|
||||
NamedCollectionUtils::removeFromSQL(query, current_context);
|
||||
NamedCollectionFactory::instance().removeFromSQL(query, current_context);
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -95,7 +95,7 @@ MutableNamedCollectionPtr tryGetNamedCollectionWithOverrides(
|
||||
if (asts.empty())
|
||||
return nullptr;
|
||||
|
||||
NamedCollectionUtils::loadIfNot();
|
||||
NamedCollectionFactory::instance().loadIfNot();
|
||||
|
||||
auto collection_name = getCollectionName(asts);
|
||||
if (!collection_name.has_value())
|
||||
|
@ -33,7 +33,7 @@ void StorageSystemNamedCollections::fillData(MutableColumns & res_columns, Conte
|
||||
{
|
||||
const auto & access = context->getAccess();
|
||||
|
||||
NamedCollectionUtils::loadIfNot();
|
||||
NamedCollectionFactory::instance().loadIfNot();
|
||||
|
||||
auto collections = NamedCollectionFactory::instance().getAll();
|
||||
for (const auto & [name, collection] : collections)
|
||||
|
Loading…
Reference in New Issue
Block a user