Better exception message in client in case of exception while writing blocks

This commit is contained in:
Alexey Milovidov 2021-04-01 07:12:07 +03:00
parent b0539eb28f
commit 9d65d83c83

View File

@ -1409,22 +1409,49 @@ void TCPHandler::sendData(const Block & block)
{
initBlockOutput(block);
writeVarUInt(Protocol::Server::Data, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
auto prev_bytes_written_out = out->count();
auto prev_bytes_written_compressed_out = state.maybe_compressed_out->count();
/// For testing hedged requests
const Settings & settings = query_context->getSettingsRef();
if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds())
try
{
out->next();
std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
writeVarUInt(Protocol::Server::Data, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
state.block_out->write(block);
state.maybe_compressed_out->next();
out->next();
/// For testing hedged requests
const Settings & settings = query_context->getSettingsRef();
if (block.rows() > 0 && settings.sleep_in_send_data_ms.totalMilliseconds())
{
out->next();
std::chrono::milliseconds ms(settings.sleep_in_send_data_ms.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
state.block_out->write(block);
state.maybe_compressed_out->next();
out->next();
}
catch (...)
{
/// In case of unsuccessful write, if the buffer with written data was not flushed,
/// we will rollback write to avoid breaking the protocol.
/// (otherwise the client will not be able to receive exception after unfinished data
/// as it will expect the continuation of the data).
/// It looks like hangs on client side or a message like "Data compressed with different methods".
if (state.compression == Protocol::Compression::Enable)
{
auto extra_bytes_written_compressed = state.maybe_compressed_out->count() - prev_bytes_written_compressed_out;
if (state.maybe_compressed_out->offset() >= extra_bytes_written_compressed)
state.maybe_compressed_out->position() -= extra_bytes_written_compressed;
}
auto extra_bytes_written_out = out->count() - prev_bytes_written_out;
if (out->offset() >= extra_bytes_written_out)
out->position() -= extra_bytes_written_out;
throw;
}
}