diff --git a/dbms/programs/client/Client.cpp b/dbms/programs/client/Client.cpp index b14b2b72d3d..7f389fc478f 100644 --- a/dbms/programs/client/Client.cpp +++ b/dbms/programs/client/Client.cpp @@ -829,9 +829,17 @@ private: connection->forceConnected(); - /// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately. - if (insert && (!insert->select || insert->input_function)) + ASTPtr input_function; + if (insert && insert->select) + insert->tryFindInputFunction(insert->select, input_function); + + /// INSERT query for which data transfer is needed (not an INSERT SELECT or input()) is processed separately. + if (insert && (!insert->select || input_function)) + { + if (input_function && insert->format.empty()) + throw Exception("FORMAT must be specified for function input()", ErrorCodes::LOGICAL_ERROR); processInsertQuery(); + } else processOrdinaryQuery(); } diff --git a/dbms/programs/server/TCPHandler.cpp b/dbms/programs/server/TCPHandler.cpp index cb8f3eb2c67..bd1d365d6ce 100644 --- a/dbms/programs/server/TCPHandler.cpp +++ b/dbms/programs/server/TCPHandler.cpp @@ -188,7 +188,7 @@ void TCPHandler::runImpl() /// Send structure of columns to client for function input() query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage) { - if (&context != &*query_context) + if (&context != &query_context.value()) throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR); state.need_receive_data_for_input = true; @@ -207,7 +207,7 @@ void TCPHandler::runImpl() query_context->setInputBlocksReaderCallback([&global_settings, this] (Context & context) -> Block { - if (&context != &*query_context) + if (&context != &query_context.value()) throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR); size_t poll_interval; diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp index aab0467037e..e31c6772f9c 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.cpp @@ -20,7 +20,7 @@ namespace ErrorCodes InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( - const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context) + const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context, const ASTPtr & input_function) { const auto * ast_insert_query = ast->as(); @@ -29,7 +29,11 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( String format = ast_insert_query->format; if (format.empty()) + { + if (input_function) + throw Exception("FORMAT must be specified for function input()", ErrorCodes::LOGICAL_ERROR); format = "Values"; + } /// Data could be in parsed (ast_insert_query.data) and in not parsed yet (input_buffer_tail_part) part of query. @@ -51,7 +55,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery( res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size); - if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->input_function) + if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !input_function) { StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table); auto column_defaults = storage->getColumns().getDefaults(); diff --git a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h index 3ecda33289e..a57e9199603 100644 --- a/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h +++ b/dbms/src/DataStreams/InputStreamFromASTInsertQuery.h @@ -19,7 +19,11 @@ class Context; class InputStreamFromASTInsertQuery : public IBlockInputStream { public: - InputStreamFromASTInsertQuery(const ASTPtr & ast, ReadBuffer * input_buffer_tail_part, const Block & header, const Context & context); + InputStreamFromASTInsertQuery(const ASTPtr & ast, + ReadBuffer * input_buffer_tail_part, + const Block & header, + const Context & context, + const ASTPtr & input_function); Block readImpl() override { return res_stream->read(); } void readPrefixImpl() override { return res_stream->readPrefix(); } diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index e4391f52247..733a6a3e87c 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -147,7 +147,7 @@ BlockIO InterpreterInsertQuery::execute() } else if (query.data && !query.has_tail) /// can execute without additional data { - res.in = std::make_shared(query_ptr, nullptr, query_sample_block, context); + res.in = std::make_shared(query_ptr, nullptr, query_sample_block, context, nullptr); res.in = std::make_shared(res.in, res.out); res.out = nullptr; } diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index ac9f0ef2c35..2866a1f7410 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -224,18 +224,22 @@ static std::tuple executeQueryImpl( /// Load external tables if they were provided context.initializeExternalTablesIfSet(); - /// Prepare Input storage before executing interpreter. auto * insert_query = ast->as(); - if (insert_query && insert_query->input_function) + if (insert_query && insert_query->select) { - /// If we already got a buffer with data then initialize input stream. + /// Prepare Input storage before executing interpreter if we already got a buffer with data. if (istr) { - StoragePtr storage = context.executeTableFunction(insert_query->input_function); - auto * input_storage = dynamic_cast(storage.get()); - BlockInputStreamPtr input_stream = std::make_shared(ast, istr, - input_storage->getSampleBlock(), context); - input_storage->setInputStream(input_stream); + ASTPtr input_function; + insert_query->tryFindInputFunction(insert_query->select, input_function); + if (input_function) + { + StoragePtr storage = context.executeTableFunction(input_function); + auto & input_storage = dynamic_cast(*storage); + BlockInputStreamPtr input_stream = std::make_shared(ast, istr, + input_storage.getSampleBlock(), context, input_function); + input_storage.setInputStream(input_stream); + } } } else @@ -513,7 +517,7 @@ void executeQuery( { if (streams.out) { - InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context); + InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr); copyData(in, *streams.out); } diff --git a/dbms/src/Parsers/ASTInsertQuery.cpp b/dbms/src/Parsers/ASTInsertQuery.cpp index f4f85ce38f3..e1497e64880 100644 --- a/dbms/src/Parsers/ASTInsertQuery.cpp +++ b/dbms/src/Parsers/ASTInsertQuery.cpp @@ -1,10 +1,17 @@ #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + + void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { frame.need_parens = false; @@ -50,4 +57,23 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s } } + +void ASTInsertQuery::tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function) const +{ + if (!ast) + return; + for (const auto & child : ast->children) + tryFindInputFunction(child, input_function); + + if (const auto * table_function = ast->as()) + { + if (table_function->name == "input") + { + if (input_function) + throw Exception("You can use 'input()' function only once per request.", ErrorCodes::LOGICAL_ERROR); + input_function = ast; + } + } +} + } diff --git a/dbms/src/Parsers/ASTInsertQuery.h b/dbms/src/Parsers/ASTInsertQuery.h index 235d53d29cf..f8b427dd9e5 100644 --- a/dbms/src/Parsers/ASTInsertQuery.h +++ b/dbms/src/Parsers/ASTInsertQuery.h @@ -17,7 +17,6 @@ public: ASTPtr columns; String format; ASTPtr select; - ASTPtr input_function; ASTPtr table_function; ASTPtr settings_ast; @@ -31,6 +30,9 @@ public: /// Query has additional data, which will be sent later bool has_tail = false; + /// Try to find table function input() in SELECT part + void tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function) const; + /** Get the text that identifies this element. */ String getID(char delim) const override { return "InsertQuery" + (delim + database) + delim + table; } @@ -41,7 +43,6 @@ public: if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); } if (select) { res->select = select->clone(); res->children.push_back(res->select); } - if (input_function) { res->input_function = input_function->clone(); res->children.push_back(res->input_function); } if (table_function) { res->table_function = table_function->clone(); res->children.push_back(res->table_function); } if (settings_ast) { res->settings_ast = settings_ast->clone(); res->children.push_back(res->settings_ast); } diff --git a/dbms/src/Parsers/ParserInsertQuery.cpp b/dbms/src/Parsers/ParserInsertQuery.cpp index afa14858a0d..5587e595f46 100644 --- a/dbms/src/Parsers/ParserInsertQuery.cpp +++ b/dbms/src/Parsers/ParserInsertQuery.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include @@ -20,24 +19,6 @@ namespace ErrorCodes extern const int SYNTAX_ERROR; } -void tryFindInputFunction(const ASTPtr & ast, ASTPtr & input_function) -{ - if (!ast) - return; - for (const auto & child : ast->children) - tryFindInputFunction(child, input_function); - - if (const auto * table_function = ast->as()) - { - if (table_function->name == "input") - { - if (input_function) - throw Exception("You can use 'input()' function only once per request.", ErrorCodes::SYNTAX_ERROR); - input_function = ast; - } - } -} - bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { @@ -63,7 +44,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ASTPtr select; ASTPtr table_function; ASTPtr settings_ast; - ASTPtr input_function; /// Insertion data const char * data = nullptr; @@ -118,12 +98,9 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserSelectWithUnionQuery select_p; select_p.parse(pos, select, expected); - /// Check if we have INSERT SELECT FROM input(). - tryFindInputFunction(select, input_function); - /// FORMAT section is required if we have input() in SELECT part - if (input_function) - if (!s_format.ignore(pos, expected) || !name_p.parse(pos, format, expected)) - return false; + /// FORMAT section is expected if we have input() in SELECT part + if (s_format.ignore(pos, expected) && !name_p.parse(pos, format, expected)) + return false; } else { @@ -182,7 +159,6 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) query->columns = columns; query->select = select; - query->input_function = input_function; query->settings_ast = settings_ast; query->data = data != end ? data : nullptr; query->end = end;