Merge pull request #57752 from chizhonkova/refactor_sql_udf_loader

Refactoring for UserDefinedSQLObjectsLoader
This commit is contained in:
pufit 2023-12-18 16:08:46 -05:00 committed by GitHub
commit 36d1905881
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 508 additions and 294 deletions

View File

@ -43,7 +43,7 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/ErrorHandlers.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -757,7 +757,7 @@ void LocalServer::processConfig()
}
/// For ClickHouse local if path is not set the loader will be disabled.
global_context->getUserDefinedSQLObjectsLoader().loadObjects();
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
LOG_DEBUG(log, "Loaded metadata.");
}

View File

@ -67,7 +67,7 @@
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Common/NamedCollections/NamedCollectionUtils.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
@ -1756,7 +1756,7 @@ try
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.
global_context->getUserDefinedSQLObjectsLoader().loadObjects();
global_context->getUserDefinedSQLObjectsStorage().loadObjects();
}
catch (...)
{

View File

@ -1,47 +0,0 @@
#pragma once
#include <base/types.h>
namespace DB
{
class IAST;
struct Settings;
enum class UserDefinedSQLObjectType;
/// Interface for a loader of user-defined SQL objects.
/// Implementations: UserDefinedSQLLoaderFromDisk, UserDefinedSQLLoaderFromZooKeeper
class IUserDefinedSQLObjectsLoader
{
public:
virtual ~IUserDefinedSQLObjectsLoader() = default;
/// Whether this loader can replicate SQL objects to another node.
virtual bool isReplicated() const { return false; }
virtual String getReplicationID() const { return ""; }
/// Loads all objects. Can be called once - if objects are already loaded the function does nothing.
virtual void loadObjects() = 0;
/// Stops watching.
virtual void stopWatching() {}
/// Immediately reloads all objects, throws an exception if failed.
virtual void reloadObjects() = 0;
/// Immediately reloads a specified object only.
virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0;
/// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction).
virtual bool storeObject(
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) = 0;
/// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction).
virtual bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) = 0;
};
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <base/types.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class IAST;
struct Settings;
enum class UserDefinedSQLObjectType;
/// Interface for a storage of user-defined SQL objects.
/// Implementations: UserDefinedSQLObjectsDiskStorage, UserDefinedSQLObjectsZooKeeperStorage
class IUserDefinedSQLObjectsStorage
{
public:
virtual ~IUserDefinedSQLObjectsStorage() = default;
/// Whether this loader can replicate SQL objects to another node.
virtual bool isReplicated() const { return false; }
virtual String getReplicationID() const { return ""; }
/// Loads all objects. Can be called once - if objects are already loaded the function does nothing.
virtual void loadObjects() = 0;
/// Get object by name. If no object stored with object_name throws exception.
virtual ASTPtr get(const String & object_name) const = 0;
/// Get object by name. If no object stored with object_name return nullptr.
virtual ASTPtr tryGet(const String & object_name) const = 0;
/// Check if object with object_name is stored.
virtual bool has(const String & object_name) const = 0;
/// Get all user defined object names.
virtual std::vector<String> getAllObjectNames() const = 0;
/// Get all user defined objects.
virtual std::vector<std::pair<String, ASTPtr>> getAllObjects() const = 0;
/// Check whether any UDFs have been stored.
virtual bool empty() const = 0;
/// Stops watching.
virtual void stopWatching() {}
/// Immediately reloads all objects, throws an exception if failed.
virtual void reloadObjects() = 0;
/// Immediately reloads a specified object only.
virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0;
/// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction).
virtual bool storeObject(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) = 0;
/// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction).
virtual bool removeObject(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists) = 0;
};
}

View File

@ -3,7 +3,7 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Backups/RestorerFromBackup.h>
#include <Functions/FunctionFactory.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h>
@ -14,8 +14,6 @@
#include <Parsers/ASTIdentifier.h>
#include <Common/quoteString.h>
#include <boost/container/flat_set.hpp>
namespace DB
{
@ -23,7 +21,6 @@ namespace DB
namespace ErrorCodes
{
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNKNOWN_FUNCTION;
extern const int CANNOT_DROP_FUNCTION;
extern const int CANNOT_CREATE_RECURSIVE_FUNCTION;
extern const int UNSUPPORTED_METHOD;
@ -130,20 +127,17 @@ bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & c
checkCanBeRegistered(context, function_name, *create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query);
std::lock_guard lock{mutex};
auto it = function_name_to_create_query_map.find(function_name);
if (it != function_name_to_create_query_map.end())
{
if (throw_if_exists)
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", function_name);
else if (!replace_if_exists)
return false;
}
try
{
auto & loader = context->getUserDefinedSQLObjectsLoader();
bool stored = loader.storeObject(UserDefinedSQLObjectType::Function, function_name, *create_function_query, throw_if_exists, replace_if_exists, context->getSettingsRef());
auto & loader = context->getUserDefinedSQLObjectsStorage();
bool stored = loader.storeObject(
context,
UserDefinedSQLObjectType::Function,
function_name,
create_function_query,
throw_if_exists,
replace_if_exists,
context->getSettingsRef());
if (!stored)
return false;
}
@ -153,7 +147,6 @@ bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & c
throw;
}
function_name_to_create_query_map[function_name] = create_function_query;
return true;
}
@ -161,20 +154,14 @@ bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr &
{
checkCanBeUnregistered(context, function_name);
std::lock_guard lock(mutex);
auto it = function_name_to_create_query_map.find(function_name);
if (it == function_name_to_create_query_map.end())
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", function_name);
else
return false;
}
try
{
auto & loader = context->getUserDefinedSQLObjectsLoader();
bool removed = loader.removeObject(UserDefinedSQLObjectType::Function, function_name, throw_if_not_exists);
auto & storage = context->getUserDefinedSQLObjectsStorage();
bool removed = storage.removeObject(
context,
UserDefinedSQLObjectType::Function,
function_name,
throw_if_not_exists);
if (!removed)
return false;
}
@ -184,61 +171,41 @@ bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr &
throw;
}
function_name_to_create_query_map.erase(function_name);
return true;
}
ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const
{
std::lock_guard lock(mutex);
auto it = function_name_to_create_query_map.find(function_name);
if (it == function_name_to_create_query_map.end())
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"The function name '{}' is not registered",
function_name);
return it->second;
return global_context->getUserDefinedSQLObjectsStorage().get(function_name);
}
ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const
{
std::lock_guard lock(mutex);
auto it = function_name_to_create_query_map.find(function_name);
if (it == function_name_to_create_query_map.end())
return nullptr;
return it->second;
return global_context->getUserDefinedSQLObjectsStorage().tryGet(function_name);
}
bool UserDefinedSQLFunctionFactory::has(const String & function_name) const
{
return tryGet(function_name) != nullptr;
return global_context->getUserDefinedSQLObjectsStorage().has(function_name);
}
std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames() const
{
std::vector<std::string> registered_names;
std::lock_guard lock(mutex);
registered_names.reserve(function_name_to_create_query_map.size());
for (const auto & [name, _] : function_name_to_create_query_map)
registered_names.emplace_back(name);
return registered_names;
return global_context->getUserDefinedSQLObjectsStorage().getAllObjectNames();
}
bool UserDefinedSQLFunctionFactory::empty() const
{
std::lock_guard lock(mutex);
return function_name_to_create_query_map.empty();
return global_context->getUserDefinedSQLObjectsStorage().empty();
}
void UserDefinedSQLFunctionFactory::backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const
{
backupUserDefinedSQLObjects(backup_entries_collector, data_path_in_backup, UserDefinedSQLObjectType::Function, getAllFunctions());
backupUserDefinedSQLObjects(
backup_entries_collector,
data_path_in_backup,
UserDefinedSQLObjectType::Function,
global_context->getUserDefinedSQLObjectsStorage().getAllObjects());
}
void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const String & data_path_in_backup)
@ -252,52 +219,4 @@ void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const
registerFunction(context, function_name, create_function_query, throw_if_exists, replace_if_exists);
}
void UserDefinedSQLFunctionFactory::setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions)
{
std::unordered_map<String, ASTPtr> normalized_functions;
for (const auto & [function_name, create_query] : new_functions)
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
std::lock_guard lock(mutex);
function_name_to_create_query_map = std::move(normalized_functions);
}
std::vector<std::pair<String, ASTPtr>> UserDefinedSQLFunctionFactory::getAllFunctions() const
{
std::lock_guard lock{mutex};
std::vector<std::pair<String, ASTPtr>> all_functions;
all_functions.reserve(function_name_to_create_query_map.size());
std::copy(function_name_to_create_query_map.begin(), function_name_to_create_query_map.end(), std::back_inserter(all_functions));
return all_functions;
}
void UserDefinedSQLFunctionFactory::setFunction(const String & function_name, const IAST & create_function_query)
{
std::lock_guard lock(mutex);
function_name_to_create_query_map[function_name] = normalizeCreateFunctionQuery(create_function_query);
}
void UserDefinedSQLFunctionFactory::removeFunction(const String & function_name)
{
std::lock_guard lock(mutex);
function_name_to_create_query_map.erase(function_name);
}
void UserDefinedSQLFunctionFactory::removeAllFunctionsExcept(const Strings & function_names_to_keep)
{
boost::container::flat_set<std::string_view> names_set_to_keep{function_names_to_keep.begin(), function_names_to_keep.end()};
std::lock_guard lock(mutex);
for (auto it = function_name_to_create_query_map.begin(); it != function_name_to_create_query_map.end();)
{
auto current = it++;
if (!names_set_to_keep.contains(current->first))
function_name_to_create_query_map.erase(current);
}
}
std::unique_lock<std::recursive_mutex> UserDefinedSQLFunctionFactory::getLock() const
{
return std::unique_lock{mutex};
}
}

View File

@ -6,7 +6,7 @@
#include <Common/NamePrompter.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/Context.h>
namespace DB
@ -48,23 +48,11 @@ public:
void restore(RestorerFromBackup & restorer, const String & data_path_in_backup);
private:
friend class UserDefinedSQLObjectsLoaderFromDisk;
friend class UserDefinedSQLObjectsLoaderFromZooKeeper;
/// Checks that a specified function can be registered, throws an exception if not.
static void checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query);
static void checkCanBeUnregistered(const ContextPtr & context, const String & function_name);
/// The following functions must be called only by the loader.
void setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions);
std::vector<std::pair<String, ASTPtr>> getAllFunctions() const;
void setFunction(const String & function_name, const IAST & create_function_query);
void removeFunction(const String & function_name);
void removeAllFunctionsExcept(const Strings & function_names_to_keep);
std::unique_lock<std::recursive_mutex> getLock() const;
std::unordered_map<String, ASTPtr> function_name_to_create_query_map;
mutable std::recursive_mutex mutex;
ContextPtr global_context = Context::getGlobalContextInstance();
};
}

View File

@ -6,7 +6,7 @@
#include <Backups/IBackupCoordination.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Interpreters/Context.h>
#include <Parsers/ParserCreateFunctionQuery.h>
@ -37,9 +37,9 @@ void backupUserDefinedSQLObjects(
escapeForFileName(object_name) + ".sql", std::make_shared<BackupEntryFromMemory>(queryToString(create_object_query)));
auto context = backup_entries_collector.getContext();
const auto & loader = context->getUserDefinedSQLObjectsLoader();
const auto & storage = context->getUserDefinedSQLObjectsStorage();
if (!loader.isReplicated())
if (!storage.isReplicated())
{
fs::path data_path_in_backup_fs{data_path_in_backup};
for (const auto & [file_name, entry] : backup_entries)
@ -47,7 +47,7 @@ void backupUserDefinedSQLObjects(
return;
}
String replication_id = loader.getReplicationID();
String replication_id = storage.getReplicationID();
auto backup_coordination = backup_entries_collector.getBackupCoordination();
backup_coordination->addReplicatedSQLObjectsDir(replication_id, object_type, data_path_in_backup);
@ -80,9 +80,9 @@ std::vector<std::pair<String, ASTPtr>>
restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type)
{
auto context = restorer.getContext();
const auto & loader = context->getUserDefinedSQLObjectsLoader();
const auto & storage = context->getUserDefinedSQLObjectsStorage();
if (loader.isReplicated() && !restorer.getRestoreCoordination()->acquireReplicatedSQLObjects(loader.getReplicationID(), object_type))
if (storage.isReplicated() && !restorer.getRestoreCoordination()->acquireReplicatedSQLObjects(storage.getReplicationID(), object_type))
return {}; /// Other replica is already restoring user-defined SQL objects.
auto backup = restorer.getBackup();

View File

@ -1,4 +1,4 @@
#include "Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h"
#include "Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h"
#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h"
#include "Functions/UserDefined/UserDefinedSQLObjectType.h"
@ -51,7 +51,7 @@ namespace
}
}
UserDefinedSQLObjectsLoaderFromDisk::UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_)
UserDefinedSQLObjectsDiskStorage::UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_)
: global_context(global_context_)
, dir_path{makeDirectoryPathCanonical(dir_path_)}
, log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")}
@ -60,13 +60,13 @@ UserDefinedSQLObjectsLoaderFromDisk::UserDefinedSQLObjectsLoaderFromDisk(const C
}
ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name)
ASTPtr UserDefinedSQLObjectsDiskStorage::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name)
{
return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true);
}
ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists)
ASTPtr UserDefinedSQLObjectsDiskStorage::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists)
{
LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path);
@ -93,7 +93,6 @@ ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectTy
"",
0,
global_context->getSettingsRef().max_parser_depth);
UserDefinedSQLFunctionFactory::checkCanBeRegistered(global_context, object_name, *ast);
return ast;
}
}
@ -106,20 +105,20 @@ ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectTy
}
void UserDefinedSQLObjectsLoaderFromDisk::loadObjects()
void UserDefinedSQLObjectsDiskStorage::loadObjects()
{
if (!objects_loaded)
loadObjectsImpl();
}
void UserDefinedSQLObjectsLoaderFromDisk::reloadObjects()
void UserDefinedSQLObjectsDiskStorage::reloadObjects()
{
loadObjectsImpl();
}
void UserDefinedSQLObjectsLoaderFromDisk::loadObjectsImpl()
void UserDefinedSQLObjectsDiskStorage::loadObjectsImpl()
{
LOG_INFO(log, "Loading user defined objects from {}", dir_path);
createDirectory();
@ -148,26 +147,25 @@ void UserDefinedSQLObjectsLoaderFromDisk::loadObjectsImpl()
function_names_and_queries.emplace_back(function_name, ast);
}
UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_queries);
setAllObjects(function_names_and_queries);
objects_loaded = true;
LOG_DEBUG(log, "User defined objects loaded");
}
void UserDefinedSQLObjectsLoaderFromDisk::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
void UserDefinedSQLObjectsDiskStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
{
createDirectory();
auto ast = tryLoadObject(object_type, object_name);
auto & factory = UserDefinedSQLFunctionFactory::instance();
if (ast)
factory.setFunction(object_name, *ast);
setObject(object_name, *ast);
else
factory.removeFunction(object_name);
removeObject(object_name);
}
void UserDefinedSQLObjectsLoaderFromDisk::createDirectory()
void UserDefinedSQLObjectsDiskStorage::createDirectory()
{
std::error_code create_dir_error_code;
fs::create_directories(dir_path, create_dir_error_code);
@ -177,10 +175,11 @@ void UserDefinedSQLObjectsLoaderFromDisk::createDirectory()
}
bool UserDefinedSQLObjectsLoaderFromDisk::storeObject(
bool UserDefinedSQLObjectsDiskStorage::storeObjectImpl(
const ContextPtr & /*current_context*/,
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings)
@ -197,7 +196,7 @@ bool UserDefinedSQLObjectsLoaderFromDisk::storeObject(
}
WriteBufferFromOwnString create_statement_buf;
formatAST(create_object_query, create_statement_buf, false);
formatAST(*create_object_query, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
@ -228,8 +227,11 @@ bool UserDefinedSQLObjectsLoaderFromDisk::storeObject(
}
bool UserDefinedSQLObjectsLoaderFromDisk::removeObject(
UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists)
bool UserDefinedSQLObjectsDiskStorage::removeObjectImpl(
const ContextPtr & /*current_context*/,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists)
{
String file_path = getFilePath(object_type, object_name);
LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path);
@ -249,7 +251,7 @@ bool UserDefinedSQLObjectsLoaderFromDisk::removeObject(
}
String UserDefinedSQLObjectsLoaderFromDisk::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const
String UserDefinedSQLObjectsDiskStorage::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const
{
String file_path;
switch (object_type)

View File

@ -1,6 +1,6 @@
#pragma once
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
@ -9,10 +9,10 @@ namespace DB
{
/// Loads user-defined sql objects from a specified folder.
class UserDefinedSQLObjectsLoaderFromDisk : public IUserDefinedSQLObjectsLoader
class UserDefinedSQLObjectsDiskStorage : public UserDefinedSQLObjectsStorageBase
{
public:
UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_);
UserDefinedSQLObjectsDiskStorage(const ContextPtr & global_context_, const String & dir_path_);
void loadObjects() override;
@ -20,17 +20,22 @@ public:
void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override;
bool storeObject(
private:
bool storeObjectImpl(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override;
bool removeObjectImpl(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists) override;
private:
void createDirectory();
void loadObjectsImpl();
ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name);

View File

@ -0,0 +1,190 @@
#include "Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h"
#include <boost/container/flat_set.hpp>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateFunctionQuery.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNKNOWN_FUNCTION;
}
namespace
{
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
{
auto ptr = create_function_query.clone();
auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
res.if_not_exists = false;
res.or_replace = false;
FunctionNameNormalizer().visit(res.function_core.get());
return ptr;
}
}
ASTPtr UserDefinedSQLObjectsStorageBase::get(const String & object_name) const
{
std::lock_guard lock(mutex);
auto it = object_name_to_create_object_map.find(object_name);
if (it == object_name_to_create_object_map.end())
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"The object name '{}' is not saved",
object_name);
return it->second;
}
ASTPtr UserDefinedSQLObjectsStorageBase::tryGet(const std::string & object_name) const
{
std::lock_guard lock(mutex);
auto it = object_name_to_create_object_map.find(object_name);
if (it == object_name_to_create_object_map.end())
return nullptr;
return it->second;
}
bool UserDefinedSQLObjectsStorageBase::has(const String & object_name) const
{
return tryGet(object_name) != nullptr;
}
std::vector<std::string> UserDefinedSQLObjectsStorageBase::getAllObjectNames() const
{
std::vector<std::string> object_names;
std::lock_guard lock(mutex);
object_names.reserve(object_name_to_create_object_map.size());
for (const auto & [name, _] : object_name_to_create_object_map)
object_names.emplace_back(name);
return object_names;
}
bool UserDefinedSQLObjectsStorageBase::empty() const
{
std::lock_guard lock(mutex);
return object_name_to_create_object_map.empty();
}
bool UserDefinedSQLObjectsStorageBase::storeObject(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings)
{
std::lock_guard lock{mutex};
auto it = object_name_to_create_object_map.find(object_name);
if (it != object_name_to_create_object_map.end())
{
if (throw_if_exists)
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined object '{}' already exists", object_name);
else if (!replace_if_exists)
return false;
}
bool stored = storeObjectImpl(
current_context,
object_type,
object_name,
create_object_query,
throw_if_exists,
replace_if_exists,
settings);
if (stored)
object_name_to_create_object_map[object_name] = create_object_query;
return stored;
}
bool UserDefinedSQLObjectsStorageBase::removeObject(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists)
{
std::lock_guard lock(mutex);
auto it = object_name_to_create_object_map.find(object_name);
if (it == object_name_to_create_object_map.end())
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined object '{}' doesn't exist", object_name);
else
return false;
}
bool removed = removeObjectImpl(
current_context,
object_type,
object_name,
throw_if_not_exists);
if (removed)
object_name_to_create_object_map.erase(object_name);
return removed;
}
std::unique_lock<std::recursive_mutex> UserDefinedSQLObjectsStorageBase::getLock() const
{
return std::unique_lock{mutex};
}
void UserDefinedSQLObjectsStorageBase::setAllObjects(const std::vector<std::pair<String, ASTPtr>> & new_objects)
{
std::unordered_map<String, ASTPtr> normalized_functions;
for (const auto & [function_name, create_query] : new_objects)
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
std::lock_guard lock(mutex);
object_name_to_create_object_map = std::move(normalized_functions);
}
std::vector<std::pair<String, ASTPtr>> UserDefinedSQLObjectsStorageBase::getAllObjects() const
{
std::lock_guard lock{mutex};
std::vector<std::pair<String, ASTPtr>> all_objects;
all_objects.reserve(object_name_to_create_object_map.size());
std::copy(object_name_to_create_object_map.begin(), object_name_to_create_object_map.end(), std::back_inserter(all_objects));
return all_objects;
}
void UserDefinedSQLObjectsStorageBase::setObject(const String & object_name, const IAST & create_object_query)
{
std::lock_guard lock(mutex);
object_name_to_create_object_map[object_name] = normalizeCreateFunctionQuery(create_object_query);
}
void UserDefinedSQLObjectsStorageBase::removeObject(const String & object_name)
{
std::lock_guard lock(mutex);
object_name_to_create_object_map.erase(object_name);
}
void UserDefinedSQLObjectsStorageBase::removeAllObjectsExcept(const Strings & object_names_to_keep)
{
boost::container::flat_set<std::string_view> names_set_to_keep{object_names_to_keep.begin(), object_names_to_keep.end()};
std::lock_guard lock(mutex);
for (auto it = object_name_to_create_object_map.begin(); it != object_name_to_create_object_map.end();)
{
auto current = it++;
if (!names_set_to_keep.contains(current->first))
object_name_to_create_object_map.erase(current);
}
}
}

View File

@ -0,0 +1,69 @@
#pragma once
#include <unordered_map>
#include <mutex>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Parsers/IAST.h>
namespace DB
{
class UserDefinedSQLObjectsStorageBase : public IUserDefinedSQLObjectsStorage
{
public:
ASTPtr get(const String & object_name) const override;
ASTPtr tryGet(const String & object_name) const override;
bool has(const String & object_name) const override;
std::vector<String> getAllObjectNames() const override;
std::vector<std::pair<String, ASTPtr>> getAllObjects() const override;
bool empty() const override;
bool storeObject(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
bool removeObject(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists) override;
protected:
virtual bool storeObjectImpl(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) = 0;
virtual bool removeObjectImpl(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists) = 0;
std::unique_lock<std::recursive_mutex> getLock() const;
void setAllObjects(const std::vector<std::pair<String, ASTPtr>> & new_objects);
void setObject(const String & object_name, const IAST & create_object_query);
void removeObject(const String & object_name);
void removeAllObjectsExcept(const Strings & object_names_to_keep);
std::unordered_map<String, ASTPtr> object_name_to_create_object_map;
mutable std::recursive_mutex mutex;
};
}

View File

@ -1,4 +1,4 @@
#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
@ -47,7 +47,7 @@ namespace
}
UserDefinedSQLObjectsLoaderFromZooKeeper::UserDefinedSQLObjectsLoaderFromZooKeeper(
UserDefinedSQLObjectsZooKeeperStorage::UserDefinedSQLObjectsZooKeeperStorage(
const ContextPtr & global_context_, const String & zookeeper_path_)
: global_context{global_context_}
, zookeeper_getter{[global_context_]() { return global_context_->getZooKeeper(); }}
@ -66,20 +66,20 @@ UserDefinedSQLObjectsLoaderFromZooKeeper::UserDefinedSQLObjectsLoaderFromZooKeep
zookeeper_path = "/" + zookeeper_path;
}
UserDefinedSQLObjectsLoaderFromZooKeeper::~UserDefinedSQLObjectsLoaderFromZooKeeper()
UserDefinedSQLObjectsZooKeeperStorage::~UserDefinedSQLObjectsZooKeeperStorage()
{
SCOPE_EXIT_SAFE(stopWatchingThread());
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::startWatchingThread()
void UserDefinedSQLObjectsZooKeeperStorage::startWatchingThread()
{
if (!watching_flag.exchange(true))
{
watching_thread = ThreadFromGlobalPool(&UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue, this);
watching_thread = ThreadFromGlobalPool(&UserDefinedSQLObjectsZooKeeperStorage::processWatchQueue, this);
}
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatchingThread()
void UserDefinedSQLObjectsZooKeeperStorage::stopWatchingThread()
{
if (watching_flag.exchange(false))
{
@ -89,7 +89,7 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatchingThread()
}
}
zkutil::ZooKeeperPtr UserDefinedSQLObjectsLoaderFromZooKeeper::getZooKeeper()
zkutil::ZooKeeperPtr UserDefinedSQLObjectsZooKeeperStorage::getZooKeeper()
{
auto [zookeeper, session_status] = zookeeper_getter.getZooKeeper();
@ -106,18 +106,18 @@ zkutil::ZooKeeperPtr UserDefinedSQLObjectsLoaderFromZooKeeper::getZooKeeper()
return zookeeper;
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::initZooKeeperIfNeeded()
void UserDefinedSQLObjectsZooKeeperStorage::initZooKeeperIfNeeded()
{
getZooKeeper();
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::resetAfterError()
void UserDefinedSQLObjectsZooKeeperStorage::resetAfterError()
{
zookeeper_getter.resetCache();
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::loadObjects()
void UserDefinedSQLObjectsZooKeeperStorage::loadObjects()
{
/// loadObjects() is called at start from Server::main(), so it's better not to stop here on no connection to ZooKeeper or any other error.
/// However the watching thread must be started anyway in case the connection will be established later.
@ -136,7 +136,7 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::loadObjects()
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue()
void UserDefinedSQLObjectsZooKeeperStorage::processWatchQueue()
{
LOG_DEBUG(log, "Started watching thread");
setThreadName("UserDefObjWatch");
@ -173,13 +173,13 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::processWatchQueue()
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::stopWatching()
void UserDefinedSQLObjectsZooKeeperStorage::stopWatching()
{
stopWatchingThread();
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObjects()
void UserDefinedSQLObjectsZooKeeperStorage::reloadObjects()
{
auto zookeeper = getZooKeeper();
refreshAllObjects(zookeeper);
@ -187,23 +187,24 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObjects()
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
void UserDefinedSQLObjectsZooKeeperStorage::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
{
auto zookeeper = getZooKeeper();
refreshObject(zookeeper, object_type, object_name);
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
void UserDefinedSQLObjectsZooKeeperStorage::createRootNodes(const zkutil::ZooKeeperPtr & zookeeper)
{
zookeeper->createAncestors(zookeeper_path);
zookeeper->createIfNotExists(zookeeper_path, "");
}
bool UserDefinedSQLObjectsLoaderFromZooKeeper::storeObject(
bool UserDefinedSQLObjectsZooKeeperStorage::storeObjectImpl(
const ContextPtr & /*current_context*/,
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings &)
@ -212,7 +213,7 @@ bool UserDefinedSQLObjectsLoaderFromZooKeeper::storeObject(
LOG_DEBUG(log, "Storing user-defined object {} at zk path {}", backQuote(object_name), path);
WriteBufferFromOwnString create_statement_buf;
formatAST(create_object_query, create_statement_buf, false);
formatAST(*create_object_query, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
@ -252,8 +253,11 @@ bool UserDefinedSQLObjectsLoaderFromZooKeeper::storeObject(
}
bool UserDefinedSQLObjectsLoaderFromZooKeeper::removeObject(
UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists)
bool UserDefinedSQLObjectsZooKeeperStorage::removeObjectImpl(
const ContextPtr & /*current_context*/,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists)
{
String path = getNodePath(zookeeper_path, object_type, object_name);
LOG_DEBUG(log, "Removing user-defined object {} at zk path {}", backQuote(object_name), path);
@ -276,7 +280,7 @@ bool UserDefinedSQLObjectsLoaderFromZooKeeper::removeObject(
return true;
}
bool UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectDataAndSetWatch(
bool UserDefinedSQLObjectsZooKeeperStorage::getObjectDataAndSetWatch(
const zkutil::ZooKeeperPtr & zookeeper,
String & data,
const String & path,
@ -298,7 +302,7 @@ bool UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectDataAndSetWatch(
return zookeeper->tryGetWatch(path, data, &entity_stat, object_watcher);
}
ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::parseObjectData(const String & object_data, UserDefinedSQLObjectType object_type)
ASTPtr UserDefinedSQLObjectsZooKeeperStorage::parseObjectData(const String & object_data, UserDefinedSQLObjectType object_type)
{
switch (object_type)
{
@ -317,7 +321,7 @@ ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::parseObjectData(const String &
UNREACHABLE();
}
ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::tryLoadObject(
ASTPtr UserDefinedSQLObjectsZooKeeperStorage::tryLoadObject(
const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name)
{
String path = getNodePath(zookeeper_path, object_type, object_name);
@ -343,7 +347,7 @@ ASTPtr UserDefinedSQLObjectsLoaderFromZooKeeper::tryLoadObject(
}
}
Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch(
Strings UserDefinedSQLObjectsZooKeeperStorage::getObjectNamesAndSetWatch(
const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
{
auto object_list_watcher = [my_watch_queue = watch_queue, object_type](const Coordination::WatchResponse &)
@ -371,7 +375,7 @@ Strings UserDefinedSQLObjectsLoaderFromZooKeeper::getObjectNamesAndSetWatch(
return object_names;
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshAllObjects(const zkutil::ZooKeeperPtr & zookeeper)
void UserDefinedSQLObjectsZooKeeperStorage::refreshAllObjects(const zkutil::ZooKeeperPtr & zookeeper)
{
/// It doesn't make sense to keep the old watch events because we will reread everything in this function.
watch_queue->clear();
@ -380,7 +384,7 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshAllObjects(const zkutil::Z
objects_loaded = true;
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
void UserDefinedSQLObjectsZooKeeperStorage::refreshObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
{
LOG_DEBUG(log, "Refreshing all user-defined {} objects", object_type);
Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type);
@ -393,21 +397,20 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObjects(const zkutil::ZooK
function_names_and_asts.emplace_back(function_name, ast);
}
UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_asts);
setAllObjects(function_names_and_asts);
LOG_DEBUG(log, "All user-defined {} objects refreshed", object_type);
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
void UserDefinedSQLObjectsZooKeeperStorage::syncObjects(const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type)
{
LOG_DEBUG(log, "Syncing user-defined {} objects", object_type);
Strings object_names = getObjectNamesAndSetWatch(zookeeper, object_type);
auto & factory = UserDefinedSQLFunctionFactory::instance();
auto lock = factory.getLock();
getLock();
/// Remove stale objects
factory.removeAllFunctionsExcept(object_names);
removeAllObjectsExcept(object_names);
/// Read & parse only new SQL objects from ZooKeeper
for (const auto & function_name : object_names)
{
@ -418,16 +421,15 @@ void UserDefinedSQLObjectsLoaderFromZooKeeper::syncObjects(const zkutil::ZooKeep
LOG_DEBUG(log, "User-defined {} objects synced", object_type);
}
void UserDefinedSQLObjectsLoaderFromZooKeeper::refreshObject(
void UserDefinedSQLObjectsZooKeeperStorage::refreshObject(
const zkutil::ZooKeeperPtr & zookeeper, UserDefinedSQLObjectType object_type, const String & object_name)
{
auto ast = tryLoadObject(zookeeper, object_type, object_name);
auto & factory = UserDefinedSQLFunctionFactory::instance();
if (ast)
factory.setFunction(object_name, *ast);
setObject(object_name, *ast);
else
factory.removeFunction(object_name);
removeObject(object_name);
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsStorageBase.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
#include <Common/ConcurrentBoundedQueue.h>
@ -12,11 +12,11 @@ namespace DB
{
/// Loads user-defined sql objects from ZooKeeper.
class UserDefinedSQLObjectsLoaderFromZooKeeper : public IUserDefinedSQLObjectsLoader
class UserDefinedSQLObjectsZooKeeperStorage : public UserDefinedSQLObjectsStorageBase
{
public:
UserDefinedSQLObjectsLoaderFromZooKeeper(const ContextPtr & global_context_, const String & zookeeper_path_);
~UserDefinedSQLObjectsLoaderFromZooKeeper() override;
UserDefinedSQLObjectsZooKeeperStorage(const ContextPtr & global_context_, const String & zookeeper_path_);
~UserDefinedSQLObjectsZooKeeperStorage() override;
bool isReplicated() const override { return true; }
String getReplicationID() const override { return zookeeper_path; }
@ -26,16 +26,21 @@ public:
void reloadObjects() override;
void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override;
bool storeObject(
private:
bool storeObjectImpl(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
ASTPtr create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override;
bool removeObjectImpl(
const ContextPtr & current_context,
UserDefinedSQLObjectType object_type,
const String & object_name,
bool throw_if_not_exists) override;
private:
void processWatchQueue();
zkutil::ZooKeeperPtr getZooKeeper();

View File

@ -1,12 +0,0 @@
#pragma once
#include <Interpreters/Context_fwd.h>
namespace DB
{
class IUserDefinedSQLObjectsLoader;
std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context);
}

View File

@ -1,6 +1,6 @@
#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromZooKeeper.h>
#include <Functions/UserDefined/createUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsDiskStorage.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsZooKeeperStorage.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
@ -17,7 +17,7 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}
std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context)
std::unique_ptr<IUserDefinedSQLObjectsStorage> createUserDefinedSQLObjectsStorage(const ContextMutablePtr & global_context)
{
const String zookeeper_path_key = "user_defined_zookeeper_path";
const String disk_path_key = "user_defined_path";
@ -33,12 +33,12 @@ std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(
zookeeper_path_key,
disk_path_key);
}
return std::make_unique<UserDefinedSQLObjectsLoaderFromZooKeeper>(global_context, config.getString(zookeeper_path_key));
return std::make_unique<UserDefinedSQLObjectsZooKeeperStorage>(global_context, config.getString(zookeeper_path_key));
}
String default_path = fs::path{global_context->getPath()} / "user_defined/";
String path = config.getString(disk_path_key, default_path);
return std::make_unique<UserDefinedSQLObjectsLoaderFromDisk>(global_context, path);
return std::make_unique<UserDefinedSQLObjectsDiskStorage>(global_context, path);
}
}

View File

@ -0,0 +1,12 @@
#pragma once
#include <Interpreters/Context_fwd.h>
namespace DB
{
class IUserDefinedSQLObjectsStorage;
std::unique_ptr<IUserDefinedSQLObjectsStorage> createUserDefinedSQLObjectsStorage(const ContextMutablePtr & global_context);
}

View File

@ -62,8 +62,8 @@
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/createUserDefinedSQLObjectsStorage.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/Cluster.h>
@ -251,8 +251,8 @@ struct ContextSharedPart : boost::noncopyable
ExternalLoaderXMLConfigRepository * user_defined_executable_functions_config_repository TSA_GUARDED_BY(external_user_defined_executable_functions_mutex) = nullptr;
scope_guard user_defined_executable_functions_xmls TSA_GUARDED_BY(external_user_defined_executable_functions_mutex);
mutable OnceFlag user_defined_sql_objects_loader_initialized;
mutable std::unique_ptr<IUserDefinedSQLObjectsLoader> user_defined_sql_objects_loader;
mutable OnceFlag user_defined_sql_objects_storage_initialized;
mutable std::unique_ptr<IUserDefinedSQLObjectsStorage> user_defined_sql_objects_storage;
#if USE_NLP
mutable OnceFlag synonyms_extensions_initialized;
@ -550,7 +550,7 @@ struct ContextSharedPart : boost::noncopyable
SHUTDOWN(log, "dictionaries loader", external_dictionaries_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "UDFs loader", external_user_defined_executable_functions_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "another UDFs loader", user_defined_sql_objects_loader, stopWatching());
SHUTDOWN(log, "another UDFs storage", user_defined_sql_objects_storage, stopWatching());
LOG_TRACE(log, "Shutting down named sessions");
Session::shutdownNamedSessions();
@ -577,7 +577,7 @@ struct ContextSharedPart : boost::noncopyable
std::unique_ptr<EmbeddedDictionaries> delete_embedded_dictionaries;
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
std::unique_ptr<IUserDefinedSQLObjectsLoader> delete_user_defined_sql_objects_loader;
std::unique_ptr<IUserDefinedSQLObjectsStorage> delete_user_defined_sql_objects_storage;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
@ -657,7 +657,7 @@ struct ContextSharedPart : boost::noncopyable
delete_embedded_dictionaries = std::move(embedded_dictionaries);
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
delete_user_defined_sql_objects_loader = std::move(user_defined_sql_objects_loader);
delete_user_defined_sql_objects_storage = std::move(user_defined_sql_objects_storage);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
@ -675,7 +675,7 @@ struct ContextSharedPart : boost::noncopyable
delete_embedded_dictionaries.reset();
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
delete_user_defined_sql_objects_loader.reset();
delete_user_defined_sql_objects_storage.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
@ -2459,24 +2459,30 @@ void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::Abstr
shared->user_defined_executable_functions_xmls = external_user_defined_executable_functions_loader.addConfigRepository(std::move(repository));
}
const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const
const IUserDefinedSQLObjectsStorage & Context::getUserDefinedSQLObjectsStorage() const
{
callOnce(shared->user_defined_sql_objects_loader_initialized, [&] {
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
callOnce(shared->user_defined_sql_objects_storage_initialized, [&] {
shared->user_defined_sql_objects_storage = createUserDefinedSQLObjectsStorage(getGlobalContext());
});
SharedLockGuard lock(shared->mutex);
return *shared->user_defined_sql_objects_loader;
return *shared->user_defined_sql_objects_storage;
}
IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader()
IUserDefinedSQLObjectsStorage & Context::getUserDefinedSQLObjectsStorage()
{
callOnce(shared->user_defined_sql_objects_loader_initialized, [&] {
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
callOnce(shared->user_defined_sql_objects_storage_initialized, [&] {
shared->user_defined_sql_objects_storage = createUserDefinedSQLObjectsStorage(getGlobalContext());
});
SharedLockGuard lock(shared->mutex);
return *shared->user_defined_sql_objects_loader;
std::lock_guard lock(shared->mutex);
return *shared->user_defined_sql_objects_storage;
}
void Context::setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage)
{
std::lock_guard lock(shared->mutex);
shared->user_defined_sql_objects_storage = std::move(storage);
}
#if USE_NLP

View File

@ -66,7 +66,7 @@ enum class RowPolicyFilterType;
class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalUserDefinedExecutableFunctionsLoader;
class IUserDefinedSQLObjectsLoader;
class IUserDefinedSQLObjectsStorage;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
@ -801,8 +801,9 @@ public:
const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const;
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader();
const IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader() const;
IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader();
const IUserDefinedSQLObjectsStorage & getUserDefinedSQLObjectsStorage() const;
IUserDefinedSQLObjectsStorage & getUserDefinedSQLObjectsStorage();
void setUserDefinedSQLObjectsStorage(std::unique_ptr<IUserDefinedSQLObjectsStorage> storage);
void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);
#if USE_NLP

View File

@ -1,7 +1,7 @@
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Access/ContextAccess.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
@ -32,7 +32,7 @@ BlockIO InterpreterCreateFunctionQuery::execute()
if (!create_function_query.cluster.empty())
{
if (current_context->getUserDefinedSQLObjectsLoader().isReplicated())
if (current_context->getUserDefinedSQLObjectsStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");
DDLQueryOnClusterParams params;

View File

@ -1,7 +1,7 @@
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Access/ContextAccess.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
@ -32,7 +32,7 @@ BlockIO InterpreterDropFunctionQuery::execute()
if (!drop_function_query.cluster.empty())
{
if (current_context->getUserDefinedSQLObjectsLoader().isReplicated())
if (current_context->getUserDefinedSQLObjectsStorage().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");
DDLQueryOnClusterParams params;

View File

@ -3,7 +3,7 @@
#include <Access/AccessControl.h>
#include <Access/ReplicatedAccessStorage.h>
#include <Common/logger_useful.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsStorage.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTDropFunctionQuery.h>
@ -47,7 +47,7 @@ ASTPtr removeOnClusterClauseIfNeeded(const ASTPtr & query, ContextPtr context, c
if ((isUserDefinedFunctionQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_udf_queries
&& context->getUserDefinedSQLObjectsLoader().isReplicated())
&& context->getUserDefinedSQLObjectsStorage().isReplicated())
|| (isAccessControlQuery(query)
&& context->getSettings().ignore_on_cluster_for_replicated_access_entities_queries
&& context->getAccessControl().containsStorage(ReplicatedAccessStorage::STORAGE_TYPE)))

View File

@ -116,7 +116,7 @@ def test_create_and_replace():
node1.query("CREATE FUNCTION f1 AS (x, y) -> x + y")
assert node1.query("SELECT f1(12, 3)") == "15\n"
expected_error = "User-defined function 'f1' already exists"
expected_error = "User-defined object 'f1' already exists"
assert expected_error in node1.query_and_get_error(
"CREATE FUNCTION f1 AS (x, y) -> x + 2 * y"
)
@ -135,7 +135,7 @@ def test_drop_if_exists():
node1.query("DROP FUNCTION IF EXISTS f1")
node1.query("DROP FUNCTION IF EXISTS f1")
expected_error = "User-defined function 'f1' doesn't exist"
expected_error = "User-defined object 'f1' doesn't exist"
assert expected_error in node1.query_and_get_error("DROP FUNCTION f1")