From dbc14655fccbf82ca0e9efc2ab9c46f73505b086 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sat, 10 Oct 2020 09:43:07 +0800 Subject: [PATCH] Code refactoring. --- src/Interpreters/InterpreterKillQueryQuery.cpp | 3 +-- src/Interpreters/executeQuery.cpp | 9 +++++---- src/Server/PostgreSQLHandler.cpp | 5 +---- src/Server/TCPHandler.cpp | 15 ++++++++------- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/Interpreters/InterpreterKillQueryQuery.cpp b/src/Interpreters/InterpreterKillQueryQuery.cpp index 80710600db6..0f7da8f1f58 100644 --- a/src/Interpreters/InterpreterKillQueryQuery.cpp +++ b/src/Interpreters/InterpreterKillQueryQuery.cpp @@ -300,8 +300,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S if (where_expression) select_query += " WHERE " + queryToString(where_expression); - BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true); - auto stream = block_io.getInputStream(); + auto stream = executeQuery(select_query, context.getGlobalContext(), true).getInputStream(); Block res = stream->read(); if (res && stream->read()) diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index d7191a83971..76f4ebeb0a7 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -64,6 +64,7 @@ namespace DB namespace ErrorCodes { extern const int INTO_OUTFILE_NOT_ALLOWED; + extern const int LOGICAL_ERROR; extern const int QUERY_WAS_CANCELLED; } @@ -804,8 +805,7 @@ void executeQuery( InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr); copyData(in, *streams.out); } - - if (streams.in) + else if (streams.in) { /// FIXME: try to prettify this cast using `as<>()` const auto * ast_query_with_output = dynamic_cast(ast.get()); @@ -847,8 +847,7 @@ void executeQuery( copyData(*streams.in, *out, [](){ return false; }, [&out](const Block &) { out->flush(); }); } - - if (pipeline.initialized()) + else if (pipeline.initialized()) { const ASTQueryWithOutput * ast_query_with_output = dynamic_cast(ast.get()); @@ -907,6 +906,8 @@ void executeQuery( executor->execute(pipeline.getNumThreads()); } } + else + throw Exception("BlockIO is empty", ErrorCodes::LOGICAL_ERROR); } catch (...) { diff --git a/src/Server/PostgreSQLHandler.cpp b/src/Server/PostgreSQLHandler.cpp index 2b8591d85d8..eacda6b1206 100644 --- a/src/Server/PostgreSQLHandler.cpp +++ b/src/Server/PostgreSQLHandler.cpp @@ -218,10 +218,7 @@ void PostgreSQLHandler::cancelRequest() String query = Poco::format("KILL QUERY WHERE query_id = 'postgres:%d:%d'", msg->process_id, msg->secret_key); ReadBufferFromString replacement(query); - executeQuery( - replacement, *out, true, connection_context, - [](const String &, const String &, const String &, const String &) {} - ); + executeQuery(replacement, *out, true, connection_context, {}); } inline std::unique_ptr PostgreSQLHandler::receiveStartupMessage(int payload_size) diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index 0dcf1227c30..608abb02148 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -253,16 +253,15 @@ void TCPHandler::runImpl() /// Processing Query state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data); - if (state.io.out) - state.need_receive_data_for_insert = true; - after_check_cancelled.restart(); after_send_progress.restart(); - /// Does the request require receive data from client? - if (state.need_receive_data_for_insert) + if (state.io.out) + { + state.need_receive_data_for_insert = true; processInsertQuery(connection_settings); - else if (state.need_receive_data_for_input) + } + else if (state.need_receive_data_for_input) // It implies pipeline execution { /// It is special case for input(), all works for reading data from client will be done in callbacks. auto executor = state.io.pipeline.execute(); @@ -271,8 +270,10 @@ void TCPHandler::runImpl() } else if (state.io.pipeline.initialized()) processOrdinaryQueryWithProcessors(); - else + else if (state.io.in) processOrdinaryQuery(); + else + throw Exception("BlockIO is empty", ErrorCodes::LOGICAL_ERROR); /// Do it before sending end of stream, to have a chance to show log message in client. query_scope->logPeakMemoryUsage();