UserDefinedExecutableFunction added send_chunk_header option

This commit is contained in:
Maksim Kita 2021-09-10 16:28:46 +03:00
parent 3852954b49
commit 769fab62e3
2 changed files with 45 additions and 32 deletions

View File

@ -5,6 +5,8 @@
#include <DataTypes/DataTypeFactory.h>
#include <IO/WriteHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
@ -29,7 +31,7 @@ public:
explicit UserDefinedFunction(
ContextPtr context_,
const UserDefinedExecutableFunction::Config & configuration_,
const UserDefinedExecutableFunctionConfiguration & configuration_,
GetProcessFunction get_process_function_,
std::shared_ptr<ProcessPool> process_pool_)
: context(context_)
@ -89,6 +91,13 @@ public:
ShellCommandSource::SendDataTask task = {[process_in, arguments_block, is_executable_pool_function, this]()
{
auto & out = *process_in;
if (configuration.send_chunk_header)
{
writeText(arguments_block.rows(), out);
writeChar('\n', out);
}
auto output_stream = context->getOutputStream(configuration.format, out, arguments_block.cloneEmpty());
formatBlock(output_stream, arguments_block);
if (!is_executable_pool_function)
@ -133,7 +142,7 @@ public:
private:
ContextPtr context;
UserDefinedExecutableFunction::Config configuration;
UserDefinedExecutableFunctionConfiguration configuration;
GetProcessFunction get_process_function;
mutable std::shared_ptr<ProcessPool> process_pool;
};
@ -177,6 +186,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
String command = config.getString(key_in_config + ".command");
String format = config.getString(key_in_config + ".format");
DataTypePtr result_type = DataTypeFactory::instance().get(config.getString(key_in_config + ".return_type"));
bool send_chunk_header = config.getBool(key_in_config + ".send_chunk_header", false);
size_t pool_size = 0;
size_t command_termination_timeout = 0;
@ -209,7 +219,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
argument_types.emplace_back(std::move(argument_type));
}
UserDefinedExecutableFunction::Config function_config
UserDefinedExecutableFunctionConfiguration function_configuration
{
.type = function_type,
.name = std::move(name),
@ -219,23 +229,24 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
.result_type = std::move(result_type),
.pool_size = pool_size,
.command_termination_timeout = command_termination_timeout,
.max_command_execution_time = max_command_execution_time
.max_command_execution_time = max_command_execution_time,
.send_chunk_header = send_chunk_header
};
std::shared_ptr<scope_guard> function_deregister_ptr = std::make_shared<scope_guard>([function_name = function_config.name]()
std::shared_ptr<scope_guard> function_deregister_ptr = std::make_shared<scope_guard>([function_name = function_configuration.name]()
{
UserDefinedExecutableFunctionFactory::instance().unregisterFunction(function_name);
});
auto function = std::make_shared<UserDefinedExecutableFunction>(function_config, std::move(function_deregister_ptr), lifetime);
auto function = std::make_shared<UserDefinedExecutableFunction>(function_configuration, std::move(function_deregister_ptr), lifetime);
std::shared_ptr<ProcessPool> process_pool;
if (function_config.type == UserDefinedExecutableFunctionType::executable_pool)
process_pool = std::make_shared<ProcessPool>(function_config.pool_size == 0 ? std::numeric_limits<int>::max() : function_config.pool_size);
if (function_configuration.type == UserDefinedExecutableFunctionType::executable_pool)
process_pool = std::make_shared<ProcessPool>(function_configuration.pool_size == 0 ? std::numeric_limits<int>::max() : function_configuration.pool_size);
auto get_process_function = [function, process_pool]()
{
const auto & executable_function_config = function->getConfig();
const auto & executable_function_config = function->getConfiguration();
std::unique_ptr<ShellCommand> process;
bool is_executable_pool_function = (process_pool != nullptr);
@ -243,7 +254,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
{
bool result = process_pool->tryBorrowObject(process, [&]()
{
ShellCommand::Config process_config(function->getConfig().script_path);
ShellCommand::Config process_config(function->getConfiguration().script_path);
process_config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy{ true /*terminate_in_destructor*/, executable_function_config.command_termination_timeout };
auto shell_command = ShellCommand::execute(process_config);
return shell_command;
@ -262,9 +273,9 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
return process;
};
UserDefinedExecutableFunctionFactory::instance().registerFunction(function_config.name, [get_process_function, function, process_pool](ContextPtr function_context)
UserDefinedExecutableFunctionFactory::instance().registerFunction(function_configuration.name, [get_process_function, function, process_pool](ContextPtr function_context)
{
std::shared_ptr<UserDefinedFunction> user_defined_function = std::make_shared<UserDefinedFunction>(function_context, function->getConfig(), std::move(get_process_function), process_pool);
std::shared_ptr<UserDefinedFunction> user_defined_function = std::make_shared<UserDefinedFunction>(function_context, function->getConfiguration(), std::move(get_process_function), process_pool);
return std::make_unique<FunctionToOverloadResolverAdaptor>(user_defined_function);
});

View File

@ -17,26 +17,28 @@ enum class UserDefinedExecutableFunctionType
executable_pool
};
struct UserDefinedExecutableFunctionConfiguration
{
UserDefinedExecutableFunctionType type;
std::string name;
std::string script_path;
std::string format;
std::vector<DataTypePtr> argument_types;
DataTypePtr result_type;
/// Pool settings
size_t pool_size = 0;
size_t command_termination_timeout = 0;
size_t max_command_execution_time = 0;
/// Send number_of_rows\n before sending chunk to process
bool send_chunk_header = false;
};
class UserDefinedExecutableFunction final : public IExternalLoadable
{
public:
struct Config
{
UserDefinedExecutableFunctionType type;
std::string name;
std::string script_path;
std::string format;
std::vector<DataTypePtr> argument_types;
DataTypePtr result_type;
/// Pool settings
size_t pool_size = 0;
size_t command_termination_timeout = 0;
size_t max_command_execution_time = 0;
};
UserDefinedExecutableFunction(
const Config & config_,
const UserDefinedExecutableFunctionConfiguration & configuration_,
std::shared_ptr<scope_guard> function_deregister_,
const ExternalLoadableLifetime & lifetime_);
@ -47,7 +49,7 @@ public:
const std::string & getLoadableName() const override
{
return config.name;
return configuration.name;
}
bool supportUpdates() const override
@ -62,12 +64,12 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<UserDefinedExecutableFunction>(config, function_deregister, lifetime);
return std::make_shared<UserDefinedExecutableFunction>(configuration, function_deregister, lifetime);
}
const Config & getConfig() const
const UserDefinedExecutableFunctionConfiguration & getConfiguration() const
{
return config;
return configuration;
}
std::shared_ptr<UserDefinedExecutableFunction> shared_from_this()
@ -81,7 +83,7 @@ public:
}
private:
Config config;
UserDefinedExecutableFunctionConfiguration configuration;
std::shared_ptr<scope_guard> function_deregister;
ExternalLoadableLifetime lifetime;
};