ClickHouse/src/Storages/StorageExecutable.cpp

239 lines
8.6 KiB
C++
Raw Normal View History

2021-04-14 17:51:55 +00:00
#include <Storages/StorageExecutable.h>
2021-08-25 19:30:22 +00:00
#include <filesystem>
#include <unistd.h>
2021-08-25 19:30:22 +00:00
2021-12-22 15:20:36 +00:00
#include <boost/algorithm/string/split.hpp>
2021-10-18 22:04:07 +00:00
#include <Common/filesystemHelpers.h>
2021-08-25 19:30:22 +00:00
#include <Core/Block.h>
2021-09-02 11:53:20 +00:00
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
2021-09-02 11:53:20 +00:00
#include <Parsers/ASTCreateQuery.h>
2021-10-16 14:03:50 +00:00
#include <QueryPipeline/Pipe.h>
2021-09-16 17:40:42 +00:00
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
2021-04-14 17:51:55 +00:00
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
2021-04-14 17:51:55 +00:00
#include <Storages/StorageFactory.h>
2021-04-14 17:51:55 +00:00
namespace DB
{
2021-08-24 19:38:42 +00:00
2021-04-14 17:51:55 +00:00
namespace ErrorCodes
{
2021-08-25 19:30:22 +00:00
extern const int UNSUPPORTED_METHOD;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2021-04-14 17:51:55 +00:00
}
2021-12-28 09:43:30 +00:00
namespace
2021-04-14 17:51:55 +00:00
{
2021-12-28 09:43:30 +00:00
void transformToSingleBlockSources(Pipes & inputs)
{
size_t inputs_size = inputs.size();
for (size_t i = 0; i < inputs_size; ++i)
{
auto && input = inputs[i];
QueryPipeline input_pipeline(std::move(input));
PullingPipelineExecutor input_pipeline_executor(input_pipeline);
auto header = input_pipeline_executor.getHeader();
auto result_block = header.cloneEmpty();
size_t result_block_columns = result_block.columns();
Block result;
while (input_pipeline_executor.pull(result))
{
for (size_t result_block_index = 0; result_block_index < result_block_columns; ++result_block_index)
{
auto & block_column = result.safeGetByPosition(result_block_index);
auto & result_block_column = result_block.safeGetByPosition(result_block_index);
result_block_column.column->assumeMutable()->insertRangeFrom(*block_column.column, 0, block_column.column->size());
}
}
auto source = std::make_shared<SourceFromSingleChunk>(std::move(result_block));
inputs[i] = Pipe(std::move(source));
}
}
2021-04-14 17:51:55 +00:00
}
2021-09-02 11:53:20 +00:00
StorageExecutable::StorageExecutable(
const StorageID & table_id_,
2021-11-01 11:22:21 +00:00
const String & format,
const ExecutableSettings & settings_,
2021-11-01 11:22:21 +00:00
const std::vector<ASTPtr> & input_queries_,
2021-09-02 11:53:20 +00:00
const ColumnsDescription & columns,
const ConstraintsDescription & constraints)
: IStorage(table_id_)
, settings(settings_)
2021-11-01 11:22:21 +00:00
, input_queries(input_queries_)
, log(settings.is_executable_pool ? &Poco::Logger::get("StorageExecutablePool") : &Poco::Logger::get("StorageExecutable"))
2021-09-02 11:53:20 +00:00
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns);
storage_metadata.setConstraints(constraints);
setInMemoryMetadata(storage_metadata);
2021-12-28 09:43:30 +00:00
ShellCommandSourceCoordinator::Configuration configuration
2021-09-16 17:40:42 +00:00
{
2021-11-01 11:22:21 +00:00
.format = format,
.command_termination_timeout_seconds = settings.command_termination_timeout,
.command_read_timeout_milliseconds = settings.command_read_timeout,
.command_write_timeout_milliseconds = settings.command_write_timeout,
2021-09-16 17:40:42 +00:00
2021-11-01 11:22:21 +00:00
.pool_size = settings.pool_size,
.max_command_execution_time_seconds = settings.max_command_execution_time,
2021-09-16 17:40:42 +00:00
2021-11-01 11:22:21 +00:00
.is_executable_pool = settings.is_executable_pool,
.send_chunk_header = settings.send_chunk_header,
.execute_direct = true
};
2021-09-16 17:40:42 +00:00
2021-12-28 09:43:30 +00:00
coordinator = std::make_unique<ShellCommandSourceCoordinator>(std::move(configuration));
2021-11-01 11:22:21 +00:00
}
2021-09-16 17:40:42 +00:00
2021-04-14 17:51:55 +00:00
Pipe StorageExecutable::read(
const Names & /*column_names*/,
2021-09-09 11:27:10 +00:00
const StorageSnapshotPtr & storage_snapshot,
2021-04-14 17:51:55 +00:00
SelectQueryInfo & /*query_info*/,
2021-08-25 19:30:22 +00:00
ContextPtr context,
2021-04-14 17:51:55 +00:00
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
2021-08-25 19:30:22 +00:00
unsigned /*threads*/)
2021-04-14 17:51:55 +00:00
{
2021-11-01 11:22:21 +00:00
auto & script_name = settings.script_name;
2021-08-25 19:30:22 +00:00
auto user_scripts_path = context->getUserScriptsPath();
auto script_path = user_scripts_path + '/' + script_name;
2021-10-18 22:04:07 +00:00
2021-12-22 15:20:36 +00:00
if (!fileOrSymlinkPathStartsWith(script_path, user_scripts_path))
2021-08-25 19:30:22 +00:00
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
2021-10-18 22:04:07 +00:00
"Executable file {} must be inside user scripts folder {}",
script_name,
user_scripts_path);
if (!FS::exists(script_path))
2021-10-18 22:04:07 +00:00
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
2021-10-19 14:41:59 +00:00
"Executable file {} does not exist inside user scripts folder {}",
2021-08-25 19:30:22 +00:00
script_name,
user_scripts_path);
if (!FS::canExecute(script_path))
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable file {} is not executable inside user scripts folder {}",
script_name,
user_scripts_path);
2021-11-01 11:22:21 +00:00
Pipes inputs;
inputs.reserve(input_queries.size());
for (auto & input_query : input_queries)
{
InterpreterSelectWithUnionQuery interpreter(input_query, context, {});
2021-11-01 11:22:21 +00:00
inputs.emplace_back(QueryPipelineBuilder::getPipe(interpreter.buildQueryPipeline()));
2021-09-02 11:53:20 +00:00
}
2021-08-25 19:30:22 +00:00
2021-12-28 09:43:30 +00:00
/// For executable pool we read data from input streams and convert it to single blocks streams.
if (settings.is_executable_pool)
2021-12-28 09:43:30 +00:00
transformToSingleBlockSources(inputs);
2021-08-25 19:30:22 +00:00
2021-09-09 11:27:10 +00:00
auto sample_block = storage_snapshot->metadata->getSampleBlock();
2021-09-02 11:53:20 +00:00
2021-09-03 10:00:40 +00:00
ShellCommandSourceConfiguration configuration;
configuration.max_block_size = max_block_size;
2021-12-28 09:43:30 +00:00
if (settings.is_executable_pool)
2021-09-02 11:53:20 +00:00
{
2021-09-03 10:00:40 +00:00
configuration.read_fixed_number_of_rows = true;
configuration.read_number_of_rows_from_process_output = true;
2021-09-02 11:53:20 +00:00
}
2021-09-03 10:00:40 +00:00
2021-12-22 15:20:36 +00:00
return coordinator->createPipe(script_path, settings.script_arguments, std::move(inputs), std::move(sample_block), context, configuration);
2021-04-14 17:51:55 +00:00
}
2021-08-28 19:47:59 +00:00
void registerStorageExecutable(StorageFactory & factory)
{
2021-09-02 11:53:20 +00:00
auto register_storage = [](const StorageFactory::Arguments & args, bool is_executable_pool) -> StoragePtr
{
auto local_context = args.getLocalContext();
if (args.engine_args.size() < 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"StorageExecutable requires minimum 2 arguments: script_name, format, [input_query...]");
for (size_t i = 0; i < 2; ++i)
args.engine_args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args.engine_args[i], local_context);
auto scipt_name_with_arguments_value = args.engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::vector<String> script_name_with_arguments;
2021-09-02 11:53:20 +00:00
boost::split(script_name_with_arguments, scipt_name_with_arguments_value, [](char c) { return c == ' '; });
auto script_name = script_name_with_arguments[0];
script_name_with_arguments.erase(script_name_with_arguments.begin());
auto format = args.engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::vector<ASTPtr> input_queries;
for (size_t i = 2; i < args.engine_args.size(); ++i)
{
ASTPtr query = args.engine_args[i]->children.at(0);
if (!query->as<ASTSelectWithUnionQuery>())
2021-09-02 11:53:20 +00:00
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "StorageExecutable argument is invalid input query {}",
query->formatForErrorMessage());
input_queries.emplace_back(std::move(query));
}
const auto & columns = args.columns;
const auto & constraints = args.constraints;
2021-11-01 11:22:21 +00:00
ExecutableSettings settings;
settings.script_name = script_name;
settings.script_arguments = script_name_with_arguments;
settings.is_executable_pool = is_executable_pool;
2021-09-02 11:53:20 +00:00
if (is_executable_pool)
{
size_t max_command_execution_time = 10;
size_t max_execution_time_seconds = static_cast<size_t>(args.getContext()->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
max_command_execution_time = max_execution_time_seconds;
2021-11-01 11:22:21 +00:00
settings.max_command_execution_time = max_command_execution_time;
2021-09-02 11:53:20 +00:00
}
2021-11-01 11:22:21 +00:00
if (args.storage_def->settings)
settings.loadFromQuery(*args.storage_def);
2021-11-01 11:22:21 +00:00
auto global_context = args.getContext()->getGlobalContext();
return std::make_shared<StorageExecutable>(args.table_id, format, settings, input_queries, columns, constraints);
2021-09-02 11:53:20 +00:00
};
StorageFactory::StorageFeatures storage_features;
storage_features.supports_settings = true;
2021-09-02 11:53:20 +00:00
factory.registerStorage("Executable", [&](const StorageFactory::Arguments & args)
{
return register_storage(args, false /*is_executable_pool*/);
}, storage_features);
2021-09-02 11:53:20 +00:00
factory.registerStorage("ExecutablePool", [&](const StorageFactory::Arguments & args)
{
return register_storage(args, true /*is_executable_pool*/);
}, storage_features);
}
2021-04-14 17:51:55 +00:00
};