diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index ae9358c6159..efda9bbfec3 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -1409,22 +1409,49 @@ void TCPHandler::sendData(const Block & block) { initBlockOutput(block); - writeVarUInt(Protocol::Server::Data, *out); - /// Send external table name (empty name is the main table) - writeStringBinary("", *out); + auto prev_bytes_written_out = out->count(); + auto prev_bytes_written_compressed_out = state.maybe_compressed_out->count(); - /// For testing hedged requests - const Settings & settings = query_context->getSettingsRef(); - if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds()) + try { - out->next(); - std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds()); - std::this_thread::sleep_for(ms); - } + writeVarUInt(Protocol::Server::Data, *out); + /// Send external table name (empty name is the main table) + writeStringBinary("", *out); - state.block_out->write(block); - state.maybe_compressed_out->next(); - out->next(); + /// For testing hedged requests + const Settings & settings = query_context->getSettingsRef(); + if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds()) + { + out->next(); + std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds()); + std::this_thread::sleep_for(ms); + } + + state.block_out->write(block); + state.maybe_compressed_out->next(); + out->next(); + } + catch (...) + { + /// In case of unsuccessful write, if the buffer with written data was not flushed, + /// we will rollback write to avoid breaking the protocol. + /// (otherwise the client will not be able to receive exception after unfinished data + /// as it will expect the continuation of the data). + /// It looks like hangs on client side or a message like "Data compressed with different methods". + + if (state.compression == Protocol::Compression::Enable) + { + auto extra_bytes_written_compressed = state.maybe_compressed_out->count() - prev_bytes_written_compressed_out; + if (state.maybe_compressed_out->offset() >= extra_bytes_written_compressed) + state.maybe_compressed_out->position() -= extra_bytes_written_compressed; + } + + auto extra_bytes_written_out = out->count() - prev_bytes_written_out; + if (out->offset() >= extra_bytes_written_out) + out->position() -= extra_bytes_written_out; + + throw; + } }