add settings for executable table func

SELECT * FROM executable('<script name>', '<format>', '<columns>', (SELECT * FROM table), SETTINGS max_command_execution_time=100, command_termination_timeout=100)

fixes #38908
This commit is contained in:
Constantine Peresypkin 2022-07-28 19:54:46 +02:00
parent ee515b8862
commit b9d7cd6a5d
4 changed files with 50 additions and 9 deletions

View File

@ -1,6 +1,7 @@
#include <string_view> #include <string_view>
#include <Parsers/ExpressionListParsers.h> #include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserSetQuery.h>
#include <Parsers/ASTAsterisk.h> #include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTExpressionList.h> #include <Parsers/ASTExpressionList.h>
@ -9,6 +10,7 @@
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h> #include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h> #include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h> #include <Parsers/ASTTablesInSelectQuery.h>
@ -603,6 +605,13 @@ bool ParserTableFunctionExpression::parseImpl(Pos & pos, ASTPtr & node, Expected
{ {
if (ParserTableFunctionView().parse(pos, node, expected)) if (ParserTableFunctionView().parse(pos, node, expected))
return true; return true;
ParserKeyword s_settings("SETTINGS");
if (s_settings.ignore(pos, expected))
{
ParserSetQuery parser_settings(true);
if (parser_settings.parse(pos, node, expected))
return true;
}
return elem_parser.parse(pos, node, expected); return elem_parser.parse(pos, node, expected);
} }

View File

@ -4,7 +4,10 @@
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/parseColumnsListForTableFunction.h> #include <TableFunctions/parseColumnsListForTableFunction.h>
#include <Parsers/ASTFunction.h> #include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectWithUnionQuery.h> #include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/parseQuery.h>
#include <Storages/checkAndGetLiteralArgument.h> #include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageExecutable.h> #include <Storages/StorageExecutable.h>
#include <DataTypes/DataTypeFactory.h> #include <DataTypes/DataTypeFactory.h>
@ -48,7 +51,7 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, Contex
std::vector<String> script_name_with_arguments; std::vector<String> script_name_with_arguments;
boost::split(script_name_with_arguments, script_name_with_arguments_value, [](char c){ return c == ' '; }); boost::split(script_name_with_arguments, script_name_with_arguments_value, [](char c){ return c == ' '; });
script_name = script_name_with_arguments[0]; script_name = std::move(script_name_with_arguments[0]);
script_name_with_arguments.erase(script_name_with_arguments.begin()); script_name_with_arguments.erase(script_name_with_arguments.begin());
arguments = std::move(script_name_with_arguments); arguments = std::move(script_name_with_arguments);
format = checkAndGetLiteralArgument<String>(args[1], "format"); format = checkAndGetLiteralArgument<String>(args[1], "format");
@ -56,14 +59,26 @@ void TableFunctionExecutable::parseArguments(const ASTPtr & ast_function, Contex
for (size_t i = 3; i < args.size(); ++i) for (size_t i = 3; i < args.size(); ++i)
{ {
ASTPtr query = args[i]->children.at(0); if (args[i]->as<ASTSetQuery>())
if (!query->as<ASTSelectWithUnionQuery>()) {
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, settings_query = std::move(args[i]);
"Table function '{}' argument is invalid input query {}", }
getName(), else
query->formatForErrorMessage()); {
ASTPtr query = args[i]->children.at(0);
input_queries.emplace_back(std::move(query)); if (query->as<ASTSelectWithUnionQuery>())
{
input_queries.emplace_back(std::move(query));
}
else
{
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Table function '{}' argument is invalid {}",
getName(),
args[i]->formatForErrorMessage());
}
}
} }
} }
@ -79,6 +94,8 @@ StoragePtr TableFunctionExecutable::executeImpl(const ASTPtr & /*ast_function*/,
ExecutableSettings settings; ExecutableSettings settings;
settings.script_name = script_name; settings.script_name = script_name;
settings.script_arguments = arguments; settings.script_arguments = arguments;
if (settings_query != nullptr)
settings.applyChanges(settings_query->as<ASTSetQuery>()->changes);
auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context), ConstraintsDescription{}); auto storage = std::make_shared<StorageExecutable>(storage_id, format, settings, input_queries, getActualTableStructure(context), ConstraintsDescription{});
storage->startup(); storage->startup();

View File

@ -6,6 +6,7 @@ namespace DB
{ {
class Context; class Context;
class ASTSetQuery;
/* executable(script_name_optional_arguments, format, structure, input_query) - creates a temporary storage from executable file /* executable(script_name_optional_arguments, format, structure, input_query) - creates a temporary storage from executable file
* *
@ -32,5 +33,6 @@ private:
String format; String format;
String structure; String structure;
std::vector<ASTPtr> input_queries; std::vector<ASTPtr> input_queries;
ASTPtr settings_query = nullptr;
}; };
} }

View File

@ -163,6 +163,19 @@ def test_executable_function_input_multiple_pipes_python(started_cluster):
assert actual == expected assert actual == expected
def test_executable_function_input_slow_python_timeout_increased(started_cluster):
skip_test_msan(node)
query = "SELECT * FROM executable('input_slow.py', 'TabSeparated', 'value String', {source}, SETTINGS {settings})"
settings = "command_termination_timeout = 26, command_read_timeout = 26000, command_write_timeout = 26000"
assert node.query(query.format(source="(SELECT 1)", settings=settings)) == "Key 1\n"
assert (
node.query(
query.format(source="(SELECT id FROM test_data_table)", settings=settings)
)
== "Key 0\nKey 1\nKey 2\n"
)
def test_executable_storage_no_input_bash(started_cluster): def test_executable_storage_no_input_bash(started_cluster):
skip_test_msan(node) skip_test_msan(node)
node.query("DROP TABLE IF EXISTS test_table") node.query("DROP TABLE IF EXISTS test_table")