diff --git a/src/DataStreams/NativeBlockOutputStream.cpp b/src/DataStreams/NativeBlockOutputStream.cpp index da68376201f..2a016c9a0c8 100644 --- a/src/DataStreams/NativeBlockOutputStream.cpp +++ b/src/DataStreams/NativeBlockOutputStream.cpp @@ -41,7 +41,7 @@ void NativeBlockOutputStream::flush() } -void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) +static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit) { /** If there are columns-constants - then we materialize them. * (Since the data type does not know how to serialize / deserialize constants.) diff --git a/src/DataStreams/NativeBlockOutputStream.h b/src/DataStreams/NativeBlockOutputStream.h index 64ccd267634..c47d7b2f1c3 100644 --- a/src/DataStreams/NativeBlockOutputStream.h +++ b/src/DataStreams/NativeBlockOutputStream.h @@ -30,8 +30,6 @@ public: void write(const Block & block) override; void flush() override; - static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuffer & ostr, UInt64 offset, UInt64 limit); - String getContentType() const override { return "application/octet-stream"; } private: diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp index 0ebca3661b4..ce7dd1abd51 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.cpp @@ -80,9 +80,11 @@ namespace DB } - void ParallelFormattingOutputFormat::collectorThreadFunction() + void ParallelFormattingOutputFormat::collectorThreadFunction(const ThreadGroupStatusPtr & thread_group) { setThreadName("Collector"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); try { @@ -135,9 +137,11 @@ namespace DB } - void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number) + void ParallelFormattingOutputFormat::formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group) { setThreadName("Formatter"); + if (thread_group) + CurrentThread::attachToIfDetached(thread_group); try { diff --git a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h index 584aa364d27..8b9e8293c69 100644 --- a/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h +++ b/src/Processors/Formats/Impl/ParallelFormattingOutputFormat.h @@ -76,7 +76,10 @@ public: /// Just heuristic. We need one thread for collecting, one thread for receiving chunks /// and n threads for formatting. processing_units.resize(params.max_threads_for_parallel_formatting + 2); - collector_thread = ThreadFromGlobalPool([&] { collectorThreadFunction(); }); + collector_thread = ThreadFromGlobalPool([thread_group = CurrentThread::getGroup(), this] + { + collectorThreadFunction(thread_group); + }); LOG_TRACE(&Poco::Logger::get("ParallelFormattingOutputFormat"), "Parallel formatting is being used"); } @@ -200,14 +203,17 @@ private: void scheduleFormatterThreadForUnitWithNumber(size_t ticket_number) { - pool.scheduleOrThrowOnError([this, ticket_number] { formatterThreadFunction(ticket_number); }); + pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup(), ticket_number] + { + formatterThreadFunction(ticket_number, thread_group); + }); } /// Collects all temporary buffers into main WriteBuffer. - void collectorThreadFunction(); + void collectorThreadFunction(const ThreadGroupStatusPtr & thread_group); /// This function is executed in ThreadPool and the only purpose of it is to format one Chunk into a continuous buffer in memory. - void formatterThreadFunction(size_t current_unit_number); + void formatterThreadFunction(size_t current_unit_number, const ThreadGroupStatusPtr & thread_group); }; } diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index ae9358c6159..efda9bbfec3 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1409,22 +1409,49 @@ void TCPHandler::sendData(const Block & block) { initBlockOutput(block); - writeVarUInt(Protocol::Server::Data, *out); - /// Send external table name (empty name is the main table) - writeStringBinary("", *out); + auto prev_bytes_written_out = out->count(); + auto prev_bytes_written_compressed_out = state.maybe_compressed_out->count(); - /// For testing hedged requests - const Settings & settings = query_context->getSettingsRef(); - if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds()) + try { - out->next(); - std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds()); - std::this_thread::sleep_for(ms); - } + writeVarUInt(Protocol::Server::Data, *out); + /// Send external table name (empty name is the main table) + writeStringBinary("", *out); - state.block_out->write(block); - state.maybe_compressed_out->next(); - out->next(); + /// For testing hedged requests + const Settings & settings = query_context->getSettingsRef(); + if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds()) + { + out->next(); + std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds()); + std::this_thread::sleep_for(ms); + } + + state.block_out->write(block); + state.maybe_compressed_out->next(); + out->next(); + } + catch (...) + { + /// In case of unsuccessful write, if the buffer with written data was not flushed, + /// we will rollback write to avoid breaking the protocol. + /// (otherwise the client will not be able to receive exception after unfinished data + /// as it will expect the continuation of the data). + /// It looks like hangs on client side or a message like "Data compressed with different methods". + + if (state.compression == Protocol::Compression::Enable) + { + auto extra_bytes_written_compressed = state.maybe_compressed_out->count() - prev_bytes_written_compressed_out; + if (state.maybe_compressed_out->offset() >= extra_bytes_written_compressed) + state.maybe_compressed_out->position() -= extra_bytes_written_compressed; + } + + auto extra_bytes_written_out = out->count() - prev_bytes_written_out; + if (out->offset() >= extra_bytes_written_out) + out->position() -= extra_bytes_written_out; + + throw; + } } diff --git a/tests/queries/0_stateless/01783_parallel_formatting_memory.reference b/tests/queries/0_stateless/01783_parallel_formatting_memory.reference new file mode 100644 index 00000000000..c5cdc5cf0bb --- /dev/null +++ b/tests/queries/0_stateless/01783_parallel_formatting_memory.reference @@ -0,0 +1 @@ +Code: 241 diff --git a/tests/queries/0_stateless/01783_parallel_formatting_memory.sh b/tests/queries/0_stateless/01783_parallel_formatting_memory.sh new file mode 100755 index 00000000000..0b8cb0bc6be --- /dev/null +++ b/tests/queries/0_stateless/01783_parallel_formatting_memory.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL&max_memory_usage=1G" -d "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" | grep -oF 'Code: 241' diff --git a/tests/queries/0_stateless/01784_parallel_formatting_memory.reference b/tests/queries/0_stateless/01784_parallel_formatting_memory.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/01784_parallel_formatting_memory.sql b/tests/queries/0_stateless/01784_parallel_formatting_memory.sql new file mode 100644 index 00000000000..35dc063f895 --- /dev/null +++ b/tests/queries/0_stateless/01784_parallel_formatting_memory.sql @@ -0,0 +1,2 @@ +SET max_memory_usage = '1G'; +SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number; -- { serverError 241 } diff --git a/tests/queries/0_stateless/01785_parallel_formatting_memory.reference b/tests/queries/0_stateless/01785_parallel_formatting_memory.reference new file mode 100644 index 00000000000..0ec7fc54b01 --- /dev/null +++ b/tests/queries/0_stateless/01785_parallel_formatting_memory.reference @@ -0,0 +1,2 @@ +Code: 241 +Code: 241 diff --git a/tests/queries/0_stateless/01785_parallel_formatting_memory.sh b/tests/queries/0_stateless/01785_parallel_formatting_memory.sh new file mode 100755 index 00000000000..6d081c61fd3 --- /dev/null +++ b/tests/queries/0_stateless/01785_parallel_formatting_memory.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$CLICKHOUSE_CLIENT --compress 0 --max_memory_usage 1G --query "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" 2>&1 | grep -oF 'Code: 241' | head -n1 +$CLICKHOUSE_CLIENT --compress 1 --max_memory_usage 1G --query "SELECT range(65535) FROM system.one ARRAY JOIN range(65536) AS number" 2>&1 | grep -oF 'Code: 241' | head -n1