ClickHouse/src/Storages/StorageExecutable.cpp

261 lines
9.2 KiB
C++
Raw Normal View History

2021-04-14 17:51:55 +00:00
#include <Storages/StorageExecutable.h>
2021-08-25 19:30:22 +00:00
#include <filesystem>
#include <Common/ShellCommand.h>
#include <Core/Block.h>
2021-09-02 11:53:20 +00:00
2021-04-15 09:40:41 +00:00
#include <IO/ReadHelpers.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
2021-09-02 11:53:20 +00:00
#include <Parsers/ASTCreateQuery.h>
#include <DataStreams/IBlockInputStream.h>
2021-08-25 19:30:22 +00:00
#include <Processors/Pipe.h>
2021-04-14 17:51:55 +00:00
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
2021-04-14 17:51:55 +00:00
#include <Storages/StorageFactory.h>
2021-04-14 17:51:55 +00:00
namespace DB
{
2021-08-24 19:38:42 +00:00
2021-04-14 17:51:55 +00:00
namespace ErrorCodes
{
2021-08-25 19:30:22 +00:00
extern const int UNSUPPORTED_METHOD;
2021-08-28 19:47:59 +00:00
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2021-09-02 11:53:20 +00:00
extern const int TIMEOUT_EXCEEDED;
2021-04-14 17:51:55 +00:00
}
StorageExecutable::StorageExecutable(
2021-04-15 09:40:41 +00:00
const StorageID & table_id_,
2021-08-25 19:30:22 +00:00
const String & script_name_,
const std::vector<String> & arguments_,
2021-04-15 21:15:54 +00:00
const String & format_,
const std::vector<ASTPtr> & input_queries_,
2021-04-14 17:51:55 +00:00
const ColumnsDescription & columns,
2021-08-24 19:38:42 +00:00
const ConstraintsDescription & constraints)
2021-04-15 09:40:41 +00:00
: IStorage(table_id_)
2021-08-25 19:30:22 +00:00
, script_name(script_name_)
, arguments(arguments_)
2021-04-15 21:15:54 +00:00
, format(format_)
, input_queries(input_queries_)
2021-08-24 19:38:42 +00:00
, log(&Poco::Logger::get("StorageExecutable"))
2021-04-14 17:51:55 +00:00
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
}
2021-09-02 11:53:20 +00:00
StorageExecutable::StorageExecutable(
const StorageID & table_id_,
const String & script_name_,
const std::vector<String> & arguments_,
const String & format_,
const std::vector<ASTPtr> & input_queries_,
const ExecutablePoolSettings & pool_settings_,
const ColumnsDescription & columns,
const ConstraintsDescription & constraints)
: IStorage(table_id_)
, script_name(script_name_)
, arguments(arguments_)
, format(format_)
, input_queries(input_queries_)
, pool_settings(pool_settings_)
/// If pool size == 0 then there is no size restrictions. Poco max size of semaphore is integer type.
, process_pool(std::make_shared<ProcessPool>(pool_settings.pool_size == 0 ? std::numeric_limits<int>::max() : pool_settings.pool_size))
, log(&Poco::Logger::get("StorageExecutablePool"))
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
}
2021-04-14 17:51:55 +00:00
Pipe StorageExecutable::read(
const Names & /*column_names*/,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & /*query_info*/,
2021-08-25 19:30:22 +00:00
ContextPtr context,
2021-04-14 17:51:55 +00:00
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
2021-08-25 19:30:22 +00:00
unsigned /*threads*/)
2021-04-14 17:51:55 +00:00
{
2021-08-25 19:30:22 +00:00
auto user_scripts_path = context->getUserScriptsPath();
auto script_path = user_scripts_path + '/' + script_name;
if (!std::filesystem::exists(std::filesystem::path(script_path)))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} does not exists inside {}",
script_name,
user_scripts_path);
std::vector<BlockInputStreamPtr> inputs;
inputs.reserve(input_queries.size());
for (auto & input_query : input_queries)
{
InterpreterSelectWithUnionQuery interpreter(input_query, context, {});
auto input = interpreter.execute().getInputStream();
inputs.emplace_back(std::move(input));
}
2021-08-25 19:30:22 +00:00
ShellCommand::Config config(script_path);
config.arguments = arguments;
2021-08-28 19:47:59 +00:00
for (size_t i = 1; i < inputs.size(); ++i)
config.write_fds.emplace_back(i + 2);
2021-08-25 19:30:22 +00:00
2021-09-02 11:53:20 +00:00
std::unique_ptr<ShellCommand> process;
2021-09-03 10:00:40 +00:00
bool is_executable_pool = (process_pool != nullptr);
if (is_executable_pool)
2021-09-02 11:53:20 +00:00
{
bool result = process_pool->tryBorrowObject(process, [&config, this]()
{
config.terminate_in_destructor_strategy = ShellCommand::DestructorStrategy{ true /*terminate_in_destructor*/, pool_settings.command_termination_timeout };
auto shell_command = ShellCommand::execute(config);
return shell_command;
}, pool_settings.max_command_execution_time * 10000);
if (!result)
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Could not get process from pool, max command execution timeout exceeded {} seconds",
pool_settings.max_command_execution_time);
}
else
{
process = ShellCommand::executeDirect(config);
}
2021-08-25 19:30:22 +00:00
2021-08-28 19:47:59 +00:00
std::vector<ShellCommandSource::SendDataTask> tasks;
tasks.reserve(inputs.size());
2021-08-25 19:30:22 +00:00
2021-08-28 19:47:59 +00:00
for (size_t i = 0; i < inputs.size(); ++i)
2021-08-25 19:30:22 +00:00
{
2021-08-28 19:47:59 +00:00
BlockInputStreamPtr input_stream = inputs[i];
2021-08-30 18:41:36 +00:00
WriteBufferFromFile * write_buffer = nullptr;
2021-08-28 19:47:59 +00:00
if (i == 0)
{
write_buffer = &process->in;
}
else
{
auto descriptor = i + 2;
auto it = process->write_fds.find(descriptor);
if (it == process->write_fds.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Process does not contain descriptor to write {}", descriptor);
write_buffer = &it->second;
}
2021-09-03 10:00:40 +00:00
ShellCommandSource::SendDataTask task = [input_stream, write_buffer, context, is_executable_pool, this]()
2021-08-28 19:47:59 +00:00
{
auto output_stream = context->getOutputStream(format, *write_buffer, input_stream->getHeader().cloneEmpty());
input_stream->readPrefix();
output_stream->writePrefix();
while (auto block = input_stream->read())
output_stream->write(block);
input_stream->readSuffix();
output_stream->writeSuffix();
output_stream->flush();
2021-09-03 10:00:40 +00:00
if (!is_executable_pool)
write_buffer->close();
2021-08-28 19:47:59 +00:00
};
tasks.emplace_back(std::move(task));
2021-08-25 19:30:22 +00:00
}
auto sample_block = metadata_snapshot->getSampleBlock();
2021-09-02 11:53:20 +00:00
2021-09-03 10:00:40 +00:00
ShellCommandSourceConfiguration configuration;
configuration.max_block_size = max_block_size;
if (is_executable_pool)
2021-09-02 11:53:20 +00:00
{
2021-09-03 10:00:40 +00:00
configuration.read_fixed_number_of_rows = true;
configuration.read_number_of_rows_from_process_output = true;
2021-09-02 11:53:20 +00:00
}
2021-09-03 10:00:40 +00:00
Pipe pipe(std::make_unique<ShellCommandSource>(context, format, std::move(sample_block), std::move(process), log, std::move(tasks), configuration, process_pool));
return pipe;
2021-04-14 17:51:55 +00:00
}
2021-08-28 19:47:59 +00:00
void registerStorageExecutable(StorageFactory & factory)
{
2021-09-02 11:53:20 +00:00
auto register_storage = [](const StorageFactory::Arguments & args, bool is_executable_pool) -> StoragePtr
{
auto local_context = args.getLocalContext();
if (args.engine_args.size() < 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"StorageExecutable requires minimum 2 arguments: script_name, format, [input_query...]");
for (size_t i = 0; i < 2; ++i)
args.engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[i], local_context);
auto scipt_name_with_arguments_value = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::vector<String> script_name_with_arguments;
2021-09-02 11:53:20 +00:00
boost::split(script_name_with_arguments, scipt_name_with_arguments_value, [](char c) { return c == ' '; });
auto script_name = script_name_with_arguments[0];
script_name_with_arguments.erase(script_name_with_arguments.begin());
auto format = args.engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::vector<ASTPtr> input_queries;
for (size_t i = 2; i < args.engine_args.size(); ++i)
{
ASTPtr query = args.engine_args[i]->children.at(0);
if (!query->as<ASTSelectWithUnionQuery>())
2021-09-02 11:53:20 +00:00
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "StorageExecutable argument is invalid input query {}",
query->formatForErrorMessage());
input_queries.emplace_back(std::move(query));
}
const auto & columns = args.columns;
const auto & constraints = args.constraints;
2021-09-02 11:53:20 +00:00
if (is_executable_pool)
{
size_t max_command_execution_time = 10;
size_t max_execution_time_seconds = static_cast<size_t>(args.getContext()->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;
ExecutablePoolSettings pool_settings;
pool_settings.max_command_execution_time = max_command_execution_time;
if (args.storage_def->settings)
pool_settings.loadFromQuery(*args.storage_def);
return StorageExecutable::create(args.table_id, script_name, script_name_with_arguments, format, input_queries, pool_settings, columns, constraints);
}
else
{
return StorageExecutable::create(args.table_id, script_name, script_name_with_arguments, format, input_queries, columns, constraints);
}
};
factory.registerStorage("Executable", [&](const StorageFactory::Arguments & args)
{
return register_storage(args, false /*is_executable_pool*/);
});
factory.registerStorage("ExecutablePool", [&](const StorageFactory::Arguments & args)
{
return register_storage(args, true /*is_executable_pool*/);
});
}
2021-04-14 17:51:55 +00:00
};