UserDefinedExecutableFunction added support for process pool

This commit is contained in:
Maksim Kita 2021-09-09 22:35:07 +03:00
parent 55492cc9bf
commit bb3bc6722a
3 changed files with 128 additions and 22 deletions

View File

@ -19,26 +19,31 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int TIMEOUT_EXCEEDED;
}
class UserDefinedFunction final : public IFunction
{
public:
using GetProcessFunction = std::function<std::unique_ptr<ShellCommand> (void)>;
explicit UserDefinedFunction(
ContextPtr context_,
const UserDefinedExecutableFunction::Config & config_,
std::unique_ptr<ShellCommand> process_)
const UserDefinedExecutableFunction::Config & configuration_,
GetProcessFunction get_process_function_,
std::shared_ptr<ProcessPool> process_pool_)
: context(context_)
, config(config_)
, process(std::move(process_))
, configuration(configuration_)
, get_process_function(std::move(get_process_function_))
, process_pool(std::move(process_pool_))
{
}
String getName() const override { return config.name; }
String getName() const override { return configuration.name; }
bool isVariadic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
size_t getNumberOfArguments() const override { return config.argument_types.size(); }
size_t getNumberOfArguments() const override { return configuration.argument_types.size(); }
bool useDefaultImplementationForConstants() const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
@ -48,7 +53,7 @@ public:
{
for (size_t i = 0; i < arguments.size(); ++i)
{
const auto & expected_argument_type = config.argument_types[i];
const auto & expected_argument_type = configuration.argument_types[i];
if (!areTypesEqual(expected_argument_type, arguments[i]))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Function {} for {} argument expected {} actual {}",
@ -58,28 +63,48 @@ public:
arguments[i]->getName());
}
return config.result_type;
return configuration.result_type;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
Block arguments_block(arguments);
std::unique_ptr<ShellCommand> process = get_process_function();
ColumnWithTypeAndName result(result_type, "result");
Block result_block({result});
Block arguments_block(arguments);
auto * process_in = &process->in;
ShellCommandSource::SendDataTask task = {[process_in, arguments_block, this]()
bool is_executable_pool_function = (process_pool != nullptr);
ShellCommandSourceConfiguration shell_command_source_configuration;
if (is_executable_pool_function)
{
shell_command_source_configuration.read_fixed_number_of_rows = true;
shell_command_source_configuration.number_of_rows_to_read = input_rows_count;
}
ShellCommandSource::SendDataTask task = {[process_in, arguments_block, is_executable_pool_function, this]()
{
auto & out = *process_in;
auto output_stream = context->getOutputStream(config.format, out, arguments_block.cloneEmpty());
auto output_stream = context->getOutputStream(configuration.format, out, arguments_block.cloneEmpty());
formatBlock(output_stream, arguments_block);
out.close();
if (!is_executable_pool_function)
out.close();
}};
std::vector<ShellCommandSource::SendDataTask> tasks = {std::move(task)};
Pipe pipe(std::make_unique<ShellCommandSource>(context, config.format, result_block.cloneEmpty(), std::move(process), nullptr, std::move(tasks)));
Pipe pipe(std::make_unique<ShellCommandSource>(
context,
configuration.format,
result_block.cloneEmpty(),
std::move(process),
nullptr,
std::move(tasks),
shell_command_source_configuration,
process_pool));
QueryPipeline pipeline;
pipeline.init(std::move(pipe));
@ -87,6 +112,8 @@ public:
PullingPipelineExecutor executor(pipeline);
auto result_column = result_type->createColumn();
result_column->reserve(input_rows_count);
Block block;
while (executor.pull(block))
{
@ -107,8 +134,9 @@ public:
private:
ContextPtr context;
UserDefinedExecutableFunction::Config config;
mutable std::unique_ptr<ShellCommand> process;
UserDefinedExecutableFunction::Config configuration;
GetProcessFunction get_process_function;
mutable std::shared_ptr<ProcessPool> process_pool;
};
ExternalUserDefinedExecutableFunctionsLoader::ExternalUserDefinedExecutableFunctionsLoader(ContextPtr global_context_)
@ -135,9 +163,36 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
const std::string & key_in_config,
const std::string &) const
{
String type = config.getString(key_in_config + ".type");
UserDefinedExecutableFunctionType function_type;
if (type == "executable")
function_type = UserDefinedExecutableFunctionType::executable;
else if (type == "executable_pool")
function_type = UserDefinedExecutableFunctionType::executable_pool;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Wrong user defined function type expected 'executable' or 'executable_pool' actual {}",
function_type);
String command = config.getString(key_in_config + ".command");
String format = config.getString(key_in_config + ".format");
DataTypePtr result_type = DataTypeFactory::instance().get(config.getString(key_in_config + ".return_type"));
size_t pool_size = 0;
size_t command_termination_timeout = 0;
size_t max_command_execution_time = 0;
if (function_type == UserDefinedExecutableFunctionType::executable_pool)
{
pool_size = config.getUInt64(key_in_config + ".pool_size", 16);
command_termination_timeout = config.getUInt64(key_in_config + ".command_termination_timeout", 10);
max_command_execution_time = config.getUInt64(key_in_config + ".max_command_execution_time", 10);
size_t max_execution_time_seconds = static_cast<size_t>(getContext()->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;
}
ExternalLoadableLifetime lifetime(config, key_in_config + ".lifetime");
std::vector<DataTypePtr> argument_types;
@ -157,11 +212,15 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
UserDefinedExecutableFunction::Config function_config
{
.type = function_type,
.name = std::move(name),
.script_path = std::move(command),
.format = std::move(format),
.argument_types = std::move(argument_types),
.result_type = std::move(result_type),
.pool_size = pool_size,
.command_termination_timeout = command_termination_timeout,
.max_command_execution_time = max_command_execution_time
};
std::shared_ptr<scope_guard> function_deregister_ptr = std::make_shared<scope_guard>([function_name = function_config.name]()
@ -171,10 +230,42 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
auto function = std::make_shared<UserDefinedExecutableFunction>(function_config, std::move(function_deregister_ptr), lifetime);
UserDefinedExecutableFunctionFactory::instance().registerFunction(function_config.name, [function](ContextPtr function_context)
std::shared_ptr<ProcessPool> process_pool;
if (function_config.type == UserDefinedExecutableFunctionType::executable_pool)
process_pool = std::make_shared<ProcessPool>(function_config.pool_size == 0 ? std::numeric_limits<int>::max() : function_config.pool_size);
auto get_process_function = [function, process_pool]()
{
auto shell_command = ShellCommand::execute(function->getConfig().script_path);
std::shared_ptr<UserDefinedFunction> user_defined_function = std::make_shared<UserDefinedFunction>(function_context, function->getConfig(), std::move(shell_command));
const auto & executable_function_config = function->getConfig();
std::unique_ptr<ShellCommand> process;
bool is_executable_pool_function = (process_pool != nullptr);
if (is_executable_pool_function)
{
bool result = process_pool->tryBorrowObject(process, [&]()
{
ShellCommand::Config process_config(function->getConfig().script_path);
process_config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy{ true /*terminate_in_destructor*/, executable_function_config.command_termination_timeout };
auto shell_command = ShellCommand::execute(process_config);
return shell_command;
}, executable_function_config.max_command_execution_time * 1000);
if (!result)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Could not get process from pool, max command execution timeout exceeded {} seconds",
executable_function_config.max_command_execution_time);
}
else
{
process = ShellCommand::execute(executable_function_config.script_path);
}
return process;
};
UserDefinedExecutableFunctionFactory::instance().registerFunction(function_config.name, [get_process_function, function, process_pool](ContextPtr function_context)
{
std::shared_ptr<UserDefinedFunction> user_defined_function = std::make_shared<UserDefinedFunction>(function_context, function->getConfig(), std::move(get_process_function), process_pool);
return std::make_unique<FunctionToOverloadResolverAdaptor>(user_defined_function);
});

View File

@ -11,16 +11,28 @@
namespace DB
{
enum class UserDefinedExecutableFunctionType
{
executable,
executable_pool
};
class UserDefinedExecutableFunction final : public IExternalLoadable
{
public:
struct Config
{
UserDefinedExecutableFunctionType type;
std::string name;
std::string script_path;
std::string format;
std::vector<DataTypePtr> argument_types;
DataTypePtr result_type;
/// Pool settings
size_t pool_size = 0;
size_t command_termination_timeout = 0;
size_t max_command_execution_time = 0;
};
UserDefinedExecutableFunction(

View File

@ -60,6 +60,7 @@ SRCS(
ExternalLoaderTempConfigRepository.cpp
ExternalLoaderXMLConfigRepository.cpp
ExternalModelsLoader.cpp
ExternalUserDefinedExecutableFunctionsLoader.cpp
ExtractExpressionInfoVisitor.cpp
FillingRow.cpp
FunctionNameNormalizer.cpp
@ -165,9 +166,11 @@ SRCS(
TranslateQualifiedNamesVisitor.cpp
TreeOptimizer.cpp
TreeRewriter.cpp
UserDefinedFunctionFactory.cpp
UserDefinedFunctionsVisitor.cpp
UserDefinedObjectsLoader.cpp
UserDefinedExecutableFunction.cpp
UserDefinedExecutableFunctionFactory.cpp
UserDefinedSQLFunctionFactory.cpp
UserDefinedSQLFunctionVisitor.cpp
UserDefinedSQLObjectsLoader.cpp
WindowDescription.cpp
ZooKeeperLog.cpp
addMissingDefaults.cpp