mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Executable user defined functions support parameters
This commit is contained in:
parent
084dd6f3c2
commit
6c5ec68a7c
@ -953,13 +953,33 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
|
||||
if (AggregateFunctionFactory::instance().isAggregateFunctionName(node.name))
|
||||
return;
|
||||
|
||||
FunctionOverloadResolverPtr function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, data.getContext());
|
||||
FunctionOverloadResolverPtr function_builder;
|
||||
|
||||
auto current_context = data.getContext();
|
||||
|
||||
if (UserDefinedExecutableFunctionFactory::instance().has(node.name, current_context))
|
||||
{
|
||||
Array parameters;
|
||||
if (node.parameters) {
|
||||
auto & node_parameters = node.parameters->children;
|
||||
size_t parameters_size = node_parameters.size();
|
||||
parameters.resize(parameters_size);
|
||||
|
||||
for (size_t i = 0; i < parameters_size; ++i)
|
||||
{
|
||||
ASTPtr literal = evaluateConstantExpressionAsLiteral(node_parameters[i], current_context);
|
||||
parameters[i] = literal->as<ASTLiteral>()->value;
|
||||
}
|
||||
}
|
||||
|
||||
function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, current_context, parameters);
|
||||
}
|
||||
|
||||
if (!function_builder)
|
||||
{
|
||||
try
|
||||
{
|
||||
function_builder = FunctionFactory::instance().get(node.name, data.getContext());
|
||||
function_builder = FunctionFactory::instance().get(node.name, current_context);
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -17,6 +17,7 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int FUNCTION_ALREADY_EXISTS;
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
ExternalUserDefinedExecutableFunctionsLoader::ExternalUserDefinedExecutableFunctionsLoader(ContextPtr global_context_)
|
||||
@ -111,6 +112,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
|
||||
lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime");
|
||||
|
||||
std::vector<UserDefinedExecutableFunctionArgument> arguments;
|
||||
std::vector<UserDefinedExecutableFunctionParameter> parameters;
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys config_elems;
|
||||
config.keys(key_in_config, config_elems);
|
||||
@ -137,12 +139,33 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
|
||||
arguments.emplace_back(std::move(argument));
|
||||
}
|
||||
|
||||
for (const auto & config_elem : config_elems)
|
||||
{
|
||||
if (!startsWith(config_elem, "parameter"))
|
||||
continue;
|
||||
|
||||
UserDefinedExecutableFunctionParameter parameter;
|
||||
|
||||
const auto parameter_prefix = key_in_config + '.' + config_elem + '.';
|
||||
|
||||
parameter.type = DataTypeFactory::instance().get(config.getString(parameter_prefix + "type"));
|
||||
parameter.name = config.getString(parameter_prefix + "name");
|
||||
|
||||
parameters.emplace_back(std::move(parameter));
|
||||
}
|
||||
|
||||
if (is_executable_pool && !parameters.empty()) {
|
||||
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
|
||||
"Executable user defined functions with `executable_pool` type does not support parameters");
|
||||
}
|
||||
|
||||
UserDefinedExecutableFunctionConfiguration function_configuration
|
||||
{
|
||||
.name = name,
|
||||
.command = std::move(command_value),
|
||||
.command_arguments = std::move(command_arguments),
|
||||
.arguments = std::move(arguments),
|
||||
.parameters = std::move(parameters),
|
||||
.result_type = std::move(result_type),
|
||||
.result_name = std::move(result_name),
|
||||
};
|
||||
|
@ -16,12 +16,19 @@ struct UserDefinedExecutableFunctionArgument
|
||||
String name;
|
||||
};
|
||||
|
||||
struct UserDefinedExecutableFunctionParameter
|
||||
{
|
||||
String name;
|
||||
DataTypePtr type;
|
||||
};
|
||||
|
||||
struct UserDefinedExecutableFunctionConfiguration
|
||||
{
|
||||
std::string name;
|
||||
std::string command;
|
||||
std::vector<std::string> command_arguments;
|
||||
std::vector<UserDefinedExecutableFunctionArgument> arguments;
|
||||
std::vector<UserDefinedExecutableFunctionParameter> parameters;
|
||||
DataTypePtr result_type;
|
||||
String result_name;
|
||||
};
|
||||
|
@ -3,6 +3,8 @@
|
||||
#include <filesystem>
|
||||
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Common/FieldVisitorToString.h>
|
||||
#include <DataTypes/FieldToDataType.h>
|
||||
|
||||
#include <Processors/Sources/ShellCommandSource.h>
|
||||
#include <Processors/Sources/SourceFromSingleChunk.h>
|
||||
@ -11,6 +13,7 @@
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <AggregateFunctions/AggregateFunctionFactory.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/castColumn.h>
|
||||
@ -22,6 +25,8 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int TYPE_MISMATCH;
|
||||
}
|
||||
|
||||
class UserDefinedFunction final : public IFunction
|
||||
@ -30,10 +35,59 @@ public:
|
||||
|
||||
explicit UserDefinedFunction(
|
||||
ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function_,
|
||||
ContextPtr context_)
|
||||
ContextPtr context_,
|
||||
Array parameters_)
|
||||
: executable_function(std::move(executable_function_))
|
||||
, context(context_)
|
||||
{
|
||||
const auto & configuration = executable_function->getConfiguration();
|
||||
size_t command_parameters_size = configuration.parameters.size();
|
||||
if (command_parameters_size != parameters_.size()) {
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Executable user defined function {} number of parameters does not match. Expected {}. Actual {}",
|
||||
configuration.name,
|
||||
command_parameters_size,
|
||||
parameters_.size());
|
||||
}
|
||||
|
||||
command_with_parameters = configuration.command;
|
||||
command_arguments_with_parameters = configuration.command_arguments;
|
||||
|
||||
for (size_t i = 0; i < command_parameters_size; ++i)
|
||||
{
|
||||
const auto & command_parameter = configuration.parameters[i];
|
||||
const auto & parameter_value = parameters_[i];
|
||||
auto converted_parameter = convertFieldToTypeOrThrow(parameter_value, *command_parameter.type);
|
||||
auto parameter_placeholder = "{" + command_parameter.name + "}";
|
||||
size_t parameter_placeholder_size = parameter_placeholder.size();
|
||||
|
||||
auto parameter_value_string = applyVisitor(FieldVisitorToString(), converted_parameter);
|
||||
bool find_placedholder = false;
|
||||
|
||||
for (auto & command_argument : command_arguments_with_parameters) {
|
||||
auto parameter_placeholder_position = command_argument.find(parameter_placeholder);
|
||||
if (parameter_placeholder_position == std::string::npos)
|
||||
continue;
|
||||
|
||||
command_argument.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string);
|
||||
find_placedholder = true;
|
||||
}
|
||||
|
||||
auto parameter_placeholder_position = command_with_parameters.find(parameter_placeholder);
|
||||
|
||||
if (parameter_placeholder_position != std::string::npos) {
|
||||
command_with_parameters.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string);
|
||||
find_placedholder = true;
|
||||
}
|
||||
|
||||
if (!find_placedholder)
|
||||
{
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS,
|
||||
"Executable user defined function {} no placeholder for parameter {}",
|
||||
configuration.name,
|
||||
command_parameter.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String getName() const override { return executable_function->getConfiguration().name; }
|
||||
@ -63,7 +117,7 @@ public:
|
||||
const auto & coordinator_configuration = coordinator->getConfiguration();
|
||||
const auto & configuration = executable_function->getConfiguration();
|
||||
|
||||
String command = configuration.command;
|
||||
String command = command_with_parameters;
|
||||
|
||||
if (coordinator_configuration.execute_direct)
|
||||
{
|
||||
@ -134,7 +188,7 @@ public:
|
||||
|
||||
Pipe pipe = coordinator->createPipe(
|
||||
command,
|
||||
configuration.command_arguments,
|
||||
command_arguments_with_parameters,
|
||||
std::move(shell_input_pipes),
|
||||
result_block,
|
||||
context,
|
||||
@ -165,9 +219,10 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function;
|
||||
ContextPtr context;
|
||||
String command_with_parameters;
|
||||
std::vector<std::string> command_arguments_with_parameters;
|
||||
};
|
||||
|
||||
UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::instance()
|
||||
@ -176,15 +231,15 @@ UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::ins
|
||||
return result;
|
||||
}
|
||||
|
||||
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context)
|
||||
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context, Array parameters)
|
||||
{
|
||||
const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader();
|
||||
auto executable_function = std::static_pointer_cast<const UserDefinedExecutableFunction>(loader.load(function_name));
|
||||
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context));
|
||||
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context), std::move(parameters));
|
||||
return std::make_unique<FunctionToOverloadResolverAdaptor>(std::move(function));
|
||||
}
|
||||
|
||||
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context)
|
||||
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context, Array parameters)
|
||||
{
|
||||
const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader();
|
||||
auto load_result = loader.getLoadResult(function_name);
|
||||
@ -192,7 +247,7 @@ FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const S
|
||||
if (load_result.object)
|
||||
{
|
||||
auto executable_function = std::static_pointer_cast<const UserDefinedExecutableFunction>(load_result.object);
|
||||
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context));
|
||||
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context), std::move(parameters));
|
||||
return std::make_unique<FunctionToOverloadResolverAdaptor>(std::move(function));
|
||||
}
|
||||
|
||||
|
@ -5,9 +5,9 @@
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <Common/NamePrompter.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Core/Field.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -20,9 +20,9 @@ public:
|
||||
|
||||
static UserDefinedExecutableFunctionFactory & instance();
|
||||
|
||||
static FunctionOverloadResolverPtr get(const String & function_name, ContextPtr context);
|
||||
static FunctionOverloadResolverPtr get(const String & function_name, ContextPtr context, Array parameters = {});
|
||||
|
||||
static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context);
|
||||
static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context, Array parameters = {});
|
||||
|
||||
static bool has(const String & function_name, ContextPtr context);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user