diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index ae9358c6159..efda9bbfec3 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1409,22 +1409,49 @@ void TCPHandler::sendData(const Block & block) { initBlockOutput(block); - writeVarUInt(Protocol::Server::Data, *out); - /// Send external table name (empty name is the main table) - writeStringBinary("", *out); + auto prev_bytes_written_out = out->count(); + auto prev_bytes_written_compressed_out = state.maybe_compressed_out->count(); - /// For testing hedged requests - const Settings & settings = query_context->getSettingsRef(); - if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds()) + try { - out->next(); - std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds()); - std::this_thread::sleep_for(ms); - } + writeVarUInt(Protocol::Server::Data, *out); + /// Send external table name (empty name is the main table) + writeStringBinary("", *out); - state.block_out->write(block); - state.maybe_compressed_out->next(); - out->next(); + /// For testing hedged requests + const Settings & settings = query_context->getSettingsRef(); + if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds()) + { + out->next(); + std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds()); + std::this_thread::sleep_for(ms); + } + + state.block_out->write(block); + state.maybe_compressed_out->next(); + out->next(); + } + catch (...) + { + /// In case of unsuccessful write, if the buffer with written data was not flushed, + /// we will rollback write to avoid breaking the protocol. + /// (otherwise the client will not be able to receive exception after unfinished data + /// as it will expect the continuation of the data). + /// It looks like hangs on client side or a message like "Data compressed with different methods". + + if (state.compression == Protocol::Compression::Enable) + { + auto extra_bytes_written_compressed = state.maybe_compressed_out->count() - prev_bytes_written_compressed_out; + if (state.maybe_compressed_out->offset() >= extra_bytes_written_compressed) + state.maybe_compressed_out->position() -= extra_bytes_written_compressed; + } + + auto extra_bytes_written_out = out->count() - prev_bytes_written_out; + if (out->offset() >= extra_bytes_written_out) + out->position() -= extra_bytes_written_out; + + throw; + } } diff --git a/tests/queries/0_stateless/01784_parallel_formatting_memory.reference b/tests/queries/0_stateless/01784_parallel_formatting_memory.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01784_parallel_formatting_memory.sql b/tests/queries/0_stateless/01784_parallel_formatting_memory.sql new file mode 100644 index 00000000000..35dc063f895 --- /dev/null +++ b/tests/queries/0_stateless/01784_parallel_formatting_memory.sql @@ -0,0 +1,2 @@ +SET max_memory_usage = '1G'; +SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number; -- { serverError 241 } diff --git a/tests/queries/0_stateless/01785_parallel_formatting_memory.reference b/tests/queries/0_stateless/01785_parallel_formatting_memory.reference new file mode 100644 index 00000000000..0ec7fc54b01 --- /dev/null +++ b/tests/queries/0_stateless/01785_parallel_formatting_memory.reference @@ -0,0 +1,2 @@ +Code: 241 +Code: 241 diff --git a/tests/queries/0_stateless/01785_parallel_formatting_memory.sh b/tests/queries/0_stateless/01785_parallel_formatting_memory.sh new file mode 100755 index 00000000000..6d081c61fd3 --- /dev/null +++ b/tests/queries/0_stateless/01785_parallel_formatting_memory.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT --compress 0 --max_memory_usage 1G --query "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" 2>&1 | grep -oF 'Code: 241' | head -n1 +$CLICKHOUSE_CLIENT --compress 1 --max_memory_usage 1G --query "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" 2>&1 | grep -oF 'Code: 241' | head -n1