Code refactoring.

This commit is contained in:
Amos Bird 2020-10-10 09:43:07 +08:00
parent d42ffa02bd
commit dbc14655fc
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
4 changed files with 15 additions and 17 deletions

View File

@ -300,8 +300,7 @@ Block InterpreterKillQueryQuery::getSelectResult(const String & columns, const S
if (where_expression)
select_query += " WHERE " + queryToString(where_expression);
BlockIO block_io = executeQuery(select_query, context.getGlobalContext(), true);
auto stream = block_io.getInputStream();
auto stream = executeQuery(select_query, context.getGlobalContext(), true).getInputStream();
Block res = stream->read();
if (res && stream->read())

View File

@ -64,6 +64,7 @@ namespace DB
namespace ErrorCodes
{
extern const int INTO_OUTFILE_NOT_ALLOWED;
extern const int LOGICAL_ERROR;
extern const int QUERY_WAS_CANCELLED;
}
@ -804,8 +805,7 @@ void executeQuery(
InputStreamFromASTInsertQuery in(ast, &istr, streams.out->getHeader(), context, nullptr);
copyData(in, *streams.out);
}
if (streams.in)
else if (streams.in)
{
/// FIXME: try to prettify this cast using `as<>()`
const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
@ -847,8 +847,7 @@ void executeQuery(
copyData(*streams.in, *out, [](){ return false; }, [&out](const Block &) { out->flush(); });
}
if (pipeline.initialized())
else if (pipeline.initialized())
{
const ASTQueryWithOutput * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
@ -907,6 +906,8 @@ void executeQuery(
executor->execute(pipeline.getNumThreads());
}
}
else
throw Exception("BlockIO is empty", ErrorCodes::LOGICAL_ERROR);
}
catch (...)
{

View File

@ -218,10 +218,7 @@ void PostgreSQLHandler::cancelRequest()
String query = Poco::format("KILL QUERY WHERE query_id = 'postgres:%d:%d'", msg->process_id, msg->secret_key);
ReadBufferFromString replacement(query);
executeQuery(
replacement, *out, true, connection_context,
[](const String &, const String &, const String &, const String &) {}
);
executeQuery(replacement, *out, true, connection_context, {});
}
inline std::unique_ptr<PostgreSQLProtocol::Messaging::StartupMessage> PostgreSQLHandler::receiveStartupMessage(int payload_size)

View File

@ -253,16 +253,15 @@ void TCPHandler::runImpl()
/// Processing Query
state.io = executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);
if (state.io.out)
state.need_receive_data_for_insert = true;
after_check_cancelled.restart();
after_send_progress.restart();
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
if (state.io.out)
{
state.need_receive_data_for_insert = true;
processInsertQuery(connection_settings);
else if (state.need_receive_data_for_input)
}
else if (state.need_receive_data_for_input) // It implies pipeline execution
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
auto executor = state.io.pipeline.execute();
@ -271,8 +270,10 @@ void TCPHandler::runImpl()
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors();
else
else if (state.io.in)
processOrdinaryQuery();
else
throw Exception("BlockIO is empty", ErrorCodes::LOGICAL_ERROR);
/// Do it before sending end of stream, to have a chance to show log message in client.
query_scope->logPeakMemoryUsage();