diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index c9b7bb3a8ee..6422f3a786d 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -1109,6 +1109,8 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des progress_indication.setFileProgressCallback(global_context, true); } + bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof(); + /// If data fetched from file (maybe compressed file) if (parsed_insert_query->infile) { @@ -1156,7 +1158,8 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des global_context->getSettingsRef().max_block_size, getNumberOfPhysicalCPUCores() ), - parsed_query + parsed_query, + have_data_in_stdin ); } catch (Exception & e) @@ -1164,6 +1167,9 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des e.addMessage("data for INSERT was parsed from file"); throw; } + + if (have_data_in_stdin) + sendDataFromStdin(sample, columns_description_for_query, parsed_query); } else if (parsed_insert_query->data) { @@ -1171,7 +1177,9 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data); try { - sendDataFrom(data_in, sample, columns_description_for_query, parsed_query); + sendDataFrom(data_in, sample, columns_description_for_query, parsed_query, have_data_in_stdin); + if (have_data_in_stdin) + sendDataFromStdin(sample, columns_description_for_query, parsed_query); } catch (Exception & e) { @@ -1187,29 +1195,14 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des } else if (!is_interactive) { - if (need_render_progress) - { - /// Add callback to track reading from fd. - std_in.setProgressCallback(global_context); - } - - /// Send data read from stdin. - try - { - sendDataFrom(std_in, sample, columns_description_for_query, parsed_query); - } - catch (Exception & e) - { - e.addMessage("data for INSERT was parsed from stdin"); - throw; - } + sendDataFromStdin(sample, columns_description_for_query, parsed_query); } else throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT); } -void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query) +void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query, bool have_more_data) { String current_format = insert_format; @@ -1231,10 +1224,10 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes }); } - sendDataFromPipe(std::move(pipe), parsed_query); + sendDataFromPipe(std::move(pipe), parsed_query, have_more_data); } -void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query) +void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query, bool has_more_data) { QueryPipeline pipeline(std::move(pipe)); PullingAsyncPipelineExecutor executor(pipeline); @@ -1264,7 +1257,28 @@ void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query) } } - connection->sendData({}, "", false); + if (!has_more_data) + connection->sendData({}, "", false); +} + +void ClientBase::sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query) +{ + if (need_render_progress) + { + /// Add callback to track reading from fd. + std_in.setProgressCallback(global_context); + } + + /// Send data read from stdin. + try + { + sendDataFrom(std_in, sample, columns_description, parsed_query); + } + catch (Exception & e) + { + e.addMessage("data for INSERT was parsed from stdin"); + throw; + } } diff --git a/src/Client/ClientBase.h b/src/Client/ClientBase.h index 6846fa247e8..639778ab53a 100644 --- a/src/Client/ClientBase.h +++ b/src/Client/ClientBase.h @@ -125,8 +125,9 @@ private: void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query); void sendDataFrom(ReadBuffer & buf, Block & sample, - const ColumnsDescription & columns_description, ASTPtr parsed_query); - void sendDataFromPipe(Pipe && pipe, ASTPtr parsed_query); + const ColumnsDescription & columns_description, ASTPtr parsed_query, bool have_more_data = false); + void sendDataFromPipe(Pipe && pipe, ASTPtr parsed_query, bool have_more_data = false); + void sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query); void sendExternalTables(ASTPtr parsed_query); void initBlockOutputStream(const Block & block, ASTPtr parsed_query); diff --git a/tests/queries/0_stateless/02270_stdin_with_query_or_infile_data.reference b/tests/queries/0_stateless/02270_stdin_with_query_or_infile_data.reference new file mode 100644 index 00000000000..05c0295fffc --- /dev/null +++ b/tests/queries/0_stateless/02270_stdin_with_query_or_infile_data.reference @@ -0,0 +1,6 @@ +24 +42 +24 +24 +42 +42 diff --git a/tests/queries/0_stateless/02270_stdin_with_query_or_infile_data.sh b/tests/queries/0_stateless/02270_stdin_with_query_or_infile_data.sh new file mode 100755 index 00000000000..626891c5d7c --- /dev/null +++ b/tests/queries/0_stateless/02270_stdin_with_query_or_infile_data.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Tags: no-parallel + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02270" +$CLICKHOUSE_CLIENT -q "CREATE TABLE test_02270 (x UInt32) ENGINE=Memory" + +echo "(42)" | $CLICKHOUSE_CLIENT -q "INSERT INTO test_02270 FORMAT Values (24)" +$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02270 ORDER BY x" + +echo "(24)" > 02270_data.values +echo "(42)" | $CLICKHOUSE_CLIENT -q "INSERT INTO test_02270 FROM INFILE '02270_data.values'" +$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02270 ORDER BY x" + +$CLICKHOUSE_CLIENT -q "DROP TABLE test_02270" +rm 02270_data.values