Merge pull request #30734 from kitaisreal/sql-user-defined-functions-on-cluster-support

SQLUserDefinedFunctions support ON CLUSTER
This commit is contained in:
Maksim Kita 2021-10-28 18:08:38 +03:00 committed by GitHub
commit a8ef6cc3b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 214 additions and 60 deletions

View File

@ -577,7 +577,7 @@
M(607, BACKUP_ELEMENT_DUPLICATE) \
M(608, CANNOT_RESTORE_TABLE) \
M(609, FUNCTION_ALREADY_EXISTS) \
M(610, CANNOT_DROP_SYSTEM_FUNCTION) \
M(610, CANNOT_DROP_FUNCTION) \
M(611, CANNOT_CREATE_RECURSIVE_FUNCTION) \
M(612, OBJECT_ALREADY_STORED_ON_DISK) \
M(613, OBJECT_WAS_NOT_STORED_ON_DISK) \

View File

@ -1,7 +1,5 @@
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <stack>
#include <Access/ContextAccess.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -11,6 +9,7 @@
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
namespace DB
@ -24,45 +23,33 @@ namespace ErrorCodes
BlockIO InterpreterCreateFunctionQuery::execute()
{
auto current_context = getContext();
current_context->checkAccess(AccessType::CREATE_FUNCTION);
FunctionNameNormalizer().visit(query_ptr.get());
auto * create_function_query = query_ptr->as<ASTCreateFunctionQuery>();
ASTCreateFunctionQuery & create_function_query = query_ptr->as<ASTCreateFunctionQuery &>();
if (!create_function_query)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Expected CREATE FUNCTION query");
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::CREATE_FUNCTION);
if (create_function_query.or_replace)
access_rights_elements.emplace_back(AccessType::DROP_FUNCTION);
if (!create_function_query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext(), access_rights_elements);
auto current_context = getContext();
current_context->checkAccess(access_rights_elements);
auto & user_defined_function_factory = UserDefinedSQLFunctionFactory::instance();
auto & function_name = create_function_query->function_name;
auto & function_name = create_function_query.function_name;
bool if_not_exists = create_function_query->if_not_exists;
bool replace = create_function_query->or_replace;
bool if_not_exists = create_function_query.if_not_exists;
bool replace = create_function_query.or_replace;
create_function_query->if_not_exists = false;
create_function_query->or_replace = false;
create_function_query.if_not_exists = false;
create_function_query.or_replace = false;
if (if_not_exists && user_defined_function_factory.tryGet(function_name) != nullptr)
return {};
validateFunction(create_function_query->function_core, function_name);
user_defined_function_factory.registerFunction(function_name, query_ptr, replace);
if (persist_function)
{
try
{
UserDefinedSQLObjectsLoader::instance().storeObject(current_context, UserDefinedSQLObjectType::Function, function_name, *query_ptr, replace);
}
catch (Exception & exception)
{
user_defined_function_factory.unregisterFunction(function_name);
exception.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(function_name)));
throw;
}
}
validateFunction(create_function_query.function_core, function_name);
user_defined_function_factory.registerFunction(current_context, function_name, query_ptr, replace, if_not_exists, persist_function);
return {};
}

View File

@ -1,10 +1,12 @@
#include <Parsers/ASTDropFunctionQuery.h>
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Parsers/ASTDropFunctionQuery.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
namespace DB
@ -12,19 +14,19 @@ namespace DB
BlockIO InterpreterDropFunctionQuery::execute()
{
auto current_context = getContext();
current_context->checkAccess(AccessType::DROP_FUNCTION);
FunctionNameNormalizer().visit(query_ptr.get());
auto & drop_function_query = query_ptr->as<ASTDropFunctionQuery &>();
ASTDropFunctionQuery & drop_function_query = query_ptr->as<ASTDropFunctionQuery &>();
auto & user_defined_functions_factory = UserDefinedSQLFunctionFactory::instance();
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::DROP_FUNCTION);
if (drop_function_query.if_exists && !user_defined_functions_factory.has(drop_function_query.function_name))
return {};
if (!drop_function_query.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, getContext(), access_rights_elements);
UserDefinedSQLFunctionFactory::instance().unregisterFunction(drop_function_query.function_name);
UserDefinedSQLObjectsLoader::instance().removeObject(current_context, UserDefinedSQLObjectType::Function, drop_function_query.function_name);
auto current_context = getContext();
current_context->checkAccess(access_rights_elements);
UserDefinedSQLFunctionFactory::instance().unregisterFunction(current_context, drop_function_query.function_name, drop_function_query.if_exists);
return {};
}

View File

@ -206,6 +206,15 @@ FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const S
return nullptr;
}
bool UserDefinedExecutableFunctionFactory::has(const String & function_name, ContextPtr context)
{
const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader();
auto load_result = loader.getLoadResult(function_name);
bool result = load_result.object != nullptr;
return result;
}
std::vector<String> UserDefinedExecutableFunctionFactory::getRegisteredNames(ContextPtr context)
{
const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader();

View File

@ -24,6 +24,8 @@ public:
static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context);
static bool has(const String & function_name, ContextPtr context);
static std::vector<String> getRegisteredNames(ContextPtr context);
};

View File

@ -1,7 +1,13 @@
#include "UserDefinedSQLFunctionFactory.h"
#include <Common/quoteString.h>
#include <Functions/FunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/UserDefinedExecutableFunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -10,7 +16,7 @@ namespace ErrorCodes
{
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNKNOWN_FUNCTION;
extern const int CANNOT_DROP_SYSTEM_FUNCTION;
extern const int CANNOT_DROP_FUNCTION;
}
UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance()
@ -19,13 +25,31 @@ UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance()
return result;
}
void UserDefinedSQLFunctionFactory::registerFunction(const String & function_name, ASTPtr create_function_query, bool replace)
void UserDefinedSQLFunctionFactory::registerFunction(ContextPtr context, const String & function_name, ASTPtr create_function_query, bool replace, bool if_not_exists, bool persist)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name))
{
if (if_not_exists)
return;
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", function_name);
}
if (AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
{
if (if_not_exists)
return;
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", function_name);
}
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
{
if (if_not_exists)
return;
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "User defined executable function '{}'", function_name);
}
std::lock_guard lock(mutex);
@ -33,28 +57,63 @@ void UserDefinedSQLFunctionFactory::registerFunction(const String & function_nam
if (!inserted)
{
if (if_not_exists)
return;
if (replace)
it->second = std::move(create_function_query);
it->second = create_function_query;
else
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS,
"The function name '{}' is not unique",
function_name);
}
if (persist)
{
try
{
UserDefinedSQLObjectsLoader::instance().storeObject(context, UserDefinedSQLObjectType::Function, function_name, *create_function_query, replace);
}
catch (Exception & exception)
{
function_name_to_create_query.erase(it);
exception.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(function_name)));
throw;
}
}
}
void UserDefinedSQLFunctionFactory::unregisterFunction(const String & function_name)
void UserDefinedSQLFunctionFactory::unregisterFunction(ContextPtr context, const String & function_name, bool if_exists)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name) ||
AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::CANNOT_DROP_SYSTEM_FUNCTION, "Cannot drop system function '{}'", function_name);
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop system function '{}'", function_name);
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop user defined executable function '{}'", function_name);
std::lock_guard lock(mutex);
auto it = function_name_to_create_query.find(function_name);
if (it == function_name_to_create_query.end())
{
if (if_exists)
return;
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"The function name '{}' is not registered",
function_name);
}
try
{
UserDefinedSQLObjectsLoader::instance().removeObject(context, UserDefinedSQLObjectType::Function, function_name);
}
catch (Exception & exception)
{
exception.addMessage(fmt::format("while removing user defined function {} from disk", backQuote(function_name)));
throw;
}
function_name_to_create_query.erase(it);
}

View File

@ -6,6 +6,8 @@
#include <Common/NamePrompter.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
@ -17,13 +19,17 @@ public:
static UserDefinedSQLFunctionFactory & instance();
/** Register function for function_name in factory for specified create_function_query.
* If replace = true and function with function_name already exists replace it with create_function_query.
* Otherwise throws exception.
* If function exists and if_not_exists = false and replace = false throws exception.
* If replace = true and sql user defined function with function_name already exists replace it with create_function_query.
* If persist = true persist function on disk.
*/
void registerFunction(const String & function_name, ASTPtr create_function_query, bool replace);
void registerFunction(ContextPtr context, const String & function_name, ASTPtr create_function_query, bool replace, bool if_not_exists, bool persist);
/// Unregister function for function_name
void unregisterFunction(const String & function_name);
/** Unregister function for function_name.
* If if_exists = true then do not throw exception if function is not registered.
* If if_exists = false then throw exception if function is not registered.
*/
void unregisterFunction(ContextPtr context, const String & function_name, bool if_exists);
/// Get function create query for function_name. If no function registered with function_name throws exception.
ASTPtr get(const String & function_name) const;

View File

@ -1,6 +1,9 @@
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
@ -25,6 +28,9 @@ void ASTCreateFunctionQuery::formatImpl(const IAST::FormatSettings & settings, I
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(function_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
function_core->formatImpl(settings, state, frame);
}

View File

@ -1,12 +1,13 @@
#pragma once
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTCreateFunctionQuery : public IAST
class ASTCreateFunctionQuery : public IAST, public ASTQueryWithOnCluster
{
public:
String function_name;
@ -20,6 +21,8 @@ public:
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &) const override { return removeOnCluster<ASTCreateFunctionQuery>(clone()); }
};
}

View File

@ -19,6 +19,7 @@ void ASTDropFunctionQuery::formatImpl(const IAST::FormatSettings & settings, IAS
settings.ostr << (settings.hilite ? hilite_none : "");
settings.ostr << (settings.hilite ? hilite_identifier : "") << backQuoteIfNeed(function_name) << (settings.hilite ? hilite_none : "");
formatOnCluster(settings);
}
}

View File

@ -1,11 +1,13 @@
#pragma once
#include "IAST.h"
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithOnCluster.h>
namespace DB
{
class ASTDropFunctionQuery : public IAST
class ASTDropFunctionQuery : public IAST, public ASTQueryWithOnCluster
{
public:
String function_name;
@ -17,6 +19,8 @@ public:
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
ASTPtr getRewrittenASTWithoutOnCluster(const std::string &) const override { return removeOnCluster<ASTDropFunctionQuery>(clone()); }
};
}

View File

@ -17,6 +17,7 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
ParserKeyword s_function("FUNCTION");
ParserKeyword s_or_replace("OR REPLACE");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier function_name_p;
ParserKeyword s_as("AS");
ParserLambdaExpression lambda_p;
@ -24,6 +25,7 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
ASTPtr function_name;
ASTPtr function_core;
String cluster_str;
bool or_replace = false;
bool if_not_exists = false;
@ -42,6 +44,12 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
if (!function_name_p.parse(pos, function_name, expected))
return false;
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
if (!s_as.ignore(pos, expected))
return false;
@ -55,6 +63,7 @@ bool ParserCreateFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Exp
create_function_query->function_core = function_core;
create_function_query->or_replace = or_replace;
create_function_query->if_not_exists = if_not_exists;
create_function_query->cluster = std::move(cluster_str);
return true;
}

View File

@ -12,8 +12,10 @@ bool ParserDropFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expec
ParserKeyword s_drop("DROP");
ParserKeyword s_function("FUNCTION");
ParserKeyword s_if_exists("IF EXISTS");
ParserKeyword s_on("ON");
ParserIdentifier function_name_p;
String cluster_str;
bool if_exists = false;
ASTPtr function_name;
@ -30,8 +32,16 @@ bool ParserDropFunctionQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expec
if (!function_name_p.parse(pos, function_name, expected))
return false;
if (s_on.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
return false;
}
auto drop_function_query = std::make_shared<ASTDropFunctionQuery>();
drop_function_query->if_exists = if_exists;
drop_function_query->cluster = std::move(cluster_str);
node = drop_function_query;
drop_function_query->function_name = function_name->as<ASTIdentifier &>().name();

View File

@ -0,0 +1,22 @@
<clickhouse>
<remote_servers>
<cluster>
<shard>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</clickhouse>

View File

@ -0,0 +1,34 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance('ch1', main_configs=["configs/config.d/clusters.xml"], with_zookeeper=True)
ch2 = cluster.add_instance('ch2', main_configs=["configs/config.d/clusters.xml"], with_zookeeper=True)
ch3 = cluster.add_instance('ch3', main_configs=["configs/config.d/clusters.xml"], with_zookeeper=True)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_sql_user_defined_functions_on_cluster():
assert "Unknown function test_function" in ch1.query_and_get_error("SELECT test_function(1);")
assert "Unknown function test_function" in ch2.query_and_get_error("SELECT test_function(1);")
assert "Unknown function test_function" in ch3.query_and_get_error("SELECT test_function(1);")
ch1.query_with_retry("CREATE FUNCTION test_function ON CLUSTER 'cluster' AS x -> x + 1;")
assert ch1.query("SELECT test_function(1);") == "2\n"
assert ch2.query("SELECT test_function(1);") == "2\n"
assert ch3.query("SELECT test_function(1);") == "2\n"
ch2.query_with_retry("DROP FUNCTION test_function ON CLUSTER 'cluster'")
assert "Unknown function test_function" in ch1.query_and_get_error("SELECT test_function(1);")
assert "Unknown function test_function" in ch2.query_and_get_error("SELECT test_function(1);")
assert "Unknown function test_function" in ch3.query_and_get_error("SELECT test_function(1);")