Updated UserDefinedObjectsLoader

This commit is contained in:
Maksim Kita 2021-08-18 12:29:52 +03:00
parent ce72a0c9c3
commit 6b2c249adc
11 changed files with 235 additions and 240 deletions

View File

@ -12,7 +12,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/UserDefinedObjectsOnDisk.h>
#include <Interpreters/UserDefinedObjectsLoader.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/Config/ConfigProcessor.h>
@ -290,7 +290,7 @@ try
fs::create_directories(fs::path(path) / "user_defined/");
LOG_DEBUG(log, "Loading user defined objects from {}", path);
Poco::File(path + "user_defined/").createDirectories();
UserDefinedObjectsOnDisk::instance().loadUserDefinedObjects(global_context);
UserDefinedObjectsLoader::instance().loadObjects(global_context);
LOG_DEBUG(log, "Loaded user defined objects.");
LOG_DEBUG(log, "Loading metadata from {}", path);

View File

@ -52,7 +52,7 @@
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/UserDefinedObjectsOnDisk.h>
#include <Interpreters/UserDefinedObjectsLoader.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/AccessControlManager.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -1093,7 +1093,7 @@ if (ThreadFuzzer::instance().isEffective())
LOG_INFO(log, "Loading user defined objects from {}", path);
try
{
UserDefinedObjectsOnDisk::instance().loadUserDefinedObjects(global_context);
UserDefinedObjectsLoader::instance().loadObjects(global_context);
}
catch (...)
{

View File

@ -570,8 +570,8 @@
M(591, FUNCTION_ALREADY_EXISTS) \
M(592, CANNOT_DROP_SYSTEM_FUNCTION) \
M(593, CANNOT_CREATE_RECURSIVE_FUNCTION) \
M(594, FUNCTION_ALREADY_STORED_ON_DISK) \
M(595, FUNCTION_WAS_NOT_STORED_ON_DISK) \
M(594, OBJECT_ALREADY_STORED_ON_DISK) \
M(595, OBJECT_WAS_NOT_STORED_ON_DISK) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -4,7 +4,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/UserDefinedObjectsOnDisk.h>
#include <Interpreters/UserDefinedObjectsLoader.h>
#include <Functions/FunctionFactory.h>
#include <Parsers/ASTIdentifier.h>
@ -15,28 +15,39 @@ namespace ErrorCodes
{
extern const int UNKNOWN_IDENTIFIER;
extern const int CANNOT_CREATE_RECURSIVE_FUNCTION;
// extern const int UNSUPPORTED_OPERATION;
}
BlockIO InterpreterCreateFunctionQuery::execute()
{
getContext()->checkAccess(AccessType::CREATE_FUNCTION);
FunctionNameNormalizer().visit(query_ptr.get());
auto & create_function_query = query_ptr->as<ASTCreateFunctionQuery &>();
validateFunction(create_function_query.function_core, create_function_query.function_name);
FunctionFactory::instance().registerUserDefinedFunction(create_function_query);
if (!internal)
auto * create_function_query = query_ptr->as<ASTCreateFunctionQuery>();
// if (!create_function_query)
// throw Exception(ErrorCodes::UNSUPPORTED_OPERATION, "Expected CREATE FUNCTION query");
auto & function_name = create_function_query->function_name;
validateFunction(create_function_query->function_core, function_name);
if (is_internal)
{
FunctionFactory::instance().registerUserDefinedFunction(*create_function_query);
}
else
{
try
{
UserDefinedObjectsOnDisk::instance().storeUserDefinedFunction(getContext(), create_function_query);
UserDefinedObjectsLoader::instance().storeObject(getContext(), UserDefinedObjectType::Function, function_name, *query_ptr);
FunctionFactory::instance().registerUserDefinedFunction(*create_function_query);
}
catch (Exception & e)
{
FunctionFactory::instance().unregisterUserDefinedFunction(create_function_query.function_name);
e.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(create_function_query.function_name)));
e.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(function_name)));
throw;
}
}
return {};
}
@ -54,11 +65,7 @@ void InterpreterCreateFunctionQuery::validateFunction(ASTPtr function, const Str
for (const auto & identifier : identifiers_in_body)
{
if (!arguments.contains(identifier))
{
WriteBufferFromOwnString s;
s << "Identifier '" << identifier << "' does not exist in arguments";
throw Exception(s.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Identifier {} does not exist in arguments", backQuote(identifier));
}
validateFunctionRecursiveness(function_body, name);
@ -82,15 +89,10 @@ void InterpreterCreateFunctionQuery::validateFunctionRecursiveness(ASTPtr node,
{
auto function_name_opt = tryGetFunctionName(child);
if (function_name_opt && function_name_opt.value() == function_to_create)
throw Exception("You cannot create recursive function", ErrorCodes::CANNOT_CREATE_RECURSIVE_FUNCTION);
throw Exception(ErrorCodes::CANNOT_CREATE_RECURSIVE_FUNCTION, "You cannot create recursive function");
validateFunctionRecursiveness(child, function_to_create);
}
}
void InterpreterCreateFunctionQuery::setInternal(bool internal_)
{
internal = internal_;
}
}

View File

@ -9,10 +9,13 @@ namespace DB
class ASTCreateFunctionQuery;
class Context;
class InterpreterCreateFunctionQuery : public IInterpreter, WithMutableContext
class InterpreterCreateFunctionQuery : public IInterpreter, WithContext
{
public:
InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) : WithMutableContext(context_), query_ptr(query_ptr_) {}
InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextPtr context_, bool is_internal_)
: WithContext(context_)
, query_ptr(query_ptr_)
, is_internal(is_internal_) {}
BlockIO execute() override;
@ -23,11 +26,8 @@ private:
static void getIdentifiers(ASTPtr node, std::set<String> & identifiers);
static void validateFunctionRecursiveness(ASTPtr node, const String & function_to_create);
private:
ASTPtr query_ptr;
/// Is this an internal query - not from the user.
bool internal = false;
bool is_internal;
};
}

View File

@ -2,7 +2,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/UserDefinedObjectsOnDisk.h>
#include <Interpreters/UserDefinedObjectsLoader.h>
#include <Functions/FunctionFactory.h>
#include <Parsers/ASTDropFunctionQuery.h>
@ -16,7 +16,8 @@ BlockIO InterpreterDropFunctionQuery::execute()
FunctionNameNormalizer().visit(query_ptr.get());
auto & drop_function_query = query_ptr->as<ASTDropFunctionQuery &>();
FunctionFactory::instance().unregisterUserDefinedFunction(drop_function_query.function_name);
UserDefinedObjectsOnDisk::instance().removeUserDefinedFunction(getContext(), drop_function_query.function_name);
UserDefinedObjectsLoader::instance().removeObject(getContext(), UserDefinedObjectType::Function, drop_function_query.function_name);
return {};
}

View File

@ -275,7 +275,7 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
}
else if (query->as<ASTCreateFunctionQuery>())
{
return std::make_unique<InterpreterCreateFunctionQuery>(query, context);
return std::make_unique<InterpreterCreateFunctionQuery>(query, context, false /*is_internal*/);
}
else if (query->as<ASTDropFunctionQuery>())
{

View File

@ -0,0 +1,165 @@
#include "UserDefinedObjectsLoader.h"
#include <filesystem>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Logger.h>
namespace DB
{
namespace ErrorCodes
{
extern const int OBJECT_ALREADY_STORED_ON_DISK;
extern const int OBJECT_WAS_NOT_STORED_ON_DISK;
}
UserDefinedObjectsLoader & UserDefinedObjectsLoader::instance()
{
static UserDefinedObjectsLoader ret;
return ret;
}
UserDefinedObjectsLoader::UserDefinedObjectsLoader()
: log(&Poco::Logger::get("UserDefinedObjectsLoader"))
{}
void UserDefinedObjectsLoader::loadUserDefinedObject(ContextPtr context, UserDefinedObjectType object_type, const std::string_view & name, const String & path)
{
auto name_ref = StringRef(name.data(), name.size());
LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(name_ref), path);
String object_create_query;
/// There is .sql file with user defined object creation statement.
ReadBufferFromFile in(path, 1024);
readStringUntilEOF(object_create_query, in);
try
{
switch (object_type)
{
case UserDefinedObjectType::Function:
{
ParserCreateFunctionQuery parser;
ASTPtr ast = parseQuery(
parser,
object_create_query.data(),
object_create_query.data() + object_create_query.size(),
"in file " + path,
0,
context->getSettingsRef().max_parser_depth);
InterpreterCreateFunctionQuery interpreter(ast, context, true /*is internal*/);
interpreter.execute();
}
}
}
catch (Exception & e)
{
e.addMessage(fmt::format("while loading user defined objects {} from path {}", backQuote(name_ref), path));
throw;
}
}
void UserDefinedObjectsLoader::loadObjects(ContextPtr context)
{
LOG_DEBUG(log, "loading user defined objects");
String dir_path = context->getPath() + "user_defined/";
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
if (it->isLink())
continue;
const auto & file_name = it.name();
/// For '.svn', '.gitignore' directory and similar.
if (file_name.at(0) == '.')
continue;
if (!it->isDirectory() && endsWith(file_name, ".sql"))
{
std::string_view object_name = file_name;
object_name.remove_suffix(strlen(".sql"));
object_name.remove_prefix(strlen("function_"));
loadUserDefinedObject(context, UserDefinedObjectType::Function, object_name, dir_path + it.name());
}
}
}
void UserDefinedObjectsLoader::storeObject(ContextPtr context, UserDefinedObjectType object_type, const String & object_name, const IAST & ast)
{
String dir_path = context->getPath() + "user_defined/";
String file_path;
switch (object_type)
{
case UserDefinedObjectType::Function:
{
file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
}
}
std::cerr << "UserDefinedObjectsLoader::storeObject " << file_path << std::endl;
if (std::filesystem::exists(file_path))
throw Exception(ErrorCodes::OBJECT_ALREADY_STORED_ON_DISK, "User defined object {} already stored on disk", backQuote(file_path));
LOG_DEBUG(log, "Storing object {} to file {}", backQuote(object_name), file_path);
WriteBufferFromOwnString create_statement_buf;
formatAST(ast, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
WriteBufferFromFile out(file_path, create_statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(create_statement, out);
out.next();
if (context->getSettingsRef().fsync_metadata)
out.sync();
out.close();
LOG_DEBUG(log, "Stored object {}", backQuote(object_name));
}
void UserDefinedObjectsLoader::removeObject(ContextPtr context, UserDefinedObjectType object_type, const String & object_name)
{
String dir_path = context->getPath() + "user_defined/";
LOG_DEBUG(log, "Removing file for user defined object {} from {}", backQuote(object_name), dir_path);
String file_path_name;
switch (object_type)
{
case UserDefinedObjectType::Function:
{
file_path_name = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
}
}
std::filesystem::path file_path(file_path_name);
if (!std::filesystem::exists(file_path))
throw Exception(ErrorCodes::OBJECT_WAS_NOT_STORED_ON_DISK, "User defined object {} was not stored on disk", backQuote(file_path.string()));
std::filesystem::remove(file_path);
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST.h>
#include <boost/noncopyable.hpp>
namespace DB
{
enum class UserDefinedObjectType
{
Function
};
class UserDefinedObjectsLoader : private boost::noncopyable
{
public:
static UserDefinedObjectsLoader & instance();
UserDefinedObjectsLoader();
void loadObjects(ContextPtr context);
void storeObject(ContextPtr context, UserDefinedObjectType object_type, const String & object_name, const IAST & ast);
void removeObject(ContextPtr context, UserDefinedObjectType object_type, const String & object_name);
private:
void loadUserDefinedObject(ContextPtr context, UserDefinedObjectType object_type, const std::string_view & object_name, const String & file_path);
Poco::Logger * log;
};
}

View File

@ -1,176 +0,0 @@
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Interpreters/UserDefinedObjectsOnDisk.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Logger.h>
#include <re2/re2.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FUNCTION_ALREADY_STORED_ON_DISK;
extern const int FUNCTION_WAS_NOT_STORED_ON_DISK;
}
UserDefinedObjectsOnDisk & UserDefinedObjectsOnDisk::instance()
{
static UserDefinedObjectsOnDisk ret;
return ret;
}
void UserDefinedObjectsOnDisk::executeCreateFunctionQuery(
const String & query,
ContextMutablePtr context,
const String & file_name)
{
ParserCreateFunctionQuery parser;
ASTPtr ast = parseQuery(
parser, query.data(), query.data() + query.size(), "in file " + file_name, 0, context->getSettingsRef().max_parser_depth);
InterpreterCreateFunctionQuery interpreter(ast, context);
interpreter.setInternal(true);
interpreter.execute();
}
void UserDefinedObjectsOnDisk::loadUserDefinedObject(ContextMutablePtr context, const String & name, const String & path)
{
Poco::Logger * log = &Poco::Logger::get("LoadUserDefinedObject");
String object_create_query;
LOG_DEBUG(log, "Loading function {} from file {}", backQuote(name), path);
if (Poco::File(path).exists())
{
/// There is .sql file with user defined object creation statement.
ReadBufferFromFile in(path, 1024);
readStringUntilEOF(object_create_query, in);
}
try
{
executeCreateFunctionQuery(object_create_query, context, path);
LOG_DEBUG(log, "Loaded function {}", backQuote(name));
}
catch (Exception & e)
{
e.addMessage(fmt::format("while loading user defined function {} from path {}", backQuote(name), path));
throw;
}
}
void UserDefinedObjectsOnDisk::loadUserDefinedObjects(ContextMutablePtr context)
{
String dir_path = context->getPath() + "user_defined/";
std::vector<std::pair<int, String>> user_defined_objects_with_priority;
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
if (it->isLink())
continue;
if (!it->isDirectory() && endsWith(it.name(), ".sql"))
{
int priority = std::stoi(it.name().substr(0, it.name().find('_')));
user_defined_objects_with_priority.emplace_back(priority, it.name());
continue;
}
/// For '.svn', '.gitignore' directory and similar.
if (it.name().at(0) == '.')
continue;
}
std::sort(user_defined_objects_with_priority.begin(), user_defined_objects_with_priority.end());
for (const auto & [priority, file_name] : user_defined_objects_with_priority)
{
int name_start_index = file_name.find('_') + 1;
String name = file_name.substr(name_start_index, file_name.size() - 4 - name_start_index);
loadUserDefinedObject(context, name, dir_path + file_name);
}
if (user_defined_objects_with_priority.empty())
user_defined_objects_count.store(0);
else
user_defined_objects_count.store(user_defined_objects_with_priority.back().first);
}
void UserDefinedObjectsOnDisk::storeUserDefinedFunction(ContextPtr context, const ASTCreateFunctionQuery & ast)
{
Poco::Logger * log = &Poco::Logger::get("StoreUserDefinedFunction");
String dir_path = context->getPath() + "user_defined/";
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
re2::StringPiece input(it.name());
re2::RE2 re("[0-9]+_" + escapeForFileName(ast.function_name) + "\\.sql");
if (re2::RE2::FullMatch(input, re))
{
throw Exception("User defined function " + backQuote(it.name()) + " already stored on disk", ErrorCodes::FUNCTION_ALREADY_STORED_ON_DISK);
}
}
int object_priority = ++user_defined_objects_count;
String new_file_path = dir_path + toString(object_priority) + "_" + escapeForFileName(ast.function_name) + ".sql";
LOG_DEBUG(log, "Storing function {} to file {}", backQuote(ast.function_name), new_file_path);
WriteBufferFromOwnString create_statement_buf;
formatAST(ast, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
WriteBufferFromFile out(new_file_path, create_statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(create_statement, out);
out.next();
if (context->getSettingsRef().fsync_metadata)
out.sync();
out.close();
LOG_DEBUG(log, "Stored function {}", backQuote(ast.function_name));
}
void UserDefinedObjectsOnDisk::removeUserDefinedFunction(ContextPtr context, const String & name)
{
Poco::Logger * log = &Poco::Logger::get("RemoveUserDefinedFunction");
String dir_path = context->getPath() + "user_defined/";
LOG_DEBUG(log, "Removing file for function {} from {}", backQuote(name), dir_path);
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
String file_name = it.name();
re2::StringPiece input(file_name);
re2::RE2 re("[0-9]+_" + escapeForFileName(name) + "\\.sql");
if (re2::RE2::FullMatch(input, re))
{
it->remove();
LOG_DEBUG(log, "Removed file {}", dir_path + file_name);
return;
}
}
throw Exception("Stored file for user defined function " + backQuote(name) + " was not found", ErrorCodes::FUNCTION_WAS_NOT_STORED_ON_DISK);
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <boost/noncopyable.hpp>
#include <atomic>
namespace DB
{
class UserDefinedObjectsOnDisk : private boost::noncopyable
{
public:
static UserDefinedObjectsOnDisk & instance();
void loadUserDefinedObjects(ContextMutablePtr context);
void storeUserDefinedFunction(ContextPtr context, const ASTCreateFunctionQuery & ast);
static void removeUserDefinedFunction(ContextPtr context, const String & name);
private:
static void loadUserDefinedObject(ContextMutablePtr context, const String & name, const String & path);
static void executeCreateFunctionQuery(const String & query, ContextMutablePtr context, const String & file_name);
private:
std::atomic_int user_defined_objects_count = 0;
};
}