Send both stdin data and data from query/data from infile in client

This commit is contained in:
avogar 2022-04-14 11:30:52 +00:00
parent 3545eba318
commit a4e3a5e05e
4 changed files with 64 additions and 24 deletions

View File

@ -1109,6 +1109,8 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
progress_indication.setFileProgressCallback(global_context, true);
}
bool have_data_in_stdin = !is_interactive && !stdin_is_a_tty && !std_in.eof();
/// If data fetched from file (maybe compressed file)
if (parsed_insert_query->infile)
{
@ -1156,7 +1158,8 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
global_context->getSettingsRef().max_block_size,
getNumberOfPhysicalCPUCores()
),
parsed_query
parsed_query,
have_data_in_stdin
);
}
catch (Exception & e)
@ -1164,6 +1167,9 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
e.addMessage("data for INSERT was parsed from file");
throw;
}
if (have_data_in_stdin)
sendDataFromStdin(sample, columns_description_for_query, parsed_query);
}
else if (parsed_insert_query->data)
{
@ -1171,7 +1177,9 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);
try
{
sendDataFrom(data_in, sample, columns_description_for_query, parsed_query);
sendDataFrom(data_in, sample, columns_description_for_query, parsed_query, have_data_in_stdin);
if (have_data_in_stdin)
sendDataFromStdin(sample, columns_description_for_query, parsed_query);
}
catch (Exception & e)
{
@ -1187,29 +1195,14 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des
}
else if (!is_interactive)
{
if (need_render_progress)
{
/// Add callback to track reading from fd.
std_in.setProgressCallback(global_context);
}
/// Send data read from stdin.
try
{
sendDataFrom(std_in, sample, columns_description_for_query, parsed_query);
}
catch (Exception & e)
{
e.addMessage("data for INSERT was parsed from stdin");
throw;
}
sendDataFromStdin(sample, columns_description_for_query, parsed_query);
}
else
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
}
void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query)
void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query, bool have_more_data)
{
String current_format = insert_format;
@ -1231,10 +1224,10 @@ void ClientBase::sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDes
});
}
sendDataFromPipe(std::move(pipe), parsed_query);
sendDataFromPipe(std::move(pipe), parsed_query, have_more_data);
}
void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query)
void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query, bool has_more_data)
{
QueryPipeline pipeline(std::move(pipe));
PullingAsyncPipelineExecutor executor(pipeline);
@ -1264,7 +1257,28 @@ void ClientBase::sendDataFromPipe(Pipe&& pipe, ASTPtr parsed_query)
}
}
connection->sendData({}, "", false);
if (!has_more_data)
connection->sendData({}, "", false);
}
void ClientBase::sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query)
{
if (need_render_progress)
{
/// Add callback to track reading from fd.
std_in.setProgressCallback(global_context);
}
/// Send data read from stdin.
try
{
sendDataFrom(std_in, sample, columns_description, parsed_query);
}
catch (Exception & e)
{
e.addMessage("data for INSERT was parsed from stdin");
throw;
}
}

View File

@ -125,8 +125,9 @@ private:
void sendData(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFrom(ReadBuffer & buf, Block & sample,
const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendDataFromPipe(Pipe && pipe, ASTPtr parsed_query);
const ColumnsDescription & columns_description, ASTPtr parsed_query, bool have_more_data = false);
void sendDataFromPipe(Pipe && pipe, ASTPtr parsed_query, bool have_more_data = false);
void sendDataFromStdin(Block & sample, const ColumnsDescription & columns_description, ASTPtr parsed_query);
void sendExternalTables(ASTPtr parsed_query);
void initBlockOutputStream(const Block & block, ASTPtr parsed_query);

View File

@ -0,0 +1,6 @@
24
42
24
24
42
42

View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# Tags: no-parallel
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS test_02270"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_02270 (x UInt32) ENGINE=Memory"
echo "(42)" | $CLICKHOUSE_CLIENT -q "INSERT INTO test_02270 FORMAT Values (24)"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02270 ORDER BY x"
echo "(24)" > 02270_data.values
echo "(42)" | $CLICKHOUSE_CLIENT -q "INSERT INTO test_02270 FROM INFILE '02270_data.values'"
$CLICKHOUSE_CLIENT -q "SELECT * FROM test_02270 ORDER BY x"
$CLICKHOUSE_CLIENT -q "DROP TABLE test_02270"
rm 02270_data.values